消息传递平台「Apache Pulsar」的使用方法(入门篇)

通过获得公司许可,我将投稿给Yahoo! JAPAN技术博客的文章转载到这里。
Apache Pulsar消息传递平台的使用方法(入门篇)- Yahoo! JAPAN技术博客

你好。我是雅虎股份有限公司系统总括本部云平台本部的水岛。目前我所属的团队致力于为公司内部提供排队、发布-订阅、流式处理等消息平台。

在这个团队中,我们使用开源软件Apache Pulsar(以下简称Pulsar)作为消息平台。团队已经开发和运营Pulsar数年,并积累了丰富的经验。我们希望分享这些经验,并希望大家对Pulsar产生兴趣。因此,我们决定以系列文章的形式发布Pulsar的用法、运营方法以及在Yahoo的案例等内容。不仅限于本文,我们也希望您能阅读我们今后预计发布的文章。

好吧,这次是连载的第一回,所以我将以”脉冲星入门”为题,大致介绍以下两点内容。

    • Pulsarの基本的な解説

さまざまな機能
Pulsarを構成するコンポーネント

Pulsarの基本的な使い方

簡易的なサーバー起動
サンプルコード実行

在Yahoo内部对Pulsar的利用情况

我所在的团队自2016年9月左右开始参与将Pulsar开源化的工作,同时也有多名成员担任Pulsar的核心开发者。除了在公司内部稳定地运营Pulsar,以便用户能够持续使用外,我们还通过根据公司内部需求和案例进行功能扩展、错误修复等开发工作,积极贡献于开源社区。

通过内部产品使用在这里运营的Pulsar,开发产品的工程师将不再需要单独运营消息平台。结果,产品可以专注于提供本应提供的服务和相应的用户价值创造。有关雅虎和Pulsar的关系,请参阅我们最近发布的另一篇文章。

    Message Queueを社内プラットフォームとして提供してみた 〜 Apache Pulsar活用事例 〜

基本的知識

在对Pulsar进行解释之前,我们先解释一下在消息传递系统中使用的术语以及Pulsar中使用的术语。

生产者/消费者

制作人是消息的发送方,消费者是消息的接收方。在Pub-Sub消息传递中,也可以将它们分别称为发布者/订阅者。在Pulsar中,这些术语也被用于相同的意思。消息传递系统接收来自生产者的消息并将其发送给消费者(或消费者接收)。这分别被称为Produce / Consume或Publish / Subscribe。

通过消息传递系统的中继,可以期望发送方系统和接收方系统的解耦。

承租者/命名空间/主题/订阅

Pulsar对常用的消息传递系统进行了更深层次的分层,并具有自己的术语和含义。图1展示了它们之间的大致嵌套关系。

pulsar_topics_tree.png

首先,将话题视为产生/消费消息时的终点。其次,命名空间是指将话题捆绑在一起的空间。通过在命名空间单位上为话题设置权限和限制等配置,可以轻松管理话题。再次,租户是指捆绑命名空间的空间。

另外,在Pulsar中,您可以使用以下形式指定消息发送和接收的终端,即主题。

persistent://${テナント}/${ネームスペース}/${トピック}

最后,”订阅”是消费者在消费主题时使用的标识符。图2展示了消息和订阅之间的关系。通过指定特定的订阅,消费者可以管理消息的消费进度。

pulsar_subscription.png

Pulsar的功能

Pulsar拥有基本功能和高级功能。在这里,我们将挑选一些进行介绍。

Pulsar的基本功能

作为Pulsar的基本功能,可以列举以下事项。

    • 水平スケール可能

 

    • 低レイテンシ

 

    • マルチテナンシー

システムやソフトウェアなどを複数の利用者で共有して利用できるような設計・構造であること

永続メッセージ
配信保証

在使用方面,特别是在水平扩展方面具有可扩展性的特点,通过简单地增加服务器可以提高集群的处理性能。为什么水平扩展容易的原因将在后面的架构部分进行解释。在利用方面,特别是在多租户方面具有特点,采用了可以与多个用户共享使用的设计和结构。实际上,这是通过租户管理员能够管理其下的命名空间,并设置可对命名空间的所有主题进行Produce/Consume的权限等实现的。

脉冲星的进化功能

我将介绍一些Pulsar的进阶功能。

订阅类型

前面提到的订阅有几种类型。目前(Pulsar v2.5.0)有独占、故障转移、共享、键共享四种类型,它们分别具有以下功能:

Exclusive

1つのサブスクリプションを1つのConsumerだけがConsumeできる

Failover

1つのサブスクリプションに複数のConsumerが接続できる
最も優先度の高いConsumerだけがConsumeできるが、このConsumerがダウンしたとき次の優先度のConsumerがConsumeできるようになる

Shared

1つのサブスクリプションを複数のConsumerがConsumeできる
メッセージがラウンドロビンに近い形でConsumerに配信される

Key_Shared

1つのサブスクリプションを複数のConsumerがConsumeできる
ProducerはメッセージにKeyと呼ばれる識別子を付与して、同じKeyを持つメッセージは同じConsumerに配信される
同じKeyを持つメッセージの順序保証ができる

通过这些,只需引入Pulsar就能满足消息传递系统的多个需求。

通过使用多个服务器来提高话题处理能力 (分区话题)

Pulsar常规情况下,一个主题通过一个Broker服务器来处理(Broker是Pulsar的组件,负责Producer和Consumer的消息发送和接收。详细内容见后文)。这意味着,对于主题的生产/消费处理能力不会超过一个Broker服务器的性能。而分区主题可以将主题作为多个内部主题(通常主题的集合)来处理,通过多个Broker处理内部主题,可以提高对主题的处理能力。此外,Producer/Consumer可以使用与常规主题相同的接口进行处理。

请参考图3,该图展示了分区主题的概念。更多详细信息请见下方链接。

    Partitioned Topics
pulsar_partitioned_topics.png

跨集群复制 (地理复制)

Pulsar在集群之间具有复制(Replication)消息的功能。通过启用此功能,可以在多个集群中消费相同的消息。这样一来,生产者就不需要将相同的消息发布到其他集群,消费者也可以在离消费者最近的Pulsar集群中消费消息,这是一个优点。此外,还可以创建集群之间的冗余配置。但是,需要注意的是,消息是异步复制的,因此并不完全实现了Pulsar上的冗余备份。

图4显示了Geo Replication的图像。您可以像蓝色箭头那样,将消息复制到Pulsar的另一个集群中的主题中。更多详细信息,请参阅此处。

    Pulsar geo-replication
pulsar_geo_replication.png

“持久化政策”

前一節介紹的指定形式中的持久 (persistent),指的是消息的持久化策略。此外还有非持久化 (non-persistent) 的策略可供选择。
持久化策略可以确保消息被持久化,而非持久化策略则不会持久化消息。相反,它可以实现快速传送给消费者。根据需求,用户可以灵活选择这些策略。更多详情请参考这里。

    Non-persistent messaging

通过代码处理消息(Pulsar Functions)

Pulsar具有对主题消息进行代码编写和处理的功能。用户只需编写必要的代码就可以使用这个功能。

在内部,会按照下面的流程进行处理。

    1. 根据主题消费消息

 

    1. 根据用户编写的代码处理消息

 

    将结果发布到其他主题

图5是Pulsar Functions的图像。您可以在不需要Pulsar以外的任何系统的情况下实现此类流处理。请查看这里以获取更多详细信息。

    Pulsar Functions overview
pulsar_functions.png

通过消息查询进行抽取(Pulsar SQL)

这里没有介绍,但是Pulsar有一种机制可以定义消息的模式。通过定义这个模式,可以使用SQL来进行消息的抽取处理。该机制使用了Presto。请参阅详细信息。

    Pulsar SQL Overview

与Pulsar和其他系统的连接(Pulsar IO)

您可以将Pulsar的消息传递系统与其他系统连接起来,以发送和接收消息。这样,您可以将消息制成Pulsar并将其存储在数据库中,或者从其他消息队列向Pulsar制作消息。更多详细信息,请参阅此处。

    Pulsar connector overview

脉冲星的架构

脉冲星可以在各种情况下作为消息平台使用。请查看以下是在Yahoo经常使用的情况。

    Message Queueを社内プラットフォームとして提供してみた 〜 Apache Pulsar活用事例 〜#社内MQ活用事例

Pulsar实现这样的平台是通过什么样的架构来实现的?图6总结了构成Pulsar的基本组件和架构的设想。但是,除此之外还存在各种各样的组件用于不同的功能。

pulsar_architecture.png

从图6中,作为一个组件

    • Broker

 

    • Bookie

 

    ZK (ZooKeeper)

可以发现有一种被称为Producer/Consumer的存在。此外,Producer/Consumer似乎通过Broker来交换消息,但我认为Broker还向Bookie发送消息。我将解释这些的含义以及每个组件的角色。

经纪人

经纪人在名称上起着中介生产者/消费者消息交互的角色。此外,它还提供用于实现上述地理复制功能以及创建和授权租户、命名空间等的REST API。

博彩者

作为软件名称,我们使用了名为Apache BookKeeper的存储系统。在Apache BookKeeper中,将负责保存消息的服务器称为Bookie。Bookie还会保存与消息一起的用于管理订阅将消息消费到哪个位置的信息,称为游标。通过存储消息,即使由于故障等原因导致消费者无法接收消息,也可以重新发送消息并进行处理。 看起来消息是由Broker发送给多个Bookie的,这是因为Bookie可以将相同的数据保存在多个Bookie上。根据Broker端设置的消息(准确说是Bookie的条目)写入副本数,消息将被写入其中一台Bookie。因此,即使发生了一台Bookie的故障,仍然可以继续读取消息。

动物园管理系统(ZooKeeper)

在软件名称方面,我们使用了一个名为Apache ZooKeeper(以下简称为ZooKeeper)的元数据存储系统。在Pulsar中,我们将ZooKeeper分为以下两种用法,并在不同的进程中启动它们。

    • Configuration Store

Pulsarのすべてのクラスタで共通する設定情報(例: テナント、ネームスペースなど)を保持します

Local ZooKeeper

Pulsarのそれぞれのクラスタで独立した設定情報(例: Broker、Bookieのメタデータなど)を保持します

水平刻度

如前所述,Pulsar的功能之一是它具备水平扩展的能力。其中一个原因是Broker和Bookie的角色分开。

举个例子,当消息写入的负载增加时,只需增加Bookie并进行部署即可。Bookie作为一个可扩展的特性,可以在增加Bookie时不需要更改其他Bookie的配置进行部署。同样地,当来自Producer/Consumer的访问增加导致负载增加时,只需增加Broker并进行部署即可实现负载均衡。这是因为Broker不会持久化数据且无状态。

这个机制的对比例子是 Apache Kafka(以下简称 Kafka)。Kafka的基本组件包括 Broker 和 ZooKeeper。Kafka也有消息持久化机制,但是消息是保存在 Broker 上的。因此,即使添加了新的 Broker 服务器,负载不会自动地从现有的 Broker 转移到新的 Broker 上。

你好,脉冲星。

下面将介绍一系列使用Pulsar的实际步骤。首先需要了解一点,Pulsar有一种称为standalone的模式,可以将Broker、Bookie和ZooKeeper作为一个JVM进程一起启动。这次我们将使用standalone模式来启动Pulsar,并尝试使用Java客户端创建Producer和Consumer。

前提 tí)

首先,本次环境建立假设是在macOS或Linux操作系统上进行。另外,独立模式默认需要2GB内存。顺便一提,这个步骤基本上是按照官方文档来进行的。

安装Java11

从JDK 11开始,安装Java11。

Pulsar的下载和解压

使用下列命令下载Pulsar v2.5.0:

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.5.0/apache-pulsar-2.5.0-bin.tar.gz

完成下载后,使用下面的命令解压并移动Pulsar。

$ tar xfz apache-pulsar-2.5.0-bin.tar.gz
$ cd apache-pulsar-2.5.0

启动Pulsar并创建租户和命名空间。

使用以下命令启动独立模式的Pulsar,其紧随上述内容。

$ bin/pulsar standalone

请准备另一个终端,并移动到与上述相同的目录。使用下面的命令创建一个用于示例代码的租户命名空间。默认设置下,在Produce/Consume主题时,主题会自动创建。

$ bin/pulsar-admin tenants create my-tenant

$ bin/pulsar-admin namespaces create my-tenant/my-namespace

示例代码

制片人 (zhì

你好HelloProducer.java

package pulsar.hello;

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;

class HelloProducer {

    public static void main(String[] args) throws Exception {

        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        Producer<byte[]> producer = pulsarClient.newProducer()
                .topic("persistent://my-tenant/my-namespace/my-topic")
                .create();

        for (int i = 0; i < 10; i++) {
            // メッセージを送信する
            producer.send(String.format("my-message-%d", i).getBytes());
        }

        pulsarClient.close();

    }
}

消费者

你好,消费者.java

package pulsar.hello;

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;

class HelloConsumer {

    public static void main(String[] args) throws Exception {

        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic("persistent://my-tenant/my-namespace/my-topic")
                .subscriptionName("my-subscription-name")
                .subscribe();

        for (int i = 0; i < 10; i++) {
            // メッセージを受信する
            Message msg = consumer.receive();
            System.out.println("Received message: " + new String(msg.getData()));

            // BrokerにACKを返す
            consumer.acknowledge(msg);
        }

        pulsarClient.close();

    }
}

pom.xml 可以用中文翻译为「项目对象模型文件」。

假设你正在使用Maven,那么请在代码块中添加以下内容。

<dependency>
  <groupId>org.apache.pulsar</groupId>
  <artifactId>pulsar-client</artifactId>
  <version>2.5.0</version>
</dependency>

运行代码

将上述的示例代码进行编译

    1. 消费者

 

    生产者

在不同的终端中按照以下顺序启动。然后,可以在Consumer的标准输出中确认是否输出了如下消息。

Received message: my-message-0
Received message: my-message-1
Received message: my-message-2
Received message: my-message-3
Received message: my-message-4
Received message: my-message-5
Received message: my-message-6
Received message: my-message-7
Received message: my-message-8
Received message: my-message-9

通过这样的方式,由于客户需求相对简明,可以轻松进行初始引入。

终点

第1回到此为止。总结一下,我写了以下内容。

    • Pulsarの基本的な解説

さまざまな機能
Pulsarを構成するコンポーネント

Pulsarの基本的な使い方

簡易的なサーバー起動
サンプルコード実行

也许内容有点多,但如果您有想要深入探讨的话题,可以参考适当的链接中提供的详细信息。下次我们将缩减一些内容,计划介绍更多关于客户端代码的应用功能。由于这次无法解释前半部分介绍的功能的实际使用方法,我们考虑要补充这方面的内容。

这里介绍一下,Pulsar社区似乎正在计划举办Pulsar Summit峰会。我个人认为这将是了解社区发展状况的机会。在GitHub上持续出现积极的问题和拉取请求,可以感受到社区正在蓬勃发展。顺便说一下,虽然我只在Pulsar开发和运维团队工作了大约5个月,但每天都能看到社区的活跃,感受到Pulsar相较以前的改善。我希望继续关注未来的动态。

我觉得下一次可能会变为不定期发布,敬请期待!

广告
将在 10 秒后关闭
bannerAds