使用Spring Boot + Protocol Buffers + Kafka发送和接收消息
最近有机会接触到Protocol Buffers,所以想尝试在Kafka应用程序中使用它。
协议缓冲区是什么?
Protocol Buffers是一个用于对象序列化的库。可用于序列化的数据格式有很多,JSON和XML也是其中之一,但由于它们是文本数据,容量往往会很大。使用Protocol Buffers可以减小序列化生成的字节序列的容量,从而改善在通过网络进行通信时的性能。此外,由于模式与编程语言无关,它可以在使用不同技术栈的微服务之间进行通信。
应用程序的整体概况
使用Protocol Buffers时,首先需要在proto文件中定义数据的模式,并在数据的发送方和接收方之间共享它。如果有proto文件,则可以根据它为各种语言自动生成用于生成和解析数据的源代码,因此即使模式发生变化,开发人员也不需要维护解析器,可以专注于业务逻辑。
如果将Protocol Buffers用于使用Kafka进行消息传输和接收,则应用程序将遵循以下流程。
以下是应用程序的存储库。
创建proto文件
首先,创建一个profo文件。这次我们会用Kafka来发送和接收由三个字段组成的数据,分别是电影名,地点和票房。
syntax = "proto3";
package proto;
option java_multiple_files = true;
option java_package = "com.udomomo.springbootkafkapractice.proto";
message MyTopicEntry {
string movie_name = 1;
enum LOCATION {
UNKNOWN = 0;
THEATER = 1;
STREAMING = 2;
}
LOCATION location = 2;
int32 box_office = 3;
}
java_multiple_files 可以用来设置是否将从proto文件生成的Java源代码拆分为多个文件。java_package 是生成的源代码的包名。
请参考以下的proto文件语法。
Proto文件应该放在哪里?
由于Proto文件是多个应用程序共享的,所以不应该将其放置在特定的应用程序中。常见的方法包括以下内容。
-
- 複数アプリケーションから参照されるcommonパッケージの中にprotoファイルと生成コードを入れ、各アプリケーションの依存関係として追加する
- Schema Registryを利用する(例: Confluent Schema Registry)
由于这次是一个本地的简易项目,所以只需创建一个proto/目录并将其放置其中即可。
从proto文件生成代码
在构建应用程序时,生成的代码将从最新的proto文件中生成。这次我们将使用Gradle。
以下是必需的三个依赖关系。
com.google.protobuf : Gradle用のプラグイン。protoファイルからコードを生成するためのタスクを提供する。
com.google.protobuf:protobuf-java : Java用のProtocol Buffersコアライブラリ。生成されたコードにアプリケーションからアクセスするためのメソッド等を提供する。Gradleでは dependency に追加すれば良い。
protoc : protoファイルをコンパイルするために必要なC言語パッケージ。CLIも提供されているが、ベストプラクティスはプリコンパイルされた com.google.protobuf:protoc パッケージをタスク内で使うこと。(下記参照)
另外,需要保持 protobuf-java 和 protoc 的版本一致。
定义Gradle任务如下。
sourceSets {
main {
proto {
srcDir "$rootDir/proto"
}
java {
srcDirs "$buildDir/generated/source/proto/main/java"
}
}
}
protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.17.3'
}
}
在protobuf任务中,使用protoc编译proto文件。
而sourceSets任务是为了使生成的代码可以被应用程序访问而设立的。其属性如下所示。
proto.srcDir: protoファイルの場所を指定している。
java.srcDirs: 生成されたコードが格納される場所を指定している。
通过在 main 以下定义它们,生成的代码将成为应用程序 main sourceSet 的一部分,并且可以被访问到。
当您定义完任务后,可以使用gradlew的–dry-run选项来确认在构建时是否真正编译了proto文件。下面是在构建producer应用程序时确认的示例。
$ ./gradlew :producer:build --dry-run
Starting a Gradle Daemon, 1 incompatible Daemon could not be reused, use --status for details
:producer:extractIncludeProto SKIPPED
:producer:extractProto SKIPPED
:producer:generateProto SKIPPED
:producer:compileJava SKIPPED
:producer:processResources SKIPPED
:producer:classes SKIPPED
:producer:bootJar SKIPPED
:producer:jar SKIPPED
:producer:assemble SKIPPED
:producer:extractIncludeTestProto SKIPPED
:producer:extractTestProto SKIPPED
:producer:generateTestProto SKIPPED
:producer:compileTestJava SKIPPED
:producer:processTestResources SKIPPED
:producer:testClasses SKIPPED
:producer:test SKIPPED
:producer:check SKIPPED
:producer:build SKIPPED
BUILD SUCCESSFUL in 16s
在执行compileJava任务之前,我们可以看到会执行相关的Proto任务。
使用Kafka发送和接收消息。
制片人 (zhì
在proto文件中定义的数据对象可以使用builder方法创建。例如,setMovieName()等方法是根据proto文件自动生成的。
MyTopicEntry myTopicEntry1 = MyTopicEntry.newBuilder()
.setMovieName("Titanic")
.setLocation(MyTopicEntry.LOCATION.STREAMING)
.setBoxOffice(340000)
.build();
请事先创建一个用于将这个数据对象通过生产者发送的序列化器。该序列化器的作用只是将数据转换为字节数组,但是为了后续添加实现时更容易限定影响范围,最好根据数据类型分开不同的序列化器。
public class MyTopicEntrySerializer implements Serializer<MyTopicEntry> {
public MyTopicEntrySerializer() {
}
public byte[] serialize(String topic, MyTopicEntry data) {
return data.toByteArray();
}
}
可以使用创建的序列化器来发送消息。
kafkaTemplate.send(kafkaSettings.getTopic(), myTopicEntry1);
消费者
创建一个MyTopicEntryDeserializer,从Consumer接收消息。通过使用parseFrom()方法,接收到的消息可以反序列化为在Proto文件中定义的数据对象。
MyTopicEntry entry = MyTopicEntry.parseFrom(record.value());