皆さん、こんにちは!
最近、プロジェクトにおいて、Kafkaという技術を活用したため、今日は本記事でサンプルを作りながら解説してみましょう。
一、Kafkaとは
Kafkaは大規模なストリームデータを扱うことができるオープンソースの分散メッセージングシステムです。レコードのストリームをリアルタイムで公開、サブスクライブ、保存、処理できます。
二、Kafkaの仕組み
Kafkaには4つのコアAPIが存在します。
KafkaはPublish-Subscribeメッセージモデルを利用して動作します。基本的に下記の通りです。
三、環境構築
1.インストールJava
ここは略です。
2.インストールKafka
3.インストールZookeeper
KafkaはZookeeperを依頼するので、必ず予めzookeeperをインストールし、起動します。
ただ、上記の「 2.インストールKafka」でインストールされたKafkaはzookeeperを付いたので、本記事は別途でzookeeperをインストールしない。
四、KafkaとZookeeperの起動
Kafkaを起動するように、必ずzookeeperを予め起動してください。
先ずはCMDを開いて\bin\windowsのしたに入ってください。
- zookeeperは下記のコマンドで起動してください。(zookeeper.propertiesのパスに合わせてください。)
zookeeper-server-start.bat C:\software\kafka_2.12-2.6.0\config\zookeeper.properties
- Kafkaは下記のコマンドで起動してください。(server.propertiesのパスに合わせてください。)
kafka-server-start.bat C:\software\kafka_2.12-2.6.0\config\server.properties
KafkaとZookeeper両方とも、無事に起動したら、下記のコマンドでKafkaのTopicを作成します。
本記事はTopicがtest-topic01となります。
kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test-topic01
五、サンプルを実装
1.先ず、Mavenプロジェクトを作って下記の依頼をpom.xmlに追加します。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
2.Producter クラスを作る
public class TestProducer {
public static void main(String[] args) throws Exception{
Properties properties = new Properties();
//Kafkaのホストとポートを設定する
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
//key Serialize
properties.setProperty("key.serializer", StringSerializer.class.getName());
//value Serialize
properties.setProperty("value.serializer",StringSerializer.class.getName());
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
// Messageを作る。
// messageのkey:testKey,messageのvalue:hello, this is Kafka producer!
ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<String, String>("test-topic01",0,"testKey","hello, this is Kafka producer!");
// ログ出力
System.out.println("kafka producer start....");
Future<RecordMetadata> send = kafkaProducer.send(stringStringProducerRecord);
// ログ出力
System.out.println("producer send data:" + stringStringProducerRecord.value());
RecordMetadata recordMetadata = send.get();
System.out.println(recordMetadata);
}
}
3.Consumerクラスを作る
public class TestConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
// Kafkaのホストとポートを設定する
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer",StringDeserializer.class.getName());
properties.setProperty("group.id","1111");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// topicを設定する
consumer.subscribe(Collections.singletonList("test-topic01"));
// ログ出力
System.out.println("kafka consumer start...");
while (true){
ConsumerRecords<String, String> poll = consumer.poll(500);
for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
// Producerから取得されたデータを出力する
System.out.println("Consumer get data from kafka producer:" + stringStringConsumerRecord.value());
}
}
}
}
ここまで、Kafkaサンプルを作り終わりました。
六、動作確認
先ず、TestConsumerクラスをjava applicationの形で起動しておいてください。
下記のログ出力したら、OKです。
kafka consumer start...
上記のイメージを見ると、サンプルは正常に動けることがわかりました。
七、最後に
最後まで読んでいただき、ありがとうございます。