使用PHP进行Redis Streams操作的笔记
这是一篇记录,目的是想尝试使用Redis Streams在PHP中进行流消费。同时也尝试使用Swoole来在一个服务器上进行扩展。
由于只是粗略地阅读了相关文档并进行了尝试,所以可能存在错误的地方。
Redis Streams
Redis 流
Redis5实现了流媒体API,受到Kafka的启发而构建。
使用Redis Streams通过PHP
在phpredis 4.2版本中似乎已经提供了支持。以前由于安装方便的原因,很多人经常使用predis,但是predis并不支持。predis的开发似乎完全停止了。
Redis服务器
如果只是试一试的话,用Docker就可以了。
docker run -d --rm -p 6379:6379 redis:5.0.9
我想确保Streams API是否运作正常,所以使用了redis-cli等工具进行确认。
$ redis-cli
> XADD mystream * key value
> XREAD COUNT 1 STREAMS mystream 0-0
1) 1) "mystream"
2) 1) 1) "1596783209134-0"
2) 1) "key"
2) "value"
确认会得到类似的结果。这条命令很复杂,如果手动输入的话,通常会出现几次错误,所以我认为复制粘贴更容易。
有关XADD和XREAD命令的详细信息,请查看Redis网站。
Redis Streams的消费者没有像消息系统那样,当订阅时会通知产生的消息,而是进行了Produce通知的行为。
消费者 zhě)
大致上,用這樣的程式碼是執行成功的。
<?php
$redis = new Redis();
$redis->connect('redis-server', 6379);
$consumer = new Consumer($redis);
while (true) {
$values = $consumer->consume();
foreach ($values[$streamName] ?? [] as $consumed) {
foreach ($consumed as $key => $value) {
print_r($value);
}
}
}
class Consumer {
private $redis;
public function __construct(Redis $redis) {
$this->redis = $redis;
$this->createGroupIfNotExists();
}
private function createGroupIfNotExists() {
// XREADではなくXREADGROUPを使うため、事前にConsumerGroupが存在することを保証する
// わざわざxInfoで確認してるけど、XGROUPにはGROUPがなかったときに作成する機能もあり
// ライブラリもサポートしているので素直にxGroupのMKSTREAMオプションを有効にする方が良いでしょう
$groupInfo = $this->redis->xInfo('GROUP', 'mystream', 'mygroup');
if (!$groupInfo) {
$this->redis->xGroup('CREATE', 'mystream', 'mygroup', '>');
}
// これと等価
// '>' の意味は概要を後述しますが、Redis StreamsのIntroductionを読むと良いです。
// $this->redis->xGroup('CREATE', 'mystream', 'mygroup', '>', $mkStream = true);
}
public function consume() {
return $this->redis->xReadGroup('group1', 'consumer1', ['mystream' => '>'], $count = 10, $block = 2000);
}
}
为什么要使用XREADGROUP而不是XREAD呢,因为它具有可扩展性,并且管理读取到哪个ID的问题非常麻烦。因为XREAD似乎没有记录Redis服务器读取到哪个键的偏移量的机制,所以如果想要自动获取消费者单位中未获取部分的记录,可能需要使用XREADGROUP。
$this->redis->xReadGroup('group1', 'consumer1', ['mystream' => '>'], $count = 10, $block = 2000);
在Consumer Group group1的Consumer consumer1中,从名为mystream的键的流中获取最多10条未获取的记录。如果所有记录都已获取,则最多等待2,000毫秒直到有记录添加进来。为了实现获取最多10条未获取的记录,需要指定[‘mystream’ => ‘>’]。
可扩展性
让我们来看看消费者是否能够扩大规模。
我认为,为了扩展Kafka的消费者,需要在主题(Topic)上设置分区,并在消费者组(Consumer Group)中添加消费者或进行扇出操作(我没有实际操作过,所以不太确定)。
一方,Redis Streams似乎没有这种分区的概念。但是,XREADGROUP的”>”是针对GROUP的值,因此只需向GROUP添加Consumer并使用XREADGROUP命令,就可以自动在每个Consumer上获取数据。不需要担心重新平衡。
当服务器数量(进程数量)增加且BLOCK请求积压时,延迟会怎样呢?READ请求会排队并按顺序传送吗?由于特定的Consumer与特定的分区没有相关性,因此无需考虑热点问题,对吧。
我认为暂且来说,只要增加服务器数量(进程数),就可以实现扩展。但是,如果要在不同的进程中运行,确定Consumer的名称可能会很麻烦。名称必须在组内唯一这个条件限制下,要认真做可能会很困难。即使使用相同的名称,也可以正常运行,所以可能更多是出于管理消费者状态、监控指标等运维上的原因。
Swoole: Swoole
我认为Redis Streams在处理上是以同步的方式使用BLOCK,而不是传统的消息传递系统,这与Plain-PHP很搭配。但我想尝试使用Swoole在单一进程中进行扩展。
为了确保XREADGROUP在单个进程中具有足够的吞吐量,采取了在获取值后使Consumer的工作部分并行运行的方法。如果XREADGROUP无法跟上,就可以考虑将请求也多路复用到Redis上。
程式碼变成了这个样子。
<?php
$messageBufferChannel = new Co\Channel(3);
// Worker
Co\run(function () use ($messageBufferChannel) {
// Worker内の処理を多重度を制限するためのChannel
$concurrencyCapChannel = new Co\Channel(5);
while (true) {
$values = $messageBufferChannel->pop();
if ($values) {
$concurrencyCapChannel->push(true);
go(function() use ($values, $concurrencyCapChannel) {
echo "got values " . count($values['mystream']) . "\n";
$concurrencyCapChannel->pop();
});
}
}
});
// Consumer
Co\run(function () use ($messageBufferChannel) {
$redis = new Redis();
$redis->connect('redis-server', 6379);
while (true) {
$values = $redis->xReadGroup('group1', 'consumer10', ['mystream' => '>'], $count = 10, $block = 2000);
if ($values) {
$messageBufferChannel->push($values);
}
}
});
因为我有段时间没有写作,所以进展非常缓慢。现在记得好像是应该这样写的…
Swoole在Docker中运行,可以参考https://www.swoole.co.uk/docs/get-started/try-docker。但是,这里介绍的镜像中没有安装phpredis。
运行 pecl install redis && docker-php-ext-enable redis.
我会随意在Dockerfile中添加并进行构建。
尽管试着写了一下,但是我对于Swoole的了解还很浅,要在实际生产中使用还需要更多的调查研究,否则会感到不安而无法使用。
相关文件
-
- Introduction to Redis Streams
-
- Redis Stream Command reference
-
- Apache Kafka
- Swoole Cotroutine