使用Kafka Streams对SensorTag进行窗口聚合
我們在Jupyter的運行環境中使用PySpark Streaming來試驗SensorTag的數據進行窗口聚合。儘管有幾個其他的流處理框架可供選擇,但下一步我們將嘗試使用Kafka Streams。與Spark不同的是,Kafka Streams不是一個集群,而是一個庫。目前,官方只支持Java作為開發語言。
Java环境
我打算使用Maven在Ubuntu 16.04上搭建的Eclim来编写代码。
项目
在项目目录中创建以下文件。完整的代码在这个存储库中。
$ tree
.
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── github
│ └── masato
│ └── streams
│ └── kafka
│ ├── App.java
│ ├── SensorSumDeserializer.java
│ ├── SensorSum.java
│ └── SensorSumSerializer.java
└── resources
└── logback.xml
9 directories, 7 files
App.java的汉语翻译选项:主程序.java
将代码分成几个部分进行说明。
数值的设定
主题名称等将从pom.xml中定义的环境变量中获取。WINDOWS_MINUTES是窗口聚合的时间间隔。COMMIT_MINUTES是Kafka自动提交缓存的时间间隔,后面会详细说明。在这里以分钟为单位指定。
public class App {
private static final Logger LOG = LoggerFactory.getLogger(App.class);
private static final String SOURCE_TOPIC = System.getenv("SOURCE_TOPIC");
private static final String SINK_TOPIC = System.getenv("SINK_TOPIC");
private static final long WINDOWS_MINUTES = 2L;
private static final long COMMIT_MINUTES = 3L;
序列化
创建记录的序列化器和反序列化器。在Kafka Streams应用程序中,将处理的中间结果保存到主题中,并实现流程。定义一个SerDe来同时处理从主题中读取记录的反序列化器和写入记录的序列化器。SerDe需要根据主题的键和值的类型来定义。
-
- jsonSerde
-
- SensorTagのレコードはキーは文字列、値はJacksonのJsonNodeオブジェクトです。
-
- sensorSumSerde
-
- SenroSumはカスタムで作成した周囲温度 (ambient)とウィンドウ集計の状態を保持するクラスです。
-
- stringSerde
-
- デフォルトのString用のSerDeです。今回メッセージのキーはすべてStringです。
-
- doubleSerde
- デフォルトのdouble用のSerDeです。SensorTagの周囲温度 (ambient)はdoubleでウィンドウ集計します。
public static void main(String[] args) throws Exception {
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde =
Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final Serializer<SensorSum> sensorSumSerializer =
new SensorSumSerializer();
final Deserializer<SensorSum> sensorSumDeserializer =
new SensorSumDeserializer();
final Serde<SensorSum> sensorSumSerde =
Serdes.serdeFrom(sensorSumSerializer,
sensorSumDeserializer);
final Serde<String> stringSerde = Serdes.String();
final Serde<Double> doubleSerde = Serdes.Double();
创建KStream
首先调用KStreamBuilder的stream()函数来创建KStream。其中,topic的键是字符串,并指定了值为JsonNode的SerDe。
final KStreamBuilder builder = new KStreamBuilder();
LOG.info("Starting Sorting Job");
final KStream<String, JsonNode> source =
builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
创建KGroupedStream
SensorTag的数据以JSON字符串的形式从树莓派3发送到Kafka主题。
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1502152524, 'humidity': 27.26287841796875, 'objecttemp': 21.1875, 'ambient': 27.03125, 'rh': 75.311279296875}
KStream的记录是具有键和值的KeyValue对象。在示例中,为了仅对环境温度的平均值进行窗口聚合,我们调用map()生成一个新的KStream,它只包含键和环境温度的对。
然后调用groupByKey()方法,根据键进行分组并创建一个KGroupedStream。记录的键是字符串,值是周围温度的double,因此需要指定各自的SerDe。
final KGroupedStream<String, Double> sensors =
source
.map((k, v) -> {
double ambient = v.get("ambient").asDouble();
return KeyValue.pair(k, ambient);
})
.groupByKey(stringSerde, doubleSerde);
使用KStram创建KTable
调用KGroupedStream的aggregate()方法将创建一个KTable。KTable将按键和指定的窗口间隔保持记录的总和和记录数的状态。
在`aggregate()`中,第一个参数`Initializer`被用于初始化在流聚合中使用的聚合器。在这里,我们初始化了用于窗口聚合状态的`SensorSum`。第二个参数实现了聚合器。它接收当前记录的键值、上一次记录处理中创建的`SensorSum`。每次数据到达时,它会将总和和记录数相加,并返回一个新的`SensorSum`。第三个参数定义了一个2分钟的`TimeWindows`。第四个参数是`SensorSum`的序列化和反序列化器,第五个参数是用于保持状态的主题名称。
final KTable<Windowed<String>, SensorSum> sensorAgg =
sensors
.aggregate(() -> new SensorSum(0D, 0)
, (aggKey, value, agg) -> new SensorSum(agg.sum + value, agg.count + 1)
, TimeWindows.of(TimeUnit.MINUTES.toMillis(WINDOWS_MINUTES))
, sensorSumSerde,
"sensorSum");
从KTable创建KStream。
使用KTable的mapValues()函数计算平均值。将总和除以记录数得到的平均值是一个新的Double类型记录的KTable。然后调用toStream()函数创建KStream。将记录格式化为时间戳、键、平均值的JSON字符串并输出到流中。时间戳采用ISO 8601格式,以便在不同系统之间方便进行数据交换。最后将记录保存到指定的主题,并结束。
final DateTimeFormatter fmt =
DateTimeFormatter.ISO_OFFSET_DATE_TIME;
sensorAgg
.<Double>mapValues((v) -> ((double) v.sum / v.count))
.toStream()
.map((key, avg) -> {
long end = key.window().end();
ZonedDateTime zdt =
new Date(end).toInstant()
.atZone(ZoneId.systemDefault());
String time = fmt.format(zdt);
String bid = key.key();
String retval =
String.format("{\"time\": \"%s\", \"bid\": \"%s\", \"ambient\": %f}",
time, bid, avg);
LOG.info(retval);
return new KeyValue<String,String>(bid, retval);
})
.to(SINK_TOPIC);
开始Kafka Streams
使用设置对象和构建器创建KafkaStreams,并启动Kafka Streams应用程序。还需要在SIGTERM时将Kafka Stream注册到关闭挂钩中以停止它。
final StreamsConfig config = new StreamsConfig(getProperties());
final KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
关于Kafka Streams的设置和超时问题
从环境变量等创建用于Kafka Streams设置的属性。
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
System.getenv("APPLICATION_ID_CONFIG"));
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
System.getenv("BOOTSTRAP_SERVERS_CONFIG"));
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
TimeUnit.MINUTES.toMillis(COMMIT_MINUTES));
return props;
}
COMMIT_INTERVAL_MS_CONFIG的中文释义为“提交间隔毫秒配置”。
在最初的时候,StreamsConfig.COMMIT_INTERVAL_MS_CONFIG没有进行更改。在将记录保存到主题之前,我们在KStream的map()中输出了日志。我想要在2分钟的窗口间隔内只输出一次最后的聚合结果,但结果却重复出现了4-5次,且次数不确定。
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.343750}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.385417}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.410156}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.440341}
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.450521}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}
{"time": "2017-08-08T10:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.562500}
根据以下参考文章,这似乎是期望从KTable的变更履历(changelog stream)这个特性中获得的行为。KTable没有窗口聚合的最终结果状态,更新后的值会按一定时间间隔进行提交更新到缓存中。在将KTable转换为KStream后,需要自己实现使用transform()或process()来去除记录重复的代码。
虽然无法完全消除重复的记录,但可以通过增加StreamsConfig.COMMIT_INTERVAL_MS_CONFIG的值来减少缓存提交的次数。默认值为30秒。
-
- How to send final kafka-streams aggregation result of a time windowed KTable?
-
- Immutable Record with Kafka Stream
-
- Kafka KStreams – processing timeouts
-
- Kafka Streams for Stream processing A few words about how Kafka works.
- Memory management
其他的班级
准备模型(SensorSum.java)、序列化器(SensorSumSerializer.java)和反序列化器(SensorSumDeserializer.java)的类。序列化器实现serialize()方法,将SensorSum的属性转换为字节数组。它将分配一个8字节的Double类型的字节缓冲区来存储周围温度总和,并使用4字节的Integer类型来存储记录数。
public byte[] serialize(String topic, SensorSum data) {
ByteBuffer buffer = ByteBuffer.allocate(8 + 4);
buffer.putDouble(data.sum);
buffer.putInt(data.count);
return buffer.array();
}
执行
用Exec Maven Plugin来执行Kafka Streams。
$ mvn clean install exec:exec@json
我尝试将窗口间隔设置为2分钟,并将缓存提交间隔指定为3分钟。虽然依然出现了几次重复的输出,但成功减少了重复输出的次数。
{"time": "2017-08-08T11:32:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414773}
{"time": "2017-08-08T11:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414063}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.453125}
{"time": "2017-08-08T11:36:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.476563}
{"time": "2017-08-08T11:38:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.546875}