我试用了Cloudflow

今天是Advent Calendar的第17天。
我尝试了上个月由Lightbend公司发布的Cloudflow。我正在根据教程写一篇文章,列出我认为重要的要点。这篇文章是为那些希望快速了解概述的人准备的!
此外,本文的内容都可以在官方指南中找到。

Cloudflow是什么?

Cloudflow是由Lightbend公司开发的开源软件,旨在利用分布式流处理在Kubernetes上快速开发和运营应用程序。
它将流处理的每个处理步骤拆分成小的组件,并使用基于模式的方法将这些组件连接起来,以构建复杂的系统。

目前,Akka Streams、Apache Spark、Apache Flink的流处理引擎可集成到Kubernetes环境中,并且可以轻松进行必要的开发和操作,直到部署为止(它们提供了不需要样板文件的功能,还可以生成Docker镜像的命令等)。

简而言之,它是一个实现流处理方便的框架。

基本概念

首先,让我们介绍一下在Cloudflow中出现的术语和概念。

apps.png

图中的Ingress, Processor, Egress被称为Streamlet组件,组件之间是松散耦合的。Streamlet具有称为inlet和outlet的输入输出,并使用它们连接到各个组件。

Streamlet被分为进程、处理器、出口和扇出等形状,并具有以下不同之处。

    • Ingress: outletだけ持つ。入力はHTTPリクエスト等

 

    • Processor: inlet/outletを1つずつ持つ。メインの処理はここに書く

 

    • FanOut: Splitterとも呼ばれる。1つのinlet、複数のoutletを持つ。条件分岐するために使う。

 

    Egress: inletだけ持つ。Slackに通知、HDFSに書き込むといった何かしら副作用のある処理を書く。

此外,围绕Streamlet的Blueprint正是指设计图,用于基于架构模式表示各个Streamlet之间的连接。

Cloudflow中使用的术语大约是这些。概念本身很简单。因为它是由Akka Streams的开发人员开发的,所以概念有些相似。

试着开始做Getting Started。

那么,我们开始操作Cloudflow。
主题是https://cloudflow.io/docs/current/get-started/index.html。

作为一个类似于“Hello World”的应用程序,我找到了一个教程,其主题是创建一个能够处理从风力发电机发送的事件的管道处理。我已经尝试在本地启动该应用程序。
教程中也包含将应用程序部署到Kubernetes环境的步骤。如果你有兴趣,可以参考官方指南。

准备

请安装以下内容。

    • Java 8 (JDK)

 

    sbt(バージョン1.2.8以上)

如果要在Kubernetes环境中部署,需要使用Docker和kubectl。

整体的形象

pipe.001.jpeg

整体情况如上图所示。我认为您已经看到了,但我会解释一下过程,如下所示。

    1. 在Ingress上接收输入数据

 

    1. 在Processor中将输入转换为领域对象

 

    1. 在Splitter上对输入值进行验证,并进行分支处理

 

    在Egress上记录与每个分支相关的日志记录

好的,接下来我们将从下一节开始进行实际编码。

项目准备

项目的组成如下所示。

|-project
|--cloudflow-plugins.sbt
|-src
|---main
|-----avro
|-----blueprint
|-----resources
|-----scala
|-------sensordata
|-build.sbt

我认为这个构成几乎与sbt相同,但有一些我觉得值得关注的地方,我在下面进行讨论。

project/cloudflow-plugins.sbt: Cloudflowをsbtで有効化するためのプラグインを記述するファイル

avro: Xxx.avscといったavro形式でドメインオブジェクトを定義する。コンパイル時に動的にScalaファイルが生成される

blueprint: Streamletを接続するためのスキーマを定義するblueprint.confを配置する

在这种情况下,使用Avro格式定义领域对象并在blueprint.conf中定义管道是关键。

构建.sbt

在中文中,”build.sbt”可以这样定义。

import sbt._
import sbt.Keys._

lazy val sensorData =  (project in file("."))
  .enablePlugins(CloudflowAkkaStreamsApplicationPlugin)
  .settings(
    libraryDependencies ++= Seq(
      "com.lightbend.akka"     %% "akka-stream-alpakka-file"  % "1.1.2",
      "com.typesafe.akka"      %% "akka-http-spray-json"      % "10.1.10",
      "ch.qos.logback"         %  "logback-classic"           % "1.2.3",
      "com.typesafe.akka"      %% "akka-http-testkit"         % "10.1.10" % "test"
    ),
    name := "sensor-data",
    organization := "com.lightbend",

    scalaVersion := "2.12.10",
    crossScalaVersions := Vector(scalaVersion.value)
  )

以下为中文的同义表达:
重点是

.enablePlugins(CloudflowAkkaStreamsApplicationPlugin)

在使用Cloudflow和Akka Streams创建应用程序的情况下,您需要启用此插件。如果您想要使用Spark,则启用下面的选项。根据所使用的流处理引擎的不同进行选择。

.enablePlugins(CloudflowSparkApplicationPlugin)

另外,cloudflow-plugins.sbt的设置如下所示。

addSbtPlugin("com.lightbend.cloudflow" % "sbt-cloudflow" % "1.3.0-M5")

用Avro定义域对象。

这里的领域对象指的是在Streamlet之间传递的对象。
如果使用Avro格式进行定义,它会在编译时将其转换为Scala文件。

在这个教程中,我准备了几个选项,但为了避免文章太长,只展示其中一个。

{
    "namespace": "sensordata",
    "type": "record",
    "name": "SensorData",
    "fields":[
         {
            "name": "deviceId",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
         },
         {
            "name": "timestamp",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            }
         },
         {
            "name": "measurements", "type": "sensordata.Measurements"
         }
    ]
}

传感器数据是第一个输入对象。假设我们不讨论Avro的细节,可以大致了解它具有deviceId、timestamp和measurements三个字段的模式。
此外,生成的Scala文件将如下所示。

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package sensordata

import scala.annotation.switch

case class SensorData(var deviceId: java.util.UUID, var timestamp: java.time.Instant, var measurements: Measurements) extends org.apache.avro.specific.SpecificRecordBase {
  ... //省略
}

我省略了一部分的内容,但是您应该可以看到我们确实拥有与avro文件定义相符的字段。

Streamlet的实现

我们将在 src/main/scala/sensordata 下实现 Streamlet。
由于本次实现基于 Akka Streams,所以写法也将采用类似 Akka Streams 的风格。

因为如果在这里列出所有的Streamlet,文章会变得很长,所以我将在输入部分列出对应的Ingress Streamlet和Akka Streams的API,并给出实际使用的代码。

Ingress的实施。

package sensordata

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._

import cloudflow.akkastream._
import cloudflow.akkastream.util.scaladsl._

import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import SensorDataJsonSupport._

class SensorDataHttpIngress extends AkkaServerStreamlet {
  // create an outlet that will use the default Kafka partitioning
  val out =
  // define the streamlet shapte
  AvroOutlet[SensorData]("out").withPartitioner(RoundRobinPartitioner)
  def shape = StreamletShape.withOutlets(out)
  // override createLogic to provide the streamlet behavior
  override def createLogic = HttpServerLogic.default(this, out)
}

继承AkkaServerStreamlet后,可以定义一个接收HTTP输入的Ingress。而且,只要定义了outlet,就可以了。在这里,我们直接将输入流向outlet。
忽略shape和createLogic的定义,重点在于以下内容。

AvroOutlet[SensorData]("out").withPartitioner(RoundRobinPartitioner)

定义了将SensorData流向out的操作。同时,Streamlet之间通过Kafka进行连接,将Kafka的记录方式设置为RoundRobinPartitioner。通过经过Kafka,就像下面的引用所示,旨在使Streamlet之间的生命周期独立。

不同组件的生命周期可以独立管理,因为在底层发布-订阅系统中,Streamlet 之间发送的数据是安全持久化的。

(引用来源:https://github.com/lightbend/cloudflow)
转述:

处理器的实现

接下来,让我们来看一下处理器的实现。

package sensordata

import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets.{ RoundRobinPartitioner, StreamletShape }
import cloudflow.streamlets.avro._

class SensorDataToMetrics extends AkkaStreamlet {
  // define inlets and outlets
  val in = AvroInlet[SensorData]("in")
  val out = AvroOutlet[Metric]("out").withPartitioner(RoundRobinPartitioner)
  // define the streamlet shape
  val shape = StreamletShape(in, out)
  // define a flow that makes it possible for cloudflow to commit reads
  def flow = {
    FlowWithOffsetContext[SensorData]
      .mapConcat { data 
        List(
          Metric(data.deviceId, data.timestamp, "power", data.measurements.power),
          Metric(data.deviceId, data.timestamp, "rotorSpeed", data.measurements.rotorSpeed),
          Metric(data.deviceId, data.timestamp, "windSpeed", data.measurements.windSpeed)
        )
      }
  }
  // override createLogic to provide streamlet behavior
  override def createLogic = new RunnableGraphStreamletLogic() {
    def runnableGraph = sourceWithOffsetContext(in).via(flow).to(sinkWithOffsetContext(out))
  }
}

只是在进行将 SensorData 转换为 Metric 的处理,但我们希望关注调用 Akka Streams API 的部分。

def runnableGraph = sourceWithOffsetContext(in).via(flow).to(sinkWithOffsetContext(out))

sourceWithOffsetContext是Akka Streams的API,用于从Kafka消费记录。此外,还有一些Akka Streams常用的API,如via和to。通过使用Akka Streams,可以将其集成到Cloudflow框架中。

制定蓝图

当完成一系列的Streamlet实现后,将开始连接这些Streamlet。
我们定义下面的blueprint.conf。

blueprint {
  streamlets {
    http-ingress = sensordata.SensorDataHttpIngress
    metrics = sensordata.SensorDataToMetrics
    validation = sensordata.MetricsValidation
    valid-logger = sensordata.ValidMetricLogger
    invalid-logger = sensordata.InvalidMetricLogger
  }

  connections {
    http-ingress.out = [metrics.in]
    metrics.out = [validation.in]
    validation.invalid = [invalid-logger.in]
    validation.valid = [valid-logger.in]
  }
}

您可以指定已实现的Streamlet,并定义它与inlet/outlet的连接位置。从配置的内容中,我认为您大致可以感受到它的氛围和大致了解其意思。

关键是在配置文件中定义的。松散耦合的Streamlet在Scala文件中进行编写,并在配置文件中描述如何连接。这样就可以在运行时组装管道。

以上是编码完成。

我来试试实际启动一下。

在项目的根目录下执行以下命令,开始流处理。

$ sbt verifyBlueprint
$ sbt runLocal
[info] Streamlet 'sensordata.SensorDataToMetrics' found
[info] Streamlet 'sensordata.MetricsValidation' found
[info] Streamlet 'sensordata.SensorDataHttpIngress' found
[info] Streamlet 'sensordata.ValidMetricLogger' found
[info] Streamlet 'sensordata.InvalidMetricLogger' found
[success] /Users/mattsu/ideaProjects/cloudflow-test/src/main/blueprint/blueprint.conf verified.
[info] No local.conf file location configured. 
[info] Set 'runLocalConfigFile' in your build to point to your local.conf location 
---------------------------------- Streamlets ----------------------------------
http-ingress [sensordata.SensorDataHttpIngress]
        - HTTP port [3000]
invalid-logger [sensordata.InvalidMetricLogger]
metrics [sensordata.SensorDataToMetrics]
valid-logger [sensordata.ValidMetricLogger]iptors 1s
validation [sensordata.MetricsValidation]
--------------------------------------------------------------------------------

--------------------------------- Connections ---------------------------------
validation.valid -> valid-logger.in
http-ingress.out -> metrics.in
validation.invalid -> invalid-logger.in
metrics.out -> validation.in
--------------------------------------------------------------------------------

------------------------------------ Output ------------------------------------
Pipeline log output available in file: /var/folders/zc/zzk9s6251d108k6nnc542nv00000gn/T/local-cloudflow765191599949274043/local-cloudflow2906863708067165705.log
--------------------------------------------------------------------------------

Running sensor-data  
To terminate, press [ENTER]

只需查看启动时的日志,就能知道开放了哪些端口,因此只需要发送HTTP请求到这些端口,就能实际确认运行情况。

让我们按照以下方式发送HTTP请求。
如果您尚未安装httpie命令,也可以使用curl命令。

$ http POST http://localhost:3000 Content-Type:application/json < <(echo '{"deviceId":"c75cb448-df0e-4692-8e06-0321b7703992","timestamp":1495545346279,"measurements":{"power":1.7,"rotorSpeed":23.4,"windSpeed":100.1}}')

然后,可以确认以下日志已记录在日志文件中。

[INFO] [12/16/2019 19:34:27.403] [akka_streamlet-akka.actor.default-dispatcher-2] [akka.actor.ActorSystemImpl(akka_streamlet)] {"deviceId": "c75cb448-df0e-4692-8e06-0321b7703992", "timestamp": 1495545346279, "name": "power", "value": 1.7}

我已经可以在本地启动了。教程还写了如何在GKE环境中部署。

最后

我稍微尝试了一下Cloudflow。如果使用它,只要是主要的流处理引擎,都可以放在Cloudflow的框架上进行开发。关于能够减少多少开发成本和运营成本,还需要进一步的尝试才能知道,但我认为可以相对轻松地编写流处理。

然而,仅凭目前所能想到的,可能存在一些问题。例如,由于受限于Cloudflow的兼容性,可能无法使用最新版本的流处理引擎。因此,似乎还需要进行大量验证才能在生产环境中使用。

广告
将在 10 秒后关闭
bannerAds