emit('chat_error', ['error' => 'session_not_found']); return; } $session = $session->toArray(); echo "[MessageService] 会话信息: user_id={$session['user_id']}, admin_id=" . ($session['admin_id'] ?? 'null') . "\n"; // 验证内容 $validation = $this->validateContent($content, $msgType); if (!$validation['valid']) { echo "[MessageService] 内容验证失败: {$validation['error']}\n"; $ws->emit('chat_error', ['error' => $validation['error']]); return; } // 判断发送者类型(通过fd查找) $fd = $ws->getSender(); $redis = $this->getRedis(); echo "[MessageService] 发送者FD: {$fd}\n"; // 检查是用户还是客服 $senderType = null; $senderId = null; $targetFd = null; // 检查是否是客服发送 $agentId = $session['admin_id']; if ($agentId) { $agentFd = $redis->get(self::CONN_AGENT_PREFIX . $agentId); echo "[MessageService] 客服FD (Redis): " . ($agentFd ?: 'null') . "\n"; if ($agentFd && (int)$agentFd === $fd) { $senderType = ChatMessage::SENDER_ADMIN; $senderId = $agentId; // 目标是用户 $targetFd = $redis->get(self::CONN_USER_PREFIX . $session['user_id']); echo "[MessageService] 识别为客服发送, 目标用户FD: " . ($targetFd ?: 'null') . "\n"; } } // 检查是否是用户发送 if ($senderType === null) { $userId = $session['user_id']; $userFd = $redis->get(self::CONN_USER_PREFIX . $userId); echo "[MessageService] 用户FD (Redis): " . ($userFd ?: 'null') . "\n"; if ($userFd && (int)$userFd === $fd) { $senderType = ChatMessage::SENDER_USER; $senderId = $userId; // 目标是客服 if ($agentId) { $targetFd = $redis->get(self::CONN_AGENT_PREFIX . $agentId); echo "[MessageService] 识别为用户发送, 目标客服FD: " . ($targetFd ?: 'null') . "\n"; } } } if ($senderType === null) { echo "[MessageService] 无法识别发送者身份, FD: {$fd}\n"; $ws->emit('chat_error', ['error' => 'unauthorized']); return; } echo "[MessageService] 发送者类型: {$senderType}, 发送者ID: {$senderId}\n"; // 创建消息 $message = $this->createMessage([ 'session_id' => $sessionId, 'sender_type' => $senderType, 'sender_id' => $senderId, 'msg_type' => $msgType, 'content' => $content, 'client_msg_id' => $clientMsgId, ]); // 发送确认给发送者 $ws->emit('chat_message_sent', [ 'success' => true, 'msgId' => $message['msg_id'], 'clientMsgId' => $clientMsgId, 'sessionId' => $sessionId, ]); // 推送给目标 if ($targetFd) { $targetFd = (int)$targetFd; $server = app('swoole.server'); echo "[MessageService] 准备推送消息到FD: {$targetFd}\n"; if ($server->isEstablished($targetFd)) { $payload = [ 'event' => 'chat_message_new', 'data' => [ 'msgId' => $message['msg_id'], 'sessionId' => $sessionId, 'senderType' => $senderType, 'msgType' => $msgType, 'content' => $content, 'createTime' => $message['create_time'], ] ]; // 使用 Socket.IO 协议格式发送 $result = $this->emitToFd($targetFd, 'chat_message_new', $payload['data']); echo "[MessageService] 推送结果: " . ($result ? '成功' : '失败') . "\n"; // 更新消息状态为已发送 $this->updateMessageStatus($message['msg_id'], ChatMessage::STATUS_SENT); } else { echo "[MessageService] 目标FD {$targetFd} 连接已断开\n"; } } else { echo "[MessageService] 目标FD为空,无法推送\n"; } } /** * 向指定fd发送Socket.IO事件 */ private function emitToFd($fd, $event, $data) { $server = app('swoole.server'); if (!$server->isEstablished($fd)) { return false; } // Socket.IO v2 协议格式: 42["event",{data}] $packet = '42' . json_encode([$event, $data]); return $server->push($fd, $packet); } /** * 获取Redis实例 */ private function getRedis() { return Cache::store('redis')->handler(); } /** * 创建消息 * @param array $data [session_id, sender_type, sender_id, msg_type, content] * @return array 消息数据 */ public function createMessage($data) { // 生成雪花ID $msgId = Snowflake::getInstance()->nextId(); // 内容长度限制 $content = $data['content'] ?? ''; if (mb_strlen($content) > self::MAX_CONTENT_LENGTH) { $content = mb_substr($content, 0, self::MAX_CONTENT_LENGTH); } $message = [ 'msg_id' => $msgId, 'session_id' => $data['session_id'], 'sender_type' => $data['sender_type'], 'sender_id' => $data['sender_id'], 'msg_type' => $data['msg_type'] ?? ChatMessage::MSG_TYPE_TEXT, 'content' => $content, 'status' => ChatMessage::STATUS_PENDING, 'retry_count' => 0, 'create_time' => time(), ]; // 幂等性检查 if (isset($data['client_msg_id']) && ChatMessage::existsByMsgId($data['client_msg_id'])) { return ChatMessage::where('msg_id', $data['client_msg_id'])->find()->toArray(); } // 入库 $id = ChatMessage::insertGetId($message); $message['id'] = $id; // 更新会话最后消息 ChatSession::where('id', $data['session_id'])->update([ 'last_msg_id' => $msgId, 'last_msg_time' => time(), 'update_time' => time(), ]); return $message; } /** * 推送消息到目标连接 * @param int $msgId 消息ID * @param int $targetFd 目标连接FD * @param array $payload 消息内容 * @return bool */ public function pushMessage($msgId, $targetFd, $payload) { $server = app('swoole.server'); for ($retry = 0; $retry <= self::MAX_RETRY; $retry++) { // 检查目标连接是否有效 if (!$server->isEstablished($targetFd)) { $this->updateMessageStatus($msgId, ChatMessage::STATUS_PENDING); return false; } // 尝试推送 $result = $server->push($targetFd, json_encode($payload)); if ($result) { $this->updateMessageStatus($msgId, ChatMessage::STATUS_SENT); return true; } // 推送失败,记录重试次数 $this->incrementRetryCount($msgId); if ($retry < self::MAX_RETRY) { // 指数退避等待 usleep(self::RETRY_DELAYS[$retry] * 1000); } } // 超过最大重试次数,标记为failed $this->updateMessageStatus($msgId, ChatMessage::STATUS_FAILED); return false; } /** * 更新消息状态 */ public function updateMessageStatus($msgId, $status) { $update = ['status' => $status]; if ($status === ChatMessage::STATUS_DELIVERED) { $update['delivered_time'] = time(); } elseif ($status === ChatMessage::STATUS_READ) { $update['read_time'] = time(); } ChatMessage::where('msg_id', $msgId)->update($update); } /** * 获取会话未读消息 (重连后拉取) */ public function getUnreadMessages($sessionId, $receiverType, $limit = null) { $limit = $limit ?? self::RECONNECT_FETCH_LIMIT; return ChatMessage::getUnreadBySessionId($sessionId, $receiverType, $limit); } /** * 批量标记消息为已读 */ public function markMessagesAsRead($msgIds) { if (empty($msgIds)) { return; } ChatMessage::whereIn('msg_id', $msgIds) ->where('status', '<', ChatMessage::STATUS_READ) ->update([ 'status' => ChatMessage::STATUS_READ, 'read_time' => time(), ]); } /** * 标记消息为已送达 */ public function markMessageDelivered($msgId) { ChatMessage::where('msg_id', $msgId) ->where('status', '<', ChatMessage::STATUS_DELIVERED) ->update([ 'status' => ChatMessage::STATUS_DELIVERED, 'delivered_time' => time(), ]); } /** * 获取会话消息历史 */ public function getMessageHistory($sessionId, $limit = 50, $lastId = 0) { return ChatMessage::getBySessionId($sessionId, $limit, $lastId); } /** * 增加重试次数 */ private function incrementRetryCount($msgId) { ChatMessage::where('msg_id', $msgId)->inc('retry_count')->update(); } /** * 验证消息内容 */ public function validateContent($content, $msgType) { if ($msgType === ChatMessage::MSG_TYPE_TEXT) { if (empty(trim($content))) { return ['valid' => false, 'error' => '消息内容不能为空']; } if (mb_strlen($content) > self::MAX_CONTENT_LENGTH) { return ['valid' => false, 'error' => '消息内容超过' . self::MAX_CONTENT_LENGTH . '字符限制']; } } elseif ($msgType === ChatMessage::MSG_TYPE_IMAGE) { if (empty($content) || !filter_var($content, FILTER_VALIDATE_URL)) { return ['valid' => false, 'error' => '图片URL无效']; } } return ['valid' => true, 'error' => null]; } }