试试使用Spring Boot + Spring Kafka将消息发送到Kafka
我调研了使用 Spring Kafka 向 Kafka 发送消息的方法,并进行了总结。
以下是结合了 Spring Boot 使用的示例。
环境建设
请参考 Kafka 的官方 QUICK START,关于 Kafka 的安装与配置指南我们不再赘述。
https://kafka.apache.org/quickstart
Spring Boot 应用程序会在 pom.xml 文件中添加 Spring Kafka。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
此外,需要在 application.yml 文件中指定 Kafka 服务器的启动位置。
默认情况下,它是在 localhost:9092 上启动的,所以只有在需要更改时才需要指定。
spring:
kafka:
bootstrap-servers: hostname:9092
KafkaTemplate 卡夫卡模板。
你可以使用KafkaTemplate类来向Kafka发送消息。
你需要指定类型参数,并分别指定要发送的数据的键和值的类型。
默认情况下,key和value都会使用StringSerializer进行序列化,因此最好将类型参数设置为String。
寄信的方式
使用send方法将消息发送到Kafka。至少需要指定topic和value。
同时,使用sendDefault方法可以省略指定topic。
默认的topic可以在application.yml的spring.kafka.template.default-topic中设置。
无论哪种方法,返回值都是ListenableFuture,并且是异步处理的。
如果要进行同步处理,则需要调用get()。
如果只想定义成功时的回调,可以使用SuccessCallback代替ListenableFutureCallback。
@Component
public class HelloKafka {
@Autowired
KafkaTemplate<String, String> template;
public void helloKafka(){
// コールバックを設定しない非同期呼び出し
template.send("sample", "value");
// コールバックを設定する非同期呼び出し
template.send("sample", "value").addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result){
// 成功時の処理
}
@Override
public void onFailure(Throwable ex) {
// 失敗時の処理
}
});
// 同期呼び出し
template.send("sample", "value").get();
}
}
制片人监听者
KafkaTemplate 可以使用 ProducerListener 来定义共享的回调函数。默认情况下,设置了 LoggingProducerListener,当出现错误时会输出日志。
如果要自己创建,就要创建一个实现了 ProducerListener 接口的类,并将其作为 Bean 注册。
@Bean
public ProducerListener<Object, Object> listener() {
return new ProducerListener<Object, Object>() {
@Override
public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
log.info("topic: {}, partition: {}, key: {}, value: {}", topic, partition, key, value);
}
};
}
序列化器
如上所述,默认情况下使用 StringSerializer。
因此,只能发送 String 数据和可转换为 String 类型的数据。
要更改Serializer,需要编辑application.yml。
例如,要将其更改为将JSON序列化为JsonSerializer,可以按照以下方式进行设置。
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
键和值都需要设置。
交易
为了启用事务控制,需要在application.yml中设置transaction-id-prefix。
spring:
kafka:
producer:
transaction-id-prefix: tx
如果激活了事务,则在未处于事务管理状态下使用send方法时,会发生需要在事务管理状态下执行的错误。
@进行事务控制的Transactional
通过给予 @Transactional注解, 可以对每个方法进行事务控制。可以设置 propagation等参数,但由于Kafka不支持isolation参数,如果更改了默认值,将会抛出异常。
@Component
@Transactional
public class HelloKafka {
@Autowired
KafkaTemplate<String, String> template;
public void helloKafka(){
template.send("sample", "value");
// ロールバックする
throw new RuntimeException();
}
}
使用executeInTransaction方法
您也可以使用 KafkaTemplate 的 executeInTransaction 方法来进行事务控制。在这种情况下,通过指定的回调函数对应的处理将被视为同一个事务。
@Component
public class HelloKafka {
@Autowired
KafkaTemplate<String, String> template;
public void helloKafka(){
template.executeInTransaction(t -> {
// この中の処理が同一トランザクション
t.send("sample", "value");
t.send("sample", "vvvvv");
return false;
});
}
}
此外,在被标记为 @Transactional 的方法内部使用 executeInTransaction 方法时,不会参与原有的事务,而是会生成一个新的事务。(即使用 REQUIRES_NEW)。
进行动作确认时要注意事项。
我一直在使用kafka-console-consumer进行测试,但是在一段时间内一直陷入了事务无法按预期运行的困境中。问题的原因是该工具的isolation-level默认设置为read_uncommitted,这导致未提交的消息也会显示出来,我误以为事务控制无法正常进行。
如果您要使用此工具进行确认,请将隔离级别设置为“读取已提交”。
kafka-console-consumer --bootstrap-server localhost:9092 --topic sample --isolation-level read_committed
我认为,即使在使用其他工具进行确认时,也可以确认隔离级别设置为何。
总结
通过上述方法,可以方便地将消息发送到Kafka。
根据官方文档显示,可以与其他事务管理器(如DataSourceTransactionManager)同步进行事务管理,并且似乎还有一个名为ReplyingKafkaTemplate的东西,所以我想进一步调查一下。