Laravel内置的并发控制机制

这是OPENLOGI 2020年度圣诞日历活动的第13篇文章。

首先

Laravel中有一个模块,利用Redis来控制可以同时执行多少次相同的操作。
我们目前使用的是Laravel的7.x版本,关于7.x的信息,基本上可以通过以下链接理解。
https://readouble.com/laravel/7.x/ja/queues.html

基本上, 如果閱讀文件大致上都能理解,但有一些語言不足之處。
這次我們將驗證這些模塊的同時執行控制實際上是如何運作的。
另外, 本文以 Laravel 7.x 為前提。

解释对象

まず、どこから利用するかですが、Illuminate\Support\Facades\Redisというfacadeが用意されており、そこから利用できます。
また、今回解説する関数については、Illuminate\Redis\Connections\Connectionに関数の定義があります。

所以,作为使用的函数主要有两个。
分别是funnel和throttle。
我们将逐一进行解释。

漏斗

Funnel是用于限制一次执行的作业数量的工具。
实际的实现定义在以下的类中。

    • Illuminate\Redis\Limiters\ConcurrencyLimiter

 

    Illuminate\Redis\Limiters\ConcurrencyLimiterBuilder

Redisあんまり詳しく無いのですが、redis操作する際にコマンドを打つのを、一連のまとめた処理として、luaスクリプトで実行できるようです。
RDBで言えば、ストアドプロシージャのようなものでしょうか?
php上にコードとして定義されているので、Redis側に登録済みの状態で実行されるのではなく、都度スクリプト自体をRedis上で実行しているっぽいので、厳密には違うでしょう。

具体的にはevalというコマンドを使っています。
ドキュメントは以下を見るとよさそうです。
https://redis.io/commands/eval

在funnel中,使用lua脚本进行内部调用的有以下两种情况。

锁脚本

for index, value in pairs(redis.call('mget', unpack(KEYS))) do
    if not value then
        redis.call('set', KEYS[index], ARGV[3], "EX", ARGV[2])
        return ARGV[1]..index
    end
end

发布脚本

if redis.call('get', KEYS[1]) == ARGV[1]
then
    return redis.call('del', KEYS[1])
else
    return 0
end

然后,我写道,虽然以前读过《Lua编程》,但只是大致了解,真的不太清楚。
根据我的猜测,关于lockScript,如果没有特定的密钥,就会注册一条带有特定期限的记录,然后返回注册结果。

如果releaseScript中有特定的键和与之匹配的特定值,则似乎具有将其删除的功能。请详细阅读代码以了解更多信息。

根据上述代码的推测,在执行开始时将键注册到Redis中,只要该键被注册,就可以控制同时运行相同的处理。
另外,当执行完成后,可以删除键并将处理恢复到可执行状态。

那么,我想要亲自试试看。

行为

漏斗功能可以使用limit、releaseAfter和block这三个函数来控制其行为,我们来分别看一下。我们将使用类似以下的代码。

<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Redis\LimiterTimeoutException;
use Illuminate\Support\Facades\Redis;

class LaravelRedisTest implements ShouldQueue
{
    /** @var int */
    public $tries = 1;

    /** @var string */
    private $name;

    public function __construct(string $name)
    {
        $this->name = $name;
    }

    public function handle()
    {
        $redis = Redis::connection();
        $redis->funnel(self::class)
            ->limit(2)
            ->block(0)
            ->then(function () {
                sleep(3);
                logger("{$this->name} is success");
            }, function (LimiterTimeoutException $e) {
                logger("{$this->name} is failed");
            });
    }
}

限制

首先是限制。这指定了您可以同时执行多少个。同时,还使用了block,但稍后会进行解释。

在此先写下执行以下处理时的期望值。

通过指定限制,可以执行最多指定数量的处理,并且当超过该数量时会失败。

    1. 执行处理 A(3秒)-> 成功

 

    1. 执行处理 B(3秒)-> 成功

 

    1. 执行处理 C(3秒)-> 失败

 

    1. 等待处理 A 完成(等待3秒)

 

    执行处理 D(3秒)-> 成功

执行脚本

function run()
{
    dispatch(new LaravelRedisTest('A'));
    dispatch(new LaravelRedisTest('B'));
    dispatch(new LaravelRedisTest('C'));
    sleep(3);
    dispatch(new LaravelRedisTest('D'));
}

执行部分

$redis = LaravelRedis::connection();
$redis->funnel(self::class)
    ->limit(2)
    ->block(0)
    ->then(function () {
        sleep(3);
        echo "{$this->name} is success" . PHP_EOL;
    }, function (LimiterTimeoutException $e) {
        echo "{$this->name} is failed" . PHP_EOL;
    });

结果 –

[2020-12-12 15:56:19] local.DEBUG: C is failed  
[2020-12-12 15:56:21] local.DEBUG: A is success  
[2020-12-12 15:56:21] local.DEBUG: B is success  
[2020-12-12 15:56:25] local.DEBUG: D is success  

发布之后

如果为releaseAfter指定值,那么当时间到达时,看起来会解锁并允许执行其他处理。因此,在执行较长时间的处理时,需要小心指定。另外,默认情况下是60秒,如果不指定,将自动解锁。

手順と期待値です。
releaseAfterを指定することで、実行中であってもロックが解除されて、実行可能になっていることがわかります。

    1. 执行处理A(3秒)

 

    1. 执行处理B(3秒)

 

    1. 等待1秒

 

    执行处理C(3秒)-> 成功
function run()
{
    dispatch(new LaravelRedisTest('A'));
    dispatch(new LaravelRedisTest('B'));
    sleep(1);
    dispatch(new LaravelRedisTest('C'));
}
$redis = LaravelRedis::connection();
$redis->funnel(self::class)
    ->limit(2)
    ->releaseAfter(1)
    ->block(0)
    ->then(function () {
        sleep(3);
        echo "{$this->name} is success" . PHP_EOL;
    }, function (LimiterTimeoutException $e) {
        echo "{$this->name} is failed" . PHP_EOL;
    });
[2020-12-12 16:10:52] local.DEBUG: A is success  
[2020-12-12 16:10:52] local.DEBUG: B is success  
[2020-12-12 16:10:53] local.DEBUG: C is success  

阻挡

区块是相对于无法解锁的情况下需要等待多久的方式。
由于默认时间设置为3秒,如果没有指定,则会等待3秒。
在limit和relaseAfter这些选项中,将block指定为0是因为行为会变得复杂。

這是步驟和期望值。
通過指定區塊,我們可以知道在等待解鎖。

    1. 执行操作A(3秒)

 

    1. 执行操作B(3秒)-> 成功

 

    执行操作C(3秒)-> 成功
function run()
{
    dispatch(new LaravelRedisTest('A'));
    dispatch(new LaravelRedisTest('B'));
    dispatch(new LaravelRedisTest('C'));
}
$redis = LaravelRedis::connection();
$redis->funnel(self::class)
    ->limit(2)
    ->block(3)
    ->then(function () {
        sleep(3);
        echo "{$this->name} is success" . PHP_EOL;
    }, function (LimiterTimeoutException $e) {
        echo "{$this->name} is failed" . PHP_EOL;
    });
[2020-12-12 16:16:03] local.DEBUG: A is success  
[2020-12-12 16:16:03] local.DEBUG: B is success  
[2020-12-12 16:16:06] local.DEBUG: C is success  

控制油门

对比漏斗,减速阀引入了时间概念。它似乎可以控制在单位时间内同时执行多少处理。

实现存在于以下类中。

    • Illuminate\Redis\Limiters\DurationLimiter

 

    Illuminate\Redis\Limiters\DurationLimiterBuilder

Redis脚本

现在让我们来阅读一下将Lua脚本发送到Redis的代码。

local function reset()
    redis.call('HMSET', KEYS[1], 'start', ARGV[2], 'end', ARGV[2] + ARGV[3], 'count', 1)
    return redis.call('EXPIRE', KEYS[1], ARGV[3] * 2)
end
if redis.call('EXISTS', KEYS[1]) == 0 then
    return {reset(), ARGV[2] + ARGV[3], ARGV[4] - 1}
end
if ARGV[1] >= redis.call('HGET', KEYS[1], 'start') and ARGV[1] <= redis.call('HGET', KEYS[1], 'end') then
    return {
        tonumber(redis.call('HINCRBY', KEYS[1], 'count', 1)) <= tonumber(ARGV[4]),
        redis.call('HGET', KEYS[1], 'end'),
        ARGV[4] - redis.call('HGET', KEYS[1], 'count')
    }
end
return {reset(), ARGV[2] + ARGV[3], ARGV[4] - 1}

Funnel有两个脚本,而throttle只有一个。
有各种各样的变量,如ARGV和KEYS等,有点难以理解。
这些变量可以通过阅读PHP代码来理解,但请您自行阅读代码。

簡単に説明すると、上記のreset関数が、有効期限つきで、登録時と削除時と、同時実行数を1として登録しています。
特定のキーが存在するとき、または存在していても有効期限外であれば、reset関数でキーを登録する。
対象のキーが存在し、その有効期限内であれば、新しくキーを登録したり更新はしない。
返却する値は、新しく実行可能か、現在いくつの処理を並行で処理しようとしているか、また現在の有効期限です。

感觉好像可以限制在单位时间内执行的数量。
不过,似乎不是对于任意时间段内同时执行的数量进行控制的形式。
仅仅是当特定的键没有执行时,从最初执行的时刻开始,可以限制同时执行的数量直到指定时间。
就像这样的感觉。

游戏手柄的油门控制反应

与漏斗不同的是,我计划从其自动重新执行部分开始验证,而不是指定部分的差异。

处理时间很短

首先,我们来看一下处理时间是否为3秒,是否与指定时间相对较短或者相当。这应该很容易理解。

在3秒的时间内,如果执行2次或更多次则失败,只需等待3秒即可确认成功。

    1. 执行操作A(3秒)

 

    1. 执行操作B(3秒)

 

    1. 执行操作C(3秒)-> 失败

 

    1. 等待操作A结束(3秒)

 

    执行操作D(3秒)-> 成功
function run()
{
    dispatch(new LaravelRedisTest('A'));
    dispatch(new LaravelRedisTest('B'));
    dispatch(new LaravelRedisTest('C'));
    sleep(3);
    dispatch(new LaravelRedisTest('D'));
}
$redis = LaravelRedis::connection();
$redis->throttle(self::class)
    ->allow(2)
    ->every(3)
    ->block(0)
    ->then(function () {
        sleep(3);
        echo "{$this->name} is success" . PHP_EOL;
    }, function (LimiterTimeoutException $e) {
        echo "{$this->name} is failed" . PHP_EOL;
    });
[2020-12-12 16:32:56] local.DEBUG: C is failed  
[2020-12-12 16:32:59] local.DEBUG: A is success  
[2020-12-12 16:32:59] local.DEBUG: B is success  
[2020-12-12 16:33:01] local.DEBUG: D is success  

如果处理时间很长

下一步,我們來看一下如果處理時間超過指定時間,會出現什麼樣的行為。

以下是步骤和预期结果。
即使时间较短,结果仍然不变,但只要经过指定的时间,即使初始处理未完成,也能够确定它已变为可执行状态。

    1. 执行处理A(10秒)

 

    1. 执行处理B(10秒)

 

    1. 执行处理C(10秒)-> 失败

 

    1. 等待3秒,但A和B都还未完成

 

    执行处理D(10秒)-> 成功
function run()
{
    dispatch(new LaravelRedisTest('A'));
    dispatch(new LaravelRedisTest('B'));
    dispatch(new LaravelRedisTest('C'));
    sleep(3);
    dispatch(new LaravelRedisTest('D'));
}
$redis = LaravelRedis::connection();
$redis->throttle(self::class)
    ->allow(2)
    ->every(3)
    ->block(0)
    ->then(function () {
        sleep(10);
        echo "{$this->name} is success" . PHP_EOL;
    }, function (LimiterTimeoutException $e) {
        echo "{$this->name} is failed" . PHP_EOL;
    });
[2020-12-12 16:35:57] local.DEBUG: C is failed  
[2020-12-12 16:36:06] local.DEBUG: A is success  
[2020-12-12 16:36:06] local.DEBUG: B is success  
[2020-12-12 16:36:10] local.DEBUG: D is success  

总结

我在工作中实际使用的是”throttle”,但与”funnel”相比,它的设置和行为更加难以理解,让我感到困惑。如果有兴趣,请试用一下。

广告
将在 10 秒后关闭
bannerAds