tg-channel-bot/bot.py

1551 lines
60 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import sys
import time
import asyncio
import random
import logging
from pathlib import Path
from telethon import TelegramClient, events, errors
from telethon.sessions import StringSession
from telethon.tl.functions.channels import JoinChannelRequest
from telethon.tl.functions.messages import ImportChatInviteRequest, CheckChatInviteRequest
import config
from database import (
init_db, save_message, search_messages, count_messages,
remove_watched_chat, is_watched_chat, get_watched_chats,
clear_watched_chats, set_owner, get_owner, add_admin, remove_admin,
get_admins, is_admin_user, is_owner_user, count_all_messages, clear_messages,
batch_save_messages,
# 多账号
add_account, remove_account, get_accounts, get_active_accounts, get_account,
update_account, set_account_active, set_account_proxy, clear_account_proxy,
count_account_chats, get_least_loaded_account, assign_chat_to_account,
get_account_chats, add_watched_chat_with_account,
# 代理池
add_proxy, batch_add_proxies, remove_proxy, get_proxies, get_available_proxy,
assign_proxy_to_account, release_proxy, mark_proxy_failed, reset_proxy_fails,
clear_proxy_pool,
# 汇总群
add_summary_group, remove_summary_group_keywords, remove_summary_group,
get_summary_groups, get_summary_group, is_summary_group, search_messages_asc,
)
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)
SESSION_DIR = Path(__file__).parent
SESSION_NAME = "userbot"
SESSION_FILE = SESSION_DIR / f"{SESSION_NAME}.session"
# 全局:所有活跃的 client {account_id: TelegramClient}
active_clients: dict[int, TelegramClient] = {}
# 汇总群关键词缓存 (内存)
_keyword_cache: dict[str, dict] = {} # {group_id: {"name": str, "keywords": set}}
_summary_group_ids: set[str] = set() # 快速判断消息来源是否为汇总群
_forwarded_msg_ids: dict[str, float] = {} # {chat_id:msg_id: timestamp} 防止多client重复转发
def reload_keyword_cache():
"""从DB全量加载汇总群关键词到内存"""
global _keyword_cache, _summary_group_ids
groups = get_summary_groups()
new_cache = {}
new_ids = set()
for g in groups:
gid = g["group_id"]
kws = {k.strip() for k in g["keywords"].split(",") if k.strip()}
new_cache[gid] = {"name": g["group_name"], "keywords": kws}
new_ids.add(gid)
_keyword_cache = new_cache
_summary_group_ids = new_ids
logger.info(f"[缓存] 已加载 {len(new_cache)} 个汇总群, 共 {sum(len(v['keywords']) for v in new_cache.values())} 个关键词")
def match_keywords(text: str) -> list[str]:
"""遍历缓存text中包含任一关键词即命中返回匹配的 group_id 列表"""
if not text or not _keyword_cache:
return []
text_lower = text.lower()
matched = []
for gid, info in _keyword_cache.items():
for kw in info["keywords"]:
if kw in text_lower:
matched.append(gid)
break # 一个群匹配一个关键词即够
return matched
def build_proxy_tuple(proxy_type: str, host: str, port: int,
username: str = "", password: str = ""):
"""构建 telethon 代理参数元组"""
if not proxy_type or not host or not port:
return None
import socks
ptype = socks.SOCKS5 if proxy_type.lower() == "socks5" else socks.HTTP
if username and password:
return (ptype, host, port, True, username, password)
return (ptype, host, port)
def get_client_for_account(account: dict) -> TelegramClient:
"""根据账号信息创建 TelegramClient"""
proxy = build_proxy_tuple(
account.get("proxy_type", ""), account.get("proxy_host", ""),
account.get("proxy_port", 0), account.get("proxy_username", ""),
account.get("proxy_password", ""))
if account.get("session_type") == "file":
session = str(SESSION_DIR / account["session_name"])
else:
session = StringSession(account.get("session_string", ""))
return TelegramClient(session, config.API_ID, config.API_HASH, proxy=proxy)
def parse_invite(text: str):
"""从文本中提取群链接或用户名"""
m = re.search(r"(?:t\.me/\+|t\.me/joinchat/)([a-zA-Z0-9_-]+)", text)
if m:
return ("invite", m.group(1))
m = re.search(r"(?:t\.me/|@)([a-zA-Z]\w{3,})", text)
if m:
return ("username", m.group(1))
text = text.strip()
if re.match(r"^[a-zA-Z]\w{3,}$", text):
return ("username", text)
return None
def extract_media_info(msg):
"""从消息中提取媒体类型和 file_id"""
if msg.photo:
return "photo", msg.photo.id
if msg.document:
# 检查是否是图片类型的文档 (如压缩后的图)
if msg.document.mime_type and msg.document.mime_type.startswith("image/"):
return "photo", msg.document.id
return "", ""
async def join_single_chat(client, parsed, account_id: int = None):
"""用指定 client 加入单个群,返回 (chat_id, chat_name) 或抛异常"""
if parsed[0] == "invite":
result = await client(ImportChatInviteRequest(parsed[1]))
chat = result.chats[0]
else:
result = await client(JoinChannelRequest(parsed[1]))
chat = result.chats[0]
chat_id = str(chat.id)
chat_name = getattr(chat, "title", None) or str(chat_id)
add_watched_chat_with_account(chat_id, chat_name, account_id)
return chat_id, chat_name, chat
async def batch_join_chats(client, links: list, account_id: int = None, notify_event=None):
"""批量加入频道/群,带风控间隔和 FloodWait 处理"""
success, failed = [], []
for i, link in enumerate(links):
parsed = parse_invite(link.strip())
if not parsed:
failed.append((link, "无法识别链接"))
continue
try:
chat_id, chat_name, chat_entity = await join_single_chat(client, parsed, account_id)
success.append(chat_name)
logger.info(f"[批量加群] 成功: {chat_name} ({chat_id})")
# 后台拉历史
asyncio.create_task(
fetch_history(client, chat_entity, chat_id, chat_name, notify_event=None)
)
# 风控间隔
if i < len(links) - 1:
# 每批休息更久
if (i + 1) % config.JOIN_BATCH_SIZE == 0:
rest = random.randint(config.JOIN_BATCH_REST_MIN, config.JOIN_BATCH_REST_MAX)
logger.info(f"[批量加群] 批次休息 {rest}s...")
if notify_event:
await notify_event.reply(f"⏸ 已加 {i+1}/{len(links)} 个,批次休息 {rest}s...")
await asyncio.sleep(rest)
else:
wait = random.randint(config.JOIN_INTERVAL_MIN, config.JOIN_INTERVAL_MAX)
await asyncio.sleep(wait)
except errors.FloodWaitError as e:
wait_time = int(e.seconds * config.FLOOD_WAIT_MULTIPLIER)
logger.warning(f"[批量加群] FloodWait {e.seconds}s等待 {wait_time}s")
if notify_event:
await notify_event.reply(f"⚠️ 触发 FloodWait等待 {wait_time}s 后继续...")
await asyncio.sleep(wait_time)
# 重试一次
try:
chat_id, chat_name, chat_entity = await join_single_chat(client, parsed, account_id)
success.append(chat_name)
asyncio.create_task(
fetch_history(client, chat_entity, chat_id, chat_name, notify_event=None)
)
except Exception as e2:
failed.append((link, str(e2)))
except Exception as e:
failed.append((link, str(e)))
logger.error(f"[批量加群] 失败: {link} - {e}")
return success, failed
async def fetch_history(client, chat_entity, chat_id: str, chat_name: str, limit: int = None, notify_event=None):
"""拉取群组历史消息并存入数据库
Args:
client: TelegramClient 实例
chat_entity: 群组实体 (chat object 或 username)
chat_id: 群组 ID
chat_name: 群组名称
limit: 拉取条数,默认使用 config.HISTORY_LIMIT
notify_event: 如果提供,会通过 reply 通知进度
"""
if limit is None:
limit = config.HISTORY_LIMIT
if notify_event:
await notify_event.reply(f"⏳ 开始拉取「{chat_name}」历史消息 (最近 {limit} 条)...")
logger.info(f"开始拉取历史消息: {chat_name} ({chat_id}), 限制 {limit}")
# 获取群用户名,用于生成链接
chat_username = getattr(chat_entity, "username", None) or ""
batch = []
count = 0
skipped = 0
try:
async for msg in client.iter_messages(chat_entity, limit=limit):
content = ""
if msg.text:
content = re.sub(r"@\w+", "", msg.text).strip()
media_type, media_file_id = extract_media_info(msg)
# 跳过既没文字也没图片的消息
if not content and not media_type:
continue
# 纯图片没文字时,给个标记方便搜索
if not content and media_type:
content = "[图片]"
msg_date = msg.date.strftime("%Y-%m-%d %H:%M") if msg.date else ""
batch.append({
"chat_id": chat_id,
"chat_name": chat_name,
"chat_username": chat_username,
"message_id": msg.id,
"content": content,
"media_type": media_type,
"media_file_id": str(media_file_id) if media_file_id else "",
"msg_date": msg_date,
})
# 每 100 条批量写入一次
if len(batch) >= 100:
inserted = batch_save_messages(batch)
count += inserted
skipped += len(batch) - inserted
batch = []
# 写入剩余的
if batch:
inserted = batch_save_messages(batch)
count += inserted
skipped += len(batch) - inserted
logger.info(f"历史消息拉取完成: {chat_name} - 新增 {count} 条, 跳过 {skipped} 条(已存在)")
if notify_event:
await notify_event.reply(
f"✅ 历史消息拉取完成「{chat_name}\n"
f"📥 新增: {count}\n"
f"⏭ 跳过: {skipped} 条 (已存在)"
)
return count
except Exception as e:
logger.error(f"拉取历史消息失败: {chat_name} - {e}")
if notify_event:
await notify_event.reply(f"❌ 拉取历史消息失败: {e}")
return 0
def register_handlers(client, account_id: int = None):
"""注册消息处理器支持多账号account_id 标识当前 client 所属账号)"""
_me_cache = {}
@client.on(events.NewMessage(func=lambda e: e.is_private))
async def handle_private(event):
if "me" not in _me_cache:
_me_cache["me"] = await client.get_me()
me = _me_cache["me"]
uid = str(event.sender_id)
# 权限检查: owner + admin + 自己
is_me = event.sender_id == me.id
if not is_me and not is_admin_user(uid):
return
text = event.raw_text.strip()
# ===== 管理指令 (所有管理员可用) =====
# /addadmin <用户ID或@用户名>
if text.startswith("/addadmin "):
target = text[10:].strip().lstrip("@")
if not target:
await event.reply("❌ 用法: /addadmin <用户ID或用户名>")
return
if add_admin(target, target):
await event.reply(f"✅ 已添加管理员: {target}")
else:
await event.reply(f" {target} 已经是管理员")
return
# /deladmin <用户ID或@用户名>
if text.startswith("/deladmin "):
target = text[10:].strip().lstrip("@")
if remove_admin(target):
await event.reply(f"✅ 已移除管理员: {target}")
else:
await event.reply(f"❌ 未找到该管理员")
return
# /admins — 查看管理员列表
if text == "/admins":
admins = get_admins()
if not admins:
await event.reply("📋 管理员列表为空")
else:
lines = []
for i, a in enumerate(admins, 1):
role = "👑 Owner" if a["role"] == "owner" else "🔑 Admin"
name = a["username"] or a["user_id"]
lines.append(f"{i}. {role} {name} (ID: {a['user_id']})")
await event.reply("📋 管理员列表:\n" + "\n".join(lines))
return
# ===== 普通指令 (admin + owner 都能用) =====
# /join <链接> [账号ID] — 加入单个群
if text.startswith("/join "):
parts = text[6:].strip().split()
target = parts[0]
# 可选指定账号ID
target_account_id = None
if len(parts) > 1:
try:
target_account_id = int(parts[1])
except ValueError:
pass
parsed = parse_invite(target)
if not parsed:
await event.reply("❌ 无法识别链接,请发送 t.me/xxx 或 @xxx")
return
# 选择用哪个 client 加群
join_client = client
join_account_id = account_id
if target_account_id and target_account_id in active_clients:
join_client = active_clients[target_account_id]
join_account_id = target_account_id
elif not target_account_id and active_clients:
# 自动分配给最少群的号
least = get_least_loaded_account()
if least and least["id"] in active_clients:
join_client = active_clients[least["id"]]
join_account_id = least["id"]
try:
chat_id, chat_name, chat_entity = await join_single_chat(
join_client, parsed, join_account_id)
acct_info = f" (账号#{join_account_id})" if join_account_id else ""
await event.reply(f"✅ 已加入并监控: {chat_name}{acct_info}")
logger.info(f"加入群组: {chat_name} ({chat_id}){acct_info}")
asyncio.create_task(
fetch_history(join_client, chat_entity, chat_id, chat_name, notify_event=event)
)
except errors.FloodWaitError as e:
await event.reply(f"⚠️ FloodWait: 需等待 {e.seconds}s请稍后重试")
except Exception as e:
await event.reply(f"❌ 加入失败: {e}")
return
# /batch_join — 批量加群(一行一个链接)
if text.startswith("/batch_join"):
first_line = text.split("\n")[0]
parts = first_line.split()
target_account_id = None
if len(parts) > 1:
try:
target_account_id = int(parts[1])
except ValueError:
pass
lines = text.split("\n")[1:]
links = [l.strip() for l in lines if l.strip()]
if not links:
await event.reply(
"❌ 用法: /batch_join [账号ID]\n"
"<链接1>\n<链接2>\n<链接3>\n...\n\n"
"每行一个链接,支持 t.me/xxx 和 @xxx")
return
# 选择 client
join_client = client
join_account_id = account_id
if target_account_id and target_account_id in active_clients:
join_client = active_clients[target_account_id]
join_account_id = target_account_id
elif not target_account_id and active_clients:
least = get_least_loaded_account()
if least and least["id"] in active_clients:
join_client = active_clients[least["id"]]
join_account_id = least["id"]
await event.reply(
f"🚀 开始批量加群: {len(links)}\n"
f"📋 账号: #{join_account_id or '默认'}\n"
f"⏱ 间隔: {config.JOIN_INTERVAL_MIN}-{config.JOIN_INTERVAL_MAX}s\n"
f"📦 每批: {config.JOIN_BATCH_SIZE} 个,批间休息 {config.JOIN_BATCH_REST_MIN}-{config.JOIN_BATCH_REST_MAX}s")
asyncio.create_task(_do_batch_join(
join_client, links, join_account_id, event))
return
# /accounts — 查看所有账号
if text == "/accounts":
accounts = get_accounts()
if not accounts:
await event.reply("📋 账号列表为空\n发 /add_account <StringSession> [备注名] 添加")
else:
lines = []
for a in accounts:
status = "🟢" if a["is_active"] and a["id"] in active_clients else "🔴"
chats = count_account_chats(a["id"])
proxy = f" | 🌐 {a['proxy_type']}://{a['proxy_host']}:{a['proxy_port']}" if a.get("proxy_host") else ""
lines.append(
f"{status} #{a['id']} {a['nickname'] or a['session_name']} "
f"| 📡 {chats}/{a['max_chats']}{proxy}")
await event.reply("📋 账号列表:\n" + "\n".join(lines))
return
# /add_account <StringSession> [备注名] — 远程添加协议号
if text.startswith("/add_account "):
parts = text[13:].strip().split(None, 1)
ss = parts[0]
nickname = parts[1] if len(parts) > 1 else f"远程添加_{len(get_accounts())+1}"
aid = add_account(session_name=nickname, session_string=ss, session_type="string", nickname=nickname)
await event.reply(f"✅ 协议号已添加 #{aid} ({nickname})\n⚠️ 需重启生效")
return
# /del_account <账号ID> — 远程删除账号
if text.startswith("/del_account "):
aid = text[13:].strip()
if not aid.isdigit():
await event.reply("❌ 用法: /del_account <账号ID数字>")
return
acct = get_account(int(aid))
if not acct:
await event.reply("❌ 未找到该账号")
return
remove_account(int(aid))
await event.reply(f"✅ 账号 #{aid} ({acct['nickname']}) 已删除\n⚠️ 需重启生效")
return
# /proxies — 查看代理池
if text == "/proxies":
proxies = get_proxies()
if not proxies:
await event.reply("📋 代理池为空")
else:
lines = []
for p in proxies:
status = "🟢" if p["is_active"] else "🔴"
assigned = f" → 账号#{p['assigned_account_id']}" if p.get("assigned_account_id") else " (空闲)"
fails = f" ❌×{p['fail_count']}" if p["fail_count"] > 0 else ""
lines.append(f"{status} #{p['id']} {p['proxy_type']}://{p['host']}:{p['port']}{assigned}{fails}")
await event.reply("📋 代理池:\n" + "\n".join(lines))
return
# /add_proxy <type> <host:port> [user:pass] — 添加代理
if text.startswith("/add_proxy "):
parts = text[11:].strip().split()
if len(parts) < 2:
await event.reply("❌ 用法: /add_proxy <socks5|http> <host:port> [user:pass]")
return
ptype = parts[0]
hp = parts[1].split(":")
if len(hp) != 2:
await event.reply("❌ 格式: host:port")
return
user, pwd = "", ""
if len(parts) >= 3 and ":" in parts[2]:
user, pwd = parts[2].split(":", 1)
pid = add_proxy(ptype, hp[0], int(hp[1]), user, pwd)
await event.reply(f"✅ 代理已添加 #{pid}")
return
# /batch_proxy — 批量添加代理(一行一个)
if text.startswith("/batch_proxy"):
lines = text.split("\n")[1:]
proxies_list = []
for line in lines:
line = line.strip()
if not line:
continue
parts = line.split()
if len(parts) < 2:
continue
ptype = parts[0]
hp = parts[1].split(":")
if len(hp) != 2:
continue
user, pwd = "", ""
if len(parts) >= 3 and ":" in parts[2]:
user, pwd = parts[2].split(":", 1)
proxies_list.append({"type": ptype, "host": hp[0], "port": int(hp[1]),
"username": user, "password": pwd})
if not proxies_list:
await event.reply(
"❌ 用法: /batch_proxy\n"
"socks5 host1:port1\n"
"socks5 host2:port2 user:pass\n"
"http host3:port3")
return
count = batch_add_proxies(proxies_list)
await event.reply(f"✅ 已添加 {count} 个代理")
return
# /del_proxy <ID> — 删除代理
if text.startswith("/del_proxy "):
pid = text[11:].strip()
if not pid.isdigit():
await event.reply("❌ 用法: /del_proxy <代理ID数字>")
return
if remove_proxy(int(pid)):
await event.reply(f"✅ 代理 #{pid} 已删除")
else:
await event.reply("❌ 未找到该代理")
return
# /fetch <chat_id> [数量] — 手动拉取历史消息
if text.startswith("/fetch"):
parts = text.split()
if len(parts) < 2:
await event.reply("❌ 用法: /fetch <群ID> [数量]\n例: /fetch 123456\n例: /fetch 123456 1000")
return
target_cid = parts[1].strip()
fetch_limit = None
if len(parts) >= 3:
try:
fetch_limit = int(parts[2])
except ValueError:
await event.reply("❌ 数量必须是数字")
return
if not is_watched_chat(target_cid):
await event.reply(f"❌ 群 {target_cid} 不在监控列表中\n请先 /join 加入该群")
return
try:
chat_entity = await client.get_entity(int(target_cid))
chat_name = getattr(chat_entity, "title", None) or target_cid
asyncio.create_task(
fetch_history(client, chat_entity, target_cid, chat_name,
limit=fetch_limit, notify_event=event)
)
except Exception as e:
await event.reply(f"❌ 获取群信息失败: {e}")
return
# /unwatch <chat_id>
if text.startswith("/unwatch "):
cid = text[9:].strip()
if remove_watched_chat(cid):
await event.reply(f"✅ 已停止监控: {cid}")
else:
await event.reply("❌ 未找到该群")
return
# /list — 显示监控列表(含账号归属)
if text == "/list":
chats = get_watched_chats()
if not chats:
await event.reply("📋 监控列表为空")
else:
lines = []
for i, c in enumerate(chats, 1):
acct = f" [账号#{c['account_id']}]" if c["account_id"] else ""
lines.append(f"{i}. {c['chat_name']} (ID: {c['chat_id']}){acct}")
await event.reply("📋 监控列表:\n" + "\n".join(lines))
return
# ===== 汇总群管理指令 =====
# /bind <群链接> 关键词1,关键词2,...
if text.startswith("/bind "):
parts = text[6:].strip().split(None, 1)
if len(parts) < 2:
await event.reply("❌ 用法: /bind <群链接> 关键词1,关键词2,...")
return
group_link = parts[0]
keywords_str = parts[1]
parsed = parse_invite(group_link)
if not parsed:
await event.reply("❌ 无法识别群链接,请发送 t.me/xxx 或 @xxx 格式")
return
try:
if parsed[0] == "invite":
result = await client(ImportChatInviteRequest(parsed[1]))
chat = result.chats[0]
else:
try:
chat = await client.get_entity(parsed[1])
except Exception:
result = await client(JoinChannelRequest(parsed[1]))
chat = result.chats[0]
group_id = str(chat.id)
group_name = getattr(chat, "title", None) or group_id
add_summary_group(group_id, group_name, keywords_str)
reload_keyword_cache()
# 格式化显示
kws = [k.strip() for k in keywords_str.split(",") if k.strip()]
await event.reply(
f"✅ 汇总群绑定成功\n"
f"📋 群: {group_name} ({group_id})\n"
f"🔑 关键词: {', '.join(kws)}\n\n"
f"📡 正在后台拉所有账号入群..."
)
# 后台拉所有小号入群
asyncio.create_task(
_join_summary_group_all_accounts(group_link, group_id, group_name, event)
)
except errors.FloodWaitError as e:
await event.reply(f"⚠️ FloodWait: 需等待 {e.seconds}s请稍后重试")
except Exception as e:
await event.reply(f"❌ 绑定失败: {e}")
return
# /delbind <群ID或链接> 关键词1,关键词2
if text.startswith("/delbind "):
parts = text[9:].strip().split(None, 1)
if len(parts) < 2:
await event.reply("❌ 用法: /delbind <群ID或链接> 关键词1,关键词2")
return
target = parts[0]
keywords_str = parts[1]
group_id = await _resolve_group_id(client, target)
if not group_id:
await event.reply("❌ 无法解析群ID请输入纯数字ID或群链接")
return
sg = get_summary_group(group_id)
if not sg:
await event.reply(f"❌ 群 {group_id} 未绑定为汇总群")
return
removed_kws = [k.strip() for k in keywords_str.split(",") if k.strip()]
remove_summary_group_keywords(group_id, keywords_str)
reload_keyword_cache()
# 检查是否还有残留
sg_after = get_summary_group(group_id)
if sg_after:
remaining = sg_after["keywords"]
await event.reply(
f"✅ 已移除关键词: {', '.join(removed_kws)}\n"
f"📋 群: {sg['group_name']} ({group_id})\n"
f"🔑 剩余关键词: {remaining}"
)
else:
await event.reply(
f"✅ 关键词已清空,汇总群绑定已自动删除\n"
f"📋 群: {sg['group_name']} ({group_id})"
)
return
# /unbind <群ID或链接>
if text.startswith("/unbind "):
target = text[8:].strip()
if not target:
await event.reply("❌ 用法: /unbind <群ID或链接>")
return
group_id = await _resolve_group_id(client, target)
if not group_id:
await event.reply("❌ 无法解析群ID请输入纯数字ID或群链接")
return
sg = get_summary_group(group_id)
if not sg:
await event.reply(f"❌ 群 {group_id} 未绑定为汇总群")
return
remove_summary_group(group_id)
reload_keyword_cache()
await event.reply(
f"✅ 已解除汇总群绑定\n"
f"📋 群: {sg['group_name']} ({group_id})\n"
f"🔑 已移除关键词: {sg['keywords']}"
)
return
# /binds — 列出所有汇总群
if text == "/binds":
groups = get_summary_groups()
if not groups:
await event.reply("📋 汇总群列表为空\n发 /bind <群链接> 关键词1,关键词2 添加")
else:
lines = []
for g in groups:
kws = g["keywords"] or "(无)"
lines.append(f"📌 {g['group_name']} (ID: {g['group_id']})\n 🔑 {kws}")
await event.reply(f"📋 汇总群列表 ({len(groups)} 个):\n\n" + "\n\n".join(lines))
return
# /help
if text == "/help":
await event.reply(
"📌 指令列表:\n\n"
"── 群管理 ──\n"
"/join <链接> [账号ID] — 加入并监控\n"
"/batch_join [账号ID]\\n<链接1>\\n<链接2> — 批量加群\n"
"/fetch <群ID> [数量] — 拉取历史消息\n"
"/list — 监控列表\n"
"/unwatch <群ID> — 停止监控\n\n"
"── 汇总群(关键词转发) ──\n"
"/bind <群链接> 关键词1,关键词2 — 绑定汇总群+关键词\n"
"/delbind <群ID或链接> 关键词1,关键词2 — 删除部分关键词\n"
"/unbind <群ID或链接> — 解除整个汇总群绑定\n"
"/binds — 查看所有汇总群\n"
"在汇总群内发 /search <关键词> — 搜索历史消息\n\n"
"── 账号管理 ──\n"
"/accounts — 查看所有账号\n"
"/add_account <StringSession> [备注名] — 添加协议号\n"
"/del_account <账号ID> — 删除账号\n\n"
"── 代理管理 ──\n"
"/proxies — 查看代理池\n"
"/add_proxy <type> <host:port> [user:pass]\n"
"/batch_proxy\\n<type host:port>... — 批量添加\n"
"/del_proxy <ID> — 删除代理\n\n"
"── 管理员 ──\n"
"/addadmin <ID> — 添加管理员\n"
"/deladmin <ID> — 移除管理员\n"
"/admins — 管理员列表\n\n"
"直接发关键词 — 搜索历史消息"
)
return
# 关键词搜索
keyword = text
if keyword.startswith("/"):
return
# 清理搜索内容:去掉 @xxx和存储时保持一致
keyword_clean = re.sub(r"@\w+", "", keyword).strip()
if not keyword_clean:
return
if len(keyword_clean) > 50:
first_line = ""
for line in keyword_clean.split("\n"):
line = line.strip()
if line:
first_line = line
break
search_key = first_line[:50] if first_line else keyword_clean[:50]
else:
search_key = keyword_clean
total = count_messages(search_key)
if total == 0:
await event.reply(f"🔍 未找到相关结果")
return
results = search_messages(search_key, limit=config.PAGE_SIZE)
header = f"🔍 共找到 {total} 条结果"
if total > config.PAGE_SIZE:
header += f",显示最新 {config.PAGE_SIZE}"
await event.reply(header)
for row in results:
content = row["content"]
name = row["chat_name"] or row["chat_id"]
msg_date = row["msg_date"] or ""
chat_username = row["chat_username"] or ""
msg_id = row["message_id"]
cid = row["chat_id"]
media_type = row["media_type"] or ""
if chat_username:
link = f"https://t.me/{chat_username}/{msg_id}"
else:
link = f"https://t.me/c/{cid}/{msg_id}"
if media_type == "photo":
try:
chat_entity = await client.get_entity(int(cid))
orig_msg = await client.get_messages(chat_entity, ids=msg_id)
if orig_msg and orig_msg.photo:
caption = content if content != "[图片]" else ""
await client.send_file(
event.sender_id, orig_msg.photo, caption=caption
)
else:
await event.reply(content + "\n\n⚠️ 原图已被删除")
except Exception as e:
logger.warning(f"拉取原图失败: {e}")
await event.reply(content + "\n\n⚠️ 原图获取失败")
else:
await event.reply(content)
await asyncio.sleep(0.3)
source = f"👥 {name}"
if msg_date:
source += f" | 🕐 {msg_date}"
source += f"\n🔗 {link}"
await event.reply(source)
await asyncio.sleep(0.3)
@client.on(events.NewMessage(incoming=True))
async def handle_group_message(event):
if event.is_private:
return
chat = await event.get_chat()
chat_id = str(chat.id)
# ── 汇总群消息:只处理 /search 命令,不入库 ──
if chat_id in _summary_group_ids:
raw = event.raw_text.strip() if event.raw_text else ""
if raw.startswith("/search "):
search_key = raw[8:].strip()
asyncio.create_task(
_handle_search_in_group(client, event, chat_id, search_key)
)
# 汇总群消息一律不入库、不触发转发
return
# ── 源频道消息处理 ──
if not is_watched_chat(chat_id):
return
content = ""
if event.raw_text:
content = re.sub(r"@\w+", "", event.raw_text).strip()
media_type, media_file_id = extract_media_info(event)
# 跳过既没文字也没图片的消息
if not content and not media_type:
return
if not content and media_type:
content = "[图片]"
chat_name = getattr(chat, "title", None) or chat_id
chat_username = getattr(chat, "username", None) or ""
msg_date = event.date.strftime("%Y-%m-%d %H:%M") if event.date else ""
save_message(
chat_id=chat_id, chat_name=chat_name, message_id=event.id,
content=content, chat_username=chat_username, msg_date=msg_date,
media_type=media_type, media_file_id=str(media_file_id) if media_file_id else ""
)
logger.info(f"[{chat_name}] {content[:50]}{'[有图]' if media_type else ''}...")
# ── 关键词匹配 → 转发到汇总群 ──
matched_groups = match_keywords(content)
if matched_groups:
# 多client去重同一消息只转发一次
dedup_key = f"{chat_id}:{event.id}"
now = time.time()
if dedup_key in _forwarded_msg_ids:
return # 已被另一个client转发过
_forwarded_msg_ids[dedup_key] = now
# 清理超过5分钟的旧记录防止内存泄漏
if len(_forwarded_msg_ids) > 1000:
cutoff = now - 300
expired = [k for k, v in _forwarded_msg_ids.items() if v < cutoff]
for k in expired:
del _forwarded_msg_ids[k]
# 转发使用原始文本(保留@等内容匹配使用清洗后的content
forward_text = event.raw_text.strip() if event.raw_text else content
asyncio.create_task(
_forward_to_summary_groups(client, event, forward_text, media_type, matched_groups)
)
async def _do_batch_join(client, links, account_id, event):
"""批量加群后台任务,完成后汇报"""
success, failed = await batch_join_chats(client, links, account_id, notify_event=event)
report = f"✅ 批量加群完成\n成功: {len(success)}\n失败: {len(failed)}"
if failed:
fail_lines = [f"{l}{r}" for l, r in failed[:10]]
report += "\n\n失败详情:\n" + "\n".join(fail_lines)
if len(failed) > 10:
report += f"\n ...还有 {len(failed)-10}"
await event.reply(report)
# ========== 汇总群辅助函数 ==========
async def _resolve_group_id(client, target: str) -> str | None:
"""解析群ID纯数字直接返回否则解析链接不执行加群操作"""
target = target.strip()
# 纯数字(可能带负号)视为group_id
if re.match(r"^-?\d+$", target):
return target.lstrip("-")
# 尝试解析链接
parsed = parse_invite(target)
if not parsed:
return None
try:
if parsed[0] == "username":
entity = await client.get_entity(parsed[1])
return str(entity.id)
else:
# invite链接不能直接get_entity遍历已绑定的汇总群尝试匹配
# 或者尝试通过 CheckChatInviteRequest 获取信息(不加入)
try:
invite_info = await client(CheckChatInviteRequest(parsed[1]))
# ChatInviteAlready 表示已在群内,可以拿到 chat
if hasattr(invite_info, 'chat') and invite_info.chat:
return str(invite_info.chat.id)
except Exception:
pass
return None
except Exception:
return None
async def _join_summary_group_all_accounts(group_link: str, group_id: str,
group_name: str, notify_event):
"""后台拉所有小号加入汇总群"""
parsed = parse_invite(group_link)
if not parsed:
return
joined, skipped, failed_list = 0, 0, []
for acct_id, c in active_clients.items():
try:
# 检查是否已在群尝试多种ID格式
already_in = False
for eid in [int(group_id), int(f"-100{group_id}")]:
try:
await c.get_entity(eid)
already_in = True
break
except Exception:
continue
if already_in:
skipped += 1
continue
# 加入群
try:
if parsed[0] == "invite":
await c(ImportChatInviteRequest(parsed[1]))
else:
await c(JoinChannelRequest(parsed[1]))
joined += 1
logger.info(f"[汇总群入群] 账号#{acct_id} 已加入 {group_name}")
except errors.UserAlreadyParticipantError:
skipped += 1
except errors.FloodWaitError as e:
wait_time = int(e.seconds * config.FLOOD_WAIT_MULTIPLIER)
logger.warning(f"[汇总群入群] 账号#{acct_id} FloodWait {e.seconds}s等待 {wait_time}s")
await asyncio.sleep(wait_time)
# 重试一次
try:
if parsed[0] == "invite":
await c(ImportChatInviteRequest(parsed[1]))
else:
await c(JoinChannelRequest(parsed[1]))
joined += 1
except Exception as e2:
failed_list.append(f"#{acct_id}: {e2}")
# 风控间隔
await asyncio.sleep(random.randint(config.JOIN_INTERVAL_MIN, config.JOIN_INTERVAL_MAX))
except Exception as e:
failed_list.append(f"#{acct_id}: {e}")
if notify_event:
report = f"📡 汇总群入群完成「{group_name}\n✅ 新加入: {joined}\n⏭ 已在群: {skipped}"
if failed_list:
report += f"\n❌ 失败: {len(failed_list)}"
for fl in failed_list[:5]:
report += f"\n {fl}"
await notify_event.reply(report)
async def _forward_to_summary_groups(client, event, content: str,
media_type: str, group_ids: list[str]):
"""将消息转发到匹配的汇总群"""
for gid in group_ids:
try:
# 尝试多种ID格式获取群实体
group_entity = None
for eid in [int(gid), int(f"-100{gid}")]:
try:
group_entity = await client.get_entity(eid)
break
except Exception:
continue
if not group_entity:
logger.error(f"[转发] 无法获取汇总群 {gid} 实体")
continue
if media_type == "photo" and event.photo:
# 有图片 → 发送图片+文字
caption = content if content and content != "[图片]" else ""
await client.send_file(group_entity, event.photo, caption=caption)
elif media_type == "photo" and event.document:
# document类型的图片
caption = content if content and content != "[图片]" else ""
await client.send_file(group_entity, event.document, caption=caption)
elif event.video or (event.document and getattr(event.document, 'mime_type', '') and
event.document.mime_type.startswith('video/')):
# 视频 → 跳过视频文件,有文字则只发文字
if content and content != "[图片]":
await client.send_message(group_entity, content)
# 无文字则跳过
else:
# 纯文字
if content:
await client.send_message(group_entity, content)
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"[转发] 转发到汇总群 {gid} 失败: {e}")
async def _handle_search_in_group(client, event, group_id: str, search_key: str):
"""处理汇总群内的 /search 命令"""
uid = str(event.sender_id)
if not is_admin_user(uid):
return
if not search_key:
await event.reply("❌ 用法: /search <关键词>")
return
results = search_messages_asc(search_key, limit=config.PAGE_SIZE)
total = count_messages(search_key)
if not results:
await event.reply(f"🔍 未找到「{search_key}」的相关结果")
return
header = f"🔍 搜索「{search_key}」共 {total} 条结果"
if total > config.PAGE_SIZE:
header += f",显示前 {config.PAGE_SIZE} 条(按时间正序)"
await event.reply(header)
for row in results:
content = row["content"]
name = row["chat_name"] or row["chat_id"]
msg_date = row["msg_date"] or ""
chat_username = row["chat_username"] or ""
msg_id = row["message_id"]
cid = row["chat_id"]
if chat_username:
link = f"https://t.me/{chat_username}/{msg_id}"
else:
link = f"https://t.me/c/{cid}/{msg_id}"
# 合并为一条消息发送
msg_text = content
msg_text += f"\n\n👥 {name}"
if msg_date:
msg_text += f" | 🕐 {msg_date}"
msg_text += f"\n🔗 {link}"
await event.reply(msg_text)
await asyncio.sleep(0.3)
# ========== 交互式菜单 ==========
def show_banner():
print()
print("=" * 45)
print(" Telegram 群消息监控 (Userbot)")
print("=" * 45)
def show_menu(has_session: bool):
accounts = get_accounts()
print()
if has_session or accounts:
print(" 📱 状态: 已有登录记录")
print()
print(" 1. 启动监听")
print(" 2. 查看监控列表")
print(" 3. 清空监控列表")
print(" 4. 管理员设置")
print(" 5. 清空消息记录")
print(" 6. 账号管理")
print(" 7. 代理管理")
print(" 0. 退出")
else:
print(" 📱 状态: 未登录")
print()
print(" 1. 登录账号")
print(" 2. 查看监控列表")
print(" 3. 清空监控列表")
print(" 4. 管理员设置")
print(" 5. 清空消息记录")
print(" 6. 账号管理")
print(" 7. 代理管理")
print(" 0. 退出")
print()
def manage_admins():
"""管理员设置子菜单"""
while True:
admins = get_admins()
print()
print(" === 管理员设置 ===")
if admins:
for i, a in enumerate(admins, 1):
role = "👑 Owner" if a["role"] == "owner" else "🔑 Admin"
name = a["username"] or a["user_id"]
print(f" {i}. {role} {name} (ID: {a['user_id']})")
else:
print(" (空)")
print()
print(" a. 添加管理员")
print(" d. 删除管理员")
print(" b. 返回")
print()
ch = input(" 请选择: ").strip().lower()
if ch == "a":
uid = input(" 输入用户ID或用户名: ").strip().lstrip("@")
if not uid:
continue
# 如果没有 owner第一个添加的自动设为 owner
owner = get_owner()
if not owner:
set_owner(uid, uid)
print(f" 👑 已设为最高管理员(Owner): {uid}")
else:
if add_admin(uid, uid):
print(f" ✅ 已添加管理员: {uid}")
else:
print(f" {uid} 已存在")
elif ch == "d":
uid = input(" 输入要删除的用户ID或用户名: ").strip().lstrip("@")
if not uid:
continue
if is_owner_user(uid):
print(" ❌ 不能删除 Owner")
elif remove_admin(uid):
print(f" ✅ 已删除: {uid}")
else:
print(f" ❌ 未找到: {uid}")
elif ch == "b":
break
def manage_accounts():
"""账号管理子菜单"""
while True:
accounts = get_accounts()
print()
print(" === 账号管理 ===")
if accounts:
for a in accounts:
status = "🟢" if a["is_active"] else "🔴"
chats = count_account_chats(a["id"])
proxy = f" | 🌐 {a['proxy_type']}://{a['proxy_host']}:{a['proxy_port']}" if a.get("proxy_host") else ""
print(f" {status} #{a['id']} {a['nickname'] or a['session_name']} "
f"| {chats}/{a['max_chats']}{proxy}")
else:
print(" (空)")
print()
print(" a. 添加协议号 (StringSession)")
print(" b. 添加直登号 (手机号+验证码)")
print(" c. 设置代理 (给某个号)")
print(" d. 启用/禁用账号")
print(" e. 删除账号")
print(" f. 返回")
print()
ch = input(" 请选择: ").strip().lower()
if ch == "a":
name = input(" 账号备注名: ").strip() or f"account_{len(accounts)+1}"
ss = input(" 粘贴 StringSession: ").strip()
if not ss:
print(" ❌ StringSession 不能为空")
continue
aid = add_account(session_name=name, session_string=ss, session_type="string", nickname=name)
print(f" ✅ 协议号已添加 #{aid}")
elif ch == "b":
name = input(" 账号备注名: ").strip() or f"account_{len(accounts)+1}"
session_file = name.replace(" ", "_")
aid = add_account(session_name=session_file, session_type="file", nickname=name)
print(f" ✅ 直登号已添加 #{aid},启动时会要求输入手机号验证码")
elif ch == "c":
if not accounts:
print(" ❌ 没有账号")
continue
if len(accounts) == 1:
acct = accounts[0]
print(f" 当前账号: #{acct['id']} {acct['nickname'] or acct['session_name']}")
else:
print(" 选择账号:")
for idx, a in enumerate(accounts, 1):
print(f" {idx}. #{a['id']} {a['nickname'] or a['session_name']}")
sel = input(" 输入序号: ").strip()
if not sel.isdigit() or int(sel) < 1 or int(sel) > len(accounts):
print(" ❌ 无效序号")
continue
acct = accounts[int(sel) - 1]
ptype = input(" 代理类型 (socks5/http) [默认socks5]: ").strip() or "socks5"
host = input(" 代理地址: ").strip()
port = input(" 代理端口: ").strip()
user = input(" 用户名 (可选): ").strip()
pwd = input(" 密码 (可选): ").strip()
if host and port:
set_account_proxy(acct["id"], ptype, host, int(port), user, pwd)
print(f" ✅ 账号 #{acct['id']} 代理已设置")
else:
print(" ❌ 地址和端口不能为空")
elif ch == "d":
if not accounts:
print(" ❌ 没有账号")
continue
if len(accounts) == 1:
acct = accounts[0]
else:
print(" 选择账号:")
for idx, a in enumerate(accounts, 1):
status = "🟢" if a["is_active"] else "🔴"
print(f" {idx}. {status} #{a['id']} {a['nickname'] or a['session_name']}")
sel = input(" 输入序号: ").strip()
if not sel.isdigit() or int(sel) < 1 or int(sel) > len(accounts):
print(" ❌ 无效序号")
continue
acct = accounts[int(sel) - 1]
new_state = not acct["is_active"]
set_account_active(acct["id"], new_state)
print(f" ✅ 账号 #{acct['id']}{'启用' if new_state else '禁用'}")
elif ch == "e":
if not accounts:
print(" ❌ 没有账号")
continue
if len(accounts) == 1:
acct = accounts[0]
else:
print(" 选择账号:")
for idx, a in enumerate(accounts, 1):
print(f" {idx}. #{a['id']} {a['nickname'] or a['session_name']}")
sel = input(" 输入序号: ").strip()
if not sel.isdigit() or int(sel) < 1 or int(sel) > len(accounts):
print(" ❌ 无效序号")
continue
acct = accounts[int(sel) - 1]
confirm = input(f" ⚠️ 确认删除 #{acct['id']} {acct['nickname']}(y/n): ").strip().lower()
if confirm == "y":
if remove_account(acct["id"]):
print(f" ✅ 账号 #{acct['id']} 已删除")
else:
print(" ❌ 删除失败")
elif ch == "f":
break
def manage_proxies():
"""代理管理子菜单"""
while True:
proxies = get_proxies()
print()
print(" === 代理管理 ===")
if proxies:
for p in proxies:
status = "🟢" if p["is_active"] else "🔴"
assigned = f" → 账号#{p['assigned_account_id']}" if p.get("assigned_account_id") else " (空闲)"
fails = f" ❌×{p['fail_count']}" if p["fail_count"] > 0 else ""
print(f" {status} #{p['id']} {p['proxy_type']}://{p['host']}:{p['port']}{assigned}{fails}")
else:
print(" (空)")
print()
print(f" 代理模式: {config.PROXY_MODE}")
print()
print(" a. 添加代理")
print(" b. 批量添加")
print(" c. 删除代理")
print(" d. 清空代理池")
print(" e. 返回")
print()
ch = input(" 请选择: ").strip().lower()
if ch == "a":
ptype = input(" 类型 (socks5/http): ").strip() or "socks5"
host = input(" 地址: ").strip()
port = input(" 端口: ").strip()
user = input(" 用户名 (可选): ").strip()
pwd = input(" 密码 (可选): ").strip()
if host and port:
pid = add_proxy(ptype, host, int(port), user, pwd)
print(f" ✅ 代理已添加 #{pid}")
else:
print(" ❌ 地址和端口不能为空")
elif ch == "b":
print(" 每行一个,格式: type host:port [user:pass]")
print(" 输入空行结束:")
proxy_list = []
while True:
line = input(" > ").strip()
if not line:
break
parts = line.split()
if len(parts) < 2:
continue
hp = parts[1].split(":")
if len(hp) != 2:
continue
user, pwd = "", ""
if len(parts) >= 3 and ":" in parts[2]:
user, pwd = parts[2].split(":", 1)
proxy_list.append({"type": parts[0], "host": hp[0], "port": int(hp[1]),
"username": user, "password": pwd})
if proxy_list:
n = batch_add_proxies(proxy_list)
print(f" ✅ 已添加 {n} 个代理")
elif ch == "c":
pid = input(" 输入代理ID: ").strip()
if pid and remove_proxy(int(pid)):
print(f" ✅ 代理 #{pid} 已删除")
else:
print(" ❌ 未找到")
elif ch == "d":
confirm = input(" ⚠️ 确认清空代理池?(y/n): ").strip().lower()
if confirm == "y":
n = clear_proxy_pool()
print(f" ✅ 已清空 {n} 个代理")
elif ch == "e":
break
async def do_login():
"""登录流程 — 通过账号管理添加直登号后首次启动时触发"""
accounts = get_active_accounts()
if not accounts:
# 没有任何账号,创建一个直登号
aid = add_account(session_name=SESSION_NAME, session_type="file", nickname="主号")
print(f" 📥 已创建直登账号 #{aid}")
# 直接走 do_run
await do_run()
async def do_run():
"""启动监听 — 支持多账号并行"""
accounts = get_active_accounts()
clients_to_run = []
# 1. 启动数据库中的多账号
for acct in accounts:
try:
c = get_client_for_account(acct)
register_handlers(c, account_id=acct["id"])
await c.start()
me = await c.get_me()
name = me.first_name or ""
if me.last_name:
name += f" {me.last_name}"
active_clients[acct["id"]] = c
clients_to_run.append(c)
update_account(acct["id"], phone=me.phone or "", nickname=name)
print(f" ✅ 账号#{acct['id']} {name} (+{me.phone or '?'}) 已上线")
except Exception as e:
logger.error(f"账号#{acct['id']} 启动失败: {e}")
print(f" ❌ 账号#{acct['id']} {acct['nickname']} 启动失败: {e}")
# 2. 自动导入旧的 userbot.session 到 accounts 表
if not accounts and SESSION_FILE.exists():
# 检查是否已导入过
all_accts = get_accounts()
already = any(a["session_name"] == SESSION_NAME and a["session_type"] == "file" for a in all_accts)
if not already:
aid = add_account(session_name=SESSION_NAME, session_type="file", nickname="主号(自动导入)")
print(f" 📥 已将 userbot.session 自动导入为账号#{aid}")
acct = get_account(aid)
try:
c = get_client_for_account(acct)
register_handlers(c, account_id=aid)
await c.start()
me = await c.get_me()
name = me.first_name or ""
if me.last_name:
name += f" {me.last_name}"
active_clients[aid] = c
clients_to_run.append(c)
update_account(aid, phone=me.phone or "", nickname=name)
print(f" ✅ 账号#{aid} {name} (+{me.phone or '?'}) 已上线")
except Exception as e:
logger.error(f"自动导入账号启动失败: {e}")
print(f" ❌ 自动导入账号启动失败: {e}")
if not clients_to_run:
print(" ❌ 没有可用账号,请先登录或添加协议号")
return
# 确保 owner
me = await clients_to_run[0].get_me()
owner = get_owner()
if not owner:
set_owner(str(me.id), me.username or me.first_name or "")
# 加载汇总群关键词缓存
reload_keyword_cache()
print()
print(f"🚀 共 {len(clients_to_run)} 个账号在线")
print()
print("📌 在 Telegram 私聊任一账号发指令控制:")
print(" /join <链接> [账号ID] — 加入并监控")
print(" /batch_join [账号ID] — 批量加群")
print(" /accounts — 查看账号")
print(" /proxies — 查看代理池")
print(" /help — 查看帮助")
print()
print("🔇 静默监听中... (Ctrl+C 停止)")
print()
# 所有 client 并行运行
await asyncio.gather(*(c.run_until_disconnected() for c in clients_to_run))
def main():
init_db()
# 命令行参数模式python bot.py run 直接启动(服务器部署用)
if len(sys.argv) > 1 and sys.argv[1] == "run":
try:
asyncio.run(do_run())
except KeyboardInterrupt:
print("\n\n👋 已停止监听")
return
# 交互式菜单模式(本地调试用)
show_banner()
while True:
has_session = SESSION_FILE.exists()
show_menu(has_session)
choice = input(" 请选择: ").strip()
if choice == "1":
accounts = get_active_accounts()
if has_session or accounts:
try:
asyncio.run(do_run())
except KeyboardInterrupt:
print("\n\n👋 已停止监听")
else:
asyncio.run(do_login())
continue
elif choice == "2":
chats = get_watched_chats()
if not chats:
print("\n 📋 监控列表为空")
else:
print(f"\n 📋 监控列表 (共 {len(chats)} 个):")
for i, c in enumerate(chats, 1):
acct = f" [账号#{c['account_id']}]" if c["account_id"] else ""
print(f" {i}. {c['chat_name']} (ID: {c['chat_id']}){acct}")
continue
elif choice == "3":
chats = get_watched_chats()
if not chats:
print("\n 📋 监控列表已经是空的")
else:
print(f"\n 当前有 {len(chats)} 个监控群:")
for i, c in enumerate(chats, 1):
print(f" {i}. {c['chat_name']}")
confirm = input("\n ⚠️ 确认清空所有监控?(y/n): ").strip().lower()
if confirm == "y":
n = clear_watched_chats()
print(f" ✅ 已清空 {n} 个监控群")
continue
elif choice == "4":
manage_admins()
continue
elif choice == "5":
total = count_all_messages()
if total == 0:
print("\n 📋 消息记录已经是空的")
else:
print(f"\n 当前共有 {total} 条消息记录")
confirm = input(" ⚠️ 确认清空所有消息?(y/n): ").strip().lower()
if confirm == "y":
n = clear_messages()
print(f" ✅ 已清空 {n} 条消息")
continue
elif choice == "6":
manage_accounts()
continue
elif choice == "7":
manage_proxies()
continue
elif choice == "0":
print("\n👋 再见!")
sys.exit(0)
else:
print(" ❌ 无效选项,请重新选择")
if __name__ == "__main__":
main()