はじめに

マイクロサービスにおけるサービス間連携でnodejs × kafkaの利用シーンは増えています。
というわけでnodejs × kafkaの概念実証(PoC)メモ。
あくまでPoCなんで超簡易的です。

ディレクトリ構成

kafka/
├ docker-compose.yml
├ node_modules
├ mackage.json
└ test
│ └ kafka-test.js
└ src
│ └ consumer
│ └ app.js
│ └ producer
│ └ app.js

ファイル説明実行コマンドtest/kafka-test.jsUnit Testava -v -uconsumer/app.jsConsumer実行ファイルnode src/consumer/app.jsproducer/app.jsProducer実行ファイルnode src/producer/app.js

セッティング

    docker-compose.yml
version: "2"

services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    links:
      - zookeeper
    kafkaトピック作成
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test1
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
# トピックからメッセージ取得
$ docker-compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test1 --from-beginning
    package.json
{
  "name": "kafka",
  "version": "1.0.0",
  "description": "kafka-node",
  "main": "index.js",
  "dependencies": {
    "ava": "^1.4.1",
    "chai": "^4.2.0",
    "kafka-node": "^4.1.0"
  },
  "devDependencies": {},
  "scripts": {
    "test": "ava -v -u"
  },
  "author": "",
  "license": "ISC"
}
    test/kafka-test.js
import test from "ava";
import chai from "chai";
import kafka from "kafka-node";

// producerテスト
test.serial.cb("producer test", t => {
    const Producer = kafka.Producer;
    const client = new kafka.KafkaClient({
        kafkaHost: "192.168.33.50:9092"
    });
    const producer = new Producer(client, {
        partitionerType: 1
    });

    producer.on("ready", () => {
        const payloads = [
            {
                topic: "test1",
                messages: JSON.stringify({name: "神崎・H・アリア", age: 16})
            }
            ,{
                topic: "test",
                messages: [
                    JSON.stringify({name: "間宮あかり", age: 15}),
                   JSON.stringify({name: "佐々木志乃", age: 15})
               ]
            }
        ];

        producer.send(payloads, (err, data) => {
            t.end();
        });
    });
});

// consumerテスト
test.serial.cb("consumer test", t => {
    const Consumer = kafka.Consumer;
    const client = new kafka.KafkaClient({
        kafkaHost: "192.168.33.50:9092"
    });
    const consumer = new Consumer(
        client,
        [{topic: "test1", partision:0}],
        {
            groupId: "simple-consumer1",
            autoCommit: true,
            fromOffset: true
        }
    );

    consumer.on("message", (message, err) => {
        const json = JSON.stringify(message.value);
            chai.assert.isString(json.name);
            chai.assert.isNumber(json.age);
            if (json.age === 16) {
               chai.assert.propertyVal(json.attr,'name','神崎・H・アリア');
            }
    });
    t.end();
});

Unit Test

$ npm test

> kafka@1.0.0 test /home/vagrant/kafka
> ava -v -u


  ✔ producer test (291ms)
  ✔ consumer test

  2 tests passed

Kafka Producer

    src/producer/app.js
"use strict";
var kafka = require("kafka-node");

const Producer = kafka.HighLevelProducer;
const client = new kafka.KafkaClient({
    kafkaHost: "192.168.33.50:9092"
});
const producer = new Producer(client, {
    partitionerType: 1
});

producer.on("ready", () => {
    const payloads = [
        {
            topic: "mytopic-1",
            messages: JSON.stringify({name: "tanaka takeshi", age: 24, sex: "M"})
        }
        ,{
            topic: "mytopic-2",
            messages: [
                JSON.stringify({name: "suzuki aiko", age: 20, sex: "F"}),
                JSON.stringify({name: "yamashita yuji", age: 28, sex: "M"})
            ]
        }
    ];

    let sends = 0;

    producer.send(payloads, (err, data) => {
        if (err) console.log(err);
        else console.log('send %d messages', ++sends);
        process.exit();
    });
});

Kafka Consumer

    src/consumer/app.js
'use strict';
var kafka = require('kafka-node');

const Consumer = kafka.Consumer;
const client = new kafka.KafkaClient({kafkaHost: "192.168.33.50:9092"});
const consumer = new Consumer(
    client,
    [{topic: "mytopic-2", partision:0}],
    {
        groupId: "simple-consumer",
        autoCommit: true,
        fromOffset: true
    }
);

consumer.on("message", (message, err) => {
    if (err) console.log("error : " + err);

    const json = JSON.parse(message.value);
    console.log("JSON:" + JSON.stringify(json));
    console.log("Name:" + json.name);
    console.log("Age:" + json.age);
    console.log("Sex Type:" + json.sex);
});

consumer.on('error', function (err) {
    console.log('error', err);
});

終わりに

本格的に使う機会がないので取りあえずNodejsで動かしてみただけっす。

参考

apache kafka quickstart(kafkaの基本的な使い方)
kafka-node

广告
将在 10 秒后关闭
bannerAds