Socket/vendor/topthink/think-swoole/src/RpcManager.php
2026-01-28 23:48:20 +08:00

184 lines
4.7 KiB
PHP

<?php
namespace think\swoole;
use Swoole\Coroutine;
use Swoole\Server;
use Swoole\Server\Port;
use think\App;
use think\Event;
use think\helper\Str;
use think\swoole\concerns\InteractsWithPools;
use think\swoole\concerns\InteractsWithRpcClient;
use think\swoole\concerns\InteractsWithServer;
use think\swoole\concerns\InteractsWithSwooleTable;
use think\swoole\concerns\WithApplication;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\rpc\JsonParser;
use think\swoole\rpc\Packer;
use think\swoole\rpc\server\Channel;
use think\swoole\rpc\server\Dispatcher;
use Throwable;
class RpcManager
{
use InteractsWithServer,
InteractsWithSwooleTable,
InteractsWithPools,
InteractsWithRpcClient,
WithApplication;
/**
* @var App
*/
protected $container;
/**
* Server events.
*
* @var array
*/
protected $events = [
'start',
'shutDown',
'workerStart',
'workerStop',
'workerError',
'workerExit',
'packet',
'task',
'finish',
'pipeMessage',
'managerStart',
'managerStop',
];
protected $rpcEvents = [
'connect',
'receive',
'close',
];
/** @var Channel[] */
protected $channels = [];
/**
* Manager constructor.
* @param App $container
*/
public function __construct(App $container)
{
$this->container = $container;
}
/**
* Initialize.
*/
protected function initialize(): void
{
$this->events = array_merge($this->events ?? [], $this->rpcEvents);
$this->prepareTables();
$this->preparePools();
$this->setSwooleServerListeners();
$this->prepareRpcServer();
$this->prepareRpcClient();
}
protected function prepareRpcServer()
{
$this->onEvent('workerStart', function () {
$this->bindRpcParser();
$this->bindRpcDispatcher();
});
}
public function attachToServer(Port $port)
{
$port->set([]);
foreach ($this->rpcEvents as $event) {
$listener = Str::camel("on_$event");
$callback = method_exists($this, $listener) ? [$this, $listener] : function () use ($event) {
$this->triggerEvent("rpc." . $event, func_get_args());
};
$port->on($event, $callback);
}
$this->onEvent('workerStart', function (App $app) {
$this->app = $app;
});
$this->prepareRpcServer();
}
protected function bindRpcDispatcher()
{
$services = $this->getConfig('rpc.server.services', []);
$this->app->make(Dispatcher::class, [$services]);
}
protected function bindRpcParser()
{
$parserClass = $this->getConfig('rpc.server.parser', JsonParser::class);
$this->app->bind(ParserInterface::class, $parserClass);
$this->app->make(ParserInterface::class);
}
public function onConnect(Server $server, int $fd, int $reactorId)
{
$args = func_get_args();
$this->runInSandbox(function (Event $event) use ($args) {
$event->trigger("swoole.rpc.Connect", $args);
}, $fd, true);
}
protected function recv(Server $server, $fd, $data, $callback)
{
if (!isset($this->channels[$fd]) || empty($handle = $this->channels[$fd]->pop())) {
//解析包头
try {
[$header, $data] = Packer::unpack($data);
} catch (Throwable $e) {
//错误的包头
return $server->close($fd);
}
$this->channels[$fd] = new Channel($header);
$handle = $this->channels[$fd]->pop();
}
$result = $handle->write($data);
if (!empty($result)) {
Coroutine::create($callback, $result);
$this->channels[$fd]->close();
} else {
$this->channels[$fd]->push($handle);
}
if (!empty($data)) {
$this->recv($server, $fd, $data, $callback);
}
}
public function onReceive(Server $server, $fd, $reactorId, $data)
{
$this->recv($server, $fd, $data, function ($data) use ($fd, $server) {
$this->runInSandbox(function (Dispatcher $dispatcher) use ($fd, $data) {
$dispatcher->dispatch($fd, $data);
}, $fd, true);
});
}
public function onClose(Server $server, int $fd, int $reactorId)
{
unset($this->channels[$fd]);
$args = func_get_args();
$this->runInSandbox(function (Event $event) use ($args) {
$event->trigger("swoole.rpc.Close", $args);
}, $fd);
}
}