使用Kafka Streams对SensorTag进行窗口聚合

我們在Jupyter的運行環境中使用PySpark Streaming來試驗SensorTag的數據進行窗口聚合。儘管有幾個其他的流處理框架可供選擇,但下一步我們將嘗試使用Kafka Streams。與Spark不同的是,Kafka Streams不是一個集群,而是一個庫。目前,官方只支持Java作為開發語言。

Java环境

我打算使用Maven在Ubuntu 16.04上搭建的Eclim来编写代码。

项目

在项目目录中创建以下文件。完整的代码在这个存储库中。

$  tree
.
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── github
        │           └── masato
        │               └── streams
        │                   └── kafka
        │                       ├── App.java
        │                       ├── SensorSumDeserializer.java
        │                       ├── SensorSum.java
        │                       └── SensorSumSerializer.java
        └── resources
            └── logback.xml

9 directories, 7 files

App.java的汉语翻译选项:主程序.java

将代码分成几个部分进行说明。

数值的设定

主题名称等将从pom.xml中定义的环境变量中获取。WINDOWS_MINUTES是窗口聚合的时间间隔。COMMIT_MINUTES是Kafka自动提交缓存的时间间隔,后面会详细说明。在这里以分钟为单位指定。

public class App {

    private static final Logger LOG = LoggerFactory.getLogger(App.class);

    private static final String SOURCE_TOPIC = System.getenv("SOURCE_TOPIC");
    private static final String SINK_TOPIC = System.getenv("SINK_TOPIC");
    private static final long WINDOWS_MINUTES = 2L;
    private static final long COMMIT_MINUTES = 3L;

序列化

创建记录的序列化器和反序列化器。在Kafka Streams应用程序中,将处理的中间结果保存到主题中,并实现流程。定义一个SerDe来同时处理从主题中读取记录的反序列化器和写入记录的序列化器。SerDe需要根据主题的键和值的类型来定义。

    • jsonSerde

 

    • SensorTagのレコードはキーは文字列、値はJacksonのJsonNodeオブジェクトです。

 

    • sensorSumSerde

 

    • SenroSumはカスタムで作成した周囲温度 (ambient)とウィンドウ集計の状態を保持するクラスです。

 

    • stringSerde

 

    • デフォルトのString用のSerDeです。今回メッセージのキーはすべてStringです。

 

    • doubleSerde

 

    デフォルトのdouble用のSerDeです。SensorTagの周囲温度 (ambient)はdoubleでウィンドウ集計します。
    public static void main(String[] args) throws Exception {

        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde =
            Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

        final Serializer<SensorSum> sensorSumSerializer =
            new SensorSumSerializer();
        final Deserializer<SensorSum> sensorSumDeserializer =
            new SensorSumDeserializer();
        final Serde<SensorSum> sensorSumSerde =
            Serdes.serdeFrom(sensorSumSerializer,
                             sensorSumDeserializer);

        final Serde<String> stringSerde = Serdes.String();
        final Serde<Double> doubleSerde = Serdes.Double();

创建KStream

首先调用KStreamBuilder的stream()函数来创建KStream。其中,topic的键是字符串,并指定了值为JsonNode的SerDe。

        final KStreamBuilder builder = new KStreamBuilder();

        LOG.info("Starting Sorting Job");

        final KStream<String, JsonNode> source =
            builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);

创建KGroupedStream

SensorTag的数据以JSON字符串的形式从树莓派3发送到Kafka主题。

{'bid': 'B0:B4:48:BE:5E:00', 'time': 1502152524, 'humidity': 27.26287841796875, 'objecttemp': 21.1875, 'ambient': 27.03125, 'rh': 75.311279296875}

KStream的记录是具有键和值的KeyValue对象。在示例中,为了仅对环境温度的平均值进行窗口聚合,我们调用map()生成一个新的KStream,它只包含键和环境温度的对。

然后调用groupByKey()方法,根据键进行分组并创建一个KGroupedStream。记录的键是字符串,值是周围温度的double,因此需要指定各自的SerDe。

        final KGroupedStream<String, Double> sensors =
            source
            .map((k, v) -> {
                    double ambient = v.get("ambient").asDouble();
                    return KeyValue.pair(k, ambient);
                })
            .groupByKey(stringSerde, doubleSerde);

使用KStram创建KTable

调用KGroupedStream的aggregate()方法将创建一个KTable。KTable将按键和指定的窗口间隔保持记录的总和和记录数的状态。

在`aggregate()`中,第一个参数`Initializer`被用于初始化在流聚合中使用的聚合器。在这里,我们初始化了用于窗口聚合状态的`SensorSum`。第二个参数实现了聚合器。它接收当前记录的键值、上一次记录处理中创建的`SensorSum`。每次数据到达时,它会将总和和记录数相加,并返回一个新的`SensorSum`。第三个参数定义了一个2分钟的`TimeWindows`。第四个参数是`SensorSum`的序列化和反序列化器,第五个参数是用于保持状态的主题名称。

        final KTable<Windowed<String>, SensorSum> sensorAgg =
            sensors
            .aggregate(() -> new SensorSum(0D, 0)
                       , (aggKey, value, agg) -> new SensorSum(agg.sum + value, agg.count + 1)
                       , TimeWindows.of(TimeUnit.MINUTES.toMillis(WINDOWS_MINUTES))
                       , sensorSumSerde,
                       "sensorSum");

从KTable创建KStream。

使用KTable的mapValues()函数计算平均值。将总和除以记录数得到的平均值是一个新的Double类型记录的KTable。然后调用toStream()函数创建KStream。将记录格式化为时间戳、键、平均值的JSON字符串并输出到流中。时间戳采用ISO 8601格式,以便在不同系统之间方便进行数据交换。最后将记录保存到指定的主题,并结束。

        final DateTimeFormatter fmt =
            DateTimeFormatter.ISO_OFFSET_DATE_TIME;

        sensorAgg
            .<Double>mapValues((v) -> ((double) v.sum / v.count))
            .toStream()
            .map((key, avg) -> {
                    long end = key.window().end();
                    ZonedDateTime zdt =
                        new Date(end).toInstant()
                        .atZone(ZoneId.systemDefault());
                    String time = fmt.format(zdt);
                    String bid = key.key();
                    String retval =
                        String.format("{\"time\": \"%s\", \"bid\": \"%s\", \"ambient\": %f}",
                                      time, bid, avg);
                    LOG.info(retval);
                    return new KeyValue<String,String>(bid, retval);
             })
            .to(SINK_TOPIC);

开始Kafka Streams

使用设置对象和构建器创建KafkaStreams,并启动Kafka Streams应用程序。还需要在SIGTERM时将Kafka Stream注册到关闭挂钩中以停止它。

        final StreamsConfig config = new StreamsConfig(getProperties());
        final KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

关于Kafka Streams的设置和超时问题

从环境变量等创建用于Kafka Streams设置的属性。

    private static Properties getProperties() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,
                  System.getenv("APPLICATION_ID_CONFIG"));
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
                  System.getenv("BOOTSTRAP_SERVERS_CONFIG"));
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
                  Serdes.String().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
                  Serdes.String().getClass().getName());
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                  WallclockTimestampExtractor.class.getName());
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
                  TimeUnit.MINUTES.toMillis(COMMIT_MINUTES));

        return props;
    }

COMMIT_INTERVAL_MS_CONFIG的中文释义为“提交间隔毫秒配置”。

在最初的时候,StreamsConfig.COMMIT_INTERVAL_MS_CONFIG没有进行更改。在将记录保存到主题之前,我们在KStream的map()中输出了日志。我想要在2分钟的窗口间隔内只输出一次最后的聚合结果,但结果却重复出现了4-5次,且次数不确定。

{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.343750}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.385417}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.410156}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.440341}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.450521}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}

根据以下参考文章,这似乎是期望从KTable的变更履历(changelog stream)这个特性中获得的行为。KTable没有窗口聚合的最终结果状态,更新后的值会按一定时间间隔进行提交更新到缓存中。在将KTable转换为KStream后,需要自己实现使用transform()或process()来去除记录重复的代码。

虽然无法完全消除重复的记录,但可以通过增加StreamsConfig.COMMIT_INTERVAL_MS_CONFIG的值来减少缓存提交的次数。默认值为30秒。

    • How to send final kafka-streams aggregation result of a time windowed KTable?

 

    • Immutable Record with Kafka Stream

 

    • Kafka KStreams – processing timeouts

 

    • Kafka Streams for Stream processing A few words about how Kafka works.

 

    Memory management

其他的班级

准备模型(SensorSum.java)、序列化器(SensorSumSerializer.java)和反序列化器(SensorSumDeserializer.java)的类。序列化器实现serialize()方法,将SensorSum的属性转换为字节数组。它将分配一个8字节的Double类型的字节缓冲区来存储周围温度总和,并使用4字节的Integer类型来存储记录数。

    public byte[] serialize(String topic, SensorSum data) {
        ByteBuffer buffer = ByteBuffer.allocate(8 + 4);
        buffer.putDouble(data.sum);
        buffer.putInt(data.count);

        return buffer.array();
    }

执行

用Exec Maven Plugin来执行Kafka Streams。

$ mvn clean install exec:exec@json

我尝试将窗口间隔设置为2分钟,并将缓存提交间隔指定为3分钟。虽然依然出现了几次重复的输出,但成功减少了重复输出的次数。

{"time": "2017-08-08T11:32:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414773}
{"time": "2017-08-08T11:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414063}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.453125}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.476563}
{"time": "2017-08-08T11:38:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.546875}
广告
将在 10 秒后关闭
bannerAds