※Kafkaに関する情報は下記のURLから参照してください。
https://kafka.apache.org/intro

必要ライブラリ

    • kafka-clients-3.2.0

 

    • slf4j-api-1.7.36

 

    slf4j-simple-1.7.36

事前準備

    • Kafkaの起動

 

    • 下記のコマンドで立ち上げを行います。

 

    • /bin/zookeeper-server-start.sh /config/zookeeper.properties

Kafka brokcerの起動
下記のコマンドで立ち上げを行います。
/bin/kafka-server-start.sh /config/server.properties

Coding

Kafkaのプロパティを定義します。
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
“my-transactional-id”);

ProducerConfig.BOOTSTRAP_SERVERS_CONFIGには接続するKafkaサーバの[URL:PORT]を記載します。
ProducerConfig.TRANSACTIONAL_ID_CONFIGにはトランザクションのIDを記載します。今回は任意の値で使います。

Producerを定義し、トランザクションを初期化します。
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();

トランザクション処理を開始します。
producer.beginTransaction();

Kafkaへイベントを送信します。
producer.send(new ProducerRecord<>(TOPIC, Integer.toString(i), “Java Producer Pub”));

今回のTOPIC、値については任意で設定し、行います。
5. 4の処理が問題なく、終わったらコミット、接続をクローズします。

	 producer.commitTransaction();
	 producer.close();		

参考

下記のレポジトリにソースコードを確認いただけます。
https://github.com/dlstjq7685/KafkaSimpleProducerTuttorial

ソースコードはkafkaのJavaDocの内容に沿って作成しております。
あくまで参考程度で書いたものですので、
その他の設定については別記事を参考にしてください。

广告
将在 10 秒后关闭
bannerAds