はじめに
マイクロサービスにおけるサービス間連携でnodejs × kafkaの利用シーンは増えています。
というわけでnodejs × kafkaの概念実証(PoC)メモ。
あくまでPoCなんで超簡易的です。
ディレクトリ構成
kafka/
├ docker-compose.yml
├ node_modules
├ mackage.json
└ test
│ └ kafka-test.js
└ src
│ └ consumer
│ └ app.js
│ └ producer
│ └ app.js
ファイル説明実行コマンドtest/kafka-test.jsUnit Testava -v -uconsumer/app.jsConsumer実行ファイルnode src/consumer/app.jsproducer/app.jsProducer実行ファイルnode src/producer/app.js
セッティング
- docker-compose.yml
version: "2"
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
links:
- zookeeper
- kafkaトピック作成
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test1
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
# トピックからメッセージ取得
$ docker-compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test1 --from-beginning
- package.json
{
"name": "kafka",
"version": "1.0.0",
"description": "kafka-node",
"main": "index.js",
"dependencies": {
"ava": "^1.4.1",
"chai": "^4.2.0",
"kafka-node": "^4.1.0"
},
"devDependencies": {},
"scripts": {
"test": "ava -v -u"
},
"author": "",
"license": "ISC"
}
- test/kafka-test.js
import test from "ava";
import chai from "chai";
import kafka from "kafka-node";
// producerテスト
test.serial.cb("producer test", t => {
const Producer = kafka.Producer;
const client = new kafka.KafkaClient({
kafkaHost: "192.168.33.50:9092"
});
const producer = new Producer(client, {
partitionerType: 1
});
producer.on("ready", () => {
const payloads = [
{
topic: "test1",
messages: JSON.stringify({name: "神崎・H・アリア", age: 16})
}
,{
topic: "test",
messages: [
JSON.stringify({name: "間宮あかり", age: 15}),
JSON.stringify({name: "佐々木志乃", age: 15})
]
}
];
producer.send(payloads, (err, data) => {
t.end();
});
});
});
// consumerテスト
test.serial.cb("consumer test", t => {
const Consumer = kafka.Consumer;
const client = new kafka.KafkaClient({
kafkaHost: "192.168.33.50:9092"
});
const consumer = new Consumer(
client,
[{topic: "test1", partision:0}],
{
groupId: "simple-consumer1",
autoCommit: true,
fromOffset: true
}
);
consumer.on("message", (message, err) => {
const json = JSON.stringify(message.value);
chai.assert.isString(json.name);
chai.assert.isNumber(json.age);
if (json.age === 16) {
chai.assert.propertyVal(json.attr,'name','神崎・H・アリア');
}
});
t.end();
});
Unit Test
$ npm test
> kafka@1.0.0 test /home/vagrant/kafka
> ava -v -u
✔ producer test (291ms)
✔ consumer test
2 tests passed
Kafka Producer
- src/producer/app.js
"use strict";
var kafka = require("kafka-node");
const Producer = kafka.HighLevelProducer;
const client = new kafka.KafkaClient({
kafkaHost: "192.168.33.50:9092"
});
const producer = new Producer(client, {
partitionerType: 1
});
producer.on("ready", () => {
const payloads = [
{
topic: "mytopic-1",
messages: JSON.stringify({name: "tanaka takeshi", age: 24, sex: "M"})
}
,{
topic: "mytopic-2",
messages: [
JSON.stringify({name: "suzuki aiko", age: 20, sex: "F"}),
JSON.stringify({name: "yamashita yuji", age: 28, sex: "M"})
]
}
];
let sends = 0;
producer.send(payloads, (err, data) => {
if (err) console.log(err);
else console.log('send %d messages', ++sends);
process.exit();
});
});
Kafka Consumer
- src/consumer/app.js
'use strict';
var kafka = require('kafka-node');
const Consumer = kafka.Consumer;
const client = new kafka.KafkaClient({kafkaHost: "192.168.33.50:9092"});
const consumer = new Consumer(
client,
[{topic: "mytopic-2", partision:0}],
{
groupId: "simple-consumer",
autoCommit: true,
fromOffset: true
}
);
consumer.on("message", (message, err) => {
if (err) console.log("error : " + err);
const json = JSON.parse(message.value);
console.log("JSON:" + JSON.stringify(json));
console.log("Name:" + json.name);
console.log("Age:" + json.age);
console.log("Sex Type:" + json.sex);
});
consumer.on('error', function (err) {
console.log('error', err);
});
終わりに
本格的に使う機会がないので取りあえずNodejsで動かしてみただけっす。
参考
apache kafka quickstart(kafkaの基本的な使い方)
kafka-node