diff --git a/.DS_Store b/.DS_Store index 084ac98..40b5bee 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/app/.DS_Store b/app/.DS_Store index 04495c0..79b513f 100644 Binary files a/app/.DS_Store and b/app/.DS_Store differ diff --git a/app/listener/.DS_Store b/app/listener/.DS_Store index cd015b8..b533d60 100644 Binary files a/app/listener/.DS_Store and b/app/listener/.DS_Store differ diff --git a/app/listener/chat/ChatAgentOffline.php b/app/listener/chat/ChatAgentOffline.php new file mode 100644 index 0000000..fd6c860 --- /dev/null +++ b/app/listener/chat/ChatAgentOffline.php @@ -0,0 +1,17 @@ +handleAgentConnect($ws, $token, $data); + $this->handleAgentConnect($ws, $token, $event); } else { $this->handleUserConnect($ws, $token, $source); } @@ -49,7 +49,7 @@ class ChatConnect /** * 处理用户连接 */ - private function handleUserConnect(WebSocket $ws, string $token, int $source): void + private function handleUserConnect($ws, $token, $source) { $fd = $ws->getSender(); @@ -65,7 +65,7 @@ class ChatConnect $count = Db::name('user')->where('login_token', $token)->count(); echo "[Chat] 数据库中login_token={$token}的记录数: {$count}\n"; - $ws->emit('chat.connected', [ + $ws->emit('chat_connected', [ 'success' => false, 'error' => 'invalid_token' ]); @@ -105,17 +105,17 @@ class ChatConnect ]; } - $ws->emit('chat.connected', [ + $ws->emit('chat_connected', [ 'success' => true, 'sessionId' => $session['id'], 'status' => $session['status'], 'agentInfo' => $agentInfo, ]); - echo "[Chat] 发送chat.connected响应成功\n"; + echo "[Chat] 发送chat_connected响应成功\n"; // 如果会话待分配,通知用户 if ($session['status'] === ChatSession::STATUS_PENDING) { - $ws->emit('chat.offline_notice', [ + $ws->emit('chat_offline_notice', [ 'message' => '当前客服繁忙,请稍候...' ]); } @@ -124,14 +124,14 @@ class ChatConnect /** * 处理客服连接 */ - private function handleAgentConnect(WebSocket $ws, string $token, array $data): void + private function handleAgentConnect($ws, $token, $event) { $fd = $ws->getSender(); // 验证客服Token (复用Admin Session) $admin = $this->verifyAdminToken($token); if (!$admin) { - $ws->emit('chat.connected', [ + $ws->emit('chat_connected', [ 'success' => false, 'error' => 'invalid_token' ]); @@ -139,7 +139,7 @@ class ChatConnect } $adminId = (int)$admin['id']; - $maxSessions = (int)($data['maxSessions'] ?? 10); + $maxSessions = (int)($event['maxSessions'] ?? 10); // 注册连接 $this->sessionService->registerAgentConnection($adminId, $fd); @@ -166,7 +166,7 @@ class ChatConnect // 获取当前会话列表 $sessions = $this->sessionService->getAgentSessions($adminId); - $ws->emit('chat.connected', [ + $ws->emit('chat_connected', [ 'success' => true, 'sessions' => $sessions, 'processedFromQueue' => count($processed), @@ -181,7 +181,7 @@ class ChatConnect /** * 验证用户Token */ - private function verifyUserToken(string $token): ?array + private function verifyUserToken($token) { if (empty($token)) { echo "[Chat] Token为空\n"; @@ -211,7 +211,7 @@ class ChatConnect /** * 验证客服Token */ - private function verifyAdminToken(string $token): ?array + private function verifyAdminToken($token) { if (empty($token)) { return null; @@ -227,7 +227,7 @@ class ChatConnect /** * 通知用户会话已分配 */ - private function notifyUserAssigned(int $userId, int $sessionId, int $adminId): void + private function notifyUserAssigned($userId, $sessionId, $adminId) { $ws = app('\think\swoole\WebSocket'); $admin = Db::name('admin')->where('id', $adminId)->find(); diff --git a/app/listener/chat/ChatConnectListener.php b/app/listener/chat/ChatConnectListener.php index 45d5d33..4ad5f95 100644 --- a/app/listener/chat/ChatConnectListener.php +++ b/app/listener/chat/ChatConnectListener.php @@ -1,5 +1,5 @@ handleAgentConnect($ws, $token, $data); + $this->handleAgentConnect($ws, $token, $event); } else { $this->handleUserConnect($ws, $token, $source); } @@ -55,7 +55,7 @@ class ChatConnectListener /** * 处理用户连接 */ - private function handleUserConnect(WebSocket $ws, string $token, int $source): void + private function handleUserConnect($ws, $token, $source) { $fd = $ws->getSender(); @@ -130,7 +130,7 @@ class ChatConnectListener /** * 处理客服连接 */ - private function handleAgentConnect(WebSocket $ws, string $token, array $data): void + private function handleAgentConnect($ws, $token, $event) { $fd = $ws->getSender(); @@ -145,7 +145,7 @@ class ChatConnectListener } $adminId = (int)$admin['id']; - $maxSessions = (int)($data['maxSessions'] ?? 10); + $maxSessions = (int)($event['maxSessions'] ?? 10); // 注册连接 $this->sessionService->registerAgentConnection($adminId, $fd); @@ -187,7 +187,7 @@ class ChatConnectListener /** * 处理心跳 */ - public function onPing(WebSocket $ws, array $data): void + public function onPing($ws, $event) { $fd = $ws->getSender(); $tableFd = app('swoole.table.fd'); @@ -210,7 +210,7 @@ class ChatConnectListener /** * 处理断开连接 */ - public function onClose(WebSocket $ws): void + public function onClose($ws) { $fd = $ws->getSender(); $tableFd = app('swoole.table.fd'); @@ -281,7 +281,7 @@ class ChatConnectListener /** * 通知用户会话已分配 */ - private function notifyUserAssigned(int $userId, int $sessionId, int $adminId): void + private function notifyUserAssigned($userId, $sessionId, $adminId) { $ws = app('\think\swoole\WebSocket'); $admin = Db::name('admin')->where('id', $adminId)->find(); diff --git a/app/listener/chat/ChatMessageAck.php b/app/listener/chat/ChatMessageAck.php index 446aa25..39a9dad 100644 --- a/app/listener/chat/ChatMessageAck.php +++ b/app/listener/chat/ChatMessageAck.php @@ -1,5 +1,5 @@ messageService = new MessageService(); } - public function handle(array $data, WebSocket $ws): void + public function handle(array $event, WebSocket $ws) { - $this->messageService->acknowledgeMessage($ws, $data); + $this->messageService->acknowledgeMessage($ws, $event); } } diff --git a/app/listener/chat/ChatMessageListener.php b/app/listener/chat/ChatMessageListener.php index b49d8b9..10669c3 100644 --- a/app/listener/chat/ChatMessageListener.php +++ b/app/listener/chat/ChatMessageListener.php @@ -1,5 +1,5 @@ getSender(); $tableFd = app('swoole.table.fd'); @@ -45,10 +45,10 @@ class ChatMessageListener return; } - $sessionId = (int)($data['sessionId'] ?? 0); - $msgType = (int)($data['msgType'] ?? ChatMessage::MSG_TYPE_TEXT); - $content = $data['content'] ?? ''; - $clientMsgId = $data['clientMsgId'] ?? null; + $sessionId = (int)($event['sessionId'] ?? 0); + $msgType = (int)($event['msgType'] ?? ChatMessage::MSG_TYPE_TEXT); + $content = $event['content'] ?? ''; + $clientMsgId = $event['clientMsgId'] ?? null; // 验证会话 $session = ChatSession::find($sessionId); @@ -121,10 +121,10 @@ class ChatMessageListener /** * 处理消息确认 (已读回执) */ - public function onMessageAck(WebSocket $ws, array $data): void + public function onMessageAck($ws, $event) { - $msgId = $data['msgId'] ?? null; - $msgIds = $data['msgIds'] ?? []; + $msgId = $event['msgId'] ?? null; + $msgIds = $event['msgIds'] ?? []; if ($msgId) { $msgIds[] = $msgId; @@ -183,7 +183,7 @@ class ChatMessageListener /** * 处理正在输入状态 */ - public function onTyping(WebSocket $ws, array $data): void + public function onTyping($ws, $event) { $fd = $ws->getSender(); $tableFd = app('swoole.table.fd'); @@ -193,8 +193,8 @@ class ChatMessageListener return; } - $sessionId = (int)($data['sessionId'] ?? 0); - $isTyping = (bool)($data['isTyping'] ?? false); + $sessionId = (int)($event['sessionId'] ?? 0); + $isTyping = (bool)($event['isTyping'] ?? false); $session = ChatSession::find($sessionId); if (!$session || $session['status'] !== ChatSession::STATUS_ACTIVE) { @@ -224,7 +224,7 @@ class ChatMessageListener /** * 推送消息给对端 */ - private function pushToPeer(WebSocket $ws, array $session, array $message, int $senderType): void + private function pushToPeer($ws, $session, $message, $senderType) { $payload = [ 'event' => 'chat.message.new', diff --git a/app/listener/chat/ChatMessageSend.php b/app/listener/chat/ChatMessageSend.php index b2b342e..f71e8a8 100644 --- a/app/listener/chat/ChatMessageSend.php +++ b/app/listener/chat/ChatMessageSend.php @@ -1,5 +1,4 @@ messageService = new MessageService(); } - public function handle(array $data, WebSocket $ws): void + public function handle(array $event, WebSocket $ws) { - $this->messageService->sendMessage($ws, $data); + $this->messageService->sendMessage($ws, $event); } } diff --git a/app/listener/chat/ChatPing.php b/app/listener/chat/ChatPing.php index b2c2262..3f17b56 100644 --- a/app/listener/chat/ChatPing.php +++ b/app/listener/chat/ChatPing.php @@ -1,5 +1,5 @@ getSender(); $tableFd = app('swoole.table.fd'); @@ -38,6 +38,6 @@ class ChatPing $this->sessionService->refreshAgentConnection($fdInfo['user_id']); } - $ws->emit('chat.pong', []); + $ws->emit('chat_pong', []); } } diff --git a/app/listener/chat/ChatQueueList.php b/app/listener/chat/ChatQueueList.php new file mode 100644 index 0000000..6329099 --- /dev/null +++ b/app/listener/chat/ChatQueueList.php @@ -0,0 +1,17 @@ +sessionService = new SessionService(); + } + + public function handle(array $event, WebSocket $ws) + { + $sessionId = $event['sessionId'] ?? 0; + + if (!$sessionId) { + $ws->emit('chat_error', ['error' => 'session_id_required']); + return; + } + + // 获取操作者ID(从fd映射表获取) + $fd = $ws->getSender(); + $tableFd = app('swoole.table.fd'); + $fdInfo = $tableFd->get((string)$fd); + $operatorId = $fdInfo['user_id'] ?? 0; + + $result = $this->sessionService->endSession($sessionId, $operatorId); + + if ($result) { + $ws->emit('chat_session_ended', [ + 'success' => true, + 'sessionId' => $sessionId + ]); + } else { + $ws->emit('chat_error', ['error' => 'end_session_failed']); + } + } +} diff --git a/app/listener/chat/ChatSessionListener.php b/app/listener/chat/ChatSessionListener.php index b99d402..d58664a 100644 --- a/app/listener/chat/ChatSessionListener.php +++ b/app/listener/chat/ChatSessionListener.php @@ -1,5 +1,5 @@ getSender(); $tableFd = app('swoole.table.fd'); @@ -43,7 +43,7 @@ class ChatSessionListener return; } - $sessionId = (int)($data['sessionId'] ?? 0); + $sessionId = (int)($event['sessionId'] ?? 0); $session = ChatSession::find($sessionId); if (!$session) { @@ -99,7 +99,7 @@ class ChatSessionListener /** * 处理会话评价 */ - public function onSessionRate(WebSocket $ws, array $data): void + public function onSessionRate($ws, $event) { $fd = $ws->getSender(); $tableFd = app('swoole.table.fd'); @@ -110,9 +110,9 @@ class ChatSessionListener return; } - $sessionId = (int)($data['sessionId'] ?? 0); - $rating = (int)($data['rating'] ?? 0); - $content = $data['content'] ?? null; + $sessionId = (int)($event['sessionId'] ?? 0); + $rating = (int)($event['rating'] ?? 0); + $content = $event['content'] ?? null; $session = ChatSession::find($sessionId); if (!$session || $session['user_id'] != $fdInfo['user_id']) { @@ -134,7 +134,7 @@ class ChatSessionListener /** * 处理客服上线 */ - public function onAgentOnline(WebSocket $ws, array $data): void + public function onAgentOnline($ws, $event) { $fd = $ws->getSender(); $tableFd = app('swoole.table.fd'); @@ -145,7 +145,7 @@ class ChatSessionListener } $adminId = $fdInfo['user_id']; - $maxSessions = (int)($data['maxSessions'] ?? 10); + $maxSessions = (int)($event['maxSessions'] ?? 10); // 设置在线状态 $this->assignService->setAgentOnline($adminId); @@ -170,7 +170,7 @@ class ChatSessionListener /** * 处理客服下线 */ - public function onAgentOffline(WebSocket $ws, array $data): void + public function onAgentOffline($ws, $event) { $fd = $ws->getSender(); $tableFd = app('swoole.table.fd'); @@ -204,7 +204,7 @@ class ChatSessionListener /** * 处理会话转接 */ - public function onSessionTransfer(WebSocket $ws, array $data): void + public function onSessionTransfer($ws, $event) { $fd = $ws->getSender(); $tableFd = app('swoole.table.fd'); @@ -214,8 +214,8 @@ class ChatSessionListener return; } - $sessionId = (int)($data['sessionId'] ?? 0); - $targetAdminId = (int)($data['targetAdminId'] ?? 0); + $sessionId = (int)($event['sessionId'] ?? 0); + $targetAdminId = (int)($event['targetAdminId'] ?? 0); $session = ChatSession::find($sessionId); if (!$session || $session['admin_id'] != $fdInfo['user_id']) { @@ -275,7 +275,7 @@ class ChatSessionListener /** * 获取待处理队列列表 (客服端) */ - public function onQueueList(WebSocket $ws, array $data): void + public function onQueueList($ws, $event) { $fd = $ws->getSender(); $tableFd = app('swoole.table.fd'); @@ -295,7 +295,7 @@ class ChatSessionListener /** * 通知用户会话已分配 */ - private function notifyUserAssigned(WebSocket $ws, int $userId, int $sessionId, int $adminId): void + private function notifyUserAssigned($ws, $userId, $sessionId, $adminId) { $admin = Db::name('admin')->where('id', $adminId)->find(); diff --git a/app/listener/chat/ChatSessionRate.php b/app/listener/chat/ChatSessionRate.php new file mode 100644 index 0000000..616dae4 --- /dev/null +++ b/app/listener/chat/ChatSessionRate.php @@ -0,0 +1,17 @@ +find(); return $status ? $status->toArray() : null; @@ -27,7 +27,7 @@ class ChatAdminStatus extends Model /** * 获取或创建客服配置 */ - public static function getOrCreate(int $adminId): array + public static function getOrCreate($adminId) { $status = self::getByAdminId($adminId); if ($status === null) { @@ -47,7 +47,7 @@ class ChatAdminStatus extends Model /** * 更新最后在线时间 */ - public static function updateLastOnlineTime(int $adminId): void + public static function updateLastOnlineTime($adminId) { self::where('admin_id', $adminId)->update([ 'last_online_time' => time(), @@ -58,7 +58,7 @@ class ChatAdminStatus extends Model /** * 获取启用客服功能的管理员ID列表 */ - public static function getEnabledAdminIds(): array + public static function getEnabledAdminIds() { return self::where('is_enabled', 1)->column('admin_id'); } diff --git a/app/models/chat/ChatMessage.php b/app/models/chat/ChatMessage.php index e7e5800..19f528d 100644 --- a/app/models/chat/ChatMessage.php +++ b/app/models/chat/ChatMessage.php @@ -33,7 +33,7 @@ class ChatMessage extends Model /** * 获取会话消息列表 */ - public static function getBySessionId(int $sessionId, int $limit = 50, int $lastId = 0): array + public static function getBySessionId($sessionId, $limit = 50, $lastId = 0) { $query = self::where('session_id', $sessionId); if ($lastId > 0) { @@ -48,7 +48,7 @@ class ChatMessage extends Model /** * 获取会话未读消息 */ - public static function getUnreadBySessionId(int $sessionId, int $senderType, int $limit = 50): array + public static function getUnreadBySessionId($sessionId, $senderType, $limit = 50) { return self::where('session_id', $sessionId) ->where('sender_type', '<>', $senderType) @@ -62,7 +62,7 @@ class ChatMessage extends Model /** * 检查消息ID是否存在(幂等性) */ - public static function existsByMsgId(int $msgId): bool + public static function existsByMsgId($msgId) { return self::where('msg_id', $msgId)->count() > 0; } diff --git a/app/models/chat/ChatQuickReply.php b/app/models/chat/ChatQuickReply.php index 67691eb..5144509 100644 --- a/app/models/chat/ChatQuickReply.php +++ b/app/models/chat/ChatQuickReply.php @@ -18,7 +18,7 @@ class ChatQuickReply extends Model /** * 获取启用的快捷回复列表 */ - public static function getEnabledList(?string $category = null): array + public static function getEnabledList($category = null) { $query = self::where('status', 1); if ($category !== null) { @@ -33,7 +33,7 @@ class ChatQuickReply extends Model /** * 获取分类列表 */ - public static function getCategories(): array + public static function getCategories() { return self::where('status', 1) ->whereNotNull('category') diff --git a/app/models/chat/ChatSession.php b/app/models/chat/ChatSession.php index 86545f8..9c9ade4 100644 --- a/app/models/chat/ChatSession.php +++ b/app/models/chat/ChatSession.php @@ -28,7 +28,7 @@ class ChatSession extends Model /** * 获取用户活跃会话 */ - public static function getActiveByUserId(int $userId): ?array + public static function getActiveByUserId($userId) { $session = self::where('user_id', $userId) ->whereIn('status', [self::STATUS_PENDING, self::STATUS_ACTIVE]) @@ -39,7 +39,7 @@ class ChatSession extends Model /** * 获取客服进行中的会话列表 */ - public static function getActiveByAdminId(int $adminId): array + public static function getActiveByAdminId($adminId) { return self::where('admin_id', $adminId) ->where('status', self::STATUS_ACTIVE) @@ -51,7 +51,7 @@ class ChatSession extends Model /** * 获取待分配会话数量 */ - public static function getPendingCount(): int + public static function getPendingCount() { return self::where('status', self::STATUS_PENDING)->count(); } diff --git a/app/services/.DS_Store b/app/services/.DS_Store new file mode 100644 index 0000000..3c16336 Binary files /dev/null and b/app/services/.DS_Store differ diff --git a/app/services/chat/AssignService.php b/app/services/chat/AssignService.php index 6826d6e..26066eb 100644 --- a/app/services/chat/AssignService.php +++ b/app/services/chat/AssignService.php @@ -31,7 +31,7 @@ class AssignService * @param int $sessionId 会话ID * @return int|null 分配的客服ID,null表示无可用客服 */ - public function assignSession(int $userId, int $sessionId): ?int + public function assignSession($userId, $sessionId): ?int { $redis = $this->getRedis(); $lockKey = self::LOCK_PREFIX . $userId; @@ -85,7 +85,7 @@ class AssignService /** * 获取在线客服列表 */ - public function getOnlineAgents(): array + public function getOnlineAgents() { $redis = $this->getRedis(); $enabledAdmins = ChatAdminStatus::getEnabledAdminIds(); @@ -103,7 +103,7 @@ class AssignService /** * 选择会话数最少的客服 */ - public function selectLeastLoadAgent(array $onlineAgents): ?int + public function selectLeastLoadAgent($onlineAgents): ?int { $redis = $this->getRedis(); $loads = []; @@ -129,7 +129,7 @@ class AssignService /** * 添加到离线队列 (按余额降序) */ - public function addToOfflineQueue(int $userId, int $sessionId): void + public function addToOfflineQueue($userId, $sessionId) { $redis = $this->getRedis(); @@ -151,7 +151,7 @@ class AssignService /** * 客服上线时处理队列 */ - public function processOfflineQueue(int $adminId): array + public function processOfflineQueue($adminId) { $redis = $this->getRedis(); $processed = []; @@ -190,7 +190,7 @@ class AssignService /** * 释放会话 (会话结束时调用) */ - public function releaseSession(int $sessionId, int $adminId): void + public function releaseSession($sessionId, $adminId) { $redis = $this->getRedis(); @@ -207,7 +207,7 @@ class AssignService /** * 设置客服在线状态 */ - public function setAgentOnline(int $adminId): void + public function setAgentOnline($adminId) { $redis = $this->getRedis(); $redis->setex(self::ONLINE_PREFIX . $adminId, 60, '1'); @@ -217,7 +217,7 @@ class AssignService /** * 刷新客服在线状态 (心跳续期) */ - public function refreshAgentOnline(int $adminId): void + public function refreshAgentOnline($adminId) { $redis = $this->getRedis(); $redis->expire(self::ONLINE_PREFIX . $adminId, 60); @@ -226,7 +226,7 @@ class AssignService /** * 设置客服离线 */ - public function setAgentOffline(int $adminId): void + public function setAgentOffline($adminId) { $redis = $this->getRedis(); $redis->del(self::ONLINE_PREFIX . $adminId); @@ -236,7 +236,7 @@ class AssignService /** * 获取客服最大会话数 */ - private function getAgentMaxSessions(int $adminId): int + private function getAgentMaxSessions($adminId) { $status = ChatAdminStatus::getByAdminId($adminId); return $status['max_sessions'] ?? self::MAX_SESSIONS; @@ -245,7 +245,7 @@ class AssignService /** * 获取用户已有会话的客服ID */ - private function getExistingSessionAgent(int $userId): ?int + private function getExistingSessionAgent($userId): ?int { $session = ChatSession::getActiveByUserId($userId); return $session['admin_id'] ?? null; @@ -254,7 +254,7 @@ class AssignService /** * 绑定会话到客服 */ - private function bindSessionToAgent(int $sessionId, int $adminId): void + private function bindSessionToAgent($sessionId, $adminId) { $redis = $this->getRedis(); @@ -272,7 +272,7 @@ class AssignService /** * 释放分配锁 */ - private function releaseLock(string $key, string $value): void + private function releaseLock($key, $value) { $redis = $this->getRedis(); @@ -290,7 +290,7 @@ class AssignService /** * 获取Redis实例 */ - private function getRedis(): \Redis + private function getRedis() { return Cache::store('redis')->handler(); } diff --git a/app/services/chat/MessageService.php b/app/services/chat/MessageService.php index 6298ade..92d1adc 100644 --- a/app/services/chat/MessageService.php +++ b/app/services/chat/MessageService.php @@ -6,6 +6,7 @@ use app\models\chat\ChatMessage; use app\models\chat\ChatSession; use app\utils\Snowflake; use think\facade\Cache; +use think\swoole\WebSocket; /** * 消息服务 @@ -20,12 +21,173 @@ class MessageService private const MAX_CONTENT_LENGTH = 500; private const RECONNECT_FETCH_LIMIT = 50; + // Redis Key 前缀 + private const CONN_USER_PREFIX = 'cs:conn:user:'; + private const CONN_AGENT_PREFIX = 'cs:conn:agent:'; + + /** + * 发送消息(WebSocket事件处理入口) + * @param WebSocket $ws + * @param array $data [sessionId, msgType, content, clientMsgId] + */ + public function sendMessage($ws, $data) + { + echo "[MessageService] ========== 收到发送消息请求 ==========\n"; + echo "[MessageService] 数据: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n"; + + $sessionId = $data['sessionId'] ?? 0; + $msgType = $data['msgType'] ?? ChatMessage::MSG_TYPE_TEXT; + $content = $data['content'] ?? ''; + $clientMsgId = $data['clientMsgId'] ?? null; + + // 验证会话 + $session = ChatSession::find($sessionId); + if (!$session) { + echo "[MessageService] 会话不存在: {$sessionId}\n"; + $ws->emit('chat_error', ['error' => 'session_not_found']); + return; + } + $session = $session->toArray(); + echo "[MessageService] 会话信息: user_id={$session['user_id']}, admin_id=" . ($session['admin_id'] ?? 'null') . "\n"; + + // 验证内容 + $validation = $this->validateContent($content, $msgType); + if (!$validation['valid']) { + echo "[MessageService] 内容验证失败: {$validation['error']}\n"; + $ws->emit('chat_error', ['error' => $validation['error']]); + return; + } + + // 判断发送者类型(通过fd查找) + $fd = $ws->getSender(); + $redis = $this->getRedis(); + + echo "[MessageService] 发送者FD: {$fd}\n"; + + // 检查是用户还是客服 + $senderType = null; + $senderId = null; + $targetFd = null; + + // 检查是否是客服发送 + $agentId = $session['admin_id']; + if ($agentId) { + $agentFd = $redis->get(self::CONN_AGENT_PREFIX . $agentId); + echo "[MessageService] 客服FD (Redis): " . ($agentFd ?: 'null') . "\n"; + if ($agentFd && (int)$agentFd === $fd) { + $senderType = ChatMessage::SENDER_ADMIN; + $senderId = $agentId; + // 目标是用户 + $targetFd = $redis->get(self::CONN_USER_PREFIX . $session['user_id']); + echo "[MessageService] 识别为客服发送, 目标用户FD: " . ($targetFd ?: 'null') . "\n"; + } + } + + // 检查是否是用户发送 + if ($senderType === null) { + $userId = $session['user_id']; + $userFd = $redis->get(self::CONN_USER_PREFIX . $userId); + echo "[MessageService] 用户FD (Redis): " . ($userFd ?: 'null') . "\n"; + if ($userFd && (int)$userFd === $fd) { + $senderType = ChatMessage::SENDER_USER; + $senderId = $userId; + // 目标是客服 + if ($agentId) { + $targetFd = $redis->get(self::CONN_AGENT_PREFIX . $agentId); + echo "[MessageService] 识别为用户发送, 目标客服FD: " . ($targetFd ?: 'null') . "\n"; + } + } + } + + if ($senderType === null) { + echo "[MessageService] 无法识别发送者身份, FD: {$fd}\n"; + $ws->emit('chat_error', ['error' => 'unauthorized']); + return; + } + + echo "[MessageService] 发送者类型: {$senderType}, 发送者ID: {$senderId}\n"; + + // 创建消息 + $message = $this->createMessage([ + 'session_id' => $sessionId, + 'sender_type' => $senderType, + 'sender_id' => $senderId, + 'msg_type' => $msgType, + 'content' => $content, + 'client_msg_id' => $clientMsgId, + ]); + + // 发送确认给发送者 + $ws->emit('chat_message_sent', [ + 'success' => true, + 'msgId' => $message['msg_id'], + 'clientMsgId' => $clientMsgId, + 'sessionId' => $sessionId, + ]); + + // 推送给目标 + if ($targetFd) { + $targetFd = (int)$targetFd; + $server = app('swoole.server'); + + echo "[MessageService] 准备推送消息到FD: {$targetFd}\n"; + + if ($server->isEstablished($targetFd)) { + $payload = [ + 'event' => 'chat_message_new', + 'data' => [ + 'msgId' => $message['msg_id'], + 'sessionId' => $sessionId, + 'senderType' => $senderType, + 'msgType' => $msgType, + 'content' => $content, + 'createTime' => $message['create_time'], + ] + ]; + + // 使用 Socket.IO 协议格式发送 + $result = $this->emitToFd($targetFd, 'chat_message_new', $payload['data']); + echo "[MessageService] 推送结果: " . ($result ? '成功' : '失败') . "\n"; + + // 更新消息状态为已发送 + $this->updateMessageStatus($message['msg_id'], ChatMessage::STATUS_SENT); + } else { + echo "[MessageService] 目标FD {$targetFd} 连接已断开\n"; + } + } else { + echo "[MessageService] 目标FD为空,无法推送\n"; + } + } + + /** + * 向指定fd发送Socket.IO事件 + */ + private function emitToFd($fd, $event, $data) + { + $server = app('swoole.server'); + if (!$server->isEstablished($fd)) { + return false; + } + + // Socket.IO v2 协议格式: 42["event",{data}] + $packet = '42' . json_encode([$event, $data]); + return $server->push($fd, $packet); + } + + /** + * 获取Redis实例 + */ + private function getRedis() + { + return Cache::store('redis')->handler(); + } + /** * 创建消息 * @param array $data [session_id, sender_type, sender_id, msg_type, content] * @return array 消息数据 */ - public function createMessage(array $data): array + public function createMessage($data) { // 生成雪花ID $msgId = Snowflake::getInstance()->nextId(); @@ -74,7 +236,7 @@ class MessageService * @param array $payload 消息内容 * @return bool */ - public function pushMessage(int $msgId, int $targetFd, array $payload): bool + public function pushMessage($msgId, $targetFd, $payload) { $server = app('swoole.server'); @@ -110,7 +272,7 @@ class MessageService /** * 更新消息状态 */ - public function updateMessageStatus(int $msgId, int $status): void + public function updateMessageStatus($msgId, $status) { $update = ['status' => $status]; @@ -126,7 +288,7 @@ class MessageService /** * 获取会话未读消息 (重连后拉取) */ - public function getUnreadMessages(int $sessionId, int $receiverType, int $limit = null): array + public function getUnreadMessages($sessionId, $receiverType, $limit = null) { $limit = $limit ?? self::RECONNECT_FETCH_LIMIT; return ChatMessage::getUnreadBySessionId($sessionId, $receiverType, $limit); @@ -135,7 +297,7 @@ class MessageService /** * 批量标记消息为已读 */ - public function markMessagesAsRead(array $msgIds): void + public function markMessagesAsRead($msgIds) { if (empty($msgIds)) { return; @@ -152,7 +314,7 @@ class MessageService /** * 标记消息为已送达 */ - public function markMessageDelivered(int $msgId): void + public function markMessageDelivered($msgId) { ChatMessage::where('msg_id', $msgId) ->where('status', '<', ChatMessage::STATUS_DELIVERED) @@ -165,7 +327,7 @@ class MessageService /** * 获取会话消息历史 */ - public function getMessageHistory(int $sessionId, int $limit = 50, int $lastId = 0): array + public function getMessageHistory($sessionId, $limit = 50, $lastId = 0) { return ChatMessage::getBySessionId($sessionId, $limit, $lastId); } @@ -173,7 +335,7 @@ class MessageService /** * 增加重试次数 */ - private function incrementRetryCount(int $msgId): void + private function incrementRetryCount($msgId) { ChatMessage::where('msg_id', $msgId)->inc('retry_count')->update(); } @@ -181,7 +343,7 @@ class MessageService /** * 验证消息内容 */ - public function validateContent(string $content, int $msgType): array + public function validateContent($content, $msgType) { if ($msgType === ChatMessage::MSG_TYPE_TEXT) { if (empty(trim($content))) { diff --git a/app/services/chat/SessionService.php b/app/services/chat/SessionService.php index 0c92df9..2e8573e 100644 --- a/app/services/chat/SessionService.php +++ b/app/services/chat/SessionService.php @@ -19,7 +19,7 @@ class SessionService private const CONN_USER_PREFIX = 'cs:conn:user:'; private const CONN_AGENT_PREFIX = 'cs:conn:agent:'; - private AssignService $assignService; + private $assignService; public function __construct() { @@ -32,7 +32,7 @@ class SessionService * @param int $source 来源 1=PC 2=Game 3=Portal * @return array 会话数据 */ - public function createSession(int $userId, int $source): array + public function createSession($userId, $source) { $redis = $this->getRedis(); @@ -70,7 +70,7 @@ class SessionService /** * 获取用户活跃会话 */ - public function getActiveSession(int $userId): ?array + public function getActiveSession($userId) { return ChatSession::getActiveByUserId($userId); } @@ -78,7 +78,7 @@ class SessionService /** * 获取会话详情(含用户信息) */ - public function getSessionDetail(int $sessionId): ?array + public function getSessionDetail($sessionId) { $session = ChatSession::find($sessionId); if (!$session) { @@ -105,7 +105,7 @@ class SessionService /** * 结束会话 */ - public function endSession(int $sessionId, int $operatorId): bool + public function endSession($sessionId, $operatorId) { $session = ChatSession::find($sessionId); if (!$session) { @@ -141,7 +141,7 @@ class SessionService /** * 转接会话 */ - public function transferSession(int $sessionId, int $newAdminId): bool + public function transferSession($sessionId, $newAdminId) { $session = ChatSession::find($sessionId); if (!$session || $session['status'] !== ChatSession::STATUS_ACTIVE) { @@ -175,7 +175,7 @@ class SessionService /** * 会话评价 */ - public function rateSession(int $sessionId, int $rating, ?string $content = null): bool + public function rateSession($sessionId, $rating, $content = null) { $session = ChatSession::find($sessionId); if (!$session) { @@ -197,7 +197,7 @@ class SessionService /** * 注册用户连接 */ - public function registerUserConnection(int $userId, int $fd): void + public function registerUserConnection($userId, $fd) { $redis = $this->getRedis(); $redis->setex(self::CONN_USER_PREFIX . $userId, 60, $fd); @@ -206,7 +206,7 @@ class SessionService /** * 注册客服连接 */ - public function registerAgentConnection(int $adminId, int $fd): void + public function registerAgentConnection($adminId, $fd) { $redis = $this->getRedis(); $redis->setex(self::CONN_AGENT_PREFIX . $adminId, 60, $fd); @@ -215,7 +215,7 @@ class SessionService /** * 刷新用户连接TTL */ - public function refreshUserConnection(int $userId): void + public function refreshUserConnection($userId) { $redis = $this->getRedis(); $redis->expire(self::CONN_USER_PREFIX . $userId, 60); @@ -224,7 +224,7 @@ class SessionService /** * 刷新客服连接TTL */ - public function refreshAgentConnection(int $adminId): void + public function refreshAgentConnection($adminId) { $redis = $this->getRedis(); $redis->expire(self::CONN_AGENT_PREFIX . $adminId, 60); @@ -233,7 +233,7 @@ class SessionService /** * 获取用户连接FD */ - public function getUserFd(int $userId): ?int + public function getUserFd($userId): ?int { $redis = $this->getRedis(); $fd = $redis->get(self::CONN_USER_PREFIX . $userId); @@ -243,7 +243,7 @@ class SessionService /** * 获取客服连接FD */ - public function getAgentFd(int $adminId): ?int + public function getAgentFd($adminId): ?int { $redis = $this->getRedis(); $fd = $redis->get(self::CONN_AGENT_PREFIX . $adminId); @@ -253,7 +253,7 @@ class SessionService /** * 清理用户连接 */ - public function clearUserConnection(int $userId): void + public function clearUserConnection($userId) { $redis = $this->getRedis(); $redis->del(self::CONN_USER_PREFIX . $userId); @@ -262,7 +262,7 @@ class SessionService /** * 清理客服连接 */ - public function clearAgentConnection(int $adminId): void + public function clearAgentConnection($adminId) { $redis = $this->getRedis(); $redis->del(self::CONN_AGENT_PREFIX . $adminId); @@ -271,7 +271,7 @@ class SessionService /** * 获取客服会话列表 */ - public function getAgentSessions(int $adminId): array + public function getAgentSessions($adminId) { $sessions = ChatSession::getActiveByAdminId($adminId); @@ -297,7 +297,7 @@ class SessionService /** * 获取待分配会话列表 */ - public function getPendingSessions(): array + public function getPendingSessions() { $sessions = ChatSession::where('status', ChatSession::STATUS_PENDING) ->order('create_time', 'asc') @@ -319,7 +319,7 @@ class SessionService /** * 获取Redis实例 */ - private function getRedis(): \Redis + private function getRedis() { return Cache::store('redis')->handler(); } diff --git a/app/utils/Snowflake.php b/app/utils/Snowflake.php index 8ef9129..92222df 100644 --- a/app/utils/Snowflake.php +++ b/app/utils/Snowflake.php @@ -14,14 +14,14 @@ class Snowflake private const DATACENTER_ID_BITS = 5; private const SEQUENCE_BITS = 12; - private int $workerId; - private int $datacenterId; - private int $sequence = 0; - private int $lastTimestamp = -1; + private $workerId; + private $datacenterId; + private $sequence = 0; + private $lastTimestamp = -1; - private static ?Snowflake $instance = null; + private static $instance = null; - public function __construct(int $workerId = 1, int $datacenterId = 1) + public function __construct($workerId = 1, $datacenterId = 1) { $this->workerId = $workerId & 0x1F; $this->datacenterId = $datacenterId & 0x1F; @@ -30,7 +30,7 @@ class Snowflake /** * 获取单例实例 */ - public static function getInstance(): Snowflake + public static function getInstance() { if (self::$instance === null) { self::$instance = new self(1, 1); @@ -41,7 +41,7 @@ class Snowflake /** * 生成下一个ID */ - public function nextId(): int + public function nextId() { $timestamp = $this->currentTimeMillis(); @@ -65,17 +65,17 @@ class Snowflake /** * 生成字符串ID */ - public function nextIdString(): string + public function nextIdString() { return (string)$this->nextId(); } - private function currentTimeMillis(): int + private function currentTimeMillis() { return (int)(microtime(true) * 1000); } - private function waitNextMillis(int $lastTimestamp): int + private function waitNextMillis($lastTimestamp) { $timestamp = $this->currentTimeMillis(); while ($timestamp <= $lastTimestamp) { diff --git a/config/swoole.php b/config/swoole.php index 4b25e64..498cb86 100644 --- a/config/swoole.php +++ b/config/swoole.php @@ -28,8 +28,8 @@ return [ 'enable' => true, 'handler' => Handler::class, 'parser' => Parser::class, - 'ping_interval' => 1000, - 'ping_timeout' => 1000, + 'ping_interval' => 25000, + 'ping_timeout' => 60000, 'room' => [ 'type' => 'redis', 'table' => [