Kafkaとは

メッセージングミドルウェア
– 公式サイト
– Apache Kafka, 他とは異なるメッセージングシステム
– Qiitaの記事

とりあえず、動くとこまで。

ClojureのKafkaクライアント

利用するライブラリ
– clj-kafka

利用するKafkaのバージョンは0.8.2。ダウンロードとコマンドラインからの動作確認は公式のquick startで確認。

サンプルアプリ

1.leiningenのインストール

公式サイトのインストールに従って、インストール

2.compojureテンプレートの利用

公式サイト

下記のコマンドを実行すると、自動でcompojureテンプレートが展開されプロジェクトの雛形が生成される(このときcompojureテンプレートの最新バージョンがローカルのMavenリポジトリに存在すればそれを利用し、存在しなければダウンロードされる)。

lein new compojure プロジェクト名

プロジェクト名はexwebにする。
下記のコマンドでサーバを起動

lein ring server

下記のURIにブラウザからアクセスすると「Hello World」と表示される。

http://localhost:3000/

3.project.cljの編集


(defproject exweb "0.1.0-SNAPSHOT"
  :description "compojure sample"
  :url "http://example.com/exweb"
  :min-lein-version "2.0.0"
  :dependencies [[org.clojure/clojure "1.7.0"]
                 [compojure "1.4.0"]
                 [ring/ring-defaults "0.1.5"]
                 [clj-kafka "0.3.4"]] ;; 追加
  :plugins [[lein-ring "0.9.7"]]
  :ring {:handler exweb.handler/app}
  :profiles {:dev {:dependencies [[javax.servlet/servlet-api "2.5"]
                                  [ring/ring-mock "0.3.0"]]}}
  )

4.kafkaクライアントの作成


(ns exweb.kafka
  (:require
    [clj-kafka.core :as kafka]
    [clj-kafka.new.producer :as kafka-pd]
    [clj-kafka.consumer.zk :as kafka-zk])
  (:import [kafka.serializer StringDecoder]
           [kafka.consumer KafkaStream]))

;;Producer用の記述
(def producer-config {"bootstrap.servers" "localhost:9092"})

(defn send-message-to-kafka [msg]
  (with-open [p (kafka-pd/producer producer-config (kafka-pd/string-serializer) (kafka-pd/string-serializer))]
    @(kafka-pd/send p (kafka-pd/record "test" msg))))


;;Consumer用の記述
(def consumer-config {"zookeeper.connect"  "localhost:2181"
                      "group.id"           "consumer-01"
                      "auto.offset.reset"  "smallest"
                      "auto.commit.enable" "false"})

(defn to-messages
  "ストリームから、値のリストを取り出し、返す。"
  [^KafkaStream stream]
  (->> (.iterator stream)
       iterator-seq
       (map kafka/to-clojure)
       (map :value)))

(defn string-decoder "文字列デコーダを返す" [] (StringDecoder. nil))

(defn consume-messages 
  "現在のTopicから全てのメッセージを読みこみ、返す"
  []
  (kafka/with-resource [c (kafka-zk/consumer consumer-config)]
                       kafka-zk/shutdown
                       (to-messages (kafka-zk/create-message-stream c "test" (string-decoder) (string-decoder)))))

5.handlerの修正

(ns exweb.handler
  (:require [compojure.core :refer :all]
            [compojure.route :as route]
            [ring.middleware.defaults :refer [wrap-defaults site-defaults]]
            [exweb.kafka :refer :all]))

(defroutes app-routes
           (GET "/" [] "Hello World")
           (GET "/send" [msg] (str (send-message-to-kafka msg)))
           (GET "/show" [] (consume-messages))
           (route/not-found "Not Found"))

(def app
  (wrap-defaults app-routes site-defaults))

6.動作確認

サーバを起動

lein ring server

下記のエンドポイントにメッセージを送信

http://localhost/send?msg=Kafka Message

ブラウザ上には「{:topic “test”, :partition 0, :offset 1}」と表示。

下記のエンドポイントでメッセージを取得し、表示

http://localhost/show

ブラウザ上に「Kafka Message」と表示される。

广告
将在 10 秒后关闭
bannerAds