import sqlite3 from datetime import datetime from typing import Optional import config def get_connection(): """获取数据库连接""" conn = sqlite3.connect(config.DATABASE_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): """初始化数据库表""" conn = get_connection() cursor = conn.cursor() # 消息表 cursor.execute(""" CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id TEXT NOT NULL, chat_name TEXT, chat_username TEXT, message_id INTEGER, content TEXT NOT NULL, media_type TEXT, media_file_id TEXT, msg_date TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # 监控的群组表 cursor.execute(""" CREATE TABLE IF NOT EXISTS watched_chats ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id TEXT UNIQUE NOT NULL, chat_name TEXT, account_id INTEGER DEFAULT NULL, added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_content ON messages(content) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_chat ON messages(chat_id) """) cursor.execute(""" CREATE UNIQUE INDEX IF NOT EXISTS idx_chat_msg ON messages(chat_id, message_id) """) # 管理员表 cursor.execute(""" CREATE TABLE IF NOT EXISTS admins ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT UNIQUE NOT NULL, username TEXT, role TEXT DEFAULT 'admin', added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # ========== 多账号相关表 ========== # 协议号/账号表 cursor.execute(""" CREATE TABLE IF NOT EXISTS accounts ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_name TEXT NOT NULL, session_string TEXT DEFAULT '', session_type TEXT DEFAULT 'string', phone TEXT DEFAULT '', nickname TEXT DEFAULT '', proxy_type TEXT DEFAULT '', proxy_host TEXT DEFAULT '', proxy_port INTEGER DEFAULT 0, proxy_username TEXT DEFAULT '', proxy_password TEXT DEFAULT '', is_active INTEGER DEFAULT 1, max_chats INTEGER DEFAULT 400, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # 代理池表 (动态IP轮换) cursor.execute(""" CREATE TABLE IF NOT EXISTS proxy_pool ( id INTEGER PRIMARY KEY AUTOINCREMENT, proxy_type TEXT NOT NULL DEFAULT 'socks5', host TEXT NOT NULL, port INTEGER NOT NULL, username TEXT DEFAULT '', password TEXT DEFAULT '', is_active INTEGER DEFAULT 1, assigned_account_id INTEGER DEFAULT NULL, last_used_at TIMESTAMP DEFAULT NULL, fail_count INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_proxy_active ON proxy_pool(is_active) """) # 汇总群表 (关键词自动转发) cursor.execute(""" CREATE TABLE IF NOT EXISTS summary_groups ( id INTEGER PRIMARY KEY AUTOINCREMENT, group_id TEXT UNIQUE NOT NULL, group_name TEXT DEFAULT '', keywords TEXT DEFAULT '', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_summary_group_id ON summary_groups(group_id) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_accounts_active ON accounts(is_active) """) # 数据库迁移: 给旧表补新列 try: cursor.execute("ALTER TABLE messages ADD COLUMN chat_username TEXT") except sqlite3.OperationalError: pass # 列已存在 try: cursor.execute("ALTER TABLE messages ADD COLUMN msg_date TEXT") except sqlite3.OperationalError: pass # 列已存在 try: cursor.execute("ALTER TABLE messages ADD COLUMN media_type TEXT") except sqlite3.OperationalError: pass try: cursor.execute("ALTER TABLE messages ADD COLUMN media_file_id TEXT") except sqlite3.OperationalError: pass # 迁移: watched_chats 补 account_id 列 try: cursor.execute("ALTER TABLE watched_chats ADD COLUMN account_id INTEGER DEFAULT NULL") except sqlite3.OperationalError: pass conn.commit() conn.close() def save_message(chat_id: str, chat_name: str, message_id: int, content: str, chat_username: str = "", msg_date: str = "", media_type: str = "", media_file_id: str = ""): """保存消息到数据库""" conn = get_connection() cursor = conn.cursor() cursor.execute(""" INSERT OR IGNORE INTO messages (chat_id, chat_name, chat_username, message_id, content, media_type, media_file_id, msg_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (chat_id, chat_name, chat_username, message_id, content, media_type, media_file_id, msg_date)) conn.commit() conn.close() def message_exists(chat_id: str, message_id: int) -> bool: """检查消息是否已存在 (用于去重)""" conn = get_connection() cursor = conn.cursor() cursor.execute( "SELECT 1 FROM messages WHERE chat_id = ? AND message_id = ?", (chat_id, message_id) ) result = cursor.fetchone() conn.close() return result is not None def batch_save_messages(messages_data: list) -> int: """批量保存消息,自动去重,返回实际插入条数""" if not messages_data: return 0 conn = get_connection() cursor = conn.cursor() inserted = 0 for msg in messages_data: # 先检查是否已存在 cursor.execute( "SELECT 1 FROM messages WHERE chat_id = ? AND message_id = ?", (msg["chat_id"], msg["message_id"]) ) if cursor.fetchone(): continue cursor.execute(""" INSERT INTO messages (chat_id, chat_name, chat_username, message_id, content, media_type, media_file_id, msg_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (msg["chat_id"], msg["chat_name"], msg.get("chat_username", ""), msg["message_id"], msg["content"], msg.get("media_type", ""), msg.get("media_file_id", ""), msg.get("msg_date", ""))) inserted += 1 conn.commit() conn.close() return inserted def search_messages(keyword: str, limit: int = 5, offset: int = 0) -> list: """根据关键词搜索消息,按消息发送时间倒序""" conn = get_connection() cursor = conn.cursor() cursor.execute(""" SELECT * FROM messages WHERE content LIKE ? ORDER BY msg_date DESC, created_at DESC LIMIT ? OFFSET ? """, (f"%{keyword}%", limit, offset)) results = cursor.fetchall() conn.close() return [dict(r) for r in results] def count_messages(keyword: str) -> int: """统计匹配关键词的消息数量""" conn = get_connection() cursor = conn.cursor() cursor.execute(""" SELECT COUNT(*) FROM messages WHERE content LIKE ? """, (f"%{keyword}%",)) count = cursor.fetchone()[0] conn.close() return count def count_all_messages() -> int: """统计所有消息数量""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM messages") count = cursor.fetchone()[0] conn.close() return count def clear_messages() -> int: """清空所有消息记录""" conn = get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM messages") conn.commit() deleted = cursor.rowcount conn.close() return deleted # 监控群组相关 def add_watched_chat(chat_id: str, chat_name: str) -> bool: """添加监控群组""" conn = get_connection() cursor = conn.cursor() try: cursor.execute(""" INSERT OR IGNORE INTO watched_chats (chat_id, chat_name) VALUES (?, ?) """, (chat_id, chat_name)) conn.commit() inserted = cursor.rowcount > 0 except: inserted = False finally: conn.close() return inserted def remove_watched_chat(chat_id: str) -> bool: """移除监控群组""" conn = get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM watched_chats WHERE chat_id = ?", (chat_id,)) conn.commit() deleted = cursor.rowcount > 0 conn.close() return deleted def is_watched_chat(chat_id: str) -> bool: """检查是否是监控的群组""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT 1 FROM watched_chats WHERE chat_id = ?", (chat_id,)) result = cursor.fetchone() conn.close() return result is not None def get_watched_chats() -> list: """获取所有监控的群组""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM watched_chats ORDER BY added_at DESC") results = cursor.fetchall() conn.close() return [dict(r) for r in results] def clear_watched_chats() -> int: """清空所有监控群组,返回删除数量""" conn = get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM watched_chats") conn.commit() deleted = cursor.rowcount conn.close() return deleted # ========== 管理员相关 ========== def set_owner(user_id: str, username: str = ""): """设置 owner (最高权限,只能有一个)""" conn = get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM admins WHERE role = 'owner'") cursor.execute(""" INSERT OR REPLACE INTO admins (user_id, username, role) VALUES (?, ?, 'owner') """, (user_id, username)) conn.commit() conn.close() def get_owner() -> dict | None: """获取 owner""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM admins WHERE role = 'owner' LIMIT 1") row = cursor.fetchone() conn.close() return dict(row) if row else None def add_admin(user_id: str, username: str = "") -> bool: """添加管理员""" conn = get_connection() cursor = conn.cursor() try: cursor.execute(""" INSERT OR IGNORE INTO admins (user_id, username, role) VALUES (?, ?, 'admin') """, (user_id, username)) conn.commit() inserted = cursor.rowcount > 0 except: inserted = False finally: conn.close() return inserted def remove_admin(user_id: str) -> bool: """移除管理员 (不能移除 owner)""" conn = get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM admins WHERE user_id = ? AND role = 'admin'", (user_id,)) conn.commit() deleted = cursor.rowcount > 0 conn.close() return deleted def get_admins() -> list: """获取所有管理员 (含 owner)""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM admins ORDER BY role DESC, added_at ASC") results = cursor.fetchall() conn.close() return [dict(r) for r in results] def is_admin_user(user_id: str) -> bool: """检查是否是管理员或 owner""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,)) result = cursor.fetchone() conn.close() return result is not None def is_owner_user(user_id: str) -> bool: """检查是否是 owner""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT 1 FROM admins WHERE user_id = ? AND role = 'owner'", (user_id,)) result = cursor.fetchone() conn.close() return result is not None # ========== 多账号管理 ========== def add_account(session_name: str, session_string: str = "", session_type: str = "string", phone: str = "", nickname: str = "", proxy_type: str = "", proxy_host: str = "", proxy_port: int = 0, proxy_username: str = "", proxy_password: str = "", max_chats: int = 400) -> int: """添加协议号/账号,返回账号ID""" conn = get_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO accounts (session_name, session_string, session_type, phone, nickname, proxy_type, proxy_host, proxy_port, proxy_username, proxy_password, is_active, max_chats) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, ?) """, (session_name, session_string, session_type, phone, nickname, proxy_type, proxy_host, proxy_port, proxy_username, proxy_password, max_chats)) conn.commit() account_id = cursor.lastrowid conn.close() return account_id def remove_account(account_id: int) -> bool: """删除账号""" conn = get_connection() cursor = conn.cursor() # 解除该账号关联的群 cursor.execute("UPDATE watched_chats SET account_id = NULL WHERE account_id = ?", (account_id,)) cursor.execute("DELETE FROM accounts WHERE id = ?", (account_id,)) conn.commit() deleted = cursor.rowcount > 0 conn.close() return deleted def get_accounts() -> list: """获取所有账号""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM accounts ORDER BY created_at ASC") results = cursor.fetchall() conn.close() return [dict(r) for r in results] def get_active_accounts() -> list: """获取所有活跃账号""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM accounts WHERE is_active = 1 ORDER BY created_at ASC") results = cursor.fetchall() conn.close() return [dict(r) for r in results] def get_account(account_id: int) -> dict | None: """获取单个账号""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM accounts WHERE id = ?", (account_id,)) row = cursor.fetchone() conn.close() return dict(row) if row else None def update_account(account_id: int, **kwargs) -> bool: """更新账号信息""" if not kwargs: return False conn = get_connection() cursor = conn.cursor() sets = ", ".join(f"{k} = ?" for k in kwargs.keys()) values = list(kwargs.values()) + [account_id] cursor.execute(f"UPDATE accounts SET {sets} WHERE id = ?", values) conn.commit() updated = cursor.rowcount > 0 conn.close() return updated def set_account_active(account_id: int, is_active: bool) -> bool: """启用/禁用账号""" return update_account(account_id, is_active=1 if is_active else 0) def set_account_proxy(account_id: int, proxy_type: str, proxy_host: str, proxy_port: int, proxy_username: str = "", proxy_password: str = "") -> bool: """设置账号代理""" return update_account(account_id, proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port, proxy_username=proxy_username, proxy_password=proxy_password) def clear_account_proxy(account_id: int) -> bool: """清除账号代理""" return update_account(account_id, proxy_type="", proxy_host="", proxy_port=0, proxy_username="", proxy_password="") def count_account_chats(account_id: int) -> int: """统计某个账号监控的群数""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM watched_chats WHERE account_id = ?", (account_id,)) count = cursor.fetchone()[0] conn.close() return count def get_least_loaded_account() -> dict | None: """获取监控群最少的活跃账号(用于自动分配)""" conn = get_connection() cursor = conn.cursor() cursor.execute(""" SELECT a.*, COUNT(w.id) as chat_count FROM accounts a LEFT JOIN watched_chats w ON w.account_id = a.id WHERE a.is_active = 1 GROUP BY a.id HAVING chat_count < a.max_chats ORDER BY chat_count ASC LIMIT 1 """) row = cursor.fetchone() conn.close() return dict(row) if row else None def assign_chat_to_account(chat_id: str, account_id: int) -> bool: """将群分配给某个账号""" conn = get_connection() cursor = conn.cursor() cursor.execute("UPDATE watched_chats SET account_id = ? WHERE chat_id = ?", (account_id, chat_id)) conn.commit() updated = cursor.rowcount > 0 conn.close() return updated def get_account_chats(account_id: int) -> list: """获取某个账号监控的所有群""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM watched_chats WHERE account_id = ? ORDER BY added_at DESC", (account_id,)) results = cursor.fetchall() conn.close() return [dict(r) for r in results] def add_watched_chat_with_account(chat_id: str, chat_name: str, account_id: int = None) -> bool: """添加监控群组,可指定账号""" conn = get_connection() cursor = conn.cursor() try: cursor.execute(""" INSERT OR IGNORE INTO watched_chats (chat_id, chat_name, account_id) VALUES (?, ?, ?) """, (chat_id, chat_name, account_id)) conn.commit() inserted = cursor.rowcount > 0 except: inserted = False finally: conn.close() return inserted # ========== 代理池管理 ========== def add_proxy(proxy_type: str, host: str, port: int, username: str = "", password: str = "") -> int: """添加代理到代理池,返回代理ID""" conn = get_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO proxy_pool (proxy_type, host, port, username, password, is_active) VALUES (?, ?, ?, ?, ?, 1) """, (proxy_type, host, port, username, password)) conn.commit() proxy_id = cursor.lastrowid conn.close() return proxy_id def batch_add_proxies(proxies: list) -> int: """批量添加代理,返回添加数量 proxies: [{"type": "socks5", "host": "x.x.x.x", "port": 1080, "username": "", "password": ""}, ...] """ if not proxies: return 0 conn = get_connection() cursor = conn.cursor() count = 0 for p in proxies: cursor.execute(""" INSERT INTO proxy_pool (proxy_type, host, port, username, password, is_active) VALUES (?, ?, ?, ?, ?, 1) """, (p.get("type", "socks5"), p["host"], p["port"], p.get("username", ""), p.get("password", ""))) count += 1 conn.commit() conn.close() return count def remove_proxy(proxy_id: int) -> bool: """删除代理""" conn = get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM proxy_pool WHERE id = ?", (proxy_id,)) conn.commit() deleted = cursor.rowcount > 0 conn.close() return deleted def get_proxies(active_only: bool = False) -> list: """获取代理池""" conn = get_connection() cursor = conn.cursor() if active_only: cursor.execute("SELECT * FROM proxy_pool WHERE is_active = 1 ORDER BY last_used_at ASC, fail_count ASC") else: cursor.execute("SELECT * FROM proxy_pool ORDER BY created_at ASC") results = cursor.fetchall() conn.close() return [dict(r) for r in results] def get_available_proxy(exclude_account_id: int = None) -> dict | None: """获取一个可用代理(未分配或最久未使用的),用于轮换""" conn = get_connection() cursor = conn.cursor() if exclude_account_id: cursor.execute(""" SELECT * FROM proxy_pool WHERE is_active = 1 AND (assigned_account_id IS NULL OR assigned_account_id != ?) ORDER BY last_used_at ASC NULLS FIRST, fail_count ASC LIMIT 1 """, (exclude_account_id,)) else: cursor.execute(""" SELECT * FROM proxy_pool WHERE is_active = 1 AND assigned_account_id IS NULL ORDER BY last_used_at ASC NULLS FIRST, fail_count ASC LIMIT 1 """) row = cursor.fetchone() conn.close() return dict(row) if row else None def assign_proxy_to_account(proxy_id: int, account_id: int) -> bool: """分配代理给账号""" conn = get_connection() cursor = conn.cursor() # 先释放该账号之前的代理 cursor.execute("UPDATE proxy_pool SET assigned_account_id = NULL WHERE assigned_account_id = ?", (account_id,)) # 分配新代理 cursor.execute(""" UPDATE proxy_pool SET assigned_account_id = ?, last_used_at = CURRENT_TIMESTAMP WHERE id = ? """, (account_id, proxy_id)) conn.commit() updated = cursor.rowcount > 0 conn.close() return updated def release_proxy(account_id: int) -> bool: """释放账号的代理""" conn = get_connection() cursor = conn.cursor() cursor.execute("UPDATE proxy_pool SET assigned_account_id = NULL WHERE assigned_account_id = ?", (account_id,)) conn.commit() updated = cursor.rowcount > 0 conn.close() return updated def mark_proxy_failed(proxy_id: int) -> bool: """标记代理失败(fail_count + 1,超过3次自动禁用)""" conn = get_connection() cursor = conn.cursor() cursor.execute(""" UPDATE proxy_pool SET fail_count = fail_count + 1, is_active = CASE WHEN fail_count + 1 >= 3 THEN 0 ELSE 1 END WHERE id = ? """, (proxy_id,)) conn.commit() updated = cursor.rowcount > 0 conn.close() return updated def reset_proxy_fails(proxy_id: int) -> bool: """重置代理失败计数""" conn = get_connection() cursor = conn.cursor() cursor.execute("UPDATE proxy_pool SET fail_count = 0, is_active = 1 WHERE id = ?", (proxy_id,)) conn.commit() updated = cursor.rowcount > 0 conn.close() return updated def clear_proxy_pool() -> int: """清空代理池""" conn = get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM proxy_pool") conn.commit() deleted = cursor.rowcount conn.close() return deleted # ========== 汇总群管理 ========== def add_summary_group(group_id: str, group_name: str, keywords: str) -> int: """添加/更新汇总群绑定,追加关键词(set去重),返回记录ID keywords: 逗号分隔的关键词字符串 """ # 统一处理关键词:小写、strip、去重 new_kws = {k.strip().lower() for k in keywords.split(",") if k.strip()} conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT id, keywords FROM summary_groups WHERE group_id = ?", (group_id,)) row = cursor.fetchone() if row: # 已存在,追加关键词(旧关键词也统一小写) existing_kws = {k.strip().lower() for k in row["keywords"].split(",") if k.strip()} merged = existing_kws | new_kws cursor.execute( "UPDATE summary_groups SET keywords = ?, group_name = ? WHERE group_id = ?", (",".join(sorted(merged)), group_name, group_id) ) conn.commit() rid = row["id"] else: # 新建 cursor.execute( "INSERT INTO summary_groups (group_id, group_name, keywords) VALUES (?, ?, ?)", (group_id, group_name, ",".join(sorted(new_kws))) ) conn.commit() rid = cursor.lastrowid conn.close() return rid def remove_summary_group_keywords(group_id: str, keywords: str) -> bool: """移除汇总群的指定关键词,删光则删除整条记录 keywords: 逗号分隔的待移除关键词 返回: 是否执行了修改 """ remove_kws = {k.strip().lower() for k in keywords.split(",") if k.strip()} conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT keywords FROM summary_groups WHERE group_id = ?", (group_id,)) row = cursor.fetchone() if not row: conn.close() return False existing_kws = {k.strip() for k in row["keywords"].split(",") if k.strip()} remaining = existing_kws - remove_kws if remaining: cursor.execute( "UPDATE summary_groups SET keywords = ? WHERE group_id = ?", (",".join(sorted(remaining)), group_id) ) else: # 关键词删光,删除整条记录 cursor.execute("DELETE FROM summary_groups WHERE group_id = ?", (group_id,)) conn.commit() conn.close() return True def remove_summary_group(group_id: str) -> bool: """删除整个汇总群绑定""" conn = get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM summary_groups WHERE group_id = ?", (group_id,)) conn.commit() deleted = cursor.rowcount > 0 conn.close() return deleted def get_summary_groups() -> list: """获取所有汇总群""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM summary_groups ORDER BY created_at ASC") results = cursor.fetchall() conn.close() return [dict(r) for r in results] def get_summary_group(group_id: str) -> dict | None: """获取单个汇总群""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM summary_groups WHERE group_id = ?", (group_id,)) row = cursor.fetchone() conn.close() return dict(row) if row else None def is_summary_group(group_id: str) -> bool: """快速判断是否是汇总群""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT 1 FROM summary_groups WHERE group_id = ?", (group_id,)) result = cursor.fetchone() conn.close() return result is not None def search_messages_asc(keyword: str, limit: int = 20, offset: int = 0) -> list: """根据关键词搜索消息,按消息发送时间正序(群内搜索用)""" conn = get_connection() cursor = conn.cursor() cursor.execute(""" SELECT * FROM messages WHERE content LIKE ? ORDER BY msg_date ASC, created_at ASC LIMIT ? OFFSET ? """, (f"%{keyword}%", limit, offset)) results = cursor.fetchall() conn.close() return [dict(r) for r in results]