Kafkaとは
メッセージングミドルウェア
– 公式サイト
– Apache Kafka, 他とは異なるメッセージングシステム
– Qiitaの記事
とりあえず、動くとこまで。
ClojureのKafkaクライアント
利用するライブラリ
– clj-kafka
利用するKafkaのバージョンは0.8.2。ダウンロードとコマンドラインからの動作確認は公式のquick startで確認。
サンプルアプリ
1.leiningenのインストール
公式サイトのインストールに従って、インストール
2.compojureテンプレートの利用
公式サイト
下記のコマンドを実行すると、自動でcompojureテンプレートが展開されプロジェクトの雛形が生成される(このときcompojureテンプレートの最新バージョンがローカルのMavenリポジトリに存在すればそれを利用し、存在しなければダウンロードされる)。
lein new compojure プロジェクト名
プロジェクト名はexwebにする。
下記のコマンドでサーバを起動
lein ring server
下記のURIにブラウザからアクセスすると「Hello World」と表示される。
http://localhost:3000/
3.project.cljの編集
(defproject exweb "0.1.0-SNAPSHOT"
:description "compojure sample"
:url "http://example.com/exweb"
:min-lein-version "2.0.0"
:dependencies [[org.clojure/clojure "1.7.0"]
[compojure "1.4.0"]
[ring/ring-defaults "0.1.5"]
[clj-kafka "0.3.4"]] ;; 追加
:plugins [[lein-ring "0.9.7"]]
:ring {:handler exweb.handler/app}
:profiles {:dev {:dependencies [[javax.servlet/servlet-api "2.5"]
[ring/ring-mock "0.3.0"]]}}
)
4.kafkaクライアントの作成
(ns exweb.kafka
(:require
[clj-kafka.core :as kafka]
[clj-kafka.new.producer :as kafka-pd]
[clj-kafka.consumer.zk :as kafka-zk])
(:import [kafka.serializer StringDecoder]
[kafka.consumer KafkaStream]))
;;Producer用の記述
(def producer-config {"bootstrap.servers" "localhost:9092"})
(defn send-message-to-kafka [msg]
(with-open [p (kafka-pd/producer producer-config (kafka-pd/string-serializer) (kafka-pd/string-serializer))]
@(kafka-pd/send p (kafka-pd/record "test" msg))))
;;Consumer用の記述
(def consumer-config {"zookeeper.connect" "localhost:2181"
"group.id" "consumer-01"
"auto.offset.reset" "smallest"
"auto.commit.enable" "false"})
(defn to-messages
"ストリームから、値のリストを取り出し、返す。"
[^KafkaStream stream]
(->> (.iterator stream)
iterator-seq
(map kafka/to-clojure)
(map :value)))
(defn string-decoder "文字列デコーダを返す" [] (StringDecoder. nil))
(defn consume-messages
"現在のTopicから全てのメッセージを読みこみ、返す"
[]
(kafka/with-resource [c (kafka-zk/consumer consumer-config)]
kafka-zk/shutdown
(to-messages (kafka-zk/create-message-stream c "test" (string-decoder) (string-decoder)))))
5.handlerの修正
(ns exweb.handler
(:require [compojure.core :refer :all]
[compojure.route :as route]
[ring.middleware.defaults :refer [wrap-defaults site-defaults]]
[exweb.kafka :refer :all]))
(defroutes app-routes
(GET "/" [] "Hello World")
(GET "/send" [msg] (str (send-message-to-kafka msg)))
(GET "/show" [] (consume-messages))
(route/not-found "Not Found"))
(def app
(wrap-defaults app-routes site-defaults))
6.動作確認
サーバを起動
lein ring server
下記のエンドポイントにメッセージを送信
http://localhost/send?msg=Kafka Message
ブラウザ上には「{:topic “test”, :partition 0, :offset 1}」と表示。
下記のエンドポイントでメッセージを取得し、表示
http://localhost/show
ブラウザ上に「Kafka Message」と表示される。