JavaでKafkaを使用する方法
JavaでKafkaを使用するには、最初にKafkaの依存関係を追加する必要があります。Mavenのpom.xmlファイルに、下記の依存関係を追加できます。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
次に、次の短い例に示すように、KafkaのJavaクライアントを使ってコードを書くことができます。これは、Javaを使ってメッセージを送受信する方法を示したものです。
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
public class KafkaExample {
private static final String TOPIC = "my-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my-group";
public static void main(String[] args) {
// 创建生产者
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(producerProps);
// 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "key", "value");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.printf("Sent record to topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
// 关闭生产者
producer.close();
// 创建消费者
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
// 订阅主题并消费消息
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received record: key=%s, value=%s%n", record.key(), record.value());
}
}
}
}
上の例では、まずプロデューサーを作成し、ProducerConfigクラスの定数を使用して、Kafkaクラスタのアドレス、キーと値のシリアライズ方法などのプロデューサーのプロパティを設定します。次に、プロデューサーレコードオブジェクトを作成し、送信するトピック、キー、値を指定します。プロデューサーのsend()メソッドを呼び出してメッセージを送信し、コールバックを使用して送信結果を処理します。最後に、プロデューサーを閉じます。
そのあと、私たちはConsumerを作成し、ConsumerConfigクラスの定数を使用して、Kafkaクラスタのアドレス、キーと値の逆シリアル化方法、コンシューマグループなどのコンシューマーのプロパティを設定します。私たちは1つのトピックをサブスクライブし、無限ループの中でpoll()メソッドを呼び出してメッセージを取得します。私たちはメッセージを反復処理し、処理を行います。
この例は、KafkaをJavaで操作する方法を示す単純な例であることに注意してください。実際には、メッセージを処理し、パフォーマンスを最適化し、信頼性を確保するために、より複雑なロジックとより多くの構成オプションを使用する必要がある場合があります。