364 lines
12 KiB
PHP
364 lines
12 KiB
PHP
<?php
|
||
|
||
namespace app\services\chat;
|
||
|
||
use app\models\chat\ChatMessage;
|
||
use app\models\chat\ChatSession;
|
||
use app\utils\Snowflake;
|
||
use think\facade\Cache;
|
||
use think\swoole\WebSocket;
|
||
|
||
/**
|
||
* 消息服务
|
||
* Class MessageService
|
||
* @package app\services\chat
|
||
*/
|
||
class MessageService
|
||
{
|
||
// 约束参数 (specs.md CC-05~CC-09)
|
||
private const MAX_RETRY = 3;
|
||
private const RETRY_DELAYS = [1000, 2000, 4000]; // ms
|
||
private const MAX_CONTENT_LENGTH = 500;
|
||
private const RECONNECT_FETCH_LIMIT = 50;
|
||
|
||
// Redis Key 前缀
|
||
private const CONN_USER_PREFIX = 'cs:conn:user:';
|
||
private const CONN_AGENT_PREFIX = 'cs:conn:agent:';
|
||
|
||
/**
|
||
* 发送消息(WebSocket事件处理入口)
|
||
* @param WebSocket $ws
|
||
* @param array $data [sessionId, msgType, content, clientMsgId]
|
||
*/
|
||
public function sendMessage($ws, $data)
|
||
{
|
||
echo "[MessageService] ========== 收到发送消息请求 ==========\n";
|
||
echo "[MessageService] 数据: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
|
||
|
||
$sessionId = $data['sessionId'] ?? 0;
|
||
$msgType = $data['msgType'] ?? ChatMessage::MSG_TYPE_TEXT;
|
||
$content = $data['content'] ?? '';
|
||
$clientMsgId = $data['clientMsgId'] ?? null;
|
||
|
||
// 验证会话
|
||
$session = ChatSession::find($sessionId);
|
||
if (!$session) {
|
||
echo "[MessageService] 会话不存在: {$sessionId}\n";
|
||
$ws->emit('chat_error', ['error' => 'session_not_found']);
|
||
return;
|
||
}
|
||
$session = $session->toArray();
|
||
echo "[MessageService] 会话信息: user_id={$session['user_id']}, admin_id=" . ($session['admin_id'] ?? 'null') . "\n";
|
||
|
||
// 验证内容
|
||
$validation = $this->validateContent($content, $msgType);
|
||
if (!$validation['valid']) {
|
||
echo "[MessageService] 内容验证失败: {$validation['error']}\n";
|
||
$ws->emit('chat_error', ['error' => $validation['error']]);
|
||
return;
|
||
}
|
||
|
||
// 判断发送者类型(通过fd查找)
|
||
$fd = $ws->getSender();
|
||
$redis = $this->getRedis();
|
||
|
||
echo "[MessageService] 发送者FD: {$fd}\n";
|
||
|
||
// 检查是用户还是客服
|
||
$senderType = null;
|
||
$senderId = null;
|
||
$targetFd = null;
|
||
|
||
// 检查是否是客服发送
|
||
$agentId = $session['admin_id'];
|
||
if ($agentId) {
|
||
$agentFd = $redis->get(self::CONN_AGENT_PREFIX . $agentId);
|
||
echo "[MessageService] 客服FD (Redis): " . ($agentFd ?: 'null') . "\n";
|
||
if ($agentFd && (int)$agentFd === $fd) {
|
||
$senderType = ChatMessage::SENDER_ADMIN;
|
||
$senderId = $agentId;
|
||
// 目标是用户
|
||
$targetFd = $redis->get(self::CONN_USER_PREFIX . $session['user_id']);
|
||
echo "[MessageService] 识别为客服发送, 目标用户FD: " . ($targetFd ?: 'null') . "\n";
|
||
}
|
||
}
|
||
|
||
// 检查是否是用户发送
|
||
if ($senderType === null) {
|
||
$userId = $session['user_id'];
|
||
$userFd = $redis->get(self::CONN_USER_PREFIX . $userId);
|
||
echo "[MessageService] 用户FD (Redis): " . ($userFd ?: 'null') . "\n";
|
||
if ($userFd && (int)$userFd === $fd) {
|
||
$senderType = ChatMessage::SENDER_USER;
|
||
$senderId = $userId;
|
||
// 目标是客服
|
||
if ($agentId) {
|
||
$targetFd = $redis->get(self::CONN_AGENT_PREFIX . $agentId);
|
||
echo "[MessageService] 识别为用户发送, 目标客服FD: " . ($targetFd ?: 'null') . "\n";
|
||
}
|
||
}
|
||
}
|
||
|
||
if ($senderType === null) {
|
||
echo "[MessageService] 无法识别发送者身份, FD: {$fd}\n";
|
||
$ws->emit('chat_error', ['error' => 'unauthorized']);
|
||
return;
|
||
}
|
||
|
||
echo "[MessageService] 发送者类型: {$senderType}, 发送者ID: {$senderId}\n";
|
||
|
||
// 创建消息
|
||
$message = $this->createMessage([
|
||
'session_id' => $sessionId,
|
||
'sender_type' => $senderType,
|
||
'sender_id' => $senderId,
|
||
'msg_type' => $msgType,
|
||
'content' => $content,
|
||
'client_msg_id' => $clientMsgId,
|
||
]);
|
||
|
||
// 发送确认给发送者
|
||
$ws->emit('chat_message_sent', [
|
||
'success' => true,
|
||
'msgId' => $message['msg_id'],
|
||
'clientMsgId' => $clientMsgId,
|
||
'sessionId' => $sessionId,
|
||
]);
|
||
|
||
// 推送给目标
|
||
if ($targetFd) {
|
||
$targetFd = (int)$targetFd;
|
||
$server = app('swoole.server');
|
||
|
||
echo "[MessageService] 准备推送消息到FD: {$targetFd}\n";
|
||
|
||
if ($server->isEstablished($targetFd)) {
|
||
$payload = [
|
||
'event' => 'chat_message_new',
|
||
'data' => [
|
||
'msgId' => $message['msg_id'],
|
||
'sessionId' => $sessionId,
|
||
'senderType' => $senderType,
|
||
'msgType' => $msgType,
|
||
'content' => $content,
|
||
'createTime' => $message['create_time'],
|
||
]
|
||
];
|
||
|
||
// 使用 Socket.IO 协议格式发送
|
||
$result = $this->emitToFd($targetFd, 'chat_message_new', $payload['data']);
|
||
echo "[MessageService] 推送结果: " . ($result ? '成功' : '失败') . "\n";
|
||
|
||
// 更新消息状态为已发送
|
||
$this->updateMessageStatus($message['msg_id'], ChatMessage::STATUS_SENT);
|
||
} else {
|
||
echo "[MessageService] 目标FD {$targetFd} 连接已断开\n";
|
||
}
|
||
} else {
|
||
echo "[MessageService] 目标FD为空,无法推送\n";
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 向指定fd发送Socket.IO事件
|
||
*/
|
||
private function emitToFd($fd, $event, $data)
|
||
{
|
||
$server = app('swoole.server');
|
||
if (!$server->isEstablished($fd)) {
|
||
return false;
|
||
}
|
||
|
||
// Socket.IO v2 协议格式: 42["event",{data}]
|
||
$packet = '42' . json_encode([$event, $data]);
|
||
return $server->push($fd, $packet);
|
||
}
|
||
|
||
/**
|
||
* 获取Redis实例
|
||
*/
|
||
private function getRedis()
|
||
{
|
||
return Cache::store('redis')->handler();
|
||
}
|
||
|
||
/**
|
||
* 创建消息
|
||
* @param array $data [session_id, sender_type, sender_id, msg_type, content]
|
||
* @return array 消息数据
|
||
*/
|
||
public function createMessage($data)
|
||
{
|
||
// 生成雪花ID
|
||
$msgId = Snowflake::getInstance()->nextId();
|
||
|
||
// 内容长度限制
|
||
$content = $data['content'] ?? '';
|
||
if (mb_strlen($content) > self::MAX_CONTENT_LENGTH) {
|
||
$content = mb_substr($content, 0, self::MAX_CONTENT_LENGTH);
|
||
}
|
||
|
||
$message = [
|
||
'msg_id' => $msgId,
|
||
'session_id' => $data['session_id'],
|
||
'sender_type' => $data['sender_type'],
|
||
'sender_id' => $data['sender_id'],
|
||
'msg_type' => $data['msg_type'] ?? ChatMessage::MSG_TYPE_TEXT,
|
||
'content' => $content,
|
||
'status' => ChatMessage::STATUS_PENDING,
|
||
'retry_count' => 0,
|
||
'create_time' => time(),
|
||
];
|
||
|
||
// 幂等性检查
|
||
if (isset($data['client_msg_id']) && ChatMessage::existsByMsgId($data['client_msg_id'])) {
|
||
return ChatMessage::where('msg_id', $data['client_msg_id'])->find()->toArray();
|
||
}
|
||
|
||
// 入库
|
||
$id = ChatMessage::insertGetId($message);
|
||
$message['id'] = $id;
|
||
|
||
// 更新会话最后消息
|
||
ChatSession::where('id', $data['session_id'])->update([
|
||
'last_msg_id' => $msgId,
|
||
'last_msg_time' => time(),
|
||
'update_time' => time(),
|
||
]);
|
||
|
||
return $message;
|
||
}
|
||
|
||
/**
|
||
* 推送消息到目标连接
|
||
* @param int $msgId 消息ID
|
||
* @param int $targetFd 目标连接FD
|
||
* @param array $payload 消息内容
|
||
* @return bool
|
||
*/
|
||
public function pushMessage($msgId, $targetFd, $payload)
|
||
{
|
||
$server = app('swoole.server');
|
||
|
||
for ($retry = 0; $retry <= self::MAX_RETRY; $retry++) {
|
||
// 检查目标连接是否有效
|
||
if (!$server->isEstablished($targetFd)) {
|
||
$this->updateMessageStatus($msgId, ChatMessage::STATUS_PENDING);
|
||
return false;
|
||
}
|
||
|
||
// 尝试推送
|
||
$result = $server->push($targetFd, json_encode($payload));
|
||
|
||
if ($result) {
|
||
$this->updateMessageStatus($msgId, ChatMessage::STATUS_SENT);
|
||
return true;
|
||
}
|
||
|
||
// 推送失败,记录重试次数
|
||
$this->incrementRetryCount($msgId);
|
||
|
||
if ($retry < self::MAX_RETRY) {
|
||
// 指数退避等待
|
||
usleep(self::RETRY_DELAYS[$retry] * 1000);
|
||
}
|
||
}
|
||
|
||
// 超过最大重试次数,标记为failed
|
||
$this->updateMessageStatus($msgId, ChatMessage::STATUS_FAILED);
|
||
return false;
|
||
}
|
||
|
||
/**
|
||
* 更新消息状态
|
||
*/
|
||
public function updateMessageStatus($msgId, $status)
|
||
{
|
||
$update = ['status' => $status];
|
||
|
||
if ($status === ChatMessage::STATUS_DELIVERED) {
|
||
$update['delivered_time'] = time();
|
||
} elseif ($status === ChatMessage::STATUS_READ) {
|
||
$update['read_time'] = time();
|
||
}
|
||
|
||
ChatMessage::where('msg_id', $msgId)->update($update);
|
||
}
|
||
|
||
/**
|
||
* 获取会话未读消息 (重连后拉取)
|
||
*/
|
||
public function getUnreadMessages($sessionId, $receiverType, $limit = null)
|
||
{
|
||
$limit = $limit ?? self::RECONNECT_FETCH_LIMIT;
|
||
return ChatMessage::getUnreadBySessionId($sessionId, $receiverType, $limit);
|
||
}
|
||
|
||
/**
|
||
* 批量标记消息为已读
|
||
*/
|
||
public function markMessagesAsRead($msgIds)
|
||
{
|
||
if (empty($msgIds)) {
|
||
return;
|
||
}
|
||
|
||
ChatMessage::whereIn('msg_id', $msgIds)
|
||
->where('status', '<', ChatMessage::STATUS_READ)
|
||
->update([
|
||
'status' => ChatMessage::STATUS_READ,
|
||
'read_time' => time(),
|
||
]);
|
||
}
|
||
|
||
/**
|
||
* 标记消息为已送达
|
||
*/
|
||
public function markMessageDelivered($msgId)
|
||
{
|
||
ChatMessage::where('msg_id', $msgId)
|
||
->where('status', '<', ChatMessage::STATUS_DELIVERED)
|
||
->update([
|
||
'status' => ChatMessage::STATUS_DELIVERED,
|
||
'delivered_time' => time(),
|
||
]);
|
||
}
|
||
|
||
/**
|
||
* 获取会话消息历史
|
||
*/
|
||
public function getMessageHistory($sessionId, $limit = 50, $lastId = 0)
|
||
{
|
||
return ChatMessage::getBySessionId($sessionId, $limit, $lastId);
|
||
}
|
||
|
||
/**
|
||
* 增加重试次数
|
||
*/
|
||
private function incrementRetryCount($msgId)
|
||
{
|
||
ChatMessage::where('msg_id', $msgId)->inc('retry_count')->update();
|
||
}
|
||
|
||
/**
|
||
* 验证消息内容
|
||
*/
|
||
public function validateContent($content, $msgType)
|
||
{
|
||
if ($msgType === ChatMessage::MSG_TYPE_TEXT) {
|
||
if (empty(trim($content))) {
|
||
return ['valid' => false, 'error' => '消息内容不能为空'];
|
||
}
|
||
if (mb_strlen($content) > self::MAX_CONTENT_LENGTH) {
|
||
return ['valid' => false, 'error' => '消息内容超过' . self::MAX_CONTENT_LENGTH . '字符限制'];
|
||
}
|
||
} elseif ($msgType === ChatMessage::MSG_TYPE_IMAGE) {
|
||
if (empty($content) || !filter_var($content, FILTER_VALIDATE_URL)) {
|
||
return ['valid' => false, 'error' => '图片URL无效'];
|
||
}
|
||
}
|
||
|
||
return ['valid' => true, 'error' => null];
|
||
}
|
||
}
|