nextId(); // 内容长度限制 $content = $data['content'] ?? ''; if (mb_strlen($content) > self::MAX_CONTENT_LENGTH) { $content = mb_substr($content, 0, self::MAX_CONTENT_LENGTH); } $message = [ 'msg_id' => $msgId, 'session_id' => $data['session_id'], 'sender_type' => $data['sender_type'], 'sender_id' => $data['sender_id'], 'msg_type' => $data['msg_type'] ?? ChatMessage::MSG_TYPE_TEXT, 'content' => $content, 'status' => ChatMessage::STATUS_PENDING, 'retry_count' => 0, 'create_time' => time(), ]; // 幂等性检查 if (isset($data['client_msg_id']) && ChatMessage::existsByMsgId($data['client_msg_id'])) { return ChatMessage::where('msg_id', $data['client_msg_id'])->find()->toArray(); } // 入库 $id = ChatMessage::insertGetId($message); $message['id'] = $id; // 更新会话最后消息 ChatSession::where('id', $data['session_id'])->update([ 'last_msg_id' => $msgId, 'last_msg_time' => time(), 'update_time' => time(), ]); return $message; } /** * 推送消息到目标连接 * @param int $msgId 消息ID * @param int $targetFd 目标连接FD * @param array $payload 消息内容 * @return bool */ public function pushMessage(int $msgId, int $targetFd, array $payload): bool { $server = app('swoole.server'); for ($retry = 0; $retry <= self::MAX_RETRY; $retry++) { // 检查目标连接是否有效 if (!$server->isEstablished($targetFd)) { $this->updateMessageStatus($msgId, ChatMessage::STATUS_PENDING); return false; } // 尝试推送 $result = $server->push($targetFd, json_encode($payload)); if ($result) { $this->updateMessageStatus($msgId, ChatMessage::STATUS_SENT); return true; } // 推送失败,记录重试次数 $this->incrementRetryCount($msgId); if ($retry < self::MAX_RETRY) { // 指数退避等待 usleep(self::RETRY_DELAYS[$retry] * 1000); } } // 超过最大重试次数,标记为failed $this->updateMessageStatus($msgId, ChatMessage::STATUS_FAILED); return false; } /** * 更新消息状态 */ public function updateMessageStatus(int $msgId, int $status): void { $update = ['status' => $status]; if ($status === ChatMessage::STATUS_DELIVERED) { $update['delivered_time'] = time(); } elseif ($status === ChatMessage::STATUS_READ) { $update['read_time'] = time(); } ChatMessage::where('msg_id', $msgId)->update($update); } /** * 获取会话未读消息 (重连后拉取) */ public function getUnreadMessages(int $sessionId, int $receiverType, int $limit = null): array { $limit = $limit ?? self::RECONNECT_FETCH_LIMIT; return ChatMessage::getUnreadBySessionId($sessionId, $receiverType, $limit); } /** * 批量标记消息为已读 */ public function markMessagesAsRead(array $msgIds): void { if (empty($msgIds)) { return; } ChatMessage::whereIn('msg_id', $msgIds) ->where('status', '<', ChatMessage::STATUS_READ) ->update([ 'status' => ChatMessage::STATUS_READ, 'read_time' => time(), ]); } /** * 标记消息为已送达 */ public function markMessageDelivered(int $msgId): void { ChatMessage::where('msg_id', $msgId) ->where('status', '<', ChatMessage::STATUS_DELIVERED) ->update([ 'status' => ChatMessage::STATUS_DELIVERED, 'delivered_time' => time(), ]); } /** * 获取会话消息历史 */ public function getMessageHistory(int $sessionId, int $limit = 50, int $lastId = 0): array { return ChatMessage::getBySessionId($sessionId, $limit, $lastId); } /** * 增加重试次数 */ private function incrementRetryCount(int $msgId): void { ChatMessage::where('msg_id', $msgId)->inc('retry_count')->update(); } /** * 验证消息内容 */ public function validateContent(string $content, int $msgType): array { if ($msgType === ChatMessage::MSG_TYPE_TEXT) { if (empty(trim($content))) { return ['valid' => false, 'error' => '消息内容不能为空']; } if (mb_strlen($content) > self::MAX_CONTENT_LENGTH) { return ['valid' => false, 'error' => '消息内容超过' . self::MAX_CONTENT_LENGTH . '字符限制']; } } elseif ($msgType === ChatMessage::MSG_TYPE_IMAGE) { if (empty($content) || !filter_var($content, FILTER_VALIDATE_URL)) { return ['valid' => false, 'error' => '图片URL无效']; } } return ['valid' => true, 'error' => null]; } }