理解 Laravel 队列

老牛浏览 31评论 0发表于 更新于

一、简介

Laravel 队列(Queue)的本质是一个生产者+消费者的异步任务调度模型:把任务序列化后放入存储容器(如 Redis 中),由常驻 worker 进程不断拉取并执行,并通过 timeoutretry_after 保证可靠性。从架构上可以拆成 4 个核心层:任务定义→入队→队列驱动→Worker 执行。

二、核心概念

2.1 任务定义(Job)

php
class SendEmail implements ShouldQueue
{
    public function handle()
    {
        // 具体执行逻辑
    }
}
  • 实现 ShouldQueue :表示这是一个异步任务

  • handle() :worker 真正执行的入口

2.2 队列驱动(Queue)

Laravel 支持多种驱动:

  • database(数据库表)

  • redis(推荐,性能最好)

  • sqs(AWS)

  • sync(同步执行)

它的本质是一个任务存储容器。

2.3 消费者(Worker)

bash
php artisan queue:work

它是一个常驻进程,循环做三件事:

  1. 从队列取任务

  2. 执行任务

  3. 处理成功/失败

2.4 调度器(Queue Manager)

Laravel 内部 QueueManager 会根据配置选择 driver(redis / db 等),统一 dispatch / pop / delete 行为。

三、运行流程

3.1 任务入队(生产者)

php
SendEmail::dispatch($user);

这一步 Laravel 会序列化 Job(包括参数),将其推送到队列(例如 Redis list)。

Redis 结构示例:

queues:default = [
    job_payload_1,
    job_payload_2
]

Payload 大致是 JSON:

json
{
    "job": "App\\Jobs\\SendEmail",
    "data": {...},
    "attempts": 0
}

3.2 Woker 拉取任务(消费者)

Worker 核心循环逻辑:

php
while (true) {
    $job = $queue->pop();
    if ($job) {
        $this->process($job);
    }
}

不同驱动行为不同,在 Redis 中 BRPOP(阻塞弹出)或 Lua 脚本保证原子性。

同时会放入 reserved 队列(处理中队列):

queues:default          // 待执行
queues:default:reserved // 已取出但未完成

3.3 执行任务

Worker 调用:

php
$job->fire();

反序列化 Job,调用 handle(),捕获异常。

3.4 成功/失败处理

成功:从 reserved 队列删除

失败:记录 attempts++,判断是否超过最大重试次数,如果没有超过重新放回队列(延迟),超过了写入 failed_jobs

四、关键机制

4.1 retry_after(防丢失)

config/queue.php 中配置:

php
'retry_after' => 90

用来避免任务丢失,当 worker 拿到任务后,如果 90 秒内没有删除(完成),则认为它“死了”,任务会被重新投递。

常见的丢失情况包括:

  • Worker 崩溃

  • 进程被 kill

  • 任务卡死

4.2 timeout(执行超时)

bash
php artisan queue:work --timeout=60

意思是任务最多执行 60 秒,否则 worker 强制结束该任务。

4.3 重试

php
public $tries = 3;
public $backoff = 10;

分别用于控制最大尝试次数和重试间隔。

4.4 延迟队列

php
SendEmail::dispatch()->delay(now()->addMinutes(5));

Redis 会放到:

queues:default:delayed

Worker 会定期迁移到主队列。

4.5 分布式锁(避免重复执行)

常见方案:

Cache::lock('job-key')->get(function () {
    // 执行任务
});

或使用:

  • Redis SETNX

  • Laravel WithoutOverlapping

五、Horizon 的作用

Laravel Horizon 是 Laravel 的队列管理 UI + Supervisor:

  • 管理 Worker 进程

  • 监控任务吞吐量

  • 自动扩缩容

  • 可视化失败任务

六、实际案例

有一个多语言的内容管理系统,当用户发表一篇文章后,系统自动调用翻译 API 将其翻译成多种语言,这里比较耗时的翻译任务就比较适合采用队列,在后台异步执行。

我们创建一个翻译任务:

app/Jobs/TranslateJob.php

php
<?php

namespace App\Jobs;

use App\Enums\Locale;
use App\Models\TranslationJob;
use Exception;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Support\Facades\Http;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Str;

class TranslateJob implements ShouldQueue
{
    use Queueable;

    public $timeout = 60 * 10;

    public $tries = 3;

    /**
     * Create a new job instance.
     */
    public function __construct(public TranslationJob $translationJob, public Locale $locale)
    {
        //
    }

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        $this->translationJob->markAsProcessing();

        $translatable = $this->translationJob->translatable;
        if (! $translatable) {
            throw new Exception('要翻译的内容不存在');
        }

        $result = Http::withHeaders([
            'Authorization' => 'Bearer '.config('services.translation.token'),
            'Content-Type' => 'application/json',
        ])
            ->timeout(60 * 9)
            ->post(config('services.translation.api'), [
                'content' => $translatable->content,
                'locale' => $this->locale,
            ])
            ->throw()
            ->json();

        $translatedContent = $result['content'] ?? null;
        if (empty($translatedContent)) {
            throw new Exception('翻译API返回内容为空');
        }

        $translatable->translations()->updateOrCreate([
            'locale' => $this->locale,
        ], ['content' => $translatedContent]);

        $this->translationJob->markAsDone();
    }

    public function failed($e)
    {
        Log::error('翻译失败', [
            'job_id' => $this->translationJob->id,
            'translatable_type' => $this->translationJob->translatable_type,
            'translatable_id' => $this->translationJob->translatable_id,
            'error' => $e->getMessage(),
        ]);

        $this->translationJob->markAsFailed(Str::limit($e->getMessage(), 200));
    }
}

任务核心逻辑在 handle() 方法中,Laravel 会捕获其是否抛出异常,如果没有异常,则任务成功;如果抛出异常,会执行 failed() 里的逻辑,任务失败,优雅结束。

这里有两个超时时间:

  1. Job 的 $timeout 10 分钟,它定义了单个任务最长允许执行多久。超时后,抛出异常,标记任务失败。和 Horizon timeout 配置的对比如下:

    维度

    Horizon timeout

    Job $timeout

    作用范围

    Supervisor 下所有任务

    单个 Job 类

    本质

    控制一个 Worker 最长能执行一个 Job 多久

    控制 Job 最长允许执行多久

    行为机制

    Horizon 启动 Worker 时记录执行时间

    Laravel 在执行 Job 时,通过 pcntl_alarm 或内部计时机制

    控制层级

    进程管理级别

    应用逻辑级别

    超时后的行为

    强制终止 Worker 进程,不会优雅结束 Job

    抛出异常(可捕获),任务失败(或进入 retry)

    优先级

    较低(作为兜底)

    较高(如果设置且小于 Horizon timeout)

    适用场景

    防止进程被长期占用

    为特定任务设置个性化超时

  2. 向翻译 API 发起的 HTTP 请求,响应时间不确定,我们设置超时时间为 9 分钟。若请求超时,也会抛出异常,从而标记任务失败。

此外,还有一个相关的参数,即队列的 retry_after ,用于控制:一个 Job 被取出后,最多可以占用多久,超过这个时间就认为它“死”了,重新放回队列。

它们的大小需要满足: http timeout < job timeout < horizon timeout < queue retry_after

  • http timeout < job timeout:确保 HTTP 请求先结束,避免阻塞无法中断进而拖死 Job。

  • job timeout < horizon timeout:确保可以在 Job 内部优雅失败,能进入 failed() 方法,而不是 Worker 粗暴终止。

  • horizon timeout < queue retry_after:确保 worker 在 retry_after 之前结束任务,避免任务被重新投入队列。

生产环境下建议每一层值之间预留 30~60 秒以上的缓冲时间。因此一个合理的值是:

  • http timeout: 60 * 9

  • job timeout: 60 * 10

  • horizon timeout: 60 * 11

  • queue retry_after: 60 * 12

点赞
收藏
暂无评论,快来发表评论吧~