在Kafka中使用Scala编写的生产者和消费者

一开始

由于可能要使用Kafka,所以将尝试用几种语言编写生产者和消费者的代码。

上次我尝试了使用Golang。

本次演示将介绍Scala。在使用Scala时,似乎需要使用Java客户端,但由于Akka已经为Akka Stream开发了一个方便的包装器Akka-Stream-Kafka,因此我们将尝试使用它。

*不会解释Kafka本身或Akka-Stream本身。

组成

IntelliJ:2018年
Scala:2.12版本
sbt:1.1版本
Akka:2.5版本
Akka-Stream:2.5版本
Akka-Stream-Kafka:0.2版本
Spray-Json:1.3版本

产品或结果

“简称为sbt”

我会解决依存关系。除了Akka-Stream-Kafka相关,我会使用spray-json进行消息的序列化和反序列化。

name := "scala-kafka-example"

version := "0.1"

scalaVersion := "2.12.5"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.5.12",

  "com.typesafe.akka" %% "akka-stream" % "2.5.12",
  "com.typesafe.akka" %% "akka-stream-kafka" % "0.20",

  "io.spray" %%  "spray-json" % "1.3.3"
)

mainClass in Compile := Some("com.example.Main")

消息对象

我們首先定義了一個可以進行序列化和反序列化的消息。

package com.example

import spray.json.{DefaultJsonProtocol, RootJsonFormat}

object Message {

  // 送信メッセージ
  final case class SendMessage(message: String, timestamp: Long)

  // 受信メッセージ
  final case class ConsumedMessage(message: String, timestamp: Long)

}

// メッセージ <-> json
trait MessageJsonProtocol extends DefaultJsonProtocol {

  import Message._

  implicit val sendMessageFormat: RootJsonFormat[SendMessage] = jsonFormat2(SendMessage)
  implicit val consumedMessageFormat: RootJsonFormat[ConsumedMessage] = jsonFormat2(ConsumedMessage)

}

制片人

我试着从制片人的角度写一下。

package com.example

// import省略

object Main extends App with MessageJsonProtocol {

  import Message._

  implicit val system: ActorSystem = ActorSystem("kafka-example")
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executor: ExecutionContext = system.dispatcher

  val conf = ConfigFactory.load()
  val bootstrapServers = conf.getString("kafka.bootstrapServers")

  // プロデューサー設定
  val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)

  // プロデューサーstream
  val (pKillSwitch, pDone) = Source
    .tick(1.seconds, 10.seconds, None)
    .viaMat(KillSwitches.single)(Keep.right)
    .map { _ =>
      val msg = SendMessage("Hello", ZonedDateTime.now().toEpochSecond)
      ProducerMessage.Message(new ProducerRecord[String, String]("test.B", msg.toJson.compactPrint),
                              None)
    }
    .via(Producer.flow(producerSettings))
    .map { result =>
      println(s"success send. message: ${result.message.record.value()}")
      result
    }
    .toMat(Sink.ignore)(Keep.both)
    .run()

  pDone.onComplete {
    case Success(_) =>
      println("done producer.")
    case Failure(ex) =>
      println(s"fail send. reason: ${ex.getMessage}")
  }

  println("start")

  StdIn.readLine()

  pKillSwitch.shutdown()

  println("end")
}

Kafka服务器地址是从”application.conf”中获取的。
每10秒,流会生成并发送消息。
根据这些参考,我们定义了流的KillSwitch。如果发生异常,应该会进入到onComplete中…。

消费者

接下来,我会写关于消费者的内容。

package com.example

// import省略

object Main extends App with MessageJsonProtocol {

  import Message._

  implicit val system: ActorSystem = ActorSystem("kafka-example")
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executor: ExecutionContext = system.dispatcher

  val conf = ConfigFactory.load()
  val bootstrapServers = conf.getString("kafka.bootstrapServers")

  // コンシューマー設定
  val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId("test")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  // コンシューマーstream
  val (cKillSwitch, cDone) = Consumer
    .committableSource(consumerSettings, Subscriptions.topics("test.B"))
    .viaMat(KillSwitches.single)(Keep.right)
    .map { consumed =>
      val msg = consumed.record.value().parseJson.convertTo[ConsumedMessage]
      println(s"consumed. message: ${msg.message}")
      consumed
    }
    .mapAsync(1) { msg =>
      msg.committableOffset.commitScaladsl()
    }
    .toMat(Sink.ignore)(Keep.both)
    .run()

  cDone.onComplete {
    case Success(_) =>
      println("done consumer.")
    case Failure(ex) =>
      println(s"fail consume. reason: ${ex.getMessage}")
  }

  println("start")

  StdIn.readLine()

  cKillSwitch.shutdown()

  println("end")

}

由于大体与制片人相同,所以没有特别需要说的。
只要流不停止,它就会自动连接到Kafka并接收指定主题的数据。

进行执行

将生产者和消费者组合在一起(Main.scala),构建和执行,应该会产生以下输出(已删除Actor系列的警告)。

start
success send. message: {"message":"Hello","timestamp":1523958246}
success consume. message: Hello, timestamp: 1523958246
success send. message: {"message":"Hello","timestamp":1523958256}
success consume. message: Hello, timestamp: 1523958256
success send. message: {"message":"Hello","timestamp":1523958266}
success consume. message: Hello, timestamp: 1523958266
end

结束

在与上次相同的封闭世界中进行,但是如果想让制片方的stream发声,可以考虑使用”Source.single”或者”Source.queue”(参考)。

广告
将在 10 秒后关闭
bannerAds