使用Apache Flink和Java 8对传感器数据进行窗口聚合

我使用Apache Flink和Scala API尝试了SensorTag的传感器数据的窗口聚合。我将尽量使用Java 8 API来重新编写Scala API。

Maven原型

使用Java API中的示例项目来创建一个Maven项目,使用flink-quickstart-java。Apache Flink的版本和之前的Scala版本相同,为1.3.2。请根据您的环境更改groupId和package。

$ mkdir -p ~/java_apps && cd ~/java_apps
$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.3.2 \
    -DgroupId=streams-flink-java-examples \
    -DartifactId=streams-flink-java-examples \
    -Dversion=0.1 \
    -Dpackage=com.github.masato.streams.flink \
    -DinteractiveMode=false

将maven-compiler-plugin插件的设置更改为Java 8 (1.8)。并添加exec-maven-plugin插件以使得能够从Maven中执行Flink应用程序的main()方法。

    <build>
        <plugins>
...
              <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                  <source>1.8</source>
                  <target>1.8</target>
                </configuration>
              </plugin>

              <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.5.0</version>
                <executions>
                  <execution>
                    <id>App</id>
                    <goals>
                      <goal>exec</goal>
                    </goals>
                  </execution>
                </executions>
                <configuration>
                  <executable>java</executable>
                  <classpathScope>compile</classpathScope>
                  <arguments>
                    <argument>-cp</argument>
                    <classpath/>
                    <argument>com.github.masato.streams.flink.WordCount</argument>
                  </arguments>
                </configuration>
              </plugin>

执行执行目标。WordCount示例将统计文本中的单词并输出到标准输出。

$ mvn clean package exec:exec@App
...
(is,1)
(a,1)
(in,1)
(mind,1)
(or,2)
(against,1)
(arms,1)
(not,1)
(sea,1)
(the,3)
(troubles,1)
(fortune,1)
(take,1)
(to,4)
(and,1)
(arrows,1)
(be,2)
(nobler,1)
(of,2)
(slings,1)
(suffer,1)
(outrageous,1)
(tis,1)
(whether,1)
(question,1)
(that,1)

窗口汇总 (suǒ)
Note: “ウィンドウ集計” means “window aggregation” in Japanese. The Chinese equivalent is “窗口汇总.”

先删除flink-quickstart-java原型创建的所有Java代码,然后编写新的代码。

$ rm src/main/java/com/github/masato/streams/flink/*.java

源代码也可以在存储库中找到。使用Scala编写的例子是在一个文件中,但是为了方便理解,Java的情况下我们将其分为了多个类。

$ tree streams-flink-java-examples
streams-flink-java-examples
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── github
        │           └── masato
        │               └── streams
        │                   └── flink
        │                       ├── Accumulator.java
        │                       ├── Aggregate.java
        │                       ├── App.java
        │                       ├── Average.java
        │                       └── Sensor.java
        └── resources
            └── log4j.properties

App.java的中文翻译可以是 “应用程序.java”。

以下是一个实现了主方法的程序的完整文本。与用Scala编写的App.scala示例类似,但将AggregateFunction和WindowFunction分别定义为类。

package com.github.masato.streams.flink;

import java.util.Date;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

public class App {
    private static DateTimeFormatter fmt = DateTimeFormatter.ISO_OFFSET_DATE_TIME;

    private static final String bootstrapServers = System.getenv("BOOTSTRAP_SERVERS_CONFIG");
    private static final String groupId = System.getenv("GROUP_ID");
    private static final String sourceTopic = System.getenv("SOURCE_TOPIC");

    private static final String sinkTopic = System.getenv("SINK_TOPIC");

    public static void main(String[] args) throws Exception {
        final Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootstrapServers);
        props.setProperty("group.id", groupId);

        final StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        final DataStream<ObjectNode> events =
                                env.addSource(new FlinkKafkaConsumer010<>(
                                  sourceTopic,
                                  new JSONDeserializationSchema(),
                                  props)).name("events");

        final SingleOutputStreamOperator<ObjectNode> timestamped =
            events
            .assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.seconds(10)) {
                    @Override
                    public long extractTimestamp(ObjectNode element) {
                        return element.get("time").asLong() * 1000;
                    }
                });

        timestamped
            .map((v) -> {
                    String key =  v.get("bid").asText();
                    double ambient = v.get("ambient").asDouble();
                    return new Sensor(key, ambient);
            })
            .keyBy(v -> v.key)
            .timeWindow(Time.seconds(60))
            .aggregate(new Aggregate(), new Average())
            .map((v) -> {
                    ZonedDateTime zdt =
                        new Date(v.time).toInstant().atZone(ZoneId.systemDefault());
                    String time = fmt.format(zdt);

                    Map<String, Object> payload = new HashMap<String, Object>();
                    payload.put("time", time);
                    payload.put("bid", v.bid);
                    payload.put("ambient", v.sum);

                    String retval = new ObjectMapper().writeValueAsString(payload);
                    System.out.println(retval);
                    return retval;
                })
            .addSink(new FlinkKafkaProducer010<String>(
                         bootstrapServers,
                         sinkTopic,
                         new SimpleStringSchema())
                     ).name("kafka");

        env.execute();
    }
}

传感器.java

在Scala中,使用BD地址作为键创建了一个Scala元组,用于表示流式传感器数据。

    timestamped
      .map { v =>
        val key =  v.get("bid").asText
        val ambient = v.get("ambient").asDouble
        (key, ambient)
      }

即使在Java 8中,也可以像Scala一样使用Tuple2。然而,根据在使用Apache Flink和Java 8的指南中所述,需要使用Eclipse JDT进行编译。另外,如果不使用TupleTypeInfo在returns()中指定元素的类型提示为Java类,将会出现错误。

            .map((v) -> {
                double ambient = v.get("value").get("ambient").asDouble();
                String key =  v.get("sensor").get("bid").asText();
                return new Tuple2<>(key, ambient);
            })
            .returns(new TupleTypeInfo<>(TypeInformation.of(String.class),
                                         TypeInformation.of(Double.class)))

因为稍微麻烦,所以使用普通的POJO更容易。

package com.github.masato.streams.flink;

public class Sensor {
    public String key;
    public double ambient;

    public Sensor(String key, double ambient) {
        this.key = key;
        this.ambient = ambient;
    }
}

聚合.java

实现AggregateFunction接口。与Scala不同的是,Accumulator不是case类,但其他方面几乎相同。

package com.github.masato.streams.flink;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.common.functions.AggregateFunction;

public class Aggregate implements AggregateFunction<Sensor, Accumulator, Accumulator> {

    private static final long serialVersionUID = 3355966737412029618L;

    @Override
    public Accumulator createAccumulator() {
        return new Accumulator(0L, "", 0.0, 0);
    }

    @Override
    public Accumulator merge(Accumulator a, Accumulator b) {
        a.count += b.count;
        a.sum += b.sum;
        return a;
    }

    @Override
    public void add(Sensor value, Accumulator acc) {
        acc.sum += value.ambient;
        acc.count++;
    }

    @Override
    public Accumulator getResult(Accumulator acc) {
        return acc;
    }
}

平均值.java

这是WindowFunction的实现。

package com.github.masato.streams.flink;

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.util.Collector;

public class Average implements WindowFunction<Accumulator,
                                               Accumulator, String, TimeWindow> {

    private static final long serialVersionUID = 5532466889638450746L;

    @Override
    public void apply(String key,
                      TimeWindow window,
                      Iterable<Accumulator> input,
                      Collector<Accumulator> out) {

        Accumulator in = input.iterator().next();
        out.collect(new Accumulator(window.getEnd(), key, in.sum/in.count, in.count));
    }
}

在Scala中,我已经直接将WindowFunction的apply()方法实现写在了aggregate中。

      .aggregate(new Aggregate(),
        ( key: String,
          window: TimeWindow,
          input: Iterable[Accumulator],
          out: Collector[Accumulator] ) => {
            var in = input.iterator.next()
            out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count))
          }
      )

pom.xml -> 项目对象模型.xml

Stream的Source使用Kafka。连接信息设置在exec-maven-plugin的环境变量中。请参考此处准备SensorTag和Raspberry Pi 3,以及构建Kafka集群。

    <build>
        <plugins>
...
              <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                  <source>1.8</source>
                  <target>1.8</target>
                </configuration>
              </plugin>

              <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.5.0</version>
                <executions>
                  <execution>
                    <id>App</id>
                    <goals>
                      <goal>exec</goal>
                    </goals>
                  </execution>
                </executions>
                <configuration>
                  <executable>java</executable>
                  <classpathScope>compile</classpathScope>
                  <arguments>
                    <argument>-cp</argument>
                    <classpath/>
                    <argument>com.github.masato.streams.flink.App</argument>
                  </arguments>
                  <environmentVariables>
                    <APPLICATION_ID_CONFIG>sensortag</APPLICATION_ID_CONFIG>
                    <BOOTSTRAP_SERVERS_CONFIG>confluent:9092</BOOTSTRAP_SERVERS_CONFIG>
                    <SOURCE_TOPIC>sensortag</SOURCE_TOPIC>
                    <SINK_TOPIC>sensortag-sink</SINK_TOPIC>
                    <GROUP_ID>flinkGroup</GROUP_ID>
                  </environmentVariables>
                </configuration>
              </plugin>

进行

在将Raspberry Pi 3的SensorTag数据发送到Kafka之后,执行exec-maven-plugin插件的exec目标。

$ mvn clean install exec:exec@App

使用60秒的滚动窗口对周围温度进行了平均值聚合,并将结果输出到标准输出。

{"ambient":28.395833333333332,"time":"2017-08-28T11:57:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.44375,"time":"2017-08-28T11:58:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.46875,"time":"2017-08-28T11:59:00+09:00","bid":"B0:B4:48:BD:DA:03"}
{"ambient":28.5,"time":"2017-08-28T12:00:00+09:00","bid":"B0:B4:48:BD:DA:03"}
广告
将在 10 秒后关闭
bannerAds