はじめに

本記事は、Oracle Cloud Infrastructure Advent Calendar 2022 のDay 20の記事として書かれています。
Oracle Transactional Event Queues(TEQ)は、大規模なデータの収集、処理、保存、統合に使われるイベントストリーミング・プラットフォームです。

データベース内にあるため、分散トランザクションを必要とせずに、データベース・トランザクションにエンキュー・デキューを組み込むことができます。メッセージは標準的なSQLで問合せ可能ですし、Oracle Databaseの高可用性、拡張性、信頼性はキューデータにも適用されます。

今回は、Spring Bootで書かれた2つのマイクロサービス間のイベント駆動型通信を確立するために、Apache Kafka brokerをデプロイします。さらに、Kafkaライブラリの代わりに、Oracle TEQへのKafka Javaクライアントの互換性を持つOkafkaを使い、Kafka brokerの代わりにTEQをデータベースで使用します。そして最後に、KafkaとTEQ broker間を接続し、Kafka側でメッセージを生成し、TEQ側で消費されるのを確認します。

なお、本記事ではOracle Cloud Infrastructure(OCI)のCloud Shell上にアプリケーションをデプロイし、データベースはAutonomous Database(ADB)のAlways Freeを作成し使用します。ですので全ての手順をOCIのFree Tierアカウントで行うことができます。

アーキテクチャの全体像はこのようになります。

eventmesh-txeventq-kafka.drawio.png

1. 環境のセットアップ

まずは、使用する環境のセットアップを行います。

image.png

2. Kafkaを使ったマイクロサービス(Spring Boot)の構築

まずは、Kafkaを使い、通信するマイクロサービスを構築してみます。

1. Kafka brokerの実行

image.png

2. トピックの作成

Kafkaの準備ができたので、次にProducerから送り、Consumerで受け取るトピックを作成します。
以下のコマンドでTXEVENTQTOPIC1というトピックを作成します。

kafka-add-topic TXEVENTQTOPIC1

※実際は以下のコマンドを実行しています。

docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic "TXEVENTQTOPIC1"

3. ProducerとConsumerマイクロサービスのビルド

今回はSpring Boot+Spring Kafkaを使ってコード化したProducerとConsumerをMavenでビルドしています。

cd $LAB_HOME/springboot-kafka
./kafka-ms-build
image.png

4. Producerからイベントを生成

アプリケーションがビルドできたのでデプロイします。

以下のコマンドを実行してイメージをビルドし、Docker Engine(Kafkaクラスタが動作しているのと同じ)内にProducerをデプロイします。 docker build
cd $LAB_HOME/springboot-kafka
./kafka-ms-deploy-producer

イメージのビルドができたら、続いてコンテナを起動して、Producerのマイクロサービスを実行します。 docker run
cd $LAB_HOME/springboot-kafka
./kafka-ms-launch-producer

実行したら試しに一度テストしてみます。Producerにcurlで直接HTTPリクエストをし、イベントを送信してみます。
curl -s -X POST -H “Content-Type: application/json” \
-d ‘{ “id”: “id1”, “message”: “message1” }’ \
http://localhost:8080/placeMessage | jq

以下のように返ってくれば成功です。
{
“id”: “0”,
“statusMessage”: “Successful”
}

5. Consumerでイベントを消費

同様の操作をConsumerでも行います。

image.png

3. Transactional Event Queuesを使ったマイクロサービスの構築

今度は、Oracle Transactional Event Queues(TEQ)を使い、非同期に通信するマイクロサービスを構築してみます。

1. トピックの作成

以下のコマンドで、セットアップで作成したADBインスタンスに、TXEVENTQTOPIC1というトピックを作成します。

txeventq-add-topic TXEVENTQTOPIC1

ここでは、実際は以下のようなSQLを実行しています。

declare
  txeventq_topic      varchar2(30) := '&1' ;
  txeventq_subscriber varchar2(30) := '&2' ;
  subscriber     sys.aq$_agent;
begin
    if txeventq_topic is not null and txeventq_subscriber is not null
    then
        -- Creating a JMS type sharded queue:
        dbms_aqadm.create_sharded_queue(
            queue_name => txeventq_topic,
            multiple_consumers => TRUE);

        dbms_aqadm.start_queue(txeventq_topic);

        --- Create the subscriber agent
        subscriber := sys.aq$_agent(txeventq_subscriber, NULL, NULL);

        dbms_aqadm.add_subscriber(
            queue_name => txeventq_topic,
            subscriber => subscriber);
    else
        DBMS_OUTPUT.put_line('ERR : at least one of the variables is null !');
    end if;
end;
/
commit;

2. ProducerとConsumerマイクロサービスのビルド

先ほどと同様にProducerとConsumerをビルドします。一点、こちらではspring-kafkaではなく、TEQ用のKafkaJavaクライアントであるKafka Java Client for Oracle Transactional Event Queues Kafka (OKafka)を使用します。

cd $LAB_HOME/springboot-txeventq
./txeventq-ms-build

3. Producerからイベントを生成

以下のコマンドを実行してイメージをビルドし、Docker Engine内にProducerをデプロイします。docker build
cd $LAB_HOME/springboot-txeventq
./txeventq-ms-deploy-producer

コンテナを起動して、Producerのマイクロサービスを実行します。docker run
cd $LAB_HOME/springboot-txeventq
./txeventq-ms-launch-producer

一度テストしてみます。Producerにcurlで直接HTTPリクエストをし、イベントを送信してみます。
curl -s -X POST -H “Content-Type: application/json” \
-d ‘{ “id”: “id1”, “message”: “TxEventQ message1” }’ \
http://localhost:8090/placeMessage | jq

以下のように返ってくれば成功です。
{
“id”: “0”,
“statusMessage”: “Successful”
}

4. Consumerでイベントを消費

Consumerも同様にデプロイし、実行します。

以下のコマンドを実行してイメージをビルドし、Docker Engine内にConsumerをデプロイします。docker build
cd $LAB_HOME/springboot-txeventq
./txeventq-ms-deploy-consumer

コンテナを起動して、Consumerのマイクロサービスを実行します。docker run
cd $LAB_HOME/springboot-txeventq
./txeventq-ms-launch-consumer

テストしてみます。Consumerマイクロサービスは、起動後、TEQ brokerからメッセージをデキューします。デキューに成功すると、先ほどProducerによって送信されたイベントをログで見ることができます。以下のコマンドでコンテナログから後半の6行をリストアップします。
container-logs txeventq-consumer 6

{“id”: “0”, “message”: “TxEventQ message1”}.と表示されていたら成功です。
このように、Kafka brokerと同様に、TEQ brokerでもイベントの生成と消費を行うことができることがわかります。

4. KafkaとTEQの接続

ここでは、KafkaとTEQの互換性を確認してみます。TEQはKafkaとの双方向の通信が可能なので、変更をほぼリアルタイムで反映させることができます。

1. Kafka Connectのセットアップ

今回は、Apache Kafkaに含まれる、Kafkaと他のシステムを統合するフレームワークであるApache Kafka Connectを使用します。

ADBのパスワードを指定して、以下のコマンドを実行します。DBユーザーやホスト名などのパラメータが入力され、KafkaトピックとTEQの間にConnect Sinkが設定されます。
cd $LAB_HOME/kafka-connect-txeventq
./setup-kafka2txeventq-connect.sh

Connect Sinkの状態を以下のコマンドで確認します。
kafka-connect-status

正しく接続できていれば、以下のような結果になります。
{
“name”: “JmsConnectSink_txeventqlab”,
“connector”: {
“state”: “RUNNING”,
“worker_id”: “connect:8083”
},
“tasks”: [
{
“id”: 0,
“state”: “RUNNING”,
“worker_id”: “connect:8083”
}
],
“type”: “sink”
}

2. Kafka brokerからメッセージをエンキュー

Connectorを実行したので、メッセージを生成して転送をテストします。Kafka Producerからエンキューし、TEQからデキューしてみます。

Kafka Producerが動いている状態で、curlでProducer APIにメッセージを送信します。

curl -s -X POST -H "Content-Type: application/json"  \
     -d '{ "id": "Sink1", "message": "Sink Message from Kafka to TxEventQ #1" }'  \
     http://localhost:8080/placeMessage | jq

以下のように返ってくれば、送信は成功です。

{
    "id": "1",
    "statusMessage": "Successful"
}

3. PL/SQLを使ってTEQからメッセージをデキュー

Producerからメッセージをエンキューすると、その後Connect SinkエージェントがKafkaトピックからメッセージを消費し、TEQにエンキューします。そして、OKafka ConsumerマイクロサービスやPL/SQLプロシージャを使って、TEQからメッセージをデキューすることができます。

以下のコマンドでTEQからデキューします。

txeventq-dequeue

実際は以下のようなPL/SQLを実行しています。

declare
  txeventq_topic      varchar2(30) := '&1' ;
  txeventq_subscriber varchar2(30) := '&2' ;

  dequeue_options    DBMS_AQ.dequeue_options_t;
  message_properties DBMS_AQ.message_properties_t;
  message_id         RAW(2000);
  my_message         SYS.AQ$_JMS_TEXT_MESSAGE;
  msg_text           varchar2(32767);
begin
    DBMS_OUTPUT.ENABLE (20000);

    if txeventq_topic is not null and txeventq_subscriber is not null
    then
        -- Dequeue Options
        dequeue_options.dequeue_mode  := DBMS_AQ.REMOVE;
        dequeue_options.wait          := DBMS_AQ.NO_WAIT;
        dequeue_options.navigation    := DBMS_AQ.FIRST_MESSAGE;
        dequeue_options.wait          := 1;
        dequeue_options.consumer_name := txeventq_subscriber;

      DBMS_AQ.DEQUEUE(
        queue_name => txeventq_topic,
        dequeue_options => dequeue_options,
        message_properties => message_properties,
        payload => my_message,
        msgid => message_id);
        commit;
        my_message.get_text(msg_text);
        DBMS_OUTPUT.put_line('TxEventQ message: ' || msg_text);
    else
        DBMS_OUTPUT.put_line('ERR : at least one of the variables is null !');
    end if;
end;
/

以下のような結果が返ってくるはずです。

TxEventQ message: {"id": "1", "message": "Sink Message from Kafka to TxEventQ #1"}

PL/SQL procedure successfully completed.

まとめ

本記事では、Apache KafkaとOracle Transactional Event Queues(TEQ)という2つのメッセージ・ブローカーを使用したメッセージ・キューイングをご紹介しました。
TEQはコンバージド・データベースであるOracle Databaseの一機能でかつ、Kafkaとも互換性があるので、エンタープライズクラスのデータ中心のマイクロサービス・アーキテクチャを構築することができます。
本記事でご紹介したものは、全てOCIの無料のリソースで完結しているので、お時間があればぜひ試してみてください。

参考

    • kafka docker公式チュートリアル

 

    • Oracle Transactional Event Queues

 

    Kafka Java Client for Oracle Transactional Event Queues
广告
将在 10 秒后关闭
bannerAds