Laravel内置的并发控制机制
这是OPENLOGI 2020年度圣诞日历活动的第13篇文章。
首先
Laravel中有一个模块,利用Redis来控制可以同时执行多少次相同的操作。
我们目前使用的是Laravel的7.x版本,关于7.x的信息,基本上可以通过以下链接理解。
https://readouble.com/laravel/7.x/ja/queues.html
基本上, 如果閱讀文件大致上都能理解,但有一些語言不足之處。
這次我們將驗證這些模塊的同時執行控制實際上是如何運作的。
另外, 本文以 Laravel 7.x 為前提。
解释对象
まず、どこから利用するかですが、Illuminate\Support\Facades\Redisというfacadeが用意されており、そこから利用できます。
また、今回解説する関数については、Illuminate\Redis\Connections\Connectionに関数の定義があります。
所以,作为使用的函数主要有两个。
分别是funnel和throttle。
我们将逐一进行解释。
漏斗
Funnel是用于限制一次执行的作业数量的工具。
实际的实现定义在以下的类中。
-
- Illuminate\Redis\Limiters\ConcurrencyLimiter
- Illuminate\Redis\Limiters\ConcurrencyLimiterBuilder
Redisあんまり詳しく無いのですが、redis操作する際にコマンドを打つのを、一連のまとめた処理として、luaスクリプトで実行できるようです。
RDBで言えば、ストアドプロシージャのようなものでしょうか?
php上にコードとして定義されているので、Redis側に登録済みの状態で実行されるのではなく、都度スクリプト自体をRedis上で実行しているっぽいので、厳密には違うでしょう。
具体的にはevalというコマンドを使っています。
ドキュメントは以下を見るとよさそうです。
https://redis.io/commands/eval
在funnel中,使用lua脚本进行内部调用的有以下两种情况。
锁脚本
for index, value in pairs(redis.call('mget', unpack(KEYS))) do
if not value then
redis.call('set', KEYS[index], ARGV[3], "EX", ARGV[2])
return ARGV[1]..index
end
end
发布脚本
if redis.call('get', KEYS[1]) == ARGV[1]
then
return redis.call('del', KEYS[1])
else
return 0
end
然后,我写道,虽然以前读过《Lua编程》,但只是大致了解,真的不太清楚。
根据我的猜测,关于lockScript,如果没有特定的密钥,就会注册一条带有特定期限的记录,然后返回注册结果。
如果releaseScript中有特定的键和与之匹配的特定值,则似乎具有将其删除的功能。请详细阅读代码以了解更多信息。
根据上述代码的推测,在执行开始时将键注册到Redis中,只要该键被注册,就可以控制同时运行相同的处理。
另外,当执行完成后,可以删除键并将处理恢复到可执行状态。
那么,我想要亲自试试看。
行为
漏斗功能可以使用limit、releaseAfter和block这三个函数来控制其行为,我们来分别看一下。我们将使用类似以下的代码。
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Redis\LimiterTimeoutException;
use Illuminate\Support\Facades\Redis;
class LaravelRedisTest implements ShouldQueue
{
/** @var int */
public $tries = 1;
/** @var string */
private $name;
public function __construct(string $name)
{
$this->name = $name;
}
public function handle()
{
$redis = Redis::connection();
$redis->funnel(self::class)
->limit(2)
->block(0)
->then(function () {
sleep(3);
logger("{$this->name} is success");
}, function (LimiterTimeoutException $e) {
logger("{$this->name} is failed");
});
}
}
限制
首先是限制。这指定了您可以同时执行多少个。同时,还使用了block,但稍后会进行解释。
在此先写下执行以下处理时的期望值。
通过指定限制,可以执行最多指定数量的处理,并且当超过该数量时会失败。
-
- 执行处理 A(3秒)-> 成功
-
- 执行处理 B(3秒)-> 成功
-
- 执行处理 C(3秒)-> 失败
-
- 等待处理 A 完成(等待3秒)
- 执行处理 D(3秒)-> 成功
执行脚本
function run()
{
dispatch(new LaravelRedisTest('A'));
dispatch(new LaravelRedisTest('B'));
dispatch(new LaravelRedisTest('C'));
sleep(3);
dispatch(new LaravelRedisTest('D'));
}
执行部分
$redis = LaravelRedis::connection();
$redis->funnel(self::class)
->limit(2)
->block(0)
->then(function () {
sleep(3);
echo "{$this->name} is success" . PHP_EOL;
}, function (LimiterTimeoutException $e) {
echo "{$this->name} is failed" . PHP_EOL;
});
结果 –
[2020-12-12 15:56:19] local.DEBUG: C is failed
[2020-12-12 15:56:21] local.DEBUG: A is success
[2020-12-12 15:56:21] local.DEBUG: B is success
[2020-12-12 15:56:25] local.DEBUG: D is success
发布之后
如果为releaseAfter指定值,那么当时间到达时,看起来会解锁并允许执行其他处理。因此,在执行较长时间的处理时,需要小心指定。另外,默认情况下是60秒,如果不指定,将自动解锁。
手順と期待値です。
releaseAfterを指定することで、実行中であってもロックが解除されて、実行可能になっていることがわかります。
-
- 执行处理A(3秒)
-
- 执行处理B(3秒)
-
- 等待1秒
- 执行处理C(3秒)-> 成功
function run()
{
dispatch(new LaravelRedisTest('A'));
dispatch(new LaravelRedisTest('B'));
sleep(1);
dispatch(new LaravelRedisTest('C'));
}
$redis = LaravelRedis::connection();
$redis->funnel(self::class)
->limit(2)
->releaseAfter(1)
->block(0)
->then(function () {
sleep(3);
echo "{$this->name} is success" . PHP_EOL;
}, function (LimiterTimeoutException $e) {
echo "{$this->name} is failed" . PHP_EOL;
});
[2020-12-12 16:10:52] local.DEBUG: A is success
[2020-12-12 16:10:52] local.DEBUG: B is success
[2020-12-12 16:10:53] local.DEBUG: C is success
阻挡
区块是相对于无法解锁的情况下需要等待多久的方式。
由于默认时间设置为3秒,如果没有指定,则会等待3秒。
在limit和relaseAfter这些选项中,将block指定为0是因为行为会变得复杂。
這是步驟和期望值。
通過指定區塊,我們可以知道在等待解鎖。
-
- 执行操作A(3秒)
-
- 执行操作B(3秒)-> 成功
- 执行操作C(3秒)-> 成功
function run()
{
dispatch(new LaravelRedisTest('A'));
dispatch(new LaravelRedisTest('B'));
dispatch(new LaravelRedisTest('C'));
}
$redis = LaravelRedis::connection();
$redis->funnel(self::class)
->limit(2)
->block(3)
->then(function () {
sleep(3);
echo "{$this->name} is success" . PHP_EOL;
}, function (LimiterTimeoutException $e) {
echo "{$this->name} is failed" . PHP_EOL;
});
[2020-12-12 16:16:03] local.DEBUG: A is success
[2020-12-12 16:16:03] local.DEBUG: B is success
[2020-12-12 16:16:06] local.DEBUG: C is success
控制油门
对比漏斗,减速阀引入了时间概念。它似乎可以控制在单位时间内同时执行多少处理。
实现存在于以下类中。
-
- Illuminate\Redis\Limiters\DurationLimiter
- Illuminate\Redis\Limiters\DurationLimiterBuilder
Redis脚本
现在让我们来阅读一下将Lua脚本发送到Redis的代码。
local function reset()
redis.call('HMSET', KEYS[1], 'start', ARGV[2], 'end', ARGV[2] + ARGV[3], 'count', 1)
return redis.call('EXPIRE', KEYS[1], ARGV[3] * 2)
end
if redis.call('EXISTS', KEYS[1]) == 0 then
return {reset(), ARGV[2] + ARGV[3], ARGV[4] - 1}
end
if ARGV[1] >= redis.call('HGET', KEYS[1], 'start') and ARGV[1] <= redis.call('HGET', KEYS[1], 'end') then
return {
tonumber(redis.call('HINCRBY', KEYS[1], 'count', 1)) <= tonumber(ARGV[4]),
redis.call('HGET', KEYS[1], 'end'),
ARGV[4] - redis.call('HGET', KEYS[1], 'count')
}
end
return {reset(), ARGV[2] + ARGV[3], ARGV[4] - 1}
Funnel有两个脚本,而throttle只有一个。
有各种各样的变量,如ARGV和KEYS等,有点难以理解。
这些变量可以通过阅读PHP代码来理解,但请您自行阅读代码。
簡単に説明すると、上記のreset関数が、有効期限つきで、登録時と削除時と、同時実行数を1として登録しています。
特定のキーが存在するとき、または存在していても有効期限外であれば、reset関数でキーを登録する。
対象のキーが存在し、その有効期限内であれば、新しくキーを登録したり更新はしない。
返却する値は、新しく実行可能か、現在いくつの処理を並行で処理しようとしているか、また現在の有効期限です。
感觉好像可以限制在单位时间内执行的数量。
不过,似乎不是对于任意时间段内同时执行的数量进行控制的形式。
仅仅是当特定的键没有执行时,从最初执行的时刻开始,可以限制同时执行的数量直到指定时间。
就像这样的感觉。
游戏手柄的油门控制反应
与漏斗不同的是,我计划从其自动重新执行部分开始验证,而不是指定部分的差异。
处理时间很短
首先,我们来看一下处理时间是否为3秒,是否与指定时间相对较短或者相当。这应该很容易理解。
在3秒的时间内,如果执行2次或更多次则失败,只需等待3秒即可确认成功。
-
- 执行操作A(3秒)
-
- 执行操作B(3秒)
-
- 执行操作C(3秒)-> 失败
-
- 等待操作A结束(3秒)
- 执行操作D(3秒)-> 成功
function run()
{
dispatch(new LaravelRedisTest('A'));
dispatch(new LaravelRedisTest('B'));
dispatch(new LaravelRedisTest('C'));
sleep(3);
dispatch(new LaravelRedisTest('D'));
}
$redis = LaravelRedis::connection();
$redis->throttle(self::class)
->allow(2)
->every(3)
->block(0)
->then(function () {
sleep(3);
echo "{$this->name} is success" . PHP_EOL;
}, function (LimiterTimeoutException $e) {
echo "{$this->name} is failed" . PHP_EOL;
});
[2020-12-12 16:32:56] local.DEBUG: C is failed
[2020-12-12 16:32:59] local.DEBUG: A is success
[2020-12-12 16:32:59] local.DEBUG: B is success
[2020-12-12 16:33:01] local.DEBUG: D is success
如果处理时间很长
下一步,我們來看一下如果處理時間超過指定時間,會出現什麼樣的行為。
以下是步骤和预期结果。
即使时间较短,结果仍然不变,但只要经过指定的时间,即使初始处理未完成,也能够确定它已变为可执行状态。
-
- 执行处理A(10秒)
-
- 执行处理B(10秒)
-
- 执行处理C(10秒)-> 失败
-
- 等待3秒,但A和B都还未完成
- 执行处理D(10秒)-> 成功
function run()
{
dispatch(new LaravelRedisTest('A'));
dispatch(new LaravelRedisTest('B'));
dispatch(new LaravelRedisTest('C'));
sleep(3);
dispatch(new LaravelRedisTest('D'));
}
$redis = LaravelRedis::connection();
$redis->throttle(self::class)
->allow(2)
->every(3)
->block(0)
->then(function () {
sleep(10);
echo "{$this->name} is success" . PHP_EOL;
}, function (LimiterTimeoutException $e) {
echo "{$this->name} is failed" . PHP_EOL;
});
[2020-12-12 16:35:57] local.DEBUG: C is failed
[2020-12-12 16:36:06] local.DEBUG: A is success
[2020-12-12 16:36:06] local.DEBUG: B is success
[2020-12-12 16:36:10] local.DEBUG: D is success
总结
我在工作中实际使用的是”throttle”,但与”funnel”相比,它的设置和行为更加难以理解,让我感到困惑。如果有兴趣,请试用一下。