在Kafka中使用Scala编写的生产者和消费者
一开始
由于可能要使用Kafka,所以将尝试用几种语言编写生产者和消费者的代码。
上次我尝试了使用Golang。
本次演示将介绍Scala。在使用Scala时,似乎需要使用Java客户端,但由于Akka已经为Akka Stream开发了一个方便的包装器Akka-Stream-Kafka,因此我们将尝试使用它。
*不会解释Kafka本身或Akka-Stream本身。
组成
IntelliJ:2018年
Scala:2.12版本
sbt:1.1版本
Akka:2.5版本
Akka-Stream:2.5版本
Akka-Stream-Kafka:0.2版本
Spray-Json:1.3版本
产品或结果
“简称为sbt”
我会解决依存关系。除了Akka-Stream-Kafka相关,我会使用spray-json进行消息的序列化和反序列化。
name := "scala-kafka-example"
version := "0.1"
scalaVersion := "2.12.5"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.12",
"com.typesafe.akka" %% "akka-stream" % "2.5.12",
"com.typesafe.akka" %% "akka-stream-kafka" % "0.20",
"io.spray" %% "spray-json" % "1.3.3"
)
mainClass in Compile := Some("com.example.Main")
消息对象
我們首先定義了一個可以進行序列化和反序列化的消息。
package com.example
import spray.json.{DefaultJsonProtocol, RootJsonFormat}
object Message {
// 送信メッセージ
final case class SendMessage(message: String, timestamp: Long)
// 受信メッセージ
final case class ConsumedMessage(message: String, timestamp: Long)
}
// メッセージ <-> json
trait MessageJsonProtocol extends DefaultJsonProtocol {
import Message._
implicit val sendMessageFormat: RootJsonFormat[SendMessage] = jsonFormat2(SendMessage)
implicit val consumedMessageFormat: RootJsonFormat[ConsumedMessage] = jsonFormat2(ConsumedMessage)
}
制片人
我试着从制片人的角度写一下。
package com.example
// import省略
object Main extends App with MessageJsonProtocol {
import Message._
implicit val system: ActorSystem = ActorSystem("kafka-example")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executor: ExecutionContext = system.dispatcher
val conf = ConfigFactory.load()
val bootstrapServers = conf.getString("kafka.bootstrapServers")
// プロデューサー設定
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
// プロデューサーstream
val (pKillSwitch, pDone) = Source
.tick(1.seconds, 10.seconds, None)
.viaMat(KillSwitches.single)(Keep.right)
.map { _ =>
val msg = SendMessage("Hello", ZonedDateTime.now().toEpochSecond)
ProducerMessage.Message(new ProducerRecord[String, String]("test.B", msg.toJson.compactPrint),
None)
}
.via(Producer.flow(producerSettings))
.map { result =>
println(s"success send. message: ${result.message.record.value()}")
result
}
.toMat(Sink.ignore)(Keep.both)
.run()
pDone.onComplete {
case Success(_) =>
println("done producer.")
case Failure(ex) =>
println(s"fail send. reason: ${ex.getMessage}")
}
println("start")
StdIn.readLine()
pKillSwitch.shutdown()
println("end")
}
Kafka服务器地址是从”application.conf”中获取的。
每10秒,流会生成并发送消息。
根据这些参考,我们定义了流的KillSwitch。如果发生异常,应该会进入到onComplete中…。
消费者
接下来,我会写关于消费者的内容。
package com.example
// import省略
object Main extends App with MessageJsonProtocol {
import Message._
implicit val system: ActorSystem = ActorSystem("kafka-example")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executor: ExecutionContext = system.dispatcher
val conf = ConfigFactory.load()
val bootstrapServers = conf.getString("kafka.bootstrapServers")
// コンシューマー設定
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("test")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// コンシューマーstream
val (cKillSwitch, cDone) = Consumer
.committableSource(consumerSettings, Subscriptions.topics("test.B"))
.viaMat(KillSwitches.single)(Keep.right)
.map { consumed =>
val msg = consumed.record.value().parseJson.convertTo[ConsumedMessage]
println(s"consumed. message: ${msg.message}")
consumed
}
.mapAsync(1) { msg =>
msg.committableOffset.commitScaladsl()
}
.toMat(Sink.ignore)(Keep.both)
.run()
cDone.onComplete {
case Success(_) =>
println("done consumer.")
case Failure(ex) =>
println(s"fail consume. reason: ${ex.getMessage}")
}
println("start")
StdIn.readLine()
cKillSwitch.shutdown()
println("end")
}
由于大体与制片人相同,所以没有特别需要说的。
只要流不停止,它就会自动连接到Kafka并接收指定主题的数据。
进行执行
将生产者和消费者组合在一起(Main.scala),构建和执行,应该会产生以下输出(已删除Actor系列的警告)。
start
success send. message: {"message":"Hello","timestamp":1523958246}
success consume. message: Hello, timestamp: 1523958246
success send. message: {"message":"Hello","timestamp":1523958256}
success consume. message: Hello, timestamp: 1523958256
success send. message: {"message":"Hello","timestamp":1523958266}
success consume. message: Hello, timestamp: 1523958266
end
结束
在与上次相同的封闭世界中进行,但是如果想让制片方的stream发声,可以考虑使用”Source.single”或者”Source.queue”(参考)。