はじめに
Oracle SQL Access to Kafka(OSaK)は、SQLでKafkaトピックに動的にクエリを実行できるOracle Database 23c Freeの新機能です。
概要やユースケースはこちらを参照ください。
Oracle SQL Access to Kafkaには主に3つの機能があります。
本記事では、その中でもKafkaレコードをSQLやPL/SQLを使って順番にループ処理するストリーミング機能の使用手順を紹介します。
前提条件
-
- Oracle Database 23c環境
-
- Kafkaクラスタを作成していること
-
- 本記事ではOracle Cloud Infrastructure(OCI) 上で提供されるフルマネージドな分散メッセージングサービスである、OCI Streaming Service(OSS)を使ってKafkaクラスタを作成しています。
- OSSについては、OCI Streaming を動かしてみようというチュートリアルが参考になります。
Kafkaクラスタの登録
まずはKafkaクラスタの登録をする必要があります。
この手順については、こちらの記事を参照ください。
KafkaデータをPL/SQLで順番にループ処理するストリーミング・アプリケーションの作成
DBMS_KAFKAパッケージを使ってストリーミング・アプリケーションを作成することで、登録したKafkaクラスタのレコードをロードする一時表やビューが自動的に作成されます。
作成された一時表を使用したSQLクエリが、Kafkaレコードに順次アクセスし、ループ内で処理していくようなストリーミング・アプリケーションを実行することができます。
ロードするKafkaレコードの形式を決定する表を作成します。今回は1桁の数字がストリーミングするので、NUMBER型の1列のみを持つ表を作成します。
create table sample(col_num number);
ストリーミング・アプリケーションSampleStreamAppを作成します。
※OSaKsumStreamというトピック名を指定しておきます。(OCI Streaming Serviceのストリーム名)
DECLARE
v_options VARCHAR2(50);
BEGIN
v_options := ‘{“fmt” : “DSV”, “reftable” : “sample”}’;
DBMS_KAFKA.CREATE_STREAMING_APP (
‘KAFKACLUS1’,
‘SampleStreamApp’,
‘OSaKsumStream’,
v_options,
1);
END;
/
ここで内部的に作成された一時表を確認してみます。Oracle SQL Access to Kafkaで作成されたオブジェクトは、固有のORA$接頭辞を持っています。
ORA$DKX と ORA$DKV は Oracle SQL Access to Kafka が生成したビューと外部表の接頭辞で、Kafka からユーザー所有のテーブルまたはグローバル一時表にデータをロードするための DBMS_KAFKA の呼び出しに対応しています。ORA$DKVGTTは、ストリーミングまたはシーキングアプリからロードされるグローバル一時表であることを指定する接頭辞です。この一時表は、DBMS_KAFKA.LOAD_TEMP_TABLEを呼び出すと透過的にロードされます。
select object_name, object_type from user_objects where object_name like ‘%ORA$DK%’;
OBJECT_NAME OBJECT_TYPE
——————————– —————
ORA$DKVGTT_KAFKACLUS1_SAMPLESTREAMAPP_0 TABLE
ORA$DKV_KAFKACLUS1_PARTITIONS VIEW
ORA$DKV_KAFKACLUS1_SAMPLESTREAMAPP_0 VIEW
ORA$DKX_KAFKACLUS1_PARTITIONS TABLE
ORA$DKX_KAFKACLUS1_SAMPLESTREAMAPP TABLE
ORA$DKX_KAFKACLUS1_SAMPLESTREAMAPP TABLE PARTITION
6 rows selected.
実際にKafkaレコードを処理するストリーミング・アプリケーションを実行してみます。今回は、単純にロードしたKafkaレコードを順番に足していき、SUMが正しい値になったらOFFSETを更新し、COMMITするアプリケーションにします。
出力結果を確認するため、serveroutputをonにしておきます。
set serveroutput on
DECLARE
record_sum number := 0;
tmp_num number;
BEGIN
–Kafkaレコードをグローバル一時表にロード
DBMS_KAFKA.LOAD_TEMP_TABLE(‘ORA$DKVGTT_KAFKACLUS1_SampleStreamAPP_0’);
–ロードした行数(レコード数)だけループ
FOR kafka_record IN (SELECT KAFKA_OFFSET offset FROM ORA$DKVGTT_KAFKACLUS1_SampleStreamAPP_0) LOOP
DBMS_OUTPUT.PUT_LINE(‘Processing record offset:’|| kafka_record.offset);
SELECT COL_NUM INTO tmp_num FROM ORA$DKVGTT_KAFKACLUS1_SampleStreamAPP_0 WHERE KAFKA_OFFSET = kafka_record.offset;
record_sum := record_sum + tmp_num;
DBMS_OUTPUT.PUT_LINE(‘record_sum:’|| record_sum);
END LOOP;
IF record_sum = 45 THEN
DBMS_OUTPUT.PUT_LINE(record_sum);
DBMS_KAFKA.UPDATE_OFFSET(‘ORA$DKV_KAFKACLUS1_SampleStreamAPP_0’);
COMMIT;
–ELSE
–エラー処理を記述
END IF;
END;
/
本来は、LOAD_TEMP_TABLEもLOOPさせて常に発生しているレコードを処理し続けるアプリケーションが多いと思いますが、今回はrecord_sumが45になったら終了するようにしました。
メッセージをPublishします。
$KAFKA_HOME/bin/kafka-console-producer.sh \
–bootstrap-server cell-1.streaming.ap-Tokyo-1.oci.oraclecloud.com:9092 \
–topic OSaKsumStream \
–producer.config $KAFKA_HOME/config/producer.properties
>1
>2
>3
>4
>5
>6
>7
>8
>9
実行結果を確認してみます。
Processing record offset:0
record_sum:1
Processing record offset:1
record_sum:3
Processing record offset:2
record_sum:6
Processing record offset:3
record_sum:10
Processing record offset:4
record_sum:15
Processing record offset:5
record_sum:21
Processing record offset:6
record_sum:28
Processing record offset:7
record_sum:36
Processing record offset:8
record_sum:45
45
Number of Kafka records processed = 9
PL/SQL procedure successfully completed.
1レコードずつ足されてrecord_sumが45となり、9レコードを処理した、と表示がされています。今回はコンソールからの手入力で検証を行いましたが、実際にProducerアプリを作成してメッセージをPublish⇒Oracle SQL Access to Kafkaのストリーミング・アプリケーションで処理することもできます。
参考情報
Oracle SQL Access to Kafka