使用Spring Boot + Protocol Buffers + Kafka发送和接收消息

最近有机会接触到Protocol Buffers,所以想尝试在Kafka应用程序中使用它。

协议缓冲区是什么?

Protocol Buffers是一个用于对象序列化的库。可用于序列化的数据格式有很多,JSON和XML也是其中之一,但由于它们是文本数据,容量往往会很大。使用Protocol Buffers可以减小序列化生成的字节序列的容量,从而改善在通过网络进行通信时的性能。此外,由于模式与编程语言无关,它可以在使用不同技术栈的微服务之间进行通信。

 

应用程序的整体概况

使用Protocol Buffers时,首先需要在proto文件中定义数据的模式,并在数据的发送方和接收方之间共享它。如果有proto文件,则可以根据它为各种语言自动生成用于生成和解析数据的源代码,因此即使模式发生变化,开发人员也不需要维护解析器,可以专注于业务逻辑。

如果将Protocol Buffers用于使用Kafka进行消息传输和接收,则应用程序将遵循以下流程。

protobuf-kafka.png

以下是应用程序的存储库。

 

创建proto文件

首先,创建一个profo文件。这次我们会用Kafka来发送和接收由三个字段组成的数据,分别是电影名,地点和票房。

syntax = "proto3";

package proto;

option java_multiple_files = true;
option java_package = "com.udomomo.springbootkafkapractice.proto";

message MyTopicEntry {
  string movie_name = 1;
  enum LOCATION {
    UNKNOWN = 0;
    THEATER = 1;
    STREAMING = 2;
  }
  LOCATION location = 2;
  int32 box_office = 3;
}

java_multiple_files 可以用来设置是否将从proto文件生成的Java源代码拆分为多个文件。java_package 是生成的源代码的包名。
请参考以下的proto文件语法。

 

Proto文件应该放在哪里?

由于Proto文件是多个应用程序共享的,所以不应该将其放置在特定的应用程序中。常见的方法包括以下内容。

    • 複数アプリケーションから参照されるcommonパッケージの中にprotoファイルと生成コードを入れ、各アプリケーションの依存関係として追加する

 

    Schema Registryを利用する(例: Confluent Schema Registry)

由于这次是一个本地的简易项目,所以只需创建一个proto/目录并将其放置其中即可。

从proto文件生成代码

在构建应用程序时,生成的代码将从最新的proto文件中生成。这次我们将使用Gradle。

以下是必需的三个依赖关系。

com.google.protobuf : Gradle用のプラグイン。protoファイルからコードを生成するためのタスクを提供する。

com.google.protobuf:protobuf-java : Java用のProtocol Buffersコアライブラリ。生成されたコードにアプリケーションからアクセスするためのメソッド等を提供する。Gradleでは dependency に追加すれば良い。

protoc : protoファイルをコンパイルするために必要なC言語パッケージ。CLIも提供されているが、ベストプラクティスはプリコンパイルされた com.google.protobuf:protoc パッケージをタスク内で使うこと。(下記参照)

另外,需要保持 protobuf-java 和 protoc 的版本一致。

定义Gradle任务如下。

sourceSets {
    main {
        proto {
            srcDir "$rootDir/proto"
        }
        java {
            srcDirs "$buildDir/generated/source/proto/main/java"
        }
    }
}

protobuf {
    protoc {
        artifact = 'com.google.protobuf:protoc:3.17.3'
    }
}

在protobuf任务中,使用protoc编译proto文件。
而sourceSets任务是为了使生成的代码可以被应用程序访问而设立的。其属性如下所示。

proto.srcDir: protoファイルの場所を指定している。

java.srcDirs: 生成されたコードが格納される場所を指定している。

通过在 main 以下定义它们,生成的代码将成为应用程序 main sourceSet 的一部分,并且可以被访问到。

当您定义完任务后,可以使用gradlew的–dry-run选项来确认在构建时是否真正编译了proto文件。下面是在构建producer应用程序时确认的示例。

$ ./gradlew :producer:build --dry-run
Starting a Gradle Daemon, 1 incompatible Daemon could not be reused, use --status for details
:producer:extractIncludeProto SKIPPED
:producer:extractProto SKIPPED
:producer:generateProto SKIPPED
:producer:compileJava SKIPPED
:producer:processResources SKIPPED
:producer:classes SKIPPED
:producer:bootJar SKIPPED
:producer:jar SKIPPED
:producer:assemble SKIPPED
:producer:extractIncludeTestProto SKIPPED
:producer:extractTestProto SKIPPED
:producer:generateTestProto SKIPPED
:producer:compileTestJava SKIPPED
:producer:processTestResources SKIPPED
:producer:testClasses SKIPPED
:producer:test SKIPPED
:producer:check SKIPPED
:producer:build SKIPPED

BUILD SUCCESSFUL in 16s

在执行compileJava任务之前,我们可以看到会执行相关的Proto任务。

使用Kafka发送和接收消息。

制片人 (zhì

在proto文件中定义的数据对象可以使用builder方法创建。例如,setMovieName()等方法是根据proto文件自动生成的。

MyTopicEntry myTopicEntry1 = MyTopicEntry.newBuilder()
            .setMovieName("Titanic")
            .setLocation(MyTopicEntry.LOCATION.STREAMING)
            .setBoxOffice(340000)
            .build();

请事先创建一个用于将这个数据对象通过生产者发送的序列化器。该序列化器的作用只是将数据转换为字节数组,但是为了后续添加实现时更容易限定影响范围,最好根据数据类型分开不同的序列化器。

public class MyTopicEntrySerializer implements Serializer<MyTopicEntry> {
  public MyTopicEntrySerializer() {
  }

  public byte[] serialize(String topic, MyTopicEntry data) {
    return data.toByteArray();
  }
}

可以使用创建的序列化器来发送消息。

kafkaTemplate.send(kafkaSettings.getTopic(), myTopicEntry1);

消费者

创建一个MyTopicEntryDeserializer,从Consumer接收消息。通过使用parseFrom()方法,接收到的消息可以反序列化为在Proto文件中定义的数据对象。

MyTopicEntry entry = MyTopicEntry.parseFrom(record.value());
广告
将在 10 秒后关闭
bannerAds