Swoft Task消息投递问题
今天,线上环境因一个SQL查询结果错误,出现大量推送消息。消息投递功能使用的是swoft的任务投递功能,所以特意来看下swoft源码,Swoft的任务功能基于Swoole的Task机制,或者说Swoft的Task机制本质就是对SwooleTask机制的封装和加强。
任务投递
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| class Task {
public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3) { $data = TaskHelper::pack($taskName, $methodName, $params, $type);
if(!App::isWorkerStatus() && !App::isCoContext()){ return self::deliverByQueue($data); }
if(!App::isWorkerStatus() && App::isCoContext()){ throw new TaskException('Please deliver task by http!'); }
$server = App::$server->getServer(); if ($type == self::TYPE_CO) { $tasks[0] = $data; $prifleKey = 'task' . '.' . $taskName . '.' . $methodName;
App::profileStart($prifleKey); $result = $server->taskCo($tasks, $timeout); App::profileEnd($prifleKey);
return $result; }
return $server->task($data); } }
|
任务投递Task::deliver()
将调用参数打包后根据$type
参数通过swoole的$server->taskCo()
或$server->task()
接口投递到Task进程。
Task本身始终是同步执行的,$type
仅仅影响投递这一操作的行为,Task::TYPE_ASYNC
对应的$server->task()
是异步投递,Task::deliver()
调用后马上返回;Task::TYPE_CO
对应的$server->taskCo()
是协程投递,投递后让出协程控制,任务完成或执行超时后Task::deliver()
才从协程返回。
任务执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
class TaskEventListener implements TaskInterface, FinishInterface {
public function onTask(Server $server, int $taskId, int $workerId, $data) { try { $taskExecutor = App::getBean(TaskExecutor::class); $result = $taskExecutor->run($data); } catch (\Throwable $throwable) { App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine())); $result = false;
App::trigger(AppEvent::RESOURCE_RELEASE);
App::trigger(TaskEvent::AFTER_TASK); } return $result; } }
|
此处是swoole.onTask
的事件回调,其职责仅仅是将将worker进程投递来的打包后的数据转发给TaskExecutor
。
Swoole的Task机制的本质是worker进程将耗时任务投递给同步的Task(又名TaskWorker)进程处理,所以swoole.onTask
的事件回调是在Task进程中执行的。上文说过,worker进程是你大部分http服务代码执行的环境,但是从TaskEventListener.onTask()
方法开始,代码的执行环境都是Task进程,也就是说,TaskExecutor和具体的TaskBean都是执行在Task进程中的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
class TaskExecutor {
public function run(string $data) { $data = TaskHelper::unpack($data);
$name = $data['name']; $type = $data['type']; $method = $data['method']; $params = $data['params']; $logid = $data['logid'] ?? uniqid('', true); $spanid = $data['spanid'] ?? 0;
$collector = TaskCollector::getCollector(); if (!isset($collector['task'][$name])) { return false; }
list(, $coroutine) = $collector['task'][$name]; $task = App::getBean($name); if ($coroutine) { $result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type); } else { $result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type); }
return $result; } }
|
任务执行思路很简单,将Worker进程发过来的数据解包还原成原来的调用参数,根据$name
参数找到对应的TaskBean并调用其对应的Task方法。其中TaskBean使用类级别注解@Task(name="TaskName")
或者@Task("TaskName")
声明。
值得一提的一点是,@Task
注解除了name属性,还有一个coroutine属性,上述代码会根据该参数选择使用协程的runCoTask()
或者同步的runSyncTask()
执行Task。但是由于而且由于Swoole的Task进程的执行是完全同步的,不支持协程,所以目前版本请该参数不要配置为true。同样的在TaskBean中编写的任务代码必须的同步的或者是要能根据环境自动将异步和协程降级为同步的
从Process中投递任务
Swoole的Task机制的本质是worker进程将耗时任务投递给同步的Task进程(又名TaskWorker进程)处理。
换句话说,swoole的$server->taskCo()
或$server->task()
都只能在worker进程中使用。
这个限制大大的限制了使用场景。 如何能够为了能够在Process中投递任务呢?Swoft为了绕过这个限制提供了Task::deliverByProcess()
方法。其实现原理也很简单,通过Swoole的$server->sendMessage()
方法将调用信息从Process中投递到Worker进程中,然后由Worker进程替其投递到Task进程当中,相关代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool { $server = App::$server->getServer(); $pipeMessage = App::getBean(PipeMessage::class); $data = [ 'name' => $taskName, 'method' => $methodName, 'params' => $params, 'timeout' => $timeout, 'type' => $type, ];
$message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data); return $server->sendMessage($message, $workId); }
|
数据打包后使用$server->sendMessage()
投递给woerker:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
public function onPipeMessage(Server $server, int $srcWorkerId, string $message) { $pipeMessage = App::getBean(PipeMessage::class); list($type, $data) = $pipeMessage->unpack($message);
App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId); }
|
$server->sendMessage
后,worker进程收到数据时会触发一个swoole.pipeMessage
事件的回调,Swoft会将其转换成自己的swoft.pipeMessage
事件并触发.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
class PipeMessageListener implements EventHandlerInterface {
public function handle(EventInterface $event) { $params = $event->getParams(); if (count($params) < 3) { return; }
list($type, $data, $srcWorkerId) = $params;
if ($type != PipeMessage::MESSAGE_TYPE_TASK) { return; }
$type = $data['type']; $taskName = $data['name']; $params = $data['params']; $timeout = $data['timeout']; $methodName = $data['method'];
Task::deliver($taskName, $methodName, $params, $type, $timeout); } }
|
swoft.pipeMessage
事件最终由PipeMessageListener
处理。在相关的监听其中,如果发现swoft.pipeMessage
事件由Task::deliverByProcess()
产生的,worker进程会替其执行一次Task::deliver()
,最终将任务数据投递到TaskWorker进程中。
回顾一下整个流程:从Task::deliverByProcess()
到某TaskBean 最终执行任务,经历了哪些进程,而调用链的哪些部分又分别是在哪些进程中执行?
从Command进程或其子进程中投递任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null) { if ($taskWorkerId === null) { $taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum); }
if ($srcWorkerId === null) { $srcWorkerId = mt_rand(0, $this->workerNum - 1); }
$this->check(); $data = $this->pack($data, $srcWorkerId); $result = \msg_send($this->queueId, $taskWorkerId, $data, false); if (!$result) { return false; }
return true; }
|
对于Command进程的任务投递,情况会更复杂一点。
上文提到的Process,其往往衍生于Http/Rpc服务,作为同一个Manger的子孙进程,他们能够拿到Swoole\Server的句柄变量,从而通过$server->sendMessage()
,$server->task()
等方法进行任务投递。
但在Swoft的体系中,还有一个特殊进程: Command。
Command的进程从shell或cronb独立启动,和Http/Rpc服务相关的进程没有亲缘关系。因此Command进程以及从Command中启动的Process进程是没有办法拿到Swoole\server的调用句柄直接通过UnixSocket进行任务投递的。
为了为这种进程提供任务投递支持,Swoft利用了Swoole的Task进程的一个特殊功能—-消息队列。

同一个项目中Command和Http\RpcServer 通过约定一个message_queue_key
获取到系统内核中的同一条消息队列,然后Comand进程就可以通过该消息队列向Task进程投递任务了。
该机制没有提供对外的公开方法,仅仅被包含在Task::deliver()
方法中,Swoft会根据当前环境隐式切换投递方式。但该消息队列的实现依赖Semaphore拓展
,如果想使用,需要在编译PHP时加上--enable-sysvmsg
参数。
定时任务
除了手动执行的普通任务,Swoft还提供了精度为秒的定时任务功能用来在项目中替代Linux的Crontab功能.
Swoft用两个前置Process—任务计划进程:CronTimerProcess
和任务执行进程CronExecProcess
,和两张内存数据表—–RunTimeTable
(任务(配置)表)OriginTable
((任务)执行表)用于定时任务的管理调度。
两张表的每行记录的结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| \\Swoft\Task\Crontab\TableCrontab.php
private $originStruct = [ 'rule' => [\Swoole\Table::TYPE_STRING, 100], 'taskClass' => [\Swoole\Table::TYPE_STRING, 255], 'taskMethod' => [\Swoole\Table::TYPE_STRING, 255], 'add_time' => [\Swoole\Table::TYPE_STRING, 11], ];
private $runTimeStruct = [ 'taskClass' => [\Swoole\Table::TYPE_STRING, 255], 'taskMethod' => [\Swoole\Table::TYPE_STRING, 255], 'minute' => [\Swoole\Table::TYPE_STRING, 20], 'sec' => [\Swoole\Table::TYPE_STRING, 20], 'runStatus' => [\Swoole\TABLE::TYPE_INT, 4], ];
|
此处为何要使用Swoole的内存Table?
Swoft的的定时任务管理是分别由 任务计划进程和 任务执行进程进程负责的。两个进程的运行共同管理定时任务,如果使用进程间独立的array()
等结构,两个进程必然需要频繁的进程间通信。而使用跨进程的Table(本文的Table,除非特别说明,都指Swoole的Swoole\Table
结构)直接进行进程间数据共享,不仅性能高,操作简单 还解耦了两个进程。
为了Table能够在两个进程间共同使用,Table必须在Swoole Server启动前创建并分配内存。具体实现在Swoft\Task\Bootstrap\Listeners->onBeforeStart()
中,比较简单。
查看这两个定时任务进程的行为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
class CronTimerProcess implements ProcessInterface {
public function run(SwoftProcess $process) { $cron = App::getBean('crontab');
$server = App::$server->getServer();
$time = (60 - date('s')) * 1000; $server->after($time, function () use ($server, $cron) { // Every minute check all tasks, and prepare the tasks that next execution point needs $cron->checkTask(); $server->tick(60 * 1000, function () use ($cron) { $cron->checkTask(); }); }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
private function initRunTimeTableData(array $task, array $parseResult): bool { $runTimeTableTasks = $this->getRunTimeTable()->table;
$min = date('YmdHi'); $sec = strtotime(date('Y-m-d H:i')); foreach ($parseResult as $time) { $this->checkTaskQueue(false); $key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec); $runTimeTableTasks->set($key, [ 'taskClass' => $task['taskClass'], 'taskMethod' => $task['taskMethod'], 'minute' => $min, 'sec' => $time + $sec, 'runStatus' => self::NORMAL ]); }
return true; }
|
CronTimerProcess
是Swoft的定时任务调度进程,其核心方法是Crontab->initRunTimeTableData()
。
该进程使用了Swoole的定时器功能,通过Swoole\Timer
在每分钟首秒时执行的回调,CronTimerProcess
每次被唤醒后都会遍历任务表
计算出当前这一分钟内的60秒分别需要执行的任务清单,写入执行表并标记为 未执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
class CronExecProcess implements ProcessInterface {
public function run(SwoftProcess $process) { $pname = App::$server->getPname(); $process->name(sprintf('%s cronexec process', $pname));
$cron = App::getBean('crontab');
$server = App::$server->getServer();
$server->tick(0.5 * 1000, function () use ($cron) { $tasks = $cron->getExecTasks(); if (!empty($tasks)) { foreach ($tasks as $task) { Task::deliverByProcess($task['taskClass'], $task['taskMethod']); $cron->finishTask($task['key']); } } }); } }
|
CronExecProcess
作为定时任务的执行者,通过Swoole\Timer
每0.5s唤醒自身一次,然后把 执行表 遍历一次,选取当下需要执行的任务,通过sendMessage()
投递出去并更新该 任务执行表中的状态。
该执行进程只负责任务的投递,任务的实际实际执行仍然在Task进程中由TaskExecutor处理。
定时任务的宏观执行情况如下:
