この記事は Kafka Advent Calendar 2021 の11日目の記事です。
前回の記事で紹介した Kafka Streams アプリケーションをClojureで書いてみました。
Jackdawという Kafka Streams の Clojure ラッパーライブラリを使っています。
テスト環境構築
手軽に手元で試したいので、Kafka の Quick start に従い環境構築します。
$ tar -xzf kafka_2.13-3.0.0.tgz
$ cd kafka_2.13-3.0.0
ZooKeeper と Kafka broker をそれぞれ別のターミナルから起動します。
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
bin/zookeeper-server-start.sh config/zookeeper.properties
# Start the Kafka broker service
bin/kafka-server-start.sh config/server.properties
必要なトピックを作成します。
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1 --topic event
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1 --topic entity
動作確認
kafka-streams-demo をクローンして下さい。
git clone https://github.com/223kazuki/kafka-streams-demo
cd kafka-streams-demo
dev プロファイルでnreplサーバを起動するよう設定しているので、好きな開発環境から接続して下さい。普通に repl を起動しても可。
clj -A:dev
kafka_streams_demo.clj の下部にテスト用のコードがあるので comment 内で eval して下さい。下記で Kafka Streams アプリケーションを起動します。
(def app (start-app (app-config)))
(stop-app app)
起動後、下記でトピックにメッセージを送信します。
(def p (jc/producer {:bootstrap.servers "localhost:9092"}
{:key-serde (jse/serde)
:value-serde (jse/serde)}))
@(jc/produce! p {:topic-name "event"} :id-1 {:type :update-entity
:data {:inc 1}})
@(jc/produce! p {:topic-name "event"} :id-1 {:type :update-entity
:data {:inc 10}})
kafka-streams-demo のログより、Input event と Updated entity が確認出来るので、エンティティが期待通りに更新されていくことを確認して下さい。トポロジー内で意図的に (Thread/sleep 3000) しており、メッセージを連続で生成した場合、前回の記事で紹介した競合状態を引き起こしえます。しかし、同様に説明した通り StateStore を導入することで、実際には競合状態を防げています。
INFO kafka-streams-demo - Input: {:key :id-1, :value {:type :update-entity, :data {:inc 1}, :entity nil}}
INFO kafka-streams-demo - Updated entity: {:count 1}
INFO kafka-streams-demo - Input: {:key :id-1, :value {:type :update-entity, :data {:inc 10}, :entity {:count 1}}}
INFO kafka-streams-demo - Updated entity: {:count 11}
トポロジー
Jackdaw により、Streams DSL が clojure 関数にラップされており、それらは builder か kstreams を第一引数に取るため、スレッディングマクロで簡潔ににトポロジーを書き下せます。また、変換自体も簡単なものであれば無名関数で記述することで見通しがよくなります。トポロジーの分岐もマクロをうまく利用すればある程度可読性を保てると思います。
(defn build-topology
[builder]
(let [event-topic (topic-config "event")
entity-topic (topic-config "entity")
store-name "entity-store"]
(-> builder
;; Add state store to the application
(j/with-kv-state-store {:store-name store-name})
;; Start streams
(j/kstream event-topic)
;; Filter by message type
(j/filter (fn [[_ {:keys [type]}]]
(= :update-entity type)))
;; Left join entity from ktable then state store
;; Event -> Event + Entity
(j/left-join (j/ktable builder entity-topic) #(assoc %1 :entity %2))
(j/transform (lambdas/transformer-with-ctx
(fn [ctx k v]
(let [store (.getStateStore ctx store-name)
stored-entity (.get store k)]
(key-value [k (update v :entity #(or stored-entity %))]))))
[store-name])
;; Process input message and existing entity to updated entity
;; Event + Entity -> Entity'
(j/peek (fn [[k v]]
(log/info "Input:" {:key k :value v})))
(j/map-values (fn [{:keys [data entity]}]
;; Simulate long processing time that can cause race condition
(Thread/sleep 3000)
(update entity :count (fnil + 0 0) (:inc data))))
(j/peek (fn [[_ v]]
(log/info "Updated entity:" v)))
;; Put updated entity to state store
(j/transform (lambdas/transformer-with-ctx
(fn [ctx k v]
(let [store (.getStateStore ctx store-name)]
(.put store k v)
(key-value [k v]))))
[store-name])
(doto #_branch
;; Publish updated entity to Entity topic
(j/to entity-topic)
;; Publish output event to Event topic
(-> (j/map-values (fn [v] {:type :entity-updated :data v}))
(j/to event-topic)))))
builder)
トポロジーが大規模になってくると、生の Streams DSL では全体のトポロジー構成を見通すのが難しくなってきますが、Clojure で書くとある程度全体の構造が見通し易いのではないでしょうか。そもそも Clojure の Immutable の思想が Kafka と相性がいいですし、Code as Data の面目躍如といったところですね。
まとめ
Clojure で Kafka Streams アプリケーションを書くと気持ちが良い。