使用Micronaut访问Apache Kafka
微型凡人卡夫卡
由于Micronaut声称具有对Apache Kafka的支持,所以我决定试一试。
卡夫卡技术支持
据说可以使用Micronaut来创建Apache Kafka的Producer和Consumer。
微服务Kafka
环境
这次使用的Micronaut版本。
$ mn -V
| Micronaut Version: 1.0.4
| JVM Version: 1.8.0_191
在Micronaut 1.0.4版本中,似乎支持Apache Kafka 2.0.1版本。
升级到Kafka 2.0.1
微服务级别的Kafka
根据这个要求,我们下载了Apache Kafka 2.0.1。
$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz
$ tar xf kafka_2.12-2.0.1.tgz
$ cd kafka_2.12-2.0.1
按照Apache Kafka的快速入门,启动Apache ZooKeeper和Apache Kafka Broker。
快速启动/启动服务器
## Apache Zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties
## Apache Kafka(Broker)
$ bin/kafka-server-start.sh config/server.properties
这个主题是用“my-topic”作为名称创建的。
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic
Created topic "my-topic".
制作应用程序
那么,我来试着创建一个应用程序。创建了一个供应者和消费者 (这里以监听器表示)用的项目。
## Producer
$ mn create-app --features kafka --build maven hello-kafka-producer
## Listener(Consumer)
$ mn create-app --features kafka --build maven hello-kafka-listener
随后,将逐步向这两个项目添加和编辑源代码。
制造一个制作人
那么,让我们来编写一个向Apache Kafka Broker发送消息的Producer。
$ cd hello-kafka-producer
主类将保持自动生成的状态使用。
src/main/java/hello/kafka/producer/Application.java的中文翻译可以是:主要/主文件/入口文件/起始文件/应用程序.java
package hello.kafka.producer;
import io.micronaut.runtime.Micronaut;
public class Application {
public static void main(String[] args) {
Micronaut.run(Application.class);
}
}
生产者似乎需要创建带有@KafkaClient注释的接口。
定义 @KafkaClient 方法
以这种方式创建,不需要创建与接口相对应的实现类。
src/main/java/hello/kafka/producer/MessageClient.java 的原文件。
package hello.kafka.producer;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;
@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
@Topic("my-topic")
Single<String> send(@KafkaKey String key, Single<String> message);
}
配置可以在@KafkaClient注释和配置文件中进行。
在@KafkaClient注解中,只进行了ACK的设置。
@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
设置文件在这里。
src/main/resources/application.yml 的原文件路径。
---
micronaut:
application:
name: hello-kafka-producer
---
kafka:
bootstrap:
servers: localhost:9092
producers:
default:
retries: 5
在kafka.bootstrap.servers中进行连接配置,用于与Apache Kafka Broker建立连接。
Producer的行为是通过kafka.producers.[client-id]来进行的。
通过指定ID,可以对@KafkaClient单元进行设置。
根据@KafkaClient生产者属性
因为这次没有特别指定ID,所以默认使用了名为”default”的名称。
发送消息需要使用带有@Topic注解和指定主题名称的方法。
@Topic("my-topic")
Single<String> send(@KafkaKey String key, Single<String> message);
如果您要指定键,则可以使用@KafkaKey。如果不使用键,则无需指定(可以省略键参数本身)。
另外,由于可以使用RxJava等Reactive Streams相关的类型,因此我们选择了这个选项。
响应式和非阻塞方法定义
最后,使用 @KafkaClient 的 @Controller。
src/main/java/hello/kafka/producer/MessageController.java 的源代码文件位置
package hello.kafka.producer;
import javax.inject.Inject;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.reactivex.Single;
@Controller("/message")
public class MessageController {
@Inject
MessageClient messageClient;
@Post(value = "/{key}", consumes = MediaType.TEXT_PLAIN, produces = MediaType.TEXT_PLAIN)
public Single<String> message(String key, @Body Single<String> value) {
return messageClient.send(key, value).map(v -> String.format("message [%s] sended", v));
}
}
构建。
$ ./mvnw package
现在,制片方已经完成了。
听众(消费者)
接下来,让我们来谈谈消费者方面。
$ cd hello-kafka-listener
使用 @KafkaListener 的 Kafka 消费者
消费者可以使用 @KafkaListener 注解来创建。这是一个类的定义。
src/main/java/hello/kafka/listener/MessageListener.java:
主要的JAVA文件,监听Kafka消息的监听器文件。
package hello.kafka.listener;
import java.util.List;
import io.micronaut.configuration.kafka.Acknowledgement;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;
@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
@Topic("my-topic")
public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
message.subscribe(m -> {
System.out.printf("Received key / message = %s / %s%n", key, m);
acknowledgement.ack();
});
}
}
使用@KafkaListener注解来配置消费者,并在接收消息的方法上指定@Topic注解。
@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
@Topic("my-topic")
public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
使用的类型是反应式的。
接收和返回反应式类型
此外,我们还使用ACK。
message.subscribe(m -> {
System.out.printf("Received key / message = %s / %s%n", key, m);
acknowledgement.ack();
});
这次,设置仅限于连接到经纪人。另外,由于在同一主机上启动了Producer,所以将micronaut.server.port设置为8080以外的值。
源代码/主要资源/应用程序配置文件/application.yml
---
micronaut:
application:
name: hello-kafka-listener
server:
port: 9080
---
kafka:
bootstrap:
servers: localhost:9092
这也是一个项目。
$ ./mvnw package
现在,消费者这方面也准备就绪了。
试着动一下。
那么,我们来进行动作确认吧。
## Producerを起動
$ java -jar target/hello-kafka-producer-0.1.jar
## Consumerを起動
$ java -jar target/hello-kafka-listener-0.1.jar
我会随便给制片人发一条消息试试看。
$ curl -H 'Content-Type: text/plain' localhost:8080/message/key1 -d 'value1'
message [value1] sended
$ curl -H 'Content-Type: text/plain' localhost:8080/message/key2 -d 'value2'
message [value2] sended
$ curl -H 'Content-Type: text/plain' localhost:8080/message/key3 -d 'value3'
message [value3] sended
在消费者端,接收到的消息将以这种方式显示。
Received key / message = key1 / value1
Received key / message = key2 / value2
Received key / message = key3 / value3
看起来你成功地移动了。