我试用了Cloudflow
今天是Advent Calendar的第17天。
我尝试了上个月由Lightbend公司发布的Cloudflow。我正在根据教程写一篇文章,列出我认为重要的要点。这篇文章是为那些希望快速了解概述的人准备的!
此外,本文的内容都可以在官方指南中找到。
Cloudflow是什么?
Cloudflow是由Lightbend公司开发的开源软件,旨在利用分布式流处理在Kubernetes上快速开发和运营应用程序。
它将流处理的每个处理步骤拆分成小的组件,并使用基于模式的方法将这些组件连接起来,以构建复杂的系统。
目前,Akka Streams、Apache Spark、Apache Flink的流处理引擎可集成到Kubernetes环境中,并且可以轻松进行必要的开发和操作,直到部署为止(它们提供了不需要样板文件的功能,还可以生成Docker镜像的命令等)。
简而言之,它是一个实现流处理方便的框架。
基本概念
首先,让我们介绍一下在Cloudflow中出现的术语和概念。
图中的Ingress, Processor, Egress被称为Streamlet组件,组件之间是松散耦合的。Streamlet具有称为inlet和outlet的输入输出,并使用它们连接到各个组件。
Streamlet被分为进程、处理器、出口和扇出等形状,并具有以下不同之处。
-
- Ingress: outletだけ持つ。入力はHTTPリクエスト等
-
- Processor: inlet/outletを1つずつ持つ。メインの処理はここに書く
-
- FanOut: Splitterとも呼ばれる。1つのinlet、複数のoutletを持つ。条件分岐するために使う。
- Egress: inletだけ持つ。Slackに通知、HDFSに書き込むといった何かしら副作用のある処理を書く。
此外,围绕Streamlet的Blueprint正是指设计图,用于基于架构模式表示各个Streamlet之间的连接。
Cloudflow中使用的术语大约是这些。概念本身很简单。因为它是由Akka Streams的开发人员开发的,所以概念有些相似。
试着开始做Getting Started。
那么,我们开始操作Cloudflow。
主题是https://cloudflow.io/docs/current/get-started/index.html。
作为一个类似于“Hello World”的应用程序,我找到了一个教程,其主题是创建一个能够处理从风力发电机发送的事件的管道处理。我已经尝试在本地启动该应用程序。
教程中也包含将应用程序部署到Kubernetes环境的步骤。如果你有兴趣,可以参考官方指南。
准备
请安装以下内容。
-
- Java 8 (JDK)
- sbt(バージョン1.2.8以上)
如果要在Kubernetes环境中部署,需要使用Docker和kubectl。
整体的形象
整体情况如上图所示。我认为您已经看到了,但我会解释一下过程,如下所示。
-
- 在Ingress上接收输入数据
-
- 在Processor中将输入转换为领域对象
-
- 在Splitter上对输入值进行验证,并进行分支处理
- 在Egress上记录与每个分支相关的日志记录
好的,接下来我们将从下一节开始进行实际编码。
项目准备
项目的组成如下所示。
|-project
|--cloudflow-plugins.sbt
|-src
|---main
|-----avro
|-----blueprint
|-----resources
|-----scala
|-------sensordata
|-build.sbt
我认为这个构成几乎与sbt相同,但有一些我觉得值得关注的地方,我在下面进行讨论。
project/cloudflow-plugins.sbt: Cloudflowをsbtで有効化するためのプラグインを記述するファイル
avro: Xxx.avscといったavro形式でドメインオブジェクトを定義する。コンパイル時に動的にScalaファイルが生成される
blueprint: Streamletを接続するためのスキーマを定義するblueprint.confを配置する
在这种情况下,使用Avro格式定义领域对象并在blueprint.conf中定义管道是关键。
构建.sbt
在中文中,”build.sbt”可以这样定义。
import sbt._
import sbt.Keys._
lazy val sensorData = (project in file("."))
.enablePlugins(CloudflowAkkaStreamsApplicationPlugin)
.settings(
libraryDependencies ++= Seq(
"com.lightbend.akka" %% "akka-stream-alpakka-file" % "1.1.2",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.10",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"com.typesafe.akka" %% "akka-http-testkit" % "10.1.10" % "test"
),
name := "sensor-data",
organization := "com.lightbend",
scalaVersion := "2.12.10",
crossScalaVersions := Vector(scalaVersion.value)
)
以下为中文的同义表达:
重点是
.enablePlugins(CloudflowAkkaStreamsApplicationPlugin)
在使用Cloudflow和Akka Streams创建应用程序的情况下,您需要启用此插件。如果您想要使用Spark,则启用下面的选项。根据所使用的流处理引擎的不同进行选择。
.enablePlugins(CloudflowSparkApplicationPlugin)
另外,cloudflow-plugins.sbt的设置如下所示。
addSbtPlugin("com.lightbend.cloudflow" % "sbt-cloudflow" % "1.3.0-M5")
用Avro定义域对象。
这里的领域对象指的是在Streamlet之间传递的对象。
如果使用Avro格式进行定义,它会在编译时将其转换为Scala文件。
在这个教程中,我准备了几个选项,但为了避免文章太长,只展示其中一个。
{
"namespace": "sensordata",
"type": "record",
"name": "SensorData",
"fields":[
{
"name": "deviceId",
"type": {
"type": "string",
"logicalType": "uuid"
}
},
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "measurements", "type": "sensordata.Measurements"
}
]
}
传感器数据是第一个输入对象。假设我们不讨论Avro的细节,可以大致了解它具有deviceId、timestamp和measurements三个字段的模式。
此外,生成的Scala文件将如下所示。
/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package sensordata
import scala.annotation.switch
case class SensorData(var deviceId: java.util.UUID, var timestamp: java.time.Instant, var measurements: Measurements) extends org.apache.avro.specific.SpecificRecordBase {
... //省略
}
我省略了一部分的内容,但是您应该可以看到我们确实拥有与avro文件定义相符的字段。
Streamlet的实现
我们将在 src/main/scala/sensordata 下实现 Streamlet。
由于本次实现基于 Akka Streams,所以写法也将采用类似 Akka Streams 的风格。
因为如果在这里列出所有的Streamlet,文章会变得很长,所以我将在输入部分列出对应的Ingress Streamlet和Akka Streams的API,并给出实际使用的代码。
Ingress的实施。
package sensordata
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import cloudflow.akkastream._
import cloudflow.akkastream.util.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import SensorDataJsonSupport._
class SensorDataHttpIngress extends AkkaServerStreamlet {
// create an outlet that will use the default Kafka partitioning
val out =
// define the streamlet shapte
AvroOutlet[SensorData]("out").withPartitioner(RoundRobinPartitioner)
def shape = StreamletShape.withOutlets(out)
// override createLogic to provide the streamlet behavior
override def createLogic = HttpServerLogic.default(this, out)
}
继承AkkaServerStreamlet后,可以定义一个接收HTTP输入的Ingress。而且,只要定义了outlet,就可以了。在这里,我们直接将输入流向outlet。
忽略shape和createLogic的定义,重点在于以下内容。
AvroOutlet[SensorData]("out").withPartitioner(RoundRobinPartitioner)
定义了将SensorData流向out的操作。同时,Streamlet之间通过Kafka进行连接,将Kafka的记录方式设置为RoundRobinPartitioner。通过经过Kafka,就像下面的引用所示,旨在使Streamlet之间的生命周期独立。
不同组件的生命周期可以独立管理,因为在底层发布-订阅系统中,Streamlet 之间发送的数据是安全持久化的。
(引用来源:https://github.com/lightbend/cloudflow)
转述:
处理器的实现
接下来,让我们来看一下处理器的实现。
package sensordata
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets.{ RoundRobinPartitioner, StreamletShape }
import cloudflow.streamlets.avro._
class SensorDataToMetrics extends AkkaStreamlet {
// define inlets and outlets
val in = AvroInlet[SensorData]("in")
val out = AvroOutlet[Metric]("out").withPartitioner(RoundRobinPartitioner)
// define the streamlet shape
val shape = StreamletShape(in, out)
// define a flow that makes it possible for cloudflow to commit reads
def flow = {
FlowWithOffsetContext[SensorData]
.mapConcat { data ⇒
List(
Metric(data.deviceId, data.timestamp, "power", data.measurements.power),
Metric(data.deviceId, data.timestamp, "rotorSpeed", data.measurements.rotorSpeed),
Metric(data.deviceId, data.timestamp, "windSpeed", data.measurements.windSpeed)
)
}
}
// override createLogic to provide streamlet behavior
override def createLogic = new RunnableGraphStreamletLogic() {
def runnableGraph = sourceWithOffsetContext(in).via(flow).to(sinkWithOffsetContext(out))
}
}
只是在进行将 SensorData 转换为 Metric 的处理,但我们希望关注调用 Akka Streams API 的部分。
def runnableGraph = sourceWithOffsetContext(in).via(flow).to(sinkWithOffsetContext(out))
sourceWithOffsetContext是Akka Streams的API,用于从Kafka消费记录。此外,还有一些Akka Streams常用的API,如via和to。通过使用Akka Streams,可以将其集成到Cloudflow框架中。
制定蓝图
当完成一系列的Streamlet实现后,将开始连接这些Streamlet。
我们定义下面的blueprint.conf。
blueprint {
streamlets {
http-ingress = sensordata.SensorDataHttpIngress
metrics = sensordata.SensorDataToMetrics
validation = sensordata.MetricsValidation
valid-logger = sensordata.ValidMetricLogger
invalid-logger = sensordata.InvalidMetricLogger
}
connections {
http-ingress.out = [metrics.in]
metrics.out = [validation.in]
validation.invalid = [invalid-logger.in]
validation.valid = [valid-logger.in]
}
}
您可以指定已实现的Streamlet,并定义它与inlet/outlet的连接位置。从配置的内容中,我认为您大致可以感受到它的氛围和大致了解其意思。
关键是在配置文件中定义的。松散耦合的Streamlet在Scala文件中进行编写,并在配置文件中描述如何连接。这样就可以在运行时组装管道。
以上是编码完成。
我来试试实际启动一下。
在项目的根目录下执行以下命令,开始流处理。
$ sbt verifyBlueprint
$ sbt runLocal
[info] Streamlet 'sensordata.SensorDataToMetrics' found
[info] Streamlet 'sensordata.MetricsValidation' found
[info] Streamlet 'sensordata.SensorDataHttpIngress' found
[info] Streamlet 'sensordata.ValidMetricLogger' found
[info] Streamlet 'sensordata.InvalidMetricLogger' found
[success] /Users/mattsu/ideaProjects/cloudflow-test/src/main/blueprint/blueprint.conf verified.
[info] No local.conf file location configured.
[info] Set 'runLocalConfigFile' in your build to point to your local.conf location
---------------------------------- Streamlets ----------------------------------
http-ingress [sensordata.SensorDataHttpIngress]
- HTTP port [3000]
invalid-logger [sensordata.InvalidMetricLogger]
metrics [sensordata.SensorDataToMetrics]
valid-logger [sensordata.ValidMetricLogger]iptors 1s
validation [sensordata.MetricsValidation]
--------------------------------------------------------------------------------
--------------------------------- Connections ---------------------------------
validation.valid -> valid-logger.in
http-ingress.out -> metrics.in
validation.invalid -> invalid-logger.in
metrics.out -> validation.in
--------------------------------------------------------------------------------
------------------------------------ Output ------------------------------------
Pipeline log output available in file: /var/folders/zc/zzk9s6251d108k6nnc542nv00000gn/T/local-cloudflow765191599949274043/local-cloudflow2906863708067165705.log
--------------------------------------------------------------------------------
Running sensor-data
To terminate, press [ENTER]
只需查看启动时的日志,就能知道开放了哪些端口,因此只需要发送HTTP请求到这些端口,就能实际确认运行情况。
让我们按照以下方式发送HTTP请求。
如果您尚未安装httpie命令,也可以使用curl命令。
$ http POST http://localhost:3000 Content-Type:application/json < <(echo '{"deviceId":"c75cb448-df0e-4692-8e06-0321b7703992","timestamp":1495545346279,"measurements":{"power":1.7,"rotorSpeed":23.4,"windSpeed":100.1}}')
然后,可以确认以下日志已记录在日志文件中。
[INFO] [12/16/2019 19:34:27.403] [akka_streamlet-akka.actor.default-dispatcher-2] [akka.actor.ActorSystemImpl(akka_streamlet)] {"deviceId": "c75cb448-df0e-4692-8e06-0321b7703992", "timestamp": 1495545346279, "name": "power", "value": 1.7}
我已经可以在本地启动了。教程还写了如何在GKE环境中部署。
最后
我稍微尝试了一下Cloudflow。如果使用它,只要是主要的流处理引擎,都可以放在Cloudflow的框架上进行开发。关于能够减少多少开发成本和运营成本,还需要进一步的尝试才能知道,但我认为可以相对轻松地编写流处理。
然而,仅凭目前所能想到的,可能存在一些问题。例如,由于受限于Cloudflow的兼容性,可能无法使用最新版本的流处理引擎。因此,似乎还需要进行大量验证才能在生产环境中使用。