环境:RabbitMQ + Lumen 5.5,消费者处理逻辑是在处理完消息之后 ack。
Worker 源码路径 。
今天突然发现一个 qa 环境的一个队列消息累积了几百万,最终发现是因为消费消息的 Worker 进程没有处理完消息就退出了处理,而且没有任何的记录。看源码发现 worker 里面有个 kill
函数,里面执行了 exit()
函数,我们都知道,这个函数是退出进程的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
/** * Kill the process. * * @param int $status * @return void */ public function kill($status = 0) { $this->events->dispatch(new Events\WorkerStopping); if (extension_loaded(‘posix’)) { exit($status); |
退出其实问题不大,但是这个退出的逻辑并没有标记这个 job 失败,这就导致下一次 Worker 进程启动的时候,继续拿到这个消息处理,然后处理到一定时候,又退出,如此无限循环。
Worker exit 的原因
我们可以看看 Worker 里面所有 exit 的调用,其实有两个地方:
- stopIfNecessary,kill 和 stop 函数最终都是 exit。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
/** * Stop the process if necessary. * * @param \Illuminate\Queue\WorkerOptions $options * @param int $lastRestart */ protected function stopIfNecessary(WorkerOptions $options, $lastRestart) { if ($this->shouldQuit) { $this->kill(); } if ($this->memoryExceeded($options->memory)) { |
shouldQuit 是在一些连接断开的时候被设置为 true,又或者用户发送了 SIGTERM 信号给 Worker。
- registerTimeoutHandler,超时处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
/** * Register the worker timeout handler (PHP 7.1+). * * @param \Illuminate\Contracts\Queue\Job|null $job * @param \Illuminate\Queue\WorkerOptions $options * @return void */ protected function registerTimeoutHandler($job, WorkerOptions $options) { if ($this->supportsAsyncSignals()) { // We will register a signal handler for the alarm signal so that we can kill this // process if it is running too long because it has frozen. This uses the async // signals supported in recent versions of PHP to accomplish it conveniently. pcntl_signal(SIGALRM, function () { $this->kill(1); }); pcntl_alarm( |
所以总结一下有以下几个原因:
- 数据库连接或其他连接断开
- Worker 接收到 SIGTERM 信号
- Worker 处理完一个 Job 之后发现 Worker 占用的内存超出了指定的内存
- 用户执行了队列重启命令
- Worker 执行时间超出了 timeout
exit 的影响
1、如果使用的是 redis
如果这个消息需要消费者处理的时间大于指定的 timeout,会导致消息没处理完就丢失。
2、如果使用的是 RabbitMQ,并且关闭了 AutoAck
Worker 进程没处理完就退出,然后消息还在队列中,下次启动 Worker 的时候继续消费这个消息,导致无限的重复消费。
解决方案
- 评估一下 Job 的最大运行时间,设置一个合适的 timeout,这个是必须的。
- 可以监听 WorkerStopping 事件,记录 Worker 异常退出的日志,但是需要注意的是,正常退出也会 fire 这个事件。所以有可能没有办法根据 log 来判断是否是异常退出(超时)。
- 这是 5.8 以下版本的 bug,我们可以升级到 5.8 以上的版本,在新版本中超时也会记录为失败,而不是单纯地退出。
这是个 bug
我们通过上面的代码也可以发现,其实 Worker 的超时回调其实并没有多少实际的处理,dispatch 一个 WorkerStopping 事件然后就 exit 了。但是我们有可能并没有监听这个事件,这就导致了 Worker 存在 timeout 过短的问题难以被及时发现。
其实这个 5.8 版本以下的 bug,在 5.8 以上的版本中这个已经修复了,超时的时候,Job 会被标记为超时,超过重试次数就被记录为失败的 job。
源码可在 https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Worker.php#L137 查看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
/** * Register the worker timeout handler. * * @param \Illuminate\Contracts\Queue\Job|null $job * @param \Illuminate\Queue\WorkerOptions $options * @return void */ protected function registerTimeoutHandler($job, WorkerOptions $options) { // We will register a signal handler for the alarm signal so that we can kill this // process if it is running too long because it has frozen. This uses the async // signals supported in recent versions of PHP to accomplish it conveniently. pcntl_signal(SIGALRM, function () use ($job, $options) { if ($job) { $this->markJobAsFailedIfWillExceedMaxAttempts( $job->getConnectionName(), $job, (int) $options->maxTries, $this->maxAttemptsExceededException($job) ); } $this->kill(1); pcntl_alarm( |
其他问题
- Job 里面写死了 timeout 属性,会以这个 timeout 为准。
在调试过程中,发现明明 php artisan queue:work --timeout=
这里设置的 timeout 足够大了,但是 Worker 还是和原来一样退出了。在 RabbitMQ 的控制台发现消息里面有记录 Job 的 timeout 属性,然后 Worker 里面在判断到如果 job 里面有 timeout 属性的时候,就不会再使用命令行传递的 timeout。
1 2 3 4 5 6 7 8 9 10 11 |
/** * Get the appropriate timeout for the given job. * * @param \Illuminate\Contracts\Queue\Job|null $job * @param \Illuminate\Queue\WorkerOptions $options * @return int */ protected function timeoutForJob($job, WorkerOptions $options) { return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout; } |