在Apache Pulsar中介绍了复制订阅的功能
首先
在Apache Pulsar中,当Consumer开始订阅主题时,需要指定一个称为订阅(Subscription)的标识符。由Producer发送到主题的消息将分别逐个传递给每个订阅。换句话说,如果所有Consumer都使用不同的订阅订阅主题,那么消息将被广播出去。另一方面,如果所有Consumer都属于相同的订阅,那么消息将只被分发给其中一个,从而实现负载均衡和冗余。可以说,Pulsar的订阅类似于Kafka中的Consumer Group。
从这里开始,Pulsar 2.4.0版本开始可以创建特殊的订阅,称为复制订阅。这次我想从复制订阅的功能概述到架构进行解释。
在下述内容中,所涉及的是基于Pulsar 2.8.0最新版本的实施,但未来版本可能不适用。另外,本文未包含有关Pulsar本身的详细说明,请有兴趣的人参考官方网站或以往的文章。
复制技术是指
在解释复制订阅功能之前,首先需要介绍一下称为地理复制的功能。
Pulsar实例由一个或多个集群组成。通常情况下,每个地理位置分散的数据中心都对应一个集群。例如,在东日本的数据中心建立一个名为”jp-east”的集群,在西日本的数据中心建立一个名为”jp-west”的集群,以此类推。
在Pulsar中,话题需要属于某个命名空间。在创建这个命名空间时,可以将一个或多个集群与其关联起来。
# ネームスペース「tenant1/ns1」にクラスター「jp-east」と「jp-west」を紐づける
$ bin/pulsar-admin namespaces create --clusters jp-east,jp-west tenant1/ns1
与命名空间相关联的群集可以在后期进行更改。
# ネームスペース「tenant1/ns1」に紐づくクラスターを「jp-east」のみに変更する
$ bin/pulsar-admin namespaces set-clusters --clusters jp-east tenant1/ns1
在连接了jp-east和jp-west的命名空间中,当在jp-east中创建一个主题时,会自动在jp-west中生成一个同名的主题(反之亦然)。而且,在jp-east中产生的消息会自动被复制和转发,并且在jp-west中也可以消费。这就是所谓的地理复制功能。
使用地理复制实现故障转移
在考虑到地热复制的用例时,数据中心级别的故障切换是其中之一。
假设发生了大规模的灾害等情况导致东日本的数据中心无法使用。从业务连续计划(BCP)的角度来看,即使发生这种情况,将处理转移到西日本的数据中心并保持服务持续运行是理想的。
使用GSLB(全局服务器负载平衡)等技术,可以轻松实现在发生故障时自动切换Producer和Consumer连接的集群。问题在于Pulsar的订阅是集群间相互独立的。每个订阅都有一个称为”游标”的信息,它指示“将消息分发至Consumer的位置以及对多少消息进行了Ack响应”,但是这些游标在集群之间并没有同步。
假设有一个消费者通过名为“sub”的订阅来订阅jp-east端的主题。然后,生产者通过m0到m3的四条消息向该主题进行生产,并且消费者接收并确认了m0和m1。在这个时刻,如果jp-east集群发生故障并且连接切换到jp-west集群,会发生什么呢?
生产者端没有特别的问题,一旦与jp-west集群连接完成,将继续生产消息m4和m5。而在消费者端,如果jp-west端尚未存在名为”sub”的订阅,那么在消费者连接时会被创建。但在这种情况下,从m0到m3之前已经生产的消息将不会在jp-west端进行分发。因此,消费者将无法接收到m2和m3,并且会先接收到m4和m5的消息。
如果在事故发生之前,jp-west也创建了一个名为”sub”的订阅,那么情况会如何呢?在这种情况下,由于在事故发生之前jp-west端没有连接消费者,从m0到m3的消息将会堆积在队列中而不被接收。在事故发生后,消费者将接收到从m0到m3的消息,然后接收到m4和m5的消息。由于m0和m1在事故发生之前jp-east端已经接收过了,因此这些消息会发生重复。
复制订阅是什么意思?
为了解决诸如这些消息的缺失和重复等问题而实施的功能是复制订阅。
Consumer可以选择启用复制订阅。例如,如果使用Java客户端库,可以按以下方式启用复制订阅。
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://public/default/test")
.subscriptionName("sub")
.replicateSubscriptionState(true) // レプリケーテッドサブスクリプションを有効化するための指定
.subscribe();
如果这样的Consumer连接到jp-east端口,那么jp-west端口也会自动创建相同名称的订阅。在jp-east端口上被Consumer接收和确认的消息也被视为在jp-west端口上接收和确认,并从队列中删除。这样,即使在jp-east端口发生故障并且Consumer重新连接到jp-west端口,也可以从(几乎)相同的位置恢复接收消息。
建筑设计
尽管复制的订阅在表面上看起来很简单,但其架构实际上相当复杂。下面我们将来看看复制的订阅功能是如何实现的,以及其原理。
如果你想在多个集群之间同步光标,简单地想的话,可以将光标信息写入可读写的存储中,并在两个集群之间共享。但是,实现这一点是不现实的。
首先,Pulsar能够展现出卓越的性能,并且可从所有集群中获取可用的存储。配置存储(全局ZooKeeper)可以从所有集群中进行读写,但仅用于元数据管理,性能不算太高。BookKeeper在顺序写入方面表现出色,但只能处理单个封闭的集群中的数据。
另外,游标保存着已被消费者应答的最新消息的ID,但消息ID在每个集群中是完全独立的。也就是说,jp-east端的消息和它被地理复制到jp-west端的消息的ID是不匹配的。因此,即使将指向集群共享存储中某个集群游标所指位置的写入,其他集群也无法确定自己的游标应该移动到哪个位置。
在这种情况下提出的方法是引入一种被称为”标记”的特殊消息,通过在常规主题消息中插播它来传达消息。标记消息不会被传递给消费者,而是用于在代理服务器内部同步游标位置。
在复制的订阅中使用的主要标记消息是
-
- ReplicatedSubscriptionsSnapshotRequest
-
- ReplicatedSubscriptionsSnapshotResponse
-
- ReplicatedSubscriptionsSnapshot
- ReplicatedSubscriptionsUpdate
以下是四个。接下来我们将分别看一下每个标记的作用。
复制订阅快照请求
ReplicatedSubscriptionsSnapshotRequest 是用于获取一个群集在另一个群集中的最新消息(无论是否已经确认)的 ID 的消息。这个标记消息会在每个群集的主题中以固定间隔(默认设置为每秒一次)自动生成并发送给除自己以外的群集。
将ReplicatedSubscriptionsSnapshotRequest的结构用JSON表示如下(实际上不是JSON,而是被Protocol Buffers序列化为二进制数据)。
{
"snapshot_id": "3fb212fc-784e-44ce-bb1e-117af24efa5e", // スナップショットIDと呼ばれるUUID文字列
"source_cluster": "jp-east" // 送信元クラスター名
}
复制订阅快照响应
当集群收到ReplicatedSubscriptionsSnapshotRequest时,其将以ReplicatedSubscriptionsSnapshotResponse消息作为回复,向发送请求的集群发送本地集群主题中最新的消息ID。其结构如下所示。
{
"snapshot_id": "3fb212fc-784e-44ce-bb1e-117af24efa5e", // スナップショットID
"cluster": {
"cluster": "jp-west", // 送信元クラスター名
"message_id": { // 送信元クラスターの最新のメッセージID
"ledger_id": 200,
"entry_id": 67890
}
}
}
复制的订阅快照
在发送源集群中接收到来自其他集群的ReplicatedSubscriptionsSnapshotResponse的ReplicatedSubscriptionsSnapshotRequest,会对本地主题产生ReplicatedSubscriptionsSnapshot消息。该消息包含与流向本地主题的ReplicatedSubscriptionsSnapshotResponse消息对应的消息ID,以及ReplicatedSubscriptionsSnapshotResponse中包含的远程集群主题中最新的消息ID。
{
"snapshot_id": "3fb212fc-784e-44ce-bb1e-117af24efa5e", // スナップショットID
"local_message_id": { // ローカルのReplicatedSubscriptionsSnapshotResponseのメッセージID
"ledger_id": 100,
"entry_id": 12345
},
"clusters": [
{
"cluster": "jp-west", // ReplicatedSubscriptionsSnapshotResponseの送信元クラスター名
"message_id": { // ReplicatedSubscriptionsSnapshotResponseの送信元クラスターの最新のメッセージID
"ledger_id": 200,
"entry_id": 67890
}
}
]
}
在Broker内部,有一个被称为调度器的组件负责向Consumer分发消息。当ReplicatedSubscriptionsSnapshot消息到达时,它会将其缓存到内存中一定数量的副本(默认设置是最近的10个)。
复制订阅更新
然后,当从Consumer收到Ack响应后,生成ReplicatedSubscriptionsUpdate消息的时机是游标移动的时候。在缓存的ReplicatedSubscriptionsSnapshot消息中,包含有“本地游标所指位置之前具有最新local_message_id”的ReplicatedSubscriptionsUpdate消息将发送到除自己以外的集群。
{
"subscription_name": "sub", // サブスクリプション名
"clusters": [
{
"cluster": "jp-west", // ReplicatedSubscriptionsSnapshotResponseの送信元クラスター名
"message_id": { // ReplicatedSubscriptionsSnapshotResponseの送信元クラスターの(当時の)最新のメッセージID
"ledger_id": 200,
"entry_id": 67890
}
}
]
}
接收到此信息的远程集群会提取其中包含的自身集群的消息ID,并将其之前的所有消息都视作已经收到Ack响应的消息。这样一来,集群之间的光标位置在某种程度上会被同步。
学科
只需一個選項,請看以下所述的本地中文改述:
根據前述架構的解釋,目前的集群內游標位置的同步,通過複製訂閱的方式並不完美。ReplicatedSubscriptionsSnapshot只會以至少1秒的時間間隔進行創建,而且只有當本地游標移動到ReplicatedSubscriptionsSnapshot創建時的訊息ID時,才能使其他集群的游標移動。因此,當使用者連接的集群切換時,可能會發生少量訊息的重複。
另外,标记消息的存在会影响主题的统计信息。在Pulsar中,可以通过REST API或Prometheus获取主题上流动消息的平均大小、吞吐量以及尚未回复的消息数量等统计信息,并在监控等方面发挥作用。然而,在存在复制订阅的主题中,标记消息的部分将被添加到统计值中,这导致了难以准确了解实际情况的缺点。
总结
复制订阅是一项非常重要的功能,尽管在平时并不显眼,它能够在大规模灾害等情况下实现数据中心单位之间的无缝故障转移。此外,该体系结构也是相当有趣的特点之一。
目前仍存在一些未解决的问题,但对此我们希望能在今后的改进中加以解决。