はじめに
本記事は、Oracle Cloud Infrastructure Advent Calendar 2022 のDay 20の記事として書かれています。
Oracle Transactional Event Queues(TEQ)は、大規模なデータの収集、処理、保存、統合に使われるイベントストリーミング・プラットフォームです。
データベース内にあるため、分散トランザクションを必要とせずに、データベース・トランザクションにエンキュー・デキューを組み込むことができます。メッセージは標準的なSQLで問合せ可能ですし、Oracle Databaseの高可用性、拡張性、信頼性はキューデータにも適用されます。
今回は、Spring Bootで書かれた2つのマイクロサービス間のイベント駆動型通信を確立するために、Apache Kafka brokerをデプロイします。さらに、Kafkaライブラリの代わりに、Oracle TEQへのKafka Javaクライアントの互換性を持つOkafkaを使い、Kafka brokerの代わりにTEQをデータベースで使用します。そして最後に、KafkaとTEQ broker間を接続し、Kafka側でメッセージを生成し、TEQ側で消費されるのを確認します。
なお、本記事ではOracle Cloud Infrastructure(OCI)のCloud Shell上にアプリケーションをデプロイし、データベースはAutonomous Database(ADB)のAlways Freeを作成し使用します。ですので全ての手順をOCIのFree Tierアカウントで行うことができます。
アーキテクチャの全体像はこのようになります。
1. 環境のセットアップ
まずは、使用する環境のセットアップを行います。
2. Kafkaを使ったマイクロサービス(Spring Boot)の構築
まずは、Kafkaを使い、通信するマイクロサービスを構築してみます。
1. Kafka brokerの実行
2. トピックの作成
Kafkaの準備ができたので、次にProducerから送り、Consumerで受け取るトピックを作成します。
以下のコマンドでTXEVENTQTOPIC1というトピックを作成します。
kafka-add-topic TXEVENTQTOPIC1
※実際は以下のコマンドを実行しています。
docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic "TXEVENTQTOPIC1"
3. ProducerとConsumerマイクロサービスのビルド
今回はSpring Boot+Spring Kafkaを使ってコード化したProducerとConsumerをMavenでビルドしています。
cd $LAB_HOME/springboot-kafka
./kafka-ms-build
4. Producerからイベントを生成
アプリケーションがビルドできたのでデプロイします。
以下のコマンドを実行してイメージをビルドし、Docker Engine(Kafkaクラスタが動作しているのと同じ)内にProducerをデプロイします。 docker build
cd $LAB_HOME/springboot-kafka
./kafka-ms-deploy-producer
イメージのビルドができたら、続いてコンテナを起動して、Producerのマイクロサービスを実行します。 docker run
cd $LAB_HOME/springboot-kafka
./kafka-ms-launch-producer
実行したら試しに一度テストしてみます。Producerにcurlで直接HTTPリクエストをし、イベントを送信してみます。
curl -s -X POST -H “Content-Type: application/json” \
-d ‘{ “id”: “id1”, “message”: “message1” }’ \
http://localhost:8080/placeMessage | jq
以下のように返ってくれば成功です。
{
“id”: “0”,
“statusMessage”: “Successful”
}
5. Consumerでイベントを消費
同様の操作をConsumerでも行います。
3. Transactional Event Queuesを使ったマイクロサービスの構築
今度は、Oracle Transactional Event Queues(TEQ)を使い、非同期に通信するマイクロサービスを構築してみます。
1. トピックの作成
以下のコマンドで、セットアップで作成したADBインスタンスに、TXEVENTQTOPIC1というトピックを作成します。
txeventq-add-topic TXEVENTQTOPIC1
ここでは、実際は以下のようなSQLを実行しています。
declare
txeventq_topic varchar2(30) := '&1' ;
txeventq_subscriber varchar2(30) := '&2' ;
subscriber sys.aq$_agent;
begin
if txeventq_topic is not null and txeventq_subscriber is not null
then
-- Creating a JMS type sharded queue:
dbms_aqadm.create_sharded_queue(
queue_name => txeventq_topic,
multiple_consumers => TRUE);
dbms_aqadm.start_queue(txeventq_topic);
--- Create the subscriber agent
subscriber := sys.aq$_agent(txeventq_subscriber, NULL, NULL);
dbms_aqadm.add_subscriber(
queue_name => txeventq_topic,
subscriber => subscriber);
else
DBMS_OUTPUT.put_line('ERR : at least one of the variables is null !');
end if;
end;
/
commit;
2. ProducerとConsumerマイクロサービスのビルド
先ほどと同様にProducerとConsumerをビルドします。一点、こちらではspring-kafkaではなく、TEQ用のKafkaJavaクライアントであるKafka Java Client for Oracle Transactional Event Queues Kafka (OKafka)を使用します。
cd $LAB_HOME/springboot-txeventq
./txeventq-ms-build
3. Producerからイベントを生成
以下のコマンドを実行してイメージをビルドし、Docker Engine内にProducerをデプロイします。docker build
cd $LAB_HOME/springboot-txeventq
./txeventq-ms-deploy-producer
コンテナを起動して、Producerのマイクロサービスを実行します。docker run
cd $LAB_HOME/springboot-txeventq
./txeventq-ms-launch-producer
一度テストしてみます。Producerにcurlで直接HTTPリクエストをし、イベントを送信してみます。
curl -s -X POST -H “Content-Type: application/json” \
-d ‘{ “id”: “id1”, “message”: “TxEventQ message1” }’ \
http://localhost:8090/placeMessage | jq
以下のように返ってくれば成功です。
{
“id”: “0”,
“statusMessage”: “Successful”
}
4. Consumerでイベントを消費
Consumerも同様にデプロイし、実行します。
以下のコマンドを実行してイメージをビルドし、Docker Engine内にConsumerをデプロイします。docker build
cd $LAB_HOME/springboot-txeventq
./txeventq-ms-deploy-consumer
コンテナを起動して、Consumerのマイクロサービスを実行します。docker run
cd $LAB_HOME/springboot-txeventq
./txeventq-ms-launch-consumer
テストしてみます。Consumerマイクロサービスは、起動後、TEQ brokerからメッセージをデキューします。デキューに成功すると、先ほどProducerによって送信されたイベントをログで見ることができます。以下のコマンドでコンテナログから後半の6行をリストアップします。
container-logs txeventq-consumer 6
{“id”: “0”, “message”: “TxEventQ message1”}.と表示されていたら成功です。
このように、Kafka brokerと同様に、TEQ brokerでもイベントの生成と消費を行うことができることがわかります。
4. KafkaとTEQの接続
ここでは、KafkaとTEQの互換性を確認してみます。TEQはKafkaとの双方向の通信が可能なので、変更をほぼリアルタイムで反映させることができます。
1. Kafka Connectのセットアップ
今回は、Apache Kafkaに含まれる、Kafkaと他のシステムを統合するフレームワークであるApache Kafka Connectを使用します。
ADBのパスワードを指定して、以下のコマンドを実行します。DBユーザーやホスト名などのパラメータが入力され、KafkaトピックとTEQの間にConnect Sinkが設定されます。
cd $LAB_HOME/kafka-connect-txeventq
./setup-kafka2txeventq-connect.sh
Connect Sinkの状態を以下のコマンドで確認します。
kafka-connect-status
正しく接続できていれば、以下のような結果になります。
{
“name”: “JmsConnectSink_txeventqlab”,
“connector”: {
“state”: “RUNNING”,
“worker_id”: “connect:8083”
},
“tasks”: [
{
“id”: 0,
“state”: “RUNNING”,
“worker_id”: “connect:8083”
}
],
“type”: “sink”
}
2. Kafka brokerからメッセージをエンキュー
Connectorを実行したので、メッセージを生成して転送をテストします。Kafka Producerからエンキューし、TEQからデキューしてみます。
Kafka Producerが動いている状態で、curlでProducer APIにメッセージを送信します。
curl -s -X POST -H "Content-Type: application/json" \
-d '{ "id": "Sink1", "message": "Sink Message from Kafka to TxEventQ #1" }' \
http://localhost:8080/placeMessage | jq
以下のように返ってくれば、送信は成功です。
{
"id": "1",
"statusMessage": "Successful"
}
3. PL/SQLを使ってTEQからメッセージをデキュー
Producerからメッセージをエンキューすると、その後Connect SinkエージェントがKafkaトピックからメッセージを消費し、TEQにエンキューします。そして、OKafka ConsumerマイクロサービスやPL/SQLプロシージャを使って、TEQからメッセージをデキューすることができます。
以下のコマンドでTEQからデキューします。
txeventq-dequeue
実際は以下のようなPL/SQLを実行しています。
declare
txeventq_topic varchar2(30) := '&1' ;
txeventq_subscriber varchar2(30) := '&2' ;
dequeue_options DBMS_AQ.dequeue_options_t;
message_properties DBMS_AQ.message_properties_t;
message_id RAW(2000);
my_message SYS.AQ$_JMS_TEXT_MESSAGE;
msg_text varchar2(32767);
begin
DBMS_OUTPUT.ENABLE (20000);
if txeventq_topic is not null and txeventq_subscriber is not null
then
-- Dequeue Options
dequeue_options.dequeue_mode := DBMS_AQ.REMOVE;
dequeue_options.wait := DBMS_AQ.NO_WAIT;
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
dequeue_options.wait := 1;
dequeue_options.consumer_name := txeventq_subscriber;
DBMS_AQ.DEQUEUE(
queue_name => txeventq_topic,
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => my_message,
msgid => message_id);
commit;
my_message.get_text(msg_text);
DBMS_OUTPUT.put_line('TxEventQ message: ' || msg_text);
else
DBMS_OUTPUT.put_line('ERR : at least one of the variables is null !');
end if;
end;
/
以下のような結果が返ってくるはずです。
TxEventQ message: {"id": "1", "message": "Sink Message from Kafka to TxEventQ #1"}
PL/SQL procedure successfully completed.
まとめ
本記事では、Apache KafkaとOracle Transactional Event Queues(TEQ)という2つのメッセージ・ブローカーを使用したメッセージ・キューイングをご紹介しました。
TEQはコンバージド・データベースであるOracle Databaseの一機能でかつ、Kafkaとも互換性があるので、エンタープライズクラスのデータ中心のマイクロサービス・アーキテクチャを構築することができます。
本記事でご紹介したものは、全てOCIの無料のリソースで完結しているので、お時間があればぜひ試してみてください。
参考
-
- kafka docker公式チュートリアル
-
- Oracle Transactional Event Queues
- Kafka Java Client for Oracle Transactional Event Queues