尝试使用Redis和Python来进行发布/订阅

尝试使用Redis和Python进行发布/订阅。

首先

我之前有接触过Redis, 以为它只是一个键值存储(KVS),但后来发现它还有发布订阅功能。为了好玩,我试着玩了一下发布订阅的特性。顺便一提,我还用Python3和redis.py库实现了并发操作。

这是示例,请点击此处查看:
https://github.com/ryoutoku/redis-sample

这次创建的环境如下所示。

    1. 使用Vagrant在VirtualBox(客户端)中启动Ubuntu14.04+Redis服务器。

 

    1. 如果Redis内的数据发生更改,则使用该数据的键作为通道,并使用pub通知键的数据的pub端脚本。

 

    将订阅(sub)注册到Redis并将数据推送到Redis服务器的sub端脚本。

在第3步中将推送的数据,通过第2步进行检测并通知,再由第3步接收。

archtecture.png

这次我们在主机端执行了所有的pub/sub脚本,但在客户端执行也没有问题。

必要的环境

    • Vagrant

 

    • Python3

 

    redis-py

有关使用Redis功能的事项

由于我个人对于Redis并不很熟悉,所以在本文中,我会简要介绍本次使用的功能。

    • Key Value Store(KVS)

いわゆるNoSQLというやつで、keyに対してValueを紐付けてデータを管理する
Pythonでいうdictのようなもの
今回はtestというkeyにlistのデータとして追加する

pub/sub

メッセージ通知のモデル
Publish(通知)とSubscribe(購読)という2つの役割がある:参考

channelという概念があり、channelを決めて購読、通知を行う事で、そのchannelに対してのみデータ通知などが行える

程序解释

我将解释以下代码的关键点(我在实现过程中花费了一些时间)。

    • Vagrantfile

 

    • Publisher.py

 

    Subscriber.py

Vagrantfile 虚拟机配置文件

在Vagrantfile中,主要执行以下任务。

    1. 将客户端的IP地址固定为192.168.56.101

 

    1. Redis的配置文件

 

    Redis配置文件的位置

Vagrant.configure("2") do |config|

  # ホストオンリーアダプタをipアドレス固定で追加
  config.vm.network "private_network", ip: "192.168.56.101"

 # 中略 ##########################################

  # redisの設定ファイルのコピー
  config.vm.provision "file", source: "./conf/redis.conf", destination: "/home/vagrant/redis.conf"

  config.vm.provision "shell", inline: <<-SHELL
    apt-get update

    # pip, redis.pyインストール
    apt-get install python-pip -y
    pip install redis

    # redisインストール
    sudo apt-get install redis-server -y

    # redis設定ファイルを移動
    mv -f /home/vagrant/redis.conf /etc/redis/redis.conf

    # redisの起動
    redis-server &
  SHELL
end

以下是令人让起毛的地方。

    • redisはデフォルトではローカルIP(127.0.0.)しか接続を受け付けないため、設定ファイルを上書きする必要がある

直接上書きできないので、/home/vagrant以下にredis.confをコピー
コピーしたredis.confを/etc/redisに移動した後redisを起動

出版商(出版者)

pub的操作如下所示。

    1. 在获取所有Redis的键之后,根据键进行如下操作:

 

    1. – 如果键中有数据添加,则将数据通知为key=channel。

 

    – 如果数据的最终值为end,则终止操作。

以下是出版商核心部分的示例。

class Publisher(object):
    def send_message(self):

        while True:
            keys = self._connection.keys()

            is_break = False
            for key in keys:
                old_data = self._data.setdefault(key, [])
                data = self._connection.lrange(key, 0, -1)

                if len(old_data) == len(data):
                    continue

                key_str = key.decode('utf-8')
                data_str = [x.decode('utf-8') for x in data]

                print("publish:", data_str)

                self._connection.publish(key_str, data_str)
                self._data[key] = data
                if data_str[-1] == "end":
                    self._connection.rpop(key)
                    is_break = True
            if is_break:
                break

以下是会让人感到恶心的地方:
* Redis的键和数据都是以字节型(byte)存在,因此需要进行解码。

订阅者

在sub中进行以下操作。

    1. 启动另一个进程,并订阅数据作为通道=’test’。

从标准输入获取字符串
将获取的字符串添加到Redis作为键值key=’test’
如果获取的字符串是’end’,则结束

以下是订阅者的核心部分示意:

class SubscriberSubject(object):
    _re_format = "\'(.*?)\'"

   # 中略 ##########################################

    def _receive_core(self, channel, end_word):
        pubsub = self._connection.pubsub()
        pubsub.subscribe([channel])
        for data in pubsub.listen():
            [x(data) for x in self._callbacks]
            if isinstance(data['data'], bytes):
                data_str = self._re.findall(data['data'].decode('utf-8'))
                if data_str[-1] == end_word:
                    break
        pubsub.unsubscribe()

    def start_receive(self, channel, end_word='end'):
        if self._job:
            return
        self._job = Process(target=self._receive_core,
                            args=(channel, end_word))
        self._job.start()

    def end_receive(self):
        self._job.join()

    def add_data(self, key, value):
        self._connection.rpush(key, value)

if __name__ == "__main__":

    # 中略 ##########################################
    # sub
    subject = SubscriberSubject(host, port)
    subject.add_callback(
        lambda x: print("callback:", xt)
    )
    subject.start_receive(channel)

    # 中略 ##########################################
    # 標準入力部
    while(True):
        data = sys.stdin.readline().strip()
        subject.add_data(channel, data)
        if data == "end":
            subject.end_receive()
            break

以下是让人不悦的地方:
* 在第一次pubsub.listen()时,将获得的字典设置为data:1。
* 在第二次及以后的pubsub.listen()时,将获得的字典设置为data:b[数据]。
* 因为是字节类型,所以需要解码后,使用正则表达式(_re_format)将其分割为字符串类型。

行动

打开两个终端并分别执行以下操作:
* 运行 python publisher.py
* 检查 Redis 数据并通知如有更改
* 运行 python subscriber.py
* 将从标准输入接收的数据添加到 Redis
* 订阅来自 publisher 的数据

操作就是这个样子。
终端上面是publisher.py,下面是subscriber.py。

qqded-bbfis.gif
    1. 将下面输入的值存储到Redis中。

 

    1. 从Redis中发布数据(显示数据)。

 

    显示从下方接收到的数据。