Laravel + Swoole 协程实战:一个多租户采集系统的真实落地方案

作为 PHP 开发者,我们迟早都会遇到一个瞬间:

代码没有报错,逻辑也没问题,但系统就是——慢。

不是慢一点,是那种

「明明只是检查 1000 个账号状态,却要跑半个小时」

「服务器 CPU 不高,但请求就是排着队等」

的慢。

最近搞项目刚好遇到了一个业务场景,多租户系统中,根据账号采集数据,采集前先要确认账号是否有效(就像根据IP代理爬数据一样,爬数据前先确认代理是否有效),于是我按照以往的经验,很自然的写了这样一条链路:

  1. 循环取出一个账号

  2. 检测状态

  3. 拉数据

  4. 账号异常,再换下一个账号,重复执行1、2、3

逻辑看似没问题,代码也很优雅。直到我测试的时候,跑了一次完整的任务。1000个账号,跑了30分钟还没跑完。我查看了CPU,内存也不高,但就是很慢。为什么呢?

问题不在算力,而在于PHP在大量IO等待中,被迫排队!

这个时候就是协程(Coroutine)大显身手的时候了。下文将结合业务和实战(编码)深入理解协程,并展示如何在Laravel中构建基于协程的高性能多租户采集系统。

一、协程基础:重新认识PHP并发编程

在介绍协程前,咱们先得明白一件事儿:

协程不会使CPU算的更快!

它解决的是“等待问题”,而不是“计算问题”。

只要系统大量在做以下几件事:

  • 调第三方API

  • 查数据库

  • 读写文件

  • 等网络相应

协程就有发挥空间。废话不多说

1.1 什么是协程?

协程是用户态的轻量级线程,这句话可能有点抽象,让我们通过一个简单的比喻来理解:

想象一个餐厅厨房:

  • 传统多进程/多线程:雇佣多个厨师(进程/线程),每个厨师有自己的工作台和工具,切换成本高(内存开销大)

  • 协程:一个超级厨师(单线程),但可以同时照看多个锅(协程),当一个锅在炖煮时(等待IO),就切换到另一个锅翻炒

技术定义:协程是一种比线程更轻量级的存在,由程序自身控制调度,在用户态进行上下文切换,避免了内核态切换的开销。

协程不是让 PHP 变快,而是让 PHP 不再傻等 IO

1.2 为什么选择协程?三大核心优势

1. 极低的资源消耗

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

// 对比:创建1000个并发任务

$threads = [];

for ($i = 0; $i < 1000; $i++) {

    $threads[] = new Thread(fn() => processTask($i)); // 每个线程约2MB内存

}

// 总内存:1000 * 2MB = 2GB



// 协程方式

Coroutine\run(function () {

    for ($i = 0; $i < 1000; $i++) {

        Coroutine::create(fn() => processTask($i)); // 每个协程约2KB内存

    }

});

// 总内存:1000 * 2KB = 2MB

2. 高效的IO并发

传统同步代码在等待数据库查询、API响应时会阻塞整个进程。协程可以在等待时让出控制权,处理其他任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

// 传统同步方式(串行执行,总耗时 = 各任务耗时之和)

function fetchUsers() {

    $result1 = $http->get('/api/users/1'); // 等待100ms

    $result2 = $http->get('/api/users/2'); // 等待100ms

    // 总耗时约200ms

}



// 协程方式(并发执行)

Coroutine\run(function () {

    $chan = new Channel(2);

    

    Coroutine::create(function () use ($chan) {

        $result1 = $http->get('/api/users/1');

        $chan->push($result1);

    });

    

    Coroutine::create(function () use ($chan) {

        $result2 = $http->get('/api/users/2');

        $chan->push($result2);

    });

    

    // 总耗时约100ms(同时发起请求)

});

3. 同步的编程体验,异步的执行效率

协程最大的魅力在于:用同步的代码风格,获得异步的执行性能。不需要复杂的回调地狱(Callback Hell),代码可读性大大提升。

1.3 协程 vs 其他并发方案对比

| 特性 | 多进程 | 多线程 | 异步回调 | 协程 |

| ———- | ——— | ——— | ——– | ———— |

| 并发能力 | 高 | 高 | 高 | 极高 |

| 内存消耗 | 高 | 中 | 低 | 极低 |

| 开发复杂度 | 低 | 中 | 高 | |

| 调试难度 | 低 | 高 | 高 | |

| 适用场景 | CPU密集型 | CPU密集型 | IO密集型 | IO密集型 |

二、Laravel中的协程集成:Swoole实战

2.1 环境配置:让Laravel飞起来

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 1. 安装Swoole扩展

pecl install swoole



# 2. 验证安装

php --ri swoole



# 3. 配置php.ini

extension=swoole.so



# 4. 安装Laravel Octane(可选,但推荐)

composer require laravel/octane

php artisan octane:install

2.2 协程核心概念:Channel与WaitGroup

在理解我们的采集系统之前,先掌握两个关键概念:

Channel(通道):协程间的通信管道

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

use Swoole\Coroutine;

use Swoole\Coroutine\Channel;



// 创建容量为10的通道

$chan = new Channel(10);



// 生产者协程

Coroutine::create(function () use ($chan) {

    for ($i = 0; $i < 100; $i++) {

        $chan->push($i); // 生产数据

        echo "生产: $i\n";

        Coroutine::sleep(0.1); // 模拟耗时

    }

    $chan->close(); // 生产完毕,关闭通道

});



// 消费者协程

Coroutine::create(function () use ($chan) {

    while ($data = $chan->pop()) { // 消费数据,通道空时自动挂起

        echo "消费: $data\n";

    }

    echo "所有数据处理完毕\n";

});

WaitGroup(等待组):协程同步工具

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

use Swoole\Coroutine\WaitGroup;



$wg = new WaitGroup();

$results = [];



for ($i = 0; $i < 10; $i++) {

    $wg->add(); // 增加计数

    Coroutine::create(function () use ($i, $wg, &$results) {

        try {

            // 模拟任务处理

            Coroutine::sleep(0.1);

            $results[$i] = "任务{$i}完成";

        } finally {

            $wg->done(); // 任务完成,减少计数

        }

    });

}



$wg->wait(); // 等待所有任务完成

print_r($results); // 所有任务完成后执行

三、实战:多租户采集系统设计

3.1 业务场景解析

回到文章开头提到的业务场景:

  1. 多租户系统:每个租户独立,有不同的采集配置和限制

  2. 账号分组:每个租户下有多个账号分组,每组包含若干账号

  3. 状态检测:采集前需要验证账号有效性

  4. 并发控制:需要限制单个租户的并发数,避免API被限流

  5. 任务管理:需要跟踪采集任务状态、进度和结果

3.2 架构设计:协程驱动的四层架构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

┌─────────────────────────────────────────────┐

│                 Web层(控制器)              │

│  - 接收采集请求                             │

│  - 参数验证                                 │

│  - 返回任务ID                               │

└─────────────────┬───────────────────────────┘


┌─────────────────▼───────────────────────────┐

│             任务调度层(Command)            │

│  - 定时扫描待处理任务                        │

│  - 动态创建协程池                           │

│  - 预热账号数据                             │

└─────────────────┬───────────────────────────┘


┌─────────────────▼───────────────────────────┐

│             业务逻辑层(Service)            │

│  - 账号状态检测                             │

│  - 数据采集逻辑                             │

│  - 数据存储                                 │

└─────────────────┬───────────────────────────┘


┌─────────────────▼───────────────────────────┐

│             基础设施层                       │

│  - 协程池(CoroutinePool)                  │

│  - 协程工具(CoroutineUtils)               │

│  - 账号缓存管理                             │

└─────────────────────────────────────────────┘

3.3 关键代码解读:从理论到实现

3.3.1 协程池(CoroutinePool) - 资源控制器

协程池的核心作用是限制并发数,避免系统过载。以下是简化版的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94

namespace App\Concurrency;



use Swoole\Coroutine;

use Swoole\Coroutine\Channel;



class CoroutinePool

{

    private $size;      // 池子大小(最大并发数)

    private $channel;   // 控制并发的通道

    private $count = 0; // 当前活跃协程数

    

    public function __construct(int $size)

    {

        $this->size = $size;

        // 创建容量为$size的通道,实现并发控制

        $this->channel = new Channel($size);

    }

    

    public function add(callable $task): void

    {

        // 关键:当并发数达到上限时,这里会阻塞,直到有协程完成

        // 这里的 channel 不是用来传数据的,而是用来“占坑位”的

		// 每 push 一次,代表占用一个并发名额

		// 当 channel 满了,push 会阻塞,新的任务进不来

        $this->channel->push(true);

        

        Coroutine::create(function () use ($task) {

            try {

                $task(); // 执行用户任务

            } finally {

                // 任务完成,释放一个位置

                $this->channel->pop();

                $this->count--;

            }

        });

        

        $this->count++;

    }

    

    public function close(): void

    {

        // 等待所有任务完成

        while ($this->count > 0) {

            Coroutine::sleep(0.1);

        }

    }

}

设计要点

  1. 通道作为信号量$channel->push()$channel->pop()实现了经典的信号量模式

  2. 优雅的资源释放:使用try...finally确保任务异常时也能释放资源

  3. 阻塞式等待:当并发数满时,新任务会阻塞而非拒绝,更适合队列场景

3.3.2 任务运行器(FetchBloggerTaskRunner) - 调度器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

public function handle()

{

    // 1. 设置定时器,定期检查数据库中的新任务

    Timer::tick(self::CHECK_INTERVAL * 1000, function () {

        Coroutine::create(function () {

            $tasks = $this->getUnHandleTasks();

            if ($tasks->isEmpty()) return;

            

            // 2. 创建协程池,限制最大并发

            $pool = new CoroutinePool(20);

            $pool->run();

            

            // 3. 异步预热账号(不阻塞主流程)

            $accountGroupIds = $tasks->pluck('account_group_id')->unique()->toArray();

            $manager = new AccountWarmupManager('', '', 10);

            $manager->asyncWarmupByGroupIds($accountGroupIds);

            

            // 4. 标记任务为处理中

            YkCollectBloggerTaskModel::whereIn('id', $tasks->pluck('id'))

                ->update(['status' => 'handing']);

            

            // 5. 为每个任务创建采集协程

            foreach ($tasks as $task) {

                // 分布式锁:避免重复处理

                $lock = Cache::lock("lock:fetch_blogger_data_task:{$task->id}", 300);

                if (!$lock->get()) continue;

                

                $pool->add(function () use ($task, $lock) {

                    try {

                        $service = new FetchBloggerTaskService($task);

                        $service->handle();

                    } finally {

                        $lock->release(); // 释放锁

                    }

                });

            }

            

            $pool->close(); // 等待所有任务完成

        });

    });

    

    Event::wait(); // 关键:阻塞主进程,让协程有机会执行

}

代码分析

  1. Timer::tick定时器:替代传统的while+sleep循环,更高效

  2. 协程内查询数据库:避免阻塞事件循环

  3. 异步预热:账号预热不阻塞任务分发

  4. 分布式锁:在多进程部署时避免任务重复执行

  5. Event::wait():维持进程运行,让定时器持续工作

3.3.3 账号预热管理器(AccountWarmupManager) - 缓存优化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

public function asyncWarmupByGroupIds(array $groupIds): void

{

    Coroutine::create(function () use ($groupIds) {

        // 1. 创建控制并发的通道

        $chan = new Channel($this->maxConcurrency);

        

        // 2. 分页查询账号,避免一次性加载过多数据

        $page = 1;

        while (true) {

            $accounts = YkAccountModel::query()

                ->whereIn('group_id', $groupIds)

                ->forPage($page, $this->pageSize)

                ->get();

            

            if ($accounts->isEmpty()) break;

            

            // 3. 并发验证每个账号

            foreach ($accounts as $account) {

                // 跳过已缓存的账号

                if (Redis::exists(self::VALID_ACCOUNT_CACHE_KEY . $account->id)) {

                    continue;

                }

                

                $chan->push(true); // 控制并发

                Coroutine::create(function () use ($account, $chan) {

                    try {

                        $this->checkAndCacheAccount($account->toArray());

                    } finally {

                        $chan->pop(); // 释放并发位

                    }

                });

            }

            

            $page++;

        }

    });

}

优化技巧

  1. 分页查询:避免一次性加载大量数据到内存

  2. 缓存跳过:已验证的账号跳过重复检查

  3. 并发控制:限制同时验证的账号数,保护API

  4. 异步执行:预热过程不阻塞主流程

3.3.4 抽象采集器(AbstractBloggerCollector) - 模板方法模式

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102

abstract class AbstractBloggerCollector

{

    // 模板方法:定义采集算法骨架

    public function execute(): int

    {

        $accountIndex = 0;

        $number = 0;

        $maxId = '';

        

        // 循环使用账号,直到采集完成

        while ($accountIndex < count($this->accounts)) {

            $account = $this->accounts[$accountIndex] ?? [];

            

            // 1. 检查账号有效性

            $checkAccount = $manager->getValidAccount($account['id']);

            if (empty($checkAccount) || empty($checkAccount['valid'])) {

                $accountIndex++; // 换下一个账号

                continue;

            }

            

            // 2. 带重试的采集

            try {

                $data = retryCoroutine(2, function () use ($account, $maxId) {

                    return $this->fetchPage($account, $maxId);

                });

            } catch (\Throwable $e) {

                $accountIndex++; // 当前账号失败,换账号

                continue;

            }

            

            // 3. 处理采集到的数据

            if (isset($data['data']['data'][$this->getDataKey()]['users'])) {

                $result = $this->storeUsers($data['data']['data'][$this->getDataKey()]['users']);

                $number += $result;

            }

            

            // 4. 处理分页

            $maxId = $data['data']['data'][$this->getDataKey()]['next_max_id'] ?? '';

            if (empty($maxId)) {

                break; // 没有更多数据

            }

        }

        

        return $number;

    }

    

    // 抽象方法:由子类实现具体逻辑

    abstract protected function getApiRoute(): string;

    abstract protected function getDataKey(): string;

    abstract protected function getCollectType(): int;

}

设计模式应用

  1. 模板方法模式:固定算法骨架,子类实现具体步骤

  2. 重试机制retryCoroutine实现自动重试,提高成功率

  3. 账号轮换:一个账号失败自动切换下一个

  4. 分页处理:支持大数据集的分页采集

四、协程使用场景与最佳实践

4.1 什么时候应该使用协程?

推荐使用协程的场景

  1. 高并发IO操作:API调用、数据库查询、文件读写

  2. 定时任务处理:需要并发执行多个独立任务

  3. 实时数据处理:消息队列消费、实时监控

  4. 长连接服务:WebSocket、聊天室

不推荐使用协程的场景

  1. 纯CPU密集型计算:协程不会提升性能,反而增加复杂度

  2. 简单的CRUD应用:如果没有高并发需求,传统方式更简单

  3. 对协程不熟悉的团队:学习成本需要考虑

4.2 Laravel协程开发最佳实践

实践1:数据库连接管理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20

// 错误的做法:在协程中直接使用Eloquent

Coroutine::create(function () {

    $users = User::all(); // 可能导致连接混用

});



// 正确的做法:使用连接池或重新获取连接

Coroutine::create(function () {

    DB::connection()->reconnect(); // 重新连接

    $users = User::all();

});

实践2:避免协程间共享状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

// 危险:协程间共享变量可能导致数据竞争

$count = 0;

for ($i = 0; $i < 1000; $i++) {

    Coroutine::create(function () use (&$count) {

        $count++; // 非原子操作,可能出错

    });

}



// 安全:使用通道或原子操作

$chan = new Channel(1);

$count = 0;

for ($i = 0; $i < 1000; $i++) {

    Coroutine::create(function () use ($chan, &$count) {

        $chan->push(true);

        $count++;

        $chan->pop();

    });

}

实践3:合理的并发数控制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16

// 根据业务调整并发数

$concurrency = min(

    $tenant->rate_limit,          // 租户API限制

    $this->getSystemMaxConcurrency(), // 系统最大并发

    count($tasks)                 // 任务数量

);



$pool = new CoroutinePool($concurrency);

4.3 调试与监控

协程的调试比传统代码复杂,需要专门的工具:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

// 1. 添加协程ID到日志

Log::info('处理任务', [

    'task_id' => $task->id,

    'coroutine_id' => Coroutine::getCid(),

    'memory_usage' => memory_get_usage(true)

]);



// 2. 协程异常捕获

Coroutine::create(function () {

    try {

        // 业务代码

    } catch (\Throwable $e) {

        Log::error('协程异常', [

            'message' => $e->getMessage(),

            'trace' => $e->getTraceAsString(),

            'coroutine_id' => Coroutine::getCid()

        ]);

        

        // 重新抛出或处理异常

        throw $e;

    }

});



// 3. 协程状态监控

$stats = Coroutine::stats();

Log::info('协程统计', [

    'peak_num' => $stats['peak_num'],     // 历史峰值

    'coroutine_num' => $stats['coroutine_num'], // 当前数量

    'coroutine_peak_num' => $stats['coroutine_peak_num']

]);

五、性能对比与收益分析

5.1 采集系统优化效果

优化前(传统同步方式)

  • 处理1000个账号:约1800秒(30分钟)

  • 内存使用:约500MB

  • 成功率:受单个账号失败影响大

优化后(协程方案)

  • 处理1000个账号:约120秒(2分钟)提升15倍

  • 内存使用:约100MB 减少80%

  • 成功率:自动重试和账号轮换,提升明显

5.2 系统资源利用率对比

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14

传统方案资源使用图:

CPU: ▁▁▃▅▇▇▇▅▃▁▁▁▁▁ (波动大,大量时间在等待IO)

内存: ██████████████ (持续较高,每个进程独立内存)



协程方案资源使用图:

CPU: ▃▅▇▇▇▇▇▇▇▅▃▃▃▃▃ (持续高效利用)

内存: ████▇▅▃▁▁▁▁▁▁▁ (峰值低,波动小)

六、总结

6.1 协程带来的范式转变

通过这个多租户采集系统的实践,我们看到了协程如何改变PHP开发范式:

  1. 从"一个请求一个进程"到"一个进程处理所有请求"

  2. 从"同步阻塞等待"到"异步非阻塞切换"

  3. 从"面向流程编程"到"面向状态机编程"

6.2 使用建议

  1. 循序渐进:从简单的定时任务开始,逐步应用到核心业务

  2. 充分测试:协程的并发特性可能暴露隐藏的bug

  3. 监控先行:在上线前建立完善的监控体系

  4. 团队学习:协程需要团队共同学习和实践

七、结语

协程不是银弹,但在IO密集型场景下,它是PHP性能提升的利器。通过上文我关于协程的实战案例,我们可以大概了解如何在Laravel中合理运用协程,构建高性能、可扩展的多租户系统。


作者:命中水

版权声明:转载请注明出处,欢迎技术交流