はじめに

NestJSのフレームワークの恩恵も受けつつもイベント駆動な非同期アプリケーションを作りたい!という方も多いんじゃないでしょうか。
今回はNestJSとkafkajsを組み合わせたKafkaConsumerの実装を紹介していきます。

Apache Kafkaとは

スクリーンショット 2023-04-17 15.56.51.png

オープンソースの分散ストリーミング送受信処理基盤で、イベント駆動システムのメッセージブローカーとしてもよく利用されています。
Kafkaの仕組みについては日立製作所が出している下記記事やCONFLUENTが出してる開発者ガイドがわかりやすいため一読されることをオススメします。
Apache Kafkaの概要とアーキテクチャ
CONFLUENT Introduction to Kafka

kafkajsとは

kafka-node_kafkajs比較.png

NestJSのマイクロサービスサポート機能について

NestJSの@nestjs/microservicesパッケージを利用すると
MQTT/Kafka/RabbitMQ/gRPCなど様々なミドルウェアに対応したシステムを構築できます。
筆者も最初は本機能を利用して開発をしていたのですが、1トピック複数Consumerを動かす
ユースケースで試してみた際に、1トピック1Consumerの制約からどうしても脱却できず
フレームワークの独自の仕様に固く縛られてしまったため、別手段での構築を検討しました。
今回はNestJSとkafkajsの双方の良さを活かしたカスタマイズ性の高いConsumerを作成してみます。

NestJSのプロジェクト作成

$ nest new tutorial-consumer
⚡  We will scaffold your app in a few seconds..

? Which package manager would you ❤️  to use? npm

パッケージマネージャーにはnpmを利用します

Consumer用のモジュールを作成する

$ nest g module hello
CREATE src/hello/hello.module.ts (82 bytes)
UPDATE src/app.module.ts (312 bytes)

KafkaConsumer用の基底クラスを作成する

各種具象のKafkaConsumerが独自のビジネスロジックに注力できるように、KafkaConsumer用の基底クラスを作成します。

import {
  Injectable,
  OnModuleInit,
  OnModuleDestroy,
  Logger,
} from '@nestjs/common';
import { Kafka, Consumer, KafkaMessage } from 'kafkajs';

/**
 * KafkaConsumerの基底クラス
 */
export abstract class KafkaConsumer implements OnModuleInit, OnModuleDestroy {
  /**
   * クラスメンバ
   */
  private readonly kafka: Kafka;
  private readonly logger: Logger = new Logger();
  private consumer: Consumer; // onModuleInitが呼び出された際に値を格納

  /**
   * 抽象メンバ
   */
  abstract readonly consumerGroupName: string;
  abstract readonly consumerTopicName: string;

  /**
   * コンストラクタ
   */
  constructor() {
    this.kafka = new Kafka({
      brokers: process.env['BROKER_ENDPOINTS']?.split(',') ?? [
        'localhost:9093',
      ],
    });
  }

  /**
   * ハンドラ処理
   */
  abstract handler(message: KafkaMessage): void;

  /**
   * ハンドラーの前処理
   */
  private actionBeforeHandler(): void {
    this.logger.log(`${this.consumerGroupName}の処理を開始します`);
    this.logger.log(`${this.consumerTopicName}からメッセージを取得します`);
  }

  /**
   * ハンドラーの後処理
   */
  private actionAfterHandler(): void {
    this.logger.log(`${this.consumerGroupName}の処理を終了します`);
  }

  /**
   * 冪等処理
   */
  private isUniqueProcess(): boolean {
    // TODO: アプリケーション固有の冪等チェックを記載する
    return true;
  }

  /**
   * 実行処理
   */
  private execute(partition: number, message: KafkaMessage): void {
    this.actionBeforeHandler();
    if (this.isUniqueProcess()) {
      this.handler(message);
    } else {
      this.logger.warn(`${this.consumerGroupName}の処理が重複しました`);
    }
    this.actionAfterHandler();
  }
}

NestJSではライフサイクルイベントという機能が存在ます
@Module、@Injectable、@Controllerデコレータが付与されたクラスにあらかじめライフサイクルメソッドを用意しておくと、ライフサイクルイベントが発生した際に該当コードを呼び出してくれる機能です。
ライフサイクルイベントには下記が存在し、consumer起動時にonModuleInit()を、停止時にonModuleDestroy()を今回は利用します

ライフサイクルメソッドフックメソッドコールのトリガとなるライフルサイクルイベントonModuleInit()ホストモジュールの依存関係が解決されたonModuleDestroy()終了シグナル(SIGTERM等)を受け取った後
  /**
   * ホストモジュールの依存関係が解決された直後の処理
   */
  async onModuleInit() {
    this.logger.log(`${this.consumerGroupName}を起動します`);
    this.consumer = this.kafka.consumer({
      groupId: this.consumerGroupName,
      heartbeatInterval: 20000,
      sessionTimeout: 60000,
    });

    await this.consumer.connect();
    await this.consumer.subscribe({
      topics: [this.consumerTopicName],
      fromBeginning: true,
    });
    await this.consumer.run({
      eachMessage: async ({ partition, message }) => {
        this.execute(partition, message);
      },
    });
    this.logger.log(`${this.consumerGroupName}が起動しました`);
  }

終了シグナルを受け取った際に、consumerをkafkaから切断するようにonModuleDestroy()メソッドに実装を行います。

  /**
   * 終了シグナルを受け取った時の処理
   */
  async onModuleDestroy() {
    this.logger.log(`${this.consumerGroupName}をkafkaから切断します`);
    await this.consumer.disconnect();
  }

具象のConsumerの作成

hello.consumer.tsを手動で作成します。

$ touch src/hello/hello.consumer.ts

先ほど作成したKafkaConsumerクラスを継承させて、具象のConsumerハンドラを作成します
@Injectable()デコレータをつけておくことでNestのプロバイダとして認識され、NestJSの独自のDIコンテナでインスタンスのライフサイクル管理がされるようになります。

import { Injectable } from '@nestjs/common';
import { KafkaMessage } from 'kafkajs';
import { KafkaConsumer } from 'src/kafka.consumer';

@Injectable()
export class HelloConsumer extends KafkaConsumer {
  readonly consumerGroupName: string;
  readonly consumerTopicName: string;
  constructor() {
    super();
    this.consumerGroupName = 'HelloWorldConsumer';
    this.consumerTopicName = 'TestTopic';
  }
  handler(message: KafkaMessage): void {
    /**
     * Consumer固有のロジックを書く
     */
    
    
  }
}

モジュールのprovidersに先ほど作成したHelloConsumerの登録を行います。

import { Module } from '@nestjs/common';
import { HelloConsumer } from './hello.consumer';
import { HelloService } from './hello.service';

@Module({
  providers: [HelloConsumer],
})
export class HelloModule {}

ApacheKafkaを起動する

ApacheKafkaを起動するためのdocker-compose.ymlをプロジェクト直下に配置し起動を行います

$ docker-compose up -d
version: '2'
services:
  # -------------------------- #
  # Zookeeper
  # -------------------------- #
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    container_name: zookeeper
    tty: true
    networks: 
      - kafka_network
  # -------------------------- #
  # Kafka(Broker)
  # -------------------------- #
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9093:9093'
      - '9094:9094'
    container_name: kafka
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,EXTERNAL_DOCKER:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093,EXTERNAL_DOCKER://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093,EXTERNAL_DOCKER://host.docker.internal:9094
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper
    tty: true
    networks: 
      - kafka_network
  # -------------------------- #
  # Kafka UI
  # -------------------------- #
  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "8080:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
      - KAFKA_CLUSTERS_0_READONLY=false
    networks: 
      - kafka_network
networks:
  kafka_network:

kafka-uiをブラウザで開き、トピックを作成する

スクリーンショット 2023-01-03 16.50.38.png
スクリーンショット 2023-01-03 16.58.07.png

NestJSのKafkaConsumerを起動

下記コマンドでNestJSのConsumerアプリケーションを起動します

$ npm run start:dev
[Nest] 25626  - 2023/01/03 14:48:44     LOG [NestFactory] Starting Nest application...
[Nest] 25626  - 2023/01/03 14:48:44     LOG [InstanceLoader] HelloModule dependencies initialized +29ms
[Nest] 25626  - 2023/01/03 14:48:44     LOG [InstanceLoader] AppModule dependencies initialized +1ms
[Nest] 25626  - 2023/01/03 14:48:44     LOG [RoutesResolver] AppController {/}: +4ms
[Nest] 25626  - 2023/01/03 14:48:44     LOG [RouterExplorer] Mapped {/, GET} route +2ms
[Nest] 25626  - 2023/01/03 14:48:44     LOG HelloWorldConsumerを起動します
{"level":"INFO","timestamp":"2023-01-03T05:48:44.366Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"HelloWorldConsumer"}
{"level":"INFO","timestamp":"2023-01-03T05:48:44.403Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"HelloWorldConsumer","memberId":"kafkajs-1a203b6f-f9c3-4917-809f-849b6f4d7408","leaderId":"kafkajs-1a203b6f-f9c3-4917-809f-849b6f4d7408","isLeader":true,"memberAssignment":{"TestTopic":[0]},"groupProtocol":"RoundRobinAssigner","duration":36}
[Nest] 25626  - 2023/01/03 14:48:44     LOG HelloWorldConsumerが起動しました
[Nest] 25626  - 2023/01/03 14:48:44     LOG [NestApplication] Nest application successfully started +6ms
スクリーンショット 2023-01-03 15.02.46.png

KafkaUIからメッセージを送信する

スクリーンショット 2023-01-03 15.04.04.png

下記のようにLOG 受信したメッセージ:Testとログが表示されてることが確認できました。

[Nest] 26241  - 2023/01/03 14:53:00     LOG [NestFactory] Starting Nest application...
[Nest] 26241  - 2023/01/03 14:53:00     LOG [InstanceLoader] HelloModule dependencies initialized +30ms
[Nest] 26241  - 2023/01/03 14:53:00     LOG [InstanceLoader] AppModule dependencies initialized +1ms
[Nest] 26241  - 2023/01/03 14:53:00     LOG [RoutesResolver] AppController {/}: +4ms
[Nest] 26241  - 2023/01/03 14:53:00     LOG [RouterExplorer] Mapped {/, GET} route +2ms
[Nest] 26241  - 2023/01/03 14:53:00     LOG HelloWorldConsumerを起動します
{"level":"INFO","timestamp":"2023-01-03T05:53:00.930Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"HelloWorldConsumer"}
{"level":"INFO","timestamp":"2023-01-03T05:53:51.359Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"HelloWorldConsumer","memberId":"kafkajs-3d4ce9f2-23ce-4d09-96c2-7f0f2b76e3a2","leaderId":"kafkajs-3d4ce9f2-23ce-4d09-96c2-7f0f2b76e3a2","isLeader":true,"memberAssignment":{"TestTopic":[0]},"groupProtocol":"RoundRobinAssigner","duration":50428}
[Nest] 26241  - 2023/01/03 14:53:51     LOG HelloWorldConsumerが起動しました
[Nest] 26241  - 2023/01/03 14:53:51     LOG [NestApplication] Nest application successfully started +10ms
[Nest] 26241  - 2023/01/03 14:56:22     LOG HelloWorldConsumerの処理を開始します
[Nest] 26241  - 2023/01/03 14:56:22     LOG TestTopicからメッセージを取得します
[Nest] 26241  - 2023/01/03 14:56:22     LOG 受信したメッセージ:Test
[Nest] 26241  - 2023/01/03 14:56:22     LOG HelloWorldConsumerの処理を終了します

おわりに

本投稿ではNestJSとkafkajsを組み合わせてKafkaConsumerを作ってみました。
アダプター部分の実装においてNestJSのコントローラー機能を利用せずに作ってみましたが、
各種Consumerのライフサイクル管理はNestJSの機能に任せて、Consumer実装者がビジネスロジックをシンプルに実装できるようになってます。

广告
将在 10 秒后关闭
bannerAds