在使用Kafka Streams的WindowStateStore进行聚合处理时,处理未来时间戳可能存在的危险陷阱
Kafka Streams有一个名为Window Aggregation的机制,可以对事件进行计数等操作,以特定的时间间隔为单位。
所以,我写了一个利用它来计算每个用户事件执行次数的处理方法,但奇怪的是结果却不符合预期。
在测试代码中无法复制,并且出现了概率性结果偏差的情况。
我非常沉迷于其中,但问题出在处理未来时间戳的原因上。
验证版本
- kafka-streams-2.4.0
問題的詳細
尽管数据已经成功写入并传递到后端的Kafka主题,但当尝试重新获取数据时,无法获取到现有的聚合计数。
因此计数被重置,并且其他地方也无法获取结果的状态。
-
- 確実にput処理は行われている
- 紐付いているKafkaのトピックには確実にデータが届いている
原因:RocksDB的Segment因未来时间戳的影响而过期。
Kafka Streams的WindowStore中存在着保留期限。超过保留期限的数据将会被删除。
以以下例子为准,保持期限为30天。
private StoreBuilder<WindowStore<String, Counter>> counterStore =
Stores.windowStoreBuilder(
Stores.persistentWindowStore(
"counter",
Duration.ofDays(30),
Duration.ofHours(24),
false),
Serdes.String(),
Serdes.Integer())
.withCachingEnabled();
另外,WindowStore在内部通过Segment这个单位对数据目录进行划分。
Segment的间隔是保留期的一半时间间隔。(最短为1分钟)
大致上表现出以下的动作。
-
- timestampと共にputを実行する
- timestampとsegment intervalからsegment idを算出する
@Override
public String segmentName(final long segmentId) {
// (1) previous format used - as a separator so if this changes in the future
// then we should use something different.
// (2) previous format used : as a separator (which did break KafkaStreams on Windows OS)
// so if this changes in the future then we should use something different.
return name + "." + segmentId * segmentInterval;
}
@Override
public long segmentId(final long timestamp) {
return timestamp / segmentInterval;
}
- 対象のSegmentを特定しそこにデータをputする
@Override
public void put(final Bytes key,
final byte[] value) {
final long timestamp = keySchema.segmentTimestamp(key);
observedStreamTime = Math.max(observedStreamTime, timestamp);
final long segmentId = segments.segmentId(timestamp);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
if (segment == null) {
expiredRecordSensor.record();
LOG.debug("Skipping record for expired segment.");
} else {
segment.put(key, value);
}
}
-
- その際、受け取ったことのある最も未来のtimestampをobservedStreamTimeとして利用し、そこを基準にgetOrCreateSegmentIfLiveを実行する。
-
- そしてその処理の中でobservedStreamTimeからretentioPeriodを引いた時間よりも前のSegmentをcleanupする。
- cleanup対象のsegmentは削除フラグを付与されてcloseされる。
@Override
public S getOrCreateSegmentIfLive(final long segmentId,
final InternalProcessorContext context,
final long streamTime) {
final long minLiveTimestamp = streamTime - retentionPeriod;
final long minLiveSegment = segmentId(minLiveTimestamp);
final S toReturn;
if (segmentId >= minLiveSegment) {
// The segment is live. get it, ensure it's open, and return it.
toReturn = getOrCreateSegment(segmentId, context);
} else {
toReturn = null;
}
cleanupEarlierThan(minLiveSegment);
return toReturn;
}
private void cleanupEarlierThan(final long minLiveSegment) {
final Iterator<Map.Entry<Long, S>> toRemove =
segments.headMap(minLiveSegment, false).entrySet().iterator();
while (toRemove.hasNext()) {
final Map.Entry<Long, S> next = toRemove.next();
toRemove.remove();
final S segment = next.getValue();
segment.close();
try {
segment.destroy();
} catch (final IOException e) {
log.error("Error destroying {}", segment, e);
}
}
}
如果这里收到了一条具有相当未来时间戳的记录,observedStreamTime将被固定在未来的时间点上,这样在从中减去保留期之后,过去的StateStore将全部关闭且无法使用!
结论 –
简而言之,如果一次写入超过保留期限的未来时间戳,那么就无法获取或写入比该时间戳更早的结果。即使随后出现了正确的当前时间戳记录,也无法使用它。
如果出现具有一个月以上未来时间戳的记录,并且这些记录使用相同的分区,那么与这些记录关联的计数将会被破坏。而且,在发生这种情况时,没有任何错误日志或警告信息可供参考。目前,如果无法信任正在进行聚合的时间戳偏移量,那么必须始终排除具有超过一定未来时间戳的记录。即使你信任时间戳,也最好做好防护措施。
我非常担心诚实和大量的努力,以及对整个功能的可靠性非常不满意,但好在终于找到了问题的原因。
由于Kafka Streams的处理涉及到时间戳的机制,因此对于可能出现严重时间偏差数据的系统,需要注意处理的方法。