いよいよKafka連携に挑戦!
前回まで検証を進めてきた、MemSQL標準機能のファイルシステム・パイプラインは、非常にシンプルな構成で自動的にデータ収集のバッチ処理が行われるので、例えばオンプレミスのS3的な活用が出来ると思いますが、より高度な処理を間に入れて、大量の処理要求(この領域でのスケールアウトが必用な場合)に対する対応が想定される場合には、Kafkaを間に挟んで処理を流す想定がかなり具体的に浮上してくるでしょう。
そこで、今回の検証では、MemSQLの標準機能として提供されているKafkaパイプラインに関する基本的な連携検証を行い、その次のより高度なKafka活用に繋げて行きたいと思います。
基本的な手順などについては、MemSQL社の公式ホームページを参考にして頂ければ良いかと思いますが、取り急ぎ実際に作業を行いながら、稼働の要点や周辺解説を行ないながら検証を進めて行きます。
検証環境の準備
今回の検証は、初歩的な連携検証になりますので、出来るだけシンプルかつ効率的に進める為に、必要な環境を単独のLinux上にDocker環境として構築して行いたいと思います。
OS:CentOS 7
Docker: 19.03.5
CentOSの導入は、ほぼほぼ問題無く行えるかと思いますので、その上で稼働させるDocker環境の構築についての参考情報を共有させて頂きます。
Docker環境の導入に関しては、既に多数の情報がネット上に提供されていますので、基本的にその手順を踏んで頂けば問題有りませんが、個人的には今回のDocker環境整備に関して下記の情報を参考にさせて頂きました。
参考情報:
Install Docker on CentOS 7
では、以降はCentOS+Dockerが上手く稼働している!という前提で作業を進めて行きます。
まずは、kafka環境の整備・・・
今回の目的は、MemSQLとKafkaを連携させる・・という基本的なステップの確認ですので、取り急ぎシンプルに検証可能な、MemSQL社から提案されている方式で実施する事にします。もちろん、腕に自信と時間の有る場合は、それぞれ素の状態から組み上げても問題ありませんが、取り急ぎ繋げて動く!という初歩的確認になりますので、今回はこの手順をお勧め致します。
Docker環境の導入が終わったら、ターミナルウィンドウを立ち上げて次のコマンドを実行してください。
docker run --name kafka memsql/kafka
この操作で導入されるコンテナ(memsql/Kafka)は、KafkaとZookeeperの両方が事前に設定されているので、後述のMemSQLのQuick Start版との連携確認を含めて、今回の様な事前検証テストに理想的な構成になります。
導入処理が進んでコンテナが初期化されると、ターミナルに多数のメッセージが出力されますが、基本的に最後の2行が下記の様に出力されれば、このコンテナ導入は成功していますのでご安心下さい。
INFO success: zookeeper entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
INFO success: kafka entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
この出力を確認出来れば、次の作業に移ります。
Kafkaへメッセージを送信してみる・・・
まず最初に、今回導入したDockerコンテナに接続して、Kafkaとの記念すべき第1回目の通信準備を行います。
(1)Kafka環境へ入る
新しいターミナルウィンドウを起動して(今回の基本検証では、大量のターミナルウインドウを使用しますので、オペレーションを間違えないようにお願い致します)、次のコマンドを実行します。
docker exec -it kafka /bin/bash
一般的なDockerコマンドになりますが、このコマンドを実行する事で現在実行中のコンテナ内で、各種のコマンドを使える様になります。
無事に処理されると、bashプロンプトが表示されます。(xxxxxxxxx_は、状況に応じて変わります)
root@xxxxxxxxxxxx:/#
(2)トピックを作る
ディレクトリを移動します。
cd /opt/kafka*/bin
通信を行う為に、Kafkaお約束の トピックを作成しなければなりませんが、今回はKafkaの中に入っている、新規のトピックを作成してメッセージの投稿を開始可能に出来る、便利なお助けスクリプトを使います。
./kafka-topics.sh --topic kafka_test --zookeeper 127.1:2181 --create --partitions 8 --replication-factor 1
このコマンドは、kafka-topics.shスクリプトを使用してkafka_testという名前の新しいトピックを作成します。
(3)プロデューサを作る
トピックができたので、トピックにメッセージを送信するために使用するできるプロデューサを作成します。
./kafka-console-producer.sh --topic kafka_test --broker-list 127.0.0.1:9092
このコマンドは、kafka-console-producer.shスクリプトを使用して、トピックkafka_testに関連付けられているプロデューサを構成します。
また、このスクリプトを使用して処理が正常に終了すると、自動的にトピック内のKafkaメッセージとして送信できる標準入力インターフェースになりますので、取り急ぎ以下の試験メッセージを入力してみてください。(この作業後にターミナルウインドウは、そのままの状態で確保しておくようにして下さい)
message-01
message-02
message-03
message-04
この段階では、今回構成したDocker上のKafkaクラスターには、幾つかのメッセージ(前述の作業で入力した)を含むkafka_testという名前のトピックが存在している状況になります。(ある意味、まだ宙ぶらりんな状態・・)
確認事項:此処までの作業で使用したターミナルウインドウは実行状態で確保されていますか?
(1)Kafkaにメッセージを入力するためのターミナルウインドウ
(2)Kafkaコンテナと遣り取りするためのターミナルウインドウ
MemSQL側の準備
ここまで順調に作業が進めば、最終段階のMemSQLとの連携に必要な環境整備を行います。
MemSQLにおけるKafkaパイプラインの作成と検証・・・
この段階まで来れば、既に検証用のDocker/Kafkaに作成したトピック上に幾つかのメッセージが準備されているはずなので、MemSQL上にKafka用のパイプラインを作成して、それらのメッセージをテーブルに挿入してみたいと思います。
検証用のMemSQLをDocker上に導入する
MemSQLのクイックスタート専用(ここ重要です)に特別に提供されている、学習・事前検証用のDockerコンテナ(memsql/quickstart)が有りますので、今回はそれを活用させて頂く事にします。
新しいターミナルウィンドウで、次のコマンドを実行します。
docker run --name memsql -p 3306:3306 -p 9000:9000 memsql/quickstart
これも、基本的にはDockerの基本的なコマンド列になりますが、動きとしてはDocker Hubからmemsql/quickstartという、Dockerイメージを自動的にダウンロードしてコンテナを作成し、コンテナに識別名(memsql)を割り当てて起動します。(このMemSQLはバージョン的には少し前(本稿時ではV5.5)のバージョンで、基本的に基本機能&CLI経由だけの利用になります)
MemSQLの初期化プロセスが完了したら、新しいターミナルウィンドウを開いて、次のコマンドを実行してください。
docker exec -it memsql memsql
簡単な初期化が無事に終了するとMemSQLのプロンプトが出てくるので、以後は基本的にSQLを打ち込んでいく形になります。
MemSQL上での設定
MemSQLのプロンプトが出てきたら、取り急ぎ今回の検証用の基本構成を設定します。
CREATE DATABASE kafka_db;
USE kafka_db;
CREATE TABLE messages (msg text);
次にファイルシステム・パイプライン同様に、パイプラインの作成を行います。
Kafkaパイプラインの場合は、NFSで共有されたディレクトリ情報ではなく、IPベースのネットワークアドレスが必用になりますので、その情報を確保しておく必要があります。
今回は、同一のDocker内で完結する検証になりますので、シンプルに次のコマンドを使ってDocker内のKafkaクラスタに関するIP情報を取得します。
新しいウィンドウで、次のコマンドを実行します。
docker inspect -f '{{ .NetworkSettings.IPAddress }}' kafka
このコマンドが正常に処理されると、172.17.0.2等のKafkaコンテナのIPアドレスが出力されますので、その情報を正確にコピーしてMemSQLを起動しているターミナルウインドウ上で以下のスクリプトを実行して下さい。
CREATE PIPELINE `Kafka_pipeline` AS LOAD DATA KAFKA '172.17.0.2/test' INTO TABLE `messages`;
# 今回のKafkaクラスタに関するIPアドレスが、172.17.0.2だったのでそれを使って入力しています。
このスクリプトが正常に実行されると、kafka_pipelineという名前のKafkaパイプラインが作成されます。またKafkaクラスタ(ここでは、IP:172.17.0.2)のkafka_testというトピックからメッセージを取り込んで、事前に定義されているmessagesというテーブルに挿入します。
また、この処理が正常に行われた場合に、作成したパイプラインのテストを行う事が出来るようになりますので、必要に応じて小さなデータセットを使用して疎通試験を行ってください。
テストの例:
以下の手順は、1つのバッチ処理を実行し(LIMIT 1)フォアグラウンド処理でmessagesテーブルにデータをコミットします。
TEST PIPELINE Kafka_pipeline LIMIT 1;
最初にトピックに送ったメッセージを取り込んでみる・・
特に問題が無いようであれば、最初にKafkaクラスタ上のトピックに送ってある、宙ぶらりん状態のメッセージをMemSQL上に設定したmessagesテーブルにながしこんでみます。
START PIPELINE Kafka_pipeline FOREGROUND LIMIT 1 BATCHES;
今回はファイルシステム・パイプラインでも出てきました、フォアグラウンド処理で実施していますが、当然Kafkaパイプラインでもバックグラウンド処理も可能ですので、取り急ぎこの処理結果を確認してから動作検証してみます。
messagesテーブルに無事に最初のメッセージ群が流し込まれたかは、お約束のスクリプトで確認できます。
SELECT * FROM messages;
パイプラインをバックグラウンドで動かしてみる・・・
では、今回の検証の最終段階として、連続したメッセージ取り込みをKafka経由で行いたいと思います・・・とは言っても此処までを無事にクリア出来ていれば、基本的にはさほど難しい話ではありませんのでご安心ください。
バックグラウンドでKafkaパイプラインを動かすには、以下のスクリプトを実行します。
START PIPELINE Kafka_pipeline;
既に、Kafkaパイプラインが稼働していると思いますので、メッセージをトピックに送信したターミナルウインドウから追加のメッセージを入力して行きます。Enterキーが押されえると、入力されたデータがトピックとして積み上がり、プロデューサ経由でMemSQLのKafkaパイプラインを通って所定のテーブルに追加されます。
message-05
message-06
message-07
message-08
追加状況は、お約束のスクリプトで随時確認が可能です。
SELECT * FROM messages;
参考情報
パイプラインの実行状況は、MemSQLが管理しているPIPELINES_BATCHES_SUMMARYテーブルを見れば、その時点でのステータス等を確認する事が出来ます。
SELECT * FROM information_schema.PIPELINES_BATCHES_SUMMARY;
またこの情報は、各パイプラインが実行されたバッチごとに整理して表示されるので、実際の利用局面における性能関連状況を容易に把握出来るようになり、パイプライン処理の監視等に対して非常に役立ちます。
今回のまとめ
今回は、MemSQLのパイプライン機能の一つである、Kafkaパイプラインの基本的な検証を行ってみました。実際的にはMemSQLとKafkaクラスタはそれぞれ単独の環境で稼働すると思いますので、可能であれば番外編2の続編として分離された環境下での、少し重ためな検証も実施してみようと考えておりますので、あまり期待しないでお待ちください・・・(汗)
今回の基本検証に関しては、概念的には実際のシナリオにも適用できると思いますので、ぜひ実際に試してみる事をお勧め致します。
MemSQLを使ってみようVol.12: 実践編5に続く
謝辞
本稿において、ymasaoka様が投稿されたInstall Docker on CentOS 7を、今回の検証環境の基本部分である、CentOS+Docker導入の参考とさせて頂きました。この場を借りて深く御礼申し上げます。
また、本解説に転載させて頂いているスクリーンショットは、一部を除いて現在MemSQL社が公開されている公式ホームページの画像を使わせて頂いており、本内容とMemSQL社の公式ホームページで公開されている内容が異なる場合は、MemSQL社の情報が優先する事をご了解ください。