剖析 Laravel 队列系统–Worker

1,190次阅读
没有评论

译文 GitHub https://github.com/yuansir/diving-laravel-…

原文链接 https://divinglaravel.com/queue-system/wor…

现在,我们知道了 Laravel 如何将作业推到不同的队列中,让我们来深入了解 workers 如何运作你的作业。 首先,我将 workers 定义为一个在后台运行的简单 PHP 进程,目的是从存储空间中提取作业并针对多个配置选项运行它们。

php artisan queue:work
运行此命令将指示 Laravel 创建应用程序的一个实例并开始执行作业,这个实例将一直存活着,启动 Laravel 应用程序的操作只在运行命令时发生一次,同一个实例将被用于执行你的作业,这意味着:

避免在每个作业上启动整个应用程序来节省服务器资源。
在应用程序中所做的任何代码更改后必须手动重启 worker。
你也可以这样运行:

php artisan queue:work –once
这将启动应用程序的一个实例,处理单个作业,然后干掉脚本。

php artisan queue:listen
queue:listen 命令相当于无限循环地运行 queue:work –once 命令,这将导致以下问题:

每个循环都会启动一个应用程序实例。
分配的 worker 将选择一个工作并执行。
worker 进程将被干掉。
使用 queue:listen 确保为每个作业创建一个新的应用程序实例,这意味着代码更改以后不必手动重启 worker,同时也意味着将消耗更多的服务器资源。

queue:work 命令
我们来看看 Queue\Console\WorkCommand 类的 handle() 方法,这是当你运行 php artisan queue:work 时会执行的方法:

public function handle()
{
if ($this->downForMaintenance() && $this->option(‘once’)) {
return $this->worker->sleep($this->option(‘sleep’));
}

$this->listenForEvents();

$connection = $this->argument('connection')
                ?: $this->laravel['config']['queue.default'];

$queue = $this->getQueue($connection);

$this->runWorker(
    $connection, $queue
);

}
首先,我们检查应用程序是否处于维护模式,并使用 –once 选项,在这种情况下,我们希望脚本正常运行,因此我们不执行任何作业,我们只需要在完全杀死脚本前让 worker 在一段时间内休眠。

Queue\Worker 的 sleep() 方法看起来像这样:

public function sleep($seconds)
{
sleep($seconds);
}
为什么我们不能在 handle () 方法中返回 null 来终止脚本?
如前所述, queue:listen 命令在循环中运行 WorkCommand :

while (true) {
// This process simply calls ‘php artisan queue:work –once’
$this->runProcess($process, $options->memory);
}
如果应用程序处于维护模式,并且 WorkCommand 立即终止,这将导致循环结束,下一个在很短的时间内启动,最好在这种情况下导致一些延迟,而不是通过创建我们不会真正使用的大量应用程序实例。

监听事件
在 handle() 方法里面我们调用 listenForEvents() 方法:

protected function listenForEvents()
{
$this->laravel[‘events’]->listen(JobProcessing::class, function ($event) {
$this->writeOutput($event->job, ‘starting’);
});

$this->laravel['events']->listen(JobProcessed::class, function ($event) {
    $this->writeOutput($event->job, 'success');
});

$this->laravel['events']->listen(JobFailed::class, function ($event) {
    $this->writeOutput($event->job, 'failed');

    $this->logFailedJob($event);
});

}
在这个方法中我们会监听几个事件,这样我们可以在每次作业处理中,处理完或处理失败时向用户打印一些信息。

记录失败作业
一旦作业失败 logFailedJob() 方法会被调用

$this->laravel[‘queue.failer’]->log(
$event->connectionName, $event->job->getQueue(),
$event->job->getRawBody(), $event->exception
);
queue.failer 容器别名在 Queue\QueueServiceProvider::registerFailedJobServices() 中注册:

protected function registerFailedJobServices()
{
$this->app->singleton(‘queue.failer’, function () {
$config = $this->app[‘config’][‘queue.failed’];

    return isset($config['table'])
                ? $this->databaseFailedJobProvider($config)
                : new NullFailedJobProvider;
});

}

/**

  • Create a new database failed job provider.
    *
  • @param array $config
  • @return \Illuminate\Queue\Failed\DatabaseFailedJobProvider
    */
    protected function databaseFailedJobProvider($config)
    {
    return new DatabaseFailedJobProvider(
    $this->app[‘db’], $config[‘database’], $config[‘table’]
    );
    }
    如果配置了 queue.failed ,则将使用数据库队列失败,并将有关失败作业的信息简单地存储在数据库表中的:

$this->getTable()->insertGetId(compact(
‘connection’, ‘queue’, ‘payload’, ‘exception’, ‘failed_at’
));
运行 worker
要运行 worker,我们需要收集两条信息:

worker 的连接信息从作业中提取
worker 找到作业的队列
如果没有使用 queue.default 配置定义的默认连接。您可以为 queue:work 命令提供 –connection=default 选项。

队列也是一样,您可以提供一个 –queue=emails 选项,或选择连接配置中的 queue 选项。一旦这一切完成, WorkCommand::handle() 方法会运行 runWorker():

protected function runWorker($connection, $queue)
{
$this->worker->setCache($this->laravel[‘cache’]->driver());

return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
    $connection, $queue, $this->gatherWorkerOptions()
);

}
在 worker 类属性在命令构造后设置:

public function __construct(Worker $worker)
{
parent::__construct();

$this->worker = $worker;

}
容器解析 Queue\Worker 实例,在 runWorker() 中我们设置了 worker 将使用的缓存驱动,我们也根据 –once 命令来决定我们调用什么方法。

如果使用 –once 选项,我们只需调用 runNextJob 来运行下一个可用的作业,然后脚本就会终止。 否则,我们将调用 daemon 方法来始终保持进程处理作业。

在开始工作时,我们使用 gatherWorkerOptions() 方法收集用户给出的命令选项,我们稍后会提供这些选项,这个工具是 runNextJob 或 daemon 方法。

protected function gatherWorkerOptions()
{
return new WorkerOptions(
$this->option(‘delay’), $this->option(‘memory’),
$this->option(‘timeout’), $this->option(‘sleep’),
$this->option(‘tries’), $this->option(‘force’)
);
}
守护进程
让我看看 Worker::daemon() 方法,这个方法的第一行调用了 Worker::daemon() 方法

protected function listenForSignals()
{
if ($this->supportsAsyncSignals()) {
pcntl_async_signals(true);

    pcntl_signal(SIGTERM, function () {
        $this->shouldQuit = true;
    });

    pcntl_signal(SIGUSR2, function () {
        $this->paused = true;
    });

    pcntl_signal(SIGCONT, function () {
        $this->paused = false;
    });
}

}
这种方法使用 PHP7.1 的信号处理, supportsAsyncSignals() 方法检查我们是否在 PHP7.1 上,并加载 pcntl 扩展名。

之后 pcntl_async_signals() 被调用来启用信号处理,然后我们为多个信号注册处理程序:

当脚本被指示关闭时,会引发 SIGTERM。
SIGUSR2 是用户定义的信号,Laravel 用来表示脚本应该暂停。
当暂停的脚本继续进行时,会引发 SIGCONT。
这些信号从 Process Monitor(如 Supervisor )发送并与我们的脚本进行通信。

Worker::daemon() 方法中的第二行读取最后一个队列重新启动的时间戳,当我们调用 queue:restart 时该值存储在缓存中,稍后我们将检查是否和上次重新启动的时间戳不符合,来指示 worker 在之后多次重启。

最后,该方法启动一个循环,在这个循环中,我们将完成其余获取作业的 worker,运行它们,并对 worker 进程执行多个操作。

while (true) {
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
$this->pauseWorker($options, $lastRestart);

    continue;
}

$job = $this->getNextJob(
    $this->manager->connection($connectionName), $queue
);

$this->registerTimeoutHandler($job, $options);

if ($job) {
    $this->runJob($job, $connectionName, $options);
} else {
    $this->sleep($options->sleep);
}

$this->stopIfNecessary($options, $lastRestart);

}
确定 worker 是否应该处理作业
调用 daemonShouldRun() 检查以下情况:

应用程序不处于维护模式
Worker 没有暂停
没有事件监听器阻止循环继续
如果应用程序在维护模式下,worker 使用 –force 选项仍然可以处理作业:

php artisan queue:work –force
确定 worker 是否应该继续的条件之一是:

$this->events->until(new Events\Looping($connectionName, $queue)) === false)
这行触发 Queue\Event\Looping 事件,并检查是否有任何监听器在 handle() 方法中返回 false,这种情况下你可以强制您的 workers 暂时停止处理作业。

如果 worker 应该暂停,则调用 pauseWorker() 方法:

protected function pauseWorker(WorkerOptions $options, $lastRestart)
{
$this->sleep($options->sleep > 0 ? $options->sleep : 1);

$this->stopIfNecessary($options, $lastRestart);

}
sleep 方法并传递给控制台命令的 –sleep 选项,这个方法调用

public function sleep($seconds)
{
sleep($seconds);
}
脚本休眠了一段时间后,我们检查 worker 是否应该在这种情况下退出并杀死脚本,稍后我们看一下 stopIfNecessary 方法,以防脚本不能被杀死,我们只需调用 continue; 开始一个新的循环:

if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
$this->pauseWorker($options, $lastRestart);

continue;

}
Retrieving 要运行的作业
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
getNextJob() 方法接受一个队列连接的实例,我们从队列中获取作业

protected function getNextJob($connection, $queue)
{
try {
foreach (explode(‘,’, $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
return $job;
}
}
} catch (Exception $e) {
$this->exceptions->report($e);

    $this->stopWorkerIfLostConnection($e);
}

}
我们简单地循环给定的队列,使用选择的队列连接从存储空间(数据库,redis,sqs,…)获取作业并返回该作业。

要从存储中 retrieve 作业,我们查询满足以下条件的最旧作业:

推送到 queue ,我们试图从中找到作业
没有被其他 worker reserved
可以在给定的时间内运行,有些作业在将来被推迟运行
我们也取到了很久以来被冻结的作业并重试
一旦我们找到符合这一标准的作业,我们将这个作业标记为 reserved,以便其他 workers 获取到,我们还会增加作业监控次数。

监控作业超时
下一个作业被 retrieved 之后,我们调用 registerTimeoutHandler() 方法:

protected function registerTimeoutHandler($job, WorkerOptions $options)
{
if ($this->supportsAsyncSignals()) {
pcntl_signal(SIGALRM, function () {
$this->kill(1);
});the

    $timeout = $this->timeoutForJob($job, $options);

    pcntl_alarm($timeout > 0 ? $timeout + $options->sleep : 0);
}

}
再次,如果 pcntl 扩展被加载,我们将注册一个信号处理程序干掉 worker 进程如果该作业超时的话,在配置了超时之后我们使用 pcntl_alarm() 来发送一个 SIGALRM 信号。

如果作业所花费的时间超过了超时值,处理程序将会终止该脚本,如果不是该作业将通过,并且下一个循环将设置一个新的报警覆盖第一个报警,因为进程中可能存在单个报警。

作业只在 PHP7.1 以上起效,在 window 上也无效 ¯(ツ)

处理作业
runJob() 方法调用 process():

public function process($connectionName, $job, WorkerOptions $options)
{
try {
$this->raiseBeforeJobEvent($connectionName, $job);

    $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
        $connectionName, $job, (int) $options->maxTries
    );

    $job->fire();

    $this->raiseAfterJobEvent($connectionName, $job);
} catch (Exception $e) {
    $this->handleJobException($connectionName, $job, $options, $e);
}

}
raiseBeforeJobEvent() 触发 Queue\Events\JobProcessing 事件,raiseAfterJobEvent() 触发 Queue\Events\JobProcessed 事件。 markJobAsFailedIfAlreadyExceedsMaxAttempts() 检查进程是否达到最大尝试次数,并将该作业标记为失败:

protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
$maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

if ($maxTries === 0 || $job->attempts() <= $maxTries) {
    return;
}

$this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
    'A queued job has been attempted too many times. The job may have previously timed out.'
));

throw $e;

}
否则我们在作业对象上调用 fire() 方法来运行作业。

从哪里获取作业对象
getNextJob() 方法返回一个 Contracts\Queue\Job 的实例,这取决于我们使用相应的 Job 实例的队列驱动程序,例如如果数据库队列驱动则选择 Queue\Jobs\DatabaseJob 。

循环结束
在循环结束时,我们调用 stopIfNecessary() 来检查在下一个循环开始之前是否应该停止进程:

protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
{
if ($this->shouldQuit) {
$this->kill();
}

if ($this->memoryExceeded($options->memory)) {
    $this->stop(12);
} elseif ($this->queueShouldRestart($lastRestart)) {
    $this->stop();
}

}
shouldQuit 属性在两种情况下设置,首先 listenForSignals() 内部的作为 SIGTERM 信号处理程序,其次在 stopWorkerIfLostConnection() 中

protected function stopWorkerIfLostConnection($e)
{
if ($this->causedByLostConnection($e)) {
$this->shouldQuit = true;
}
}
在 retrieving 和处理作业时,会在几个 try … catch 语句中调用此方法,以确保 worker 应该处于被干掉的状态,以便我们的 Process Control 可能会启动一个新的数据库连接。

causedByLostConnection() 方法可以在 Database\DetectsLostConnections trait 中找到。
memoryExceeded() 检查内存使用情况是否超过当前设置的内存限制,您可以使用 –memory 选项设置限制。

正文完
可以使用微信扫码关注公众号(ID:xzluomor)
post-qrcode
 0
评论(没有评论)

文心AIGC

2023 年 11 月
 12345
6789101112
13141516171819
20212223242526
27282930  
文心AIGC
文心AIGC
人工智能ChatGPT,AIGC指利用人工智能技术来生成内容,其中包括文字、语音、代码、图像、视频、机器人动作等等。被认为是继PGC、UGC之后的新型内容创作方式。AIGC作为元宇宙的新方向,近几年迭代速度呈现指数级爆发,谷歌、Meta、百度等平台型巨头持续布局
文章搜索
热门文章
潞晨尤洋:日常办公没必要上私有模型,这三类企业才需要 | MEET2026

潞晨尤洋:日常办公没必要上私有模型,这三类企业才需要 | MEET2026

潞晨尤洋:日常办公没必要上私有模型,这三类企业才需要 | MEET2026 Jay 2025-12-22 09...
共推空天领域智能化升级!趋境科技与金航数码强强联手

共推空天领域智能化升级!趋境科技与金航数码强强联手

共推空天领域智能化升级!趋境科技与金航数码强强联手 十三 2025-12-09 18:18:41 来源:量子位...
起底“豆包手机”:核心技术探索早已开源,GUI Agent布局近两年,“全球首款真正的AI手机”

起底“豆包手机”:核心技术探索早已开源,GUI Agent布局近两年,“全球首款真正的AI手机”

起底“豆包手机”:核心技术探索早已开源,GUI Agent布局近两年,“全球首款真正的AI手机” 西风 202...
面向「空天具身智能」,北航团队提出星座规划新基准丨NeurIPS’25

面向「空天具身智能」,北航团队提出星座规划新基准丨NeurIPS’25

面向「空天具身智能」,北航团队提出星座规划新基准丨NeurIPS’25 鹭羽 2025-12-13 22:37...
5天连更5次,可灵AI年末“狂飙式”升级

5天连更5次,可灵AI年末“狂飙式”升级

5天连更5次,可灵AI年末“狂飙式”升级 思邈 2025-12-10 14:28:37 来源:量子位 让更大规...
最新评论
ufabet ufabet มีเกมให้เลือกเล่นมากมาย: เกมเดิมพันหลากหลาย ครบทุกค่ายดัง
tornado crypto mixer tornado crypto mixer Discover the power of privacy with TornadoCash! Learn how this decentralized mixer ensures your transactions remain confidential.
ดูบอลสด ดูบอลสด Very well presented. Every quote was awesome and thanks for sharing the content. Keep sharing and keep motivating others.
ดูบอลสด ดูบอลสด Pretty! This has been a really wonderful post. Many thanks for providing these details.
ดูบอลสด ดูบอลสด Pretty! This has been a really wonderful post. Many thanks for providing these details.
ดูบอลสด ดูบอลสด Hi there to all, for the reason that I am genuinely keen of reading this website’s post to be updated on a regular basis. It carries pleasant stuff.
Obrazy Sztuka Nowoczesna Obrazy Sztuka Nowoczesna Thank you for this wonderful contribution to the topic. Your ability to explain complex ideas simply is admirable.
ufabet ufabet Hi there to all, for the reason that I am genuinely keen of reading this website’s post to be updated on a regular basis. It carries pleasant stuff.
ufabet ufabet You’re so awesome! I don’t believe I have read a single thing like that before. So great to find someone with some original thoughts on this topic. Really.. thank you for starting this up. This website is something that is needed on the internet, someone with a little originality!
ufabet ufabet Very well presented. Every quote was awesome and thanks for sharing the content. Keep sharing and keep motivating others.
热评文章
小冰之父李笛智能体创业,公司取名Nextie!陆奇是股东

小冰之父李笛智能体创业,公司取名Nextie!陆奇是股东

小冰之父李笛智能体创业,公司取名Nextie!陆奇是股东 Jay 2025-12-09 08:26:01 来源...
梁文锋,Nature全球年度十大科学人物!

梁文锋,Nature全球年度十大科学人物!

梁文锋,Nature全球年度十大科学人物! 一水 2025-12-09 09:46:23 来源:量子位 来自安...
起底“豆包手机”:核心技术探索早已开源,GUI Agent布局近两年,“全球首款真正的AI手机”

起底“豆包手机”:核心技术探索早已开源,GUI Agent布局近两年,“全球首款真正的AI手机”

起底“豆包手机”:核心技术探索早已开源,GUI Agent布局近两年,“全球首款真正的AI手机” 西风 202...
摩尔线程新一代GPU架构10天后发布

摩尔线程新一代GPU架构10天后发布

摩尔线程新一代GPU架构10天后发布 思邈 2025-12-09 15:46:09 来源:量子位 国内首个聚焦...
极客公园创新大会 2026在京落幕,罗永浩、张楠、何小鹏、刘靖康等共议 AI 时代「进程由我」

极客公园创新大会 2026在京落幕,罗永浩、张楠、何小鹏、刘靖康等共议 AI 时代「进程由我」

极客公园创新大会 2026在京落幕,罗永浩、张楠、何小鹏、刘靖康等共议 AI 时代「进程由我」 henry 2...