使用Micronaut访问Apache Kafka

微型凡人卡夫卡

由于Micronaut声称具有对Apache Kafka的支持,所以我决定试一试。

卡夫卡技术支持

据说可以使用Micronaut来创建Apache Kafka的Producer和Consumer。

微服务Kafka

环境

这次使用的Micronaut版本。

$ mn -V
| Micronaut Version: 1.0.4
| JVM Version: 1.8.0_191

在Micronaut 1.0.4版本中,似乎支持Apache Kafka 2.0.1版本。

升级到Kafka 2.0.1

微服务级别的Kafka

根据这个要求,我们下载了Apache Kafka 2.0.1。

$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz
$ tar xf kafka_2.12-2.0.1.tgz
$ cd kafka_2.12-2.0.1

按照Apache Kafka的快速入门,启动Apache ZooKeeper和Apache Kafka Broker。

快速启动/启动服务器

## Apache Zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties

## Apache Kafka(Broker)
$ bin/kafka-server-start.sh config/server.properties

这个主题是用“my-topic”作为名称创建的。

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic
Created topic "my-topic".

制作应用程序

那么,我来试着创建一个应用程序。创建了一个供应者和消费者 (这里以监听器表示)用的项目。

## Producer
$ mn create-app --features kafka --build maven hello-kafka-producer

## Listener(Consumer)
$ mn create-app --features kafka --build maven hello-kafka-listener

随后,将逐步向这两个项目添加和编辑源代码。

制造一个制作人

那么,让我们来编写一个向Apache Kafka Broker发送消息的Producer。

$ cd hello-kafka-producer

主类将保持自动生成的状态使用。

src/main/java/hello/kafka/producer/Application.java的中文翻译可以是:主要/主文件/入口文件/起始文件/应用程序.java

package hello.kafka.producer;

import io.micronaut.runtime.Micronaut;

public class Application {

    public static void main(String[] args) {
        Micronaut.run(Application.class);
    }
}

生产者似乎需要创建带有@KafkaClient注释的接口。

定义 @KafkaClient 方法

以这种方式创建,不需要创建与接口相对应的实现类。

src/main/java/hello/kafka/producer/MessageClient.java 的原文件。

package hello.kafka.producer;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;

@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {
    @Topic("my-topic")
    Single<String> send(@KafkaKey String key, Single<String> message);
}

配置可以在@KafkaClient注释和配置文件中进行。

在@KafkaClient注解中,只进行了ACK的设置。

@KafkaClient(acks = KafkaClient.Acknowledge.ALL)
public interface MessageClient {

设置文件在这里。

src/main/resources/application.yml 的原文件路径。

---
micronaut:
    application:
        name: hello-kafka-producer

---
kafka:
    bootstrap:
        servers: localhost:9092
    producers:
        default:
            retries: 5

在kafka.bootstrap.servers中进行连接配置,用于与Apache Kafka Broker建立连接。

Producer的行为是通过kafka.producers.[client-id]来进行的。

通过指定ID,可以对@KafkaClient单元进行设置。

根据@KafkaClient生产者属性

因为这次没有特别指定ID,所以默认使用了名为”default”的名称。

发送消息需要使用带有@Topic注解和指定主题名称的方法。

    @Topic("my-topic")
    Single<String> send(@KafkaKey String key, Single<String> message);

如果您要指定键,则可以使用@KafkaKey。如果不使用键,则无需指定(可以省略键参数本身)。

另外,由于可以使用RxJava等Reactive Streams相关的类型,因此我们选择了这个选项。

响应式和非阻塞方法定义

最后,使用 @KafkaClient 的 @Controller。

src/main/java/hello/kafka/producer/MessageController.java 的源代码文件位置

package hello.kafka.producer;

import javax.inject.Inject;

import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.reactivex.Single;

@Controller("/message")
public class MessageController {
    @Inject
    MessageClient messageClient;

    @Post(value = "/{key}", consumes = MediaType.TEXT_PLAIN, produces = MediaType.TEXT_PLAIN)
    public Single<String> message(String key, @Body Single<String> value) {
        return messageClient.send(key, value).map(v -> String.format("message [%s] sended", v));
    }
}

构建。

$ ./mvnw package

现在,制片方已经完成了。

听众(消费者)

接下来,让我们来谈谈消费者方面。

$ cd hello-kafka-listener

使用 @KafkaListener 的 Kafka 消费者

消费者可以使用 @KafkaListener 注解来创建。这是一个类的定义。

src/main/java/hello/kafka/listener/MessageListener.java:
主要的JAVA文件,监听Kafka消息的监听器文件。

package hello.kafka.listener;

import java.util.List;

import io.micronaut.configuration.kafka.Acknowledgement;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.reactivex.Single;

@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
    @Topic("my-topic")
    public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {
        message.subscribe(m -> {
            System.out.printf("Received key / message = %s / %s%n", key, m);
            acknowledgement.ack();
        });
    }
}

使用@KafkaListener注解来配置消费者,并在接收消息的方法上指定@Topic注解。

@KafkaListener(groupId = "message-group", offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED)
public class MessageListener {
    @Topic("my-topic")
    public void receive(@KafkaKey String key, Single<String> message, Acknowledgement acknowledgement) {

使用的类型是反应式的。

接收和返回反应式类型

此外,我们还使用ACK。

        message.subscribe(m -> {
            System.out.printf("Received key / message = %s / %s%n", key, m);
            acknowledgement.ack();
        });

这次,设置仅限于连接到经纪人。另外,由于在同一主机上启动了Producer,所以将micronaut.server.port设置为8080以外的值。

源代码/主要资源/应用程序配置文件/application.yml

---
micronaut:
    application:
        name: hello-kafka-listener
    server:
        port: 9080

---
kafka:
    bootstrap:
        servers: localhost:9092

这也是一个项目。

$ ./mvnw package

现在,消费者这方面也准备就绪了。

试着动一下。

那么,我们来进行动作确认吧。

## Producerを起動
$ java -jar target/hello-kafka-producer-0.1.jar


## Consumerを起動
$ java -jar target/hello-kafka-listener-0.1.jar

我会随便给制片人发一条消息试试看。

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key1 -d 'value1'
message [value1] sended

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key2 -d 'value2'
message [value2] sended

$ curl -H 'Content-Type: text/plain' localhost:8080/message/key3 -d 'value3'
message [value3] sended

在消费者端,接收到的消息将以这种方式显示。

Received key / message = key1 / value1
Received key / message = key2 / value2
Received key / message = key3 / value3

看起来你成功地移动了。

广告
将在 10 秒后关闭
bannerAds