tg-channel-bot/database.py

861 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
from datetime import datetime
from typing import Optional
import config
def get_connection():
"""获取数据库连接"""
conn = sqlite3.connect(config.DATABASE_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
"""初始化数据库表"""
conn = get_connection()
cursor = conn.cursor()
# 消息表
cursor.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id TEXT NOT NULL,
chat_name TEXT,
chat_username TEXT,
message_id INTEGER,
content TEXT NOT NULL,
media_type TEXT,
media_file_id TEXT,
msg_date TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 监控的群组表
cursor.execute("""
CREATE TABLE IF NOT EXISTS watched_chats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id TEXT UNIQUE NOT NULL,
chat_name TEXT,
account_id INTEGER DEFAULT NULL,
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_content ON messages(content)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_chat ON messages(chat_id)
""")
cursor.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_chat_msg ON messages(chat_id, message_id)
""")
# 管理员表
cursor.execute("""
CREATE TABLE IF NOT EXISTS admins (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT UNIQUE NOT NULL,
username TEXT,
role TEXT DEFAULT 'admin',
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# ========== 多账号相关表 ==========
# 协议号/账号表
cursor.execute("""
CREATE TABLE IF NOT EXISTS accounts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_name TEXT NOT NULL,
session_string TEXT DEFAULT '',
session_type TEXT DEFAULT 'string',
phone TEXT DEFAULT '',
nickname TEXT DEFAULT '',
proxy_type TEXT DEFAULT '',
proxy_host TEXT DEFAULT '',
proxy_port INTEGER DEFAULT 0,
proxy_username TEXT DEFAULT '',
proxy_password TEXT DEFAULT '',
is_active INTEGER DEFAULT 1,
max_chats INTEGER DEFAULT 400,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 代理池表 (动态IP轮换)
cursor.execute("""
CREATE TABLE IF NOT EXISTS proxy_pool (
id INTEGER PRIMARY KEY AUTOINCREMENT,
proxy_type TEXT NOT NULL DEFAULT 'socks5',
host TEXT NOT NULL,
port INTEGER NOT NULL,
username TEXT DEFAULT '',
password TEXT DEFAULT '',
is_active INTEGER DEFAULT 1,
assigned_account_id INTEGER DEFAULT NULL,
last_used_at TIMESTAMP DEFAULT NULL,
fail_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_proxy_active ON proxy_pool(is_active)
""")
# 汇总群表 (关键词自动转发)
cursor.execute("""
CREATE TABLE IF NOT EXISTS summary_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_id TEXT UNIQUE NOT NULL,
group_name TEXT DEFAULT '',
keywords TEXT DEFAULT '',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_summary_group_id ON summary_groups(group_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_accounts_active ON accounts(is_active)
""")
# 数据库迁移: 给旧表补新列
try:
cursor.execute("ALTER TABLE messages ADD COLUMN chat_username TEXT")
except sqlite3.OperationalError:
pass # 列已存在
try:
cursor.execute("ALTER TABLE messages ADD COLUMN msg_date TEXT")
except sqlite3.OperationalError:
pass # 列已存在
try:
cursor.execute("ALTER TABLE messages ADD COLUMN media_type TEXT")
except sqlite3.OperationalError:
pass
try:
cursor.execute("ALTER TABLE messages ADD COLUMN media_file_id TEXT")
except sqlite3.OperationalError:
pass
# 迁移: watched_chats 补 account_id 列
try:
cursor.execute("ALTER TABLE watched_chats ADD COLUMN account_id INTEGER DEFAULT NULL")
except sqlite3.OperationalError:
pass
conn.commit()
conn.close()
def save_message(chat_id: str, chat_name: str, message_id: int, content: str,
chat_username: str = "", msg_date: str = "",
media_type: str = "", media_file_id: str = ""):
"""保存消息到数据库"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT OR IGNORE INTO messages (chat_id, chat_name, chat_username, message_id, content, media_type, media_file_id, msg_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (chat_id, chat_name, chat_username, message_id, content, media_type, media_file_id, msg_date))
conn.commit()
conn.close()
def message_exists(chat_id: str, message_id: int) -> bool:
"""检查消息是否已存在 (用于去重)"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT 1 FROM messages WHERE chat_id = ? AND message_id = ?",
(chat_id, message_id)
)
result = cursor.fetchone()
conn.close()
return result is not None
def batch_save_messages(messages_data: list) -> int:
"""批量保存消息,自动去重,返回实际插入条数"""
if not messages_data:
return 0
conn = get_connection()
cursor = conn.cursor()
inserted = 0
for msg in messages_data:
# 先检查是否已存在
cursor.execute(
"SELECT 1 FROM messages WHERE chat_id = ? AND message_id = ?",
(msg["chat_id"], msg["message_id"])
)
if cursor.fetchone():
continue
cursor.execute("""
INSERT INTO messages (chat_id, chat_name, chat_username, message_id, content, media_type, media_file_id, msg_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (msg["chat_id"], msg["chat_name"], msg.get("chat_username", ""),
msg["message_id"], msg["content"],
msg.get("media_type", ""), msg.get("media_file_id", ""),
msg.get("msg_date", "")))
inserted += 1
conn.commit()
conn.close()
return inserted
def search_messages(keyword: str, limit: int = 5, offset: int = 0) -> list:
"""根据关键词搜索消息,按消息发送时间倒序"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM messages
WHERE content LIKE ?
ORDER BY msg_date DESC, created_at DESC
LIMIT ? OFFSET ?
""", (f"%{keyword}%", limit, offset))
results = cursor.fetchall()
conn.close()
return [dict(r) for r in results]
def count_messages(keyword: str) -> int:
"""统计匹配关键词的消息数量"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*) FROM messages WHERE content LIKE ?
""", (f"%{keyword}%",))
count = cursor.fetchone()[0]
conn.close()
return count
def count_all_messages() -> int:
"""统计所有消息数量"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM messages")
count = cursor.fetchone()[0]
conn.close()
return count
def clear_messages() -> int:
"""清空所有消息记录"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM messages")
conn.commit()
deleted = cursor.rowcount
conn.close()
return deleted
# 监控群组相关
def add_watched_chat(chat_id: str, chat_name: str) -> bool:
"""添加监控群组"""
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute("""
INSERT OR IGNORE INTO watched_chats (chat_id, chat_name)
VALUES (?, ?)
""", (chat_id, chat_name))
conn.commit()
inserted = cursor.rowcount > 0
except:
inserted = False
finally:
conn.close()
return inserted
def remove_watched_chat(chat_id: str) -> bool:
"""移除监控群组"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM watched_chats WHERE chat_id = ?", (chat_id,))
conn.commit()
deleted = cursor.rowcount > 0
conn.close()
return deleted
def is_watched_chat(chat_id: str) -> bool:
"""检查是否是监控的群组"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT 1 FROM watched_chats WHERE chat_id = ?", (chat_id,))
result = cursor.fetchone()
conn.close()
return result is not None
def get_watched_chats() -> list:
"""获取所有监控的群组"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM watched_chats ORDER BY added_at DESC")
results = cursor.fetchall()
conn.close()
return [dict(r) for r in results]
def clear_watched_chats() -> int:
"""清空所有监控群组,返回删除数量"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM watched_chats")
conn.commit()
deleted = cursor.rowcount
conn.close()
return deleted
# ========== 管理员相关 ==========
def set_owner(user_id: str, username: str = ""):
"""设置 owner (最高权限,只能有一个)"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM admins WHERE role = 'owner'")
cursor.execute("""
INSERT OR REPLACE INTO admins (user_id, username, role) VALUES (?, ?, 'owner')
""", (user_id, username))
conn.commit()
conn.close()
def get_owner() -> dict | None:
"""获取 owner"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM admins WHERE role = 'owner' LIMIT 1")
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def add_admin(user_id: str, username: str = "") -> bool:
"""添加管理员"""
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute("""
INSERT OR IGNORE INTO admins (user_id, username, role) VALUES (?, ?, 'admin')
""", (user_id, username))
conn.commit()
inserted = cursor.rowcount > 0
except:
inserted = False
finally:
conn.close()
return inserted
def remove_admin(user_id: str) -> bool:
"""移除管理员 (不能移除 owner)"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM admins WHERE user_id = ? AND role = 'admin'", (user_id,))
conn.commit()
deleted = cursor.rowcount > 0
conn.close()
return deleted
def get_admins() -> list:
"""获取所有管理员 (含 owner)"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM admins ORDER BY role DESC, added_at ASC")
results = cursor.fetchall()
conn.close()
return [dict(r) for r in results]
def is_admin_user(user_id: str) -> bool:
"""检查是否是管理员或 owner"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,))
result = cursor.fetchone()
conn.close()
return result is not None
def is_owner_user(user_id: str) -> bool:
"""检查是否是 owner"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT 1 FROM admins WHERE user_id = ? AND role = 'owner'", (user_id,))
result = cursor.fetchone()
conn.close()
return result is not None
# ========== 多账号管理 ==========
def add_account(session_name: str, session_string: str = "", session_type: str = "string",
phone: str = "", nickname: str = "",
proxy_type: str = "", proxy_host: str = "", proxy_port: int = 0,
proxy_username: str = "", proxy_password: str = "",
max_chats: int = 400) -> int:
"""添加协议号/账号返回账号ID"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO accounts (session_name, session_string, session_type, phone, nickname,
proxy_type, proxy_host, proxy_port, proxy_username, proxy_password,
is_active, max_chats)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, ?)
""", (session_name, session_string, session_type, phone, nickname,
proxy_type, proxy_host, proxy_port, proxy_username, proxy_password, max_chats))
conn.commit()
account_id = cursor.lastrowid
conn.close()
return account_id
def remove_account(account_id: int) -> bool:
"""删除账号"""
conn = get_connection()
cursor = conn.cursor()
# 解除该账号关联的群
cursor.execute("UPDATE watched_chats SET account_id = NULL WHERE account_id = ?", (account_id,))
cursor.execute("DELETE FROM accounts WHERE id = ?", (account_id,))
conn.commit()
deleted = cursor.rowcount > 0
conn.close()
return deleted
def get_accounts() -> list:
"""获取所有账号"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM accounts ORDER BY created_at ASC")
results = cursor.fetchall()
conn.close()
return [dict(r) for r in results]
def get_active_accounts() -> list:
"""获取所有活跃账号"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM accounts WHERE is_active = 1 ORDER BY created_at ASC")
results = cursor.fetchall()
conn.close()
return [dict(r) for r in results]
def get_account(account_id: int) -> dict | None:
"""获取单个账号"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM accounts WHERE id = ?", (account_id,))
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def update_account(account_id: int, **kwargs) -> bool:
"""更新账号信息"""
if not kwargs:
return False
conn = get_connection()
cursor = conn.cursor()
sets = ", ".join(f"{k} = ?" for k in kwargs.keys())
values = list(kwargs.values()) + [account_id]
cursor.execute(f"UPDATE accounts SET {sets} WHERE id = ?", values)
conn.commit()
updated = cursor.rowcount > 0
conn.close()
return updated
def set_account_active(account_id: int, is_active: bool) -> bool:
"""启用/禁用账号"""
return update_account(account_id, is_active=1 if is_active else 0)
def set_account_proxy(account_id: int, proxy_type: str, proxy_host: str, proxy_port: int,
proxy_username: str = "", proxy_password: str = "") -> bool:
"""设置账号代理"""
return update_account(account_id,
proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
proxy_username=proxy_username, proxy_password=proxy_password)
def clear_account_proxy(account_id: int) -> bool:
"""清除账号代理"""
return update_account(account_id,
proxy_type="", proxy_host="", proxy_port=0,
proxy_username="", proxy_password="")
def count_account_chats(account_id: int) -> int:
"""统计某个账号监控的群数"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM watched_chats WHERE account_id = ?", (account_id,))
count = cursor.fetchone()[0]
conn.close()
return count
def get_least_loaded_account() -> dict | None:
"""获取监控群最少的活跃账号(用于自动分配)"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT a.*, COUNT(w.id) as chat_count
FROM accounts a
LEFT JOIN watched_chats w ON w.account_id = a.id
WHERE a.is_active = 1
GROUP BY a.id
HAVING chat_count < a.max_chats
ORDER BY chat_count ASC
LIMIT 1
""")
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def assign_chat_to_account(chat_id: str, account_id: int) -> bool:
"""将群分配给某个账号"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("UPDATE watched_chats SET account_id = ? WHERE chat_id = ?", (account_id, chat_id))
conn.commit()
updated = cursor.rowcount > 0
conn.close()
return updated
def get_account_chats(account_id: int) -> list:
"""获取某个账号监控的所有群"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM watched_chats WHERE account_id = ? ORDER BY added_at DESC", (account_id,))
results = cursor.fetchall()
conn.close()
return [dict(r) for r in results]
def add_watched_chat_with_account(chat_id: str, chat_name: str, account_id: int = None) -> bool:
"""添加监控群组,可指定账号"""
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute("""
INSERT OR IGNORE INTO watched_chats (chat_id, chat_name, account_id)
VALUES (?, ?, ?)
""", (chat_id, chat_name, account_id))
conn.commit()
inserted = cursor.rowcount > 0
except:
inserted = False
finally:
conn.close()
return inserted
# ========== 代理池管理 ==========
def add_proxy(proxy_type: str, host: str, port: int,
username: str = "", password: str = "") -> int:
"""添加代理到代理池返回代理ID"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO proxy_pool (proxy_type, host, port, username, password, is_active)
VALUES (?, ?, ?, ?, ?, 1)
""", (proxy_type, host, port, username, password))
conn.commit()
proxy_id = cursor.lastrowid
conn.close()
return proxy_id
def batch_add_proxies(proxies: list) -> int:
"""批量添加代理,返回添加数量
proxies: [{"type": "socks5", "host": "x.x.x.x", "port": 1080, "username": "", "password": ""}, ...]
"""
if not proxies:
return 0
conn = get_connection()
cursor = conn.cursor()
count = 0
for p in proxies:
cursor.execute("""
INSERT INTO proxy_pool (proxy_type, host, port, username, password, is_active)
VALUES (?, ?, ?, ?, ?, 1)
""", (p.get("type", "socks5"), p["host"], p["port"],
p.get("username", ""), p.get("password", "")))
count += 1
conn.commit()
conn.close()
return count
def remove_proxy(proxy_id: int) -> bool:
"""删除代理"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM proxy_pool WHERE id = ?", (proxy_id,))
conn.commit()
deleted = cursor.rowcount > 0
conn.close()
return deleted
def get_proxies(active_only: bool = False) -> list:
"""获取代理池"""
conn = get_connection()
cursor = conn.cursor()
if active_only:
cursor.execute("SELECT * FROM proxy_pool WHERE is_active = 1 ORDER BY last_used_at ASC, fail_count ASC")
else:
cursor.execute("SELECT * FROM proxy_pool ORDER BY created_at ASC")
results = cursor.fetchall()
conn.close()
return [dict(r) for r in results]
def get_available_proxy(exclude_account_id: int = None) -> dict | None:
"""获取一个可用代理(未分配或最久未使用的),用于轮换"""
conn = get_connection()
cursor = conn.cursor()
if exclude_account_id:
cursor.execute("""
SELECT * FROM proxy_pool
WHERE is_active = 1 AND (assigned_account_id IS NULL OR assigned_account_id != ?)
ORDER BY last_used_at ASC NULLS FIRST, fail_count ASC
LIMIT 1
""", (exclude_account_id,))
else:
cursor.execute("""
SELECT * FROM proxy_pool
WHERE is_active = 1 AND assigned_account_id IS NULL
ORDER BY last_used_at ASC NULLS FIRST, fail_count ASC
LIMIT 1
""")
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def assign_proxy_to_account(proxy_id: int, account_id: int) -> bool:
"""分配代理给账号"""
conn = get_connection()
cursor = conn.cursor()
# 先释放该账号之前的代理
cursor.execute("UPDATE proxy_pool SET assigned_account_id = NULL WHERE assigned_account_id = ?", (account_id,))
# 分配新代理
cursor.execute("""
UPDATE proxy_pool SET assigned_account_id = ?, last_used_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (account_id, proxy_id))
conn.commit()
updated = cursor.rowcount > 0
conn.close()
return updated
def release_proxy(account_id: int) -> bool:
"""释放账号的代理"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("UPDATE proxy_pool SET assigned_account_id = NULL WHERE assigned_account_id = ?", (account_id,))
conn.commit()
updated = cursor.rowcount > 0
conn.close()
return updated
def mark_proxy_failed(proxy_id: int) -> bool:
"""标记代理失败fail_count + 1超过3次自动禁用"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE proxy_pool SET fail_count = fail_count + 1,
is_active = CASE WHEN fail_count + 1 >= 3 THEN 0 ELSE 1 END
WHERE id = ?
""", (proxy_id,))
conn.commit()
updated = cursor.rowcount > 0
conn.close()
return updated
def reset_proxy_fails(proxy_id: int) -> bool:
"""重置代理失败计数"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("UPDATE proxy_pool SET fail_count = 0, is_active = 1 WHERE id = ?", (proxy_id,))
conn.commit()
updated = cursor.rowcount > 0
conn.close()
return updated
def clear_proxy_pool() -> int:
"""清空代理池"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM proxy_pool")
conn.commit()
deleted = cursor.rowcount
conn.close()
return deleted
# ========== 汇总群管理 ==========
def add_summary_group(group_id: str, group_name: str, keywords: str) -> int:
"""添加/更新汇总群绑定追加关键词set去重返回记录ID
keywords: 逗号分隔的关键词字符串
"""
# 统一处理关键词小写、strip、去重
new_kws = {k.strip().lower() for k in keywords.split(",") if k.strip()}
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT id, keywords FROM summary_groups WHERE group_id = ?", (group_id,))
row = cursor.fetchone()
if row:
# 已存在,追加关键词(旧关键词也统一小写)
existing_kws = {k.strip().lower() for k in row["keywords"].split(",") if k.strip()}
merged = existing_kws | new_kws
cursor.execute(
"UPDATE summary_groups SET keywords = ?, group_name = ? WHERE group_id = ?",
(",".join(sorted(merged)), group_name, group_id)
)
conn.commit()
rid = row["id"]
else:
# 新建
cursor.execute(
"INSERT INTO summary_groups (group_id, group_name, keywords) VALUES (?, ?, ?)",
(group_id, group_name, ",".join(sorted(new_kws)))
)
conn.commit()
rid = cursor.lastrowid
conn.close()
return rid
def remove_summary_group_keywords(group_id: str, keywords: str) -> bool:
"""移除汇总群的指定关键词,删光则删除整条记录
keywords: 逗号分隔的待移除关键词
返回: 是否执行了修改
"""
remove_kws = {k.strip().lower() for k in keywords.split(",") if k.strip()}
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT keywords FROM summary_groups WHERE group_id = ?", (group_id,))
row = cursor.fetchone()
if not row:
conn.close()
return False
existing_kws = {k.strip() for k in row["keywords"].split(",") if k.strip()}
remaining = existing_kws - remove_kws
if remaining:
cursor.execute(
"UPDATE summary_groups SET keywords = ? WHERE group_id = ?",
(",".join(sorted(remaining)), group_id)
)
else:
# 关键词删光,删除整条记录
cursor.execute("DELETE FROM summary_groups WHERE group_id = ?", (group_id,))
conn.commit()
conn.close()
return True
def remove_summary_group(group_id: str) -> bool:
"""删除整个汇总群绑定"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM summary_groups WHERE group_id = ?", (group_id,))
conn.commit()
deleted = cursor.rowcount > 0
conn.close()
return deleted
def get_summary_groups() -> list:
"""获取所有汇总群"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM summary_groups ORDER BY created_at ASC")
results = cursor.fetchall()
conn.close()
return [dict(r) for r in results]
def get_summary_group(group_id: str) -> dict | None:
"""获取单个汇总群"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM summary_groups WHERE group_id = ?", (group_id,))
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def is_summary_group(group_id: str) -> bool:
"""快速判断是否是汇总群"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT 1 FROM summary_groups WHERE group_id = ?", (group_id,))
result = cursor.fetchone()
conn.close()
return result is not None
def search_messages_asc(keyword: str, limit: int = 20, offset: int = 0) -> list:
"""根据关键词搜索消息,按消息发送时间正序(群内搜索用)"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM messages
WHERE content LIKE ?
ORDER BY msg_date ASC, created_at ASC
LIMIT ? OFFSET ?
""", (f"%{keyword}%", limit, offset))
results = cursor.fetchall()
conn.close()
return [dict(r) for r in results]