试试使用Spring Boot + Spring Kafka将消息发送到Kafka

我调研了使用 Spring Kafka 向 Kafka 发送消息的方法,并进行了总结。
以下是结合了 Spring Boot 使用的示例。

环境建设

请参考 Kafka 的官方 QUICK START,关于 Kafka 的安装与配置指南我们不再赘述。
https://kafka.apache.org/quickstart

Spring Boot 应用程序会在 pom.xml 文件中添加 Spring Kafka。

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.0.RELEASE</version>
        <relativePath/>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

此外,需要在 application.yml 文件中指定 Kafka 服务器的启动位置。
默认情况下,它是在 localhost:9092 上启动的,所以只有在需要更改时才需要指定。

spring:
    kafka:
        bootstrap-servers: hostname:9092

KafkaTemplate 卡夫卡模板。

你可以使用KafkaTemplate类来向Kafka发送消息。
你需要指定类型参数,并分别指定要发送的数据的键和值的类型。

默认情况下,key和value都会使用StringSerializer进行序列化,因此最好将类型参数设置为String。

寄信的方式

使用send方法将消息发送到Kafka。至少需要指定topic和value。
同时,使用sendDefault方法可以省略指定topic。
默认的topic可以在application.yml的spring.kafka.template.default-topic中设置。

无论哪种方法,返回值都是ListenableFuture,并且是异步处理的。
如果要进行同步处理,则需要调用get()。

如果只想定义成功时的回调,可以使用SuccessCallback代替ListenableFutureCallback。

@Component
public class HelloKafka {

    @Autowired
    KafkaTemplate<String, String> template;

    public void helloKafka(){
        // コールバックを設定しない非同期呼び出し
        template.send("sample", "value");

        // コールバックを設定する非同期呼び出し
        template.send("sample", "value").addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result){
                // 成功時の処理
            }

            @Override
            public void onFailure(Throwable ex) {
                // 失敗時の処理
            }
        });

        // 同期呼び出し
        template.send("sample", "value").get();
    }
}

制片人监听者

KafkaTemplate 可以使用 ProducerListener 来定义共享的回调函数。默认情况下,设置了 LoggingProducerListener,当出现错误时会输出日志。

如果要自己创建,就要创建一个实现了 ProducerListener 接口的类,并将其作为 Bean 注册。

@Bean
public ProducerListener<Object, Object> listener() {
    return new ProducerListener<Object, Object>() {
        @Override
        public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
            log.info("topic: {}, partition: {}, key: {}, value: {}", topic, partition, key, value);
        }
    };
}

序列化器

如上所述,默认情况下使用 StringSerializer。
因此,只能发送 String 数据和可转换为 String 类型的数据。

要更改Serializer,需要编辑application.yml。
例如,要将其更改为将JSON序列化为JsonSerializer,可以按照以下方式进行设置。

spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer

键和值都需要设置。

交易

为了启用事务控制,需要在application.yml中设置transaction-id-prefix。

spring:
  kafka:
    producer:
      transaction-id-prefix: tx

如果激活了事务,则在未处于事务管理状态下使用send方法时,会发生需要在事务管理状态下执行的错误。

@进行事务控制的Transactional

通过给予 @Transactional注解, 可以对每个方法进行事务控制。可以设置 propagation等参数,但由于Kafka不支持isolation参数,如果更改了默认值,将会抛出异常。

@Component
@Transactional
public class HelloKafka {

    @Autowired
    KafkaTemplate<String, String> template;

    public void helloKafka(){
        template.send("sample", "value");

        // ロールバックする
        throw new RuntimeException();
    }
}

使用executeInTransaction方法

您也可以使用 KafkaTemplate 的 executeInTransaction 方法来进行事务控制。在这种情况下,通过指定的回调函数对应的处理将被视为同一个事务。

@Component
public class HelloKafka {

    @Autowired
    KafkaTemplate<String, String> template;

    public void helloKafka(){
        template.executeInTransaction(t -> {
            // この中の処理が同一トランザクション
            t.send("sample", "value");
            t.send("sample", "vvvvv");
            return false;
        });
    }
}

此外,在被标记为 @Transactional 的方法内部使用 executeInTransaction 方法时,不会参与原有的事务,而是会生成一个新的事务。(即使用 REQUIRES_NEW)。

进行动作确认时要注意事项。

我一直在使用kafka-console-consumer进行测试,但是在一段时间内一直陷入了事务无法按预期运行的困境中。问题的原因是该工具的isolation-level默认设置为read_uncommitted,这导致未提交的消息也会显示出来,我误以为事务控制无法正常进行。

如果您要使用此工具进行确认,请将隔离级别设置为“读取已提交”。

kafka-console-consumer --bootstrap-server localhost:9092 --topic sample --isolation-level read_committed

我认为,即使在使用其他工具进行确认时,也可以确认隔离级别设置为何。

总结

通过上述方法,可以方便地将消息发送到Kafka。
根据官方文档显示,可以与其他事务管理器(如DataSourceTransactionManager)同步进行事务管理,并且似乎还有一个名为ReplyingKafkaTemplate的东西,所以我想进一步调查一下。

广告
将在 10 秒后关闭
bannerAds