在使用Kafka Streams的WindowStateStore进行聚合处理时,处理未来时间戳可能存在的危险陷阱

Kafka Streams有一个名为Window Aggregation的机制,可以对事件进行计数等操作,以特定的时间间隔为单位。

所以,我写了一个利用它来计算每个用户事件执行次数的处理方法,但奇怪的是结果却不符合预期。
在测试代码中无法复制,并且出现了概率性结果偏差的情况。

我非常沉迷于其中,但问题出在处理未来时间戳的原因上。

验证版本

    kafka-streams-2.4.0

問題的詳細

尽管数据已经成功写入并传递到后端的Kafka主题,但当尝试重新获取数据时,无法获取到现有的聚合计数。
因此计数被重置,并且其他地方也无法获取结果的状态。

    • 確実にput処理は行われている

 

    紐付いているKafkaのトピックには確実にデータが届いている

原因:RocksDB的Segment因未来时间戳的影响而过期。

Kafka Streams的WindowStore中存在着保留期限。超过保留期限的数据将会被删除。
以以下例子为准,保持期限为30天。

  private StoreBuilder<WindowStore<String, Counter>> counterStore =
      Stores.windowStoreBuilder(
              Stores.persistentWindowStore(
                  "counter",
                  Duration.ofDays(30),
                  Duration.ofHours(24),
                  false),
              Serdes.String(),
              Serdes.Integer())
          .withCachingEnabled();

另外,WindowStore在内部通过Segment这个单位对数据目录进行划分。
Segment的间隔是保留期的一半时间间隔。(最短为1分钟)

大致上表现出以下的动作。

    • timestampと共にputを実行する

 

    timestampとsegment intervalからsegment idを算出する
    @Override
    public String segmentName(final long segmentId) {
        // (1) previous format used - as a separator so if this changes in the future
        // then we should use something different.
        // (2) previous format used : as a separator (which did break KafkaStreams on Windows OS)
        // so if this changes in the future then we should use something different.
        return name + "." + segmentId * segmentInterval;
    }

    @Override
    public long segmentId(final long timestamp) {
        return timestamp / segmentInterval;
    }
    対象のSegmentを特定しそこにデータをputする
    @Override
    public void put(final Bytes key,
                    final byte[] value) {
        final long timestamp = keySchema.segmentTimestamp(key);
        observedStreamTime = Math.max(observedStreamTime, timestamp);
        final long segmentId = segments.segmentId(timestamp);
        final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
        if (segment == null) {
            expiredRecordSensor.record();
            LOG.debug("Skipping record for expired segment.");
        } else {
            segment.put(key, value);
        }
    }
    • その際、受け取ったことのある最も未来のtimestampをobservedStreamTimeとして利用し、そこを基準にgetOrCreateSegmentIfLiveを実行する。

 

    • そしてその処理の中でobservedStreamTimeからretentioPeriodを引いた時間よりも前のSegmentをcleanupする。

 

    cleanup対象のsegmentは削除フラグを付与されてcloseされる。
    @Override
    public S getOrCreateSegmentIfLive(final long segmentId,
                                      final InternalProcessorContext context,
                                      final long streamTime) {
        final long minLiveTimestamp = streamTime - retentionPeriod;
        final long minLiveSegment = segmentId(minLiveTimestamp);

        final S toReturn;
        if (segmentId >= minLiveSegment) {
            // The segment is live. get it, ensure it's open, and return it.
            toReturn = getOrCreateSegment(segmentId, context);
        } else {
            toReturn = null;
        }

        cleanupEarlierThan(minLiveSegment);
        return toReturn;
    }

    private void cleanupEarlierThan(final long minLiveSegment) {
        final Iterator<Map.Entry<Long, S>> toRemove =
            segments.headMap(minLiveSegment, false).entrySet().iterator();

        while (toRemove.hasNext()) {
            final Map.Entry<Long, S> next = toRemove.next();
            toRemove.remove();
            final S segment = next.getValue();
            segment.close();
            try {
                segment.destroy();
            } catch (final IOException e) {
                log.error("Error destroying {}", segment, e);
            }
        }
    }

如果这里收到了一条具有相当未来时间戳的记录,observedStreamTime将被固定在未来的时间点上,这样在从中减去保留期之后,过去的StateStore将全部关闭且无法使用!

结论 –

简而言之,如果一次写入超过保留期限的未来时间戳,那么就无法获取或写入比该时间戳更早的结果。即使随后出现了正确的当前时间戳记录,也无法使用它。

如果出现具有一个月以上未来时间戳的记录,并且这些记录使用相同的分区,那么与这些记录关联的计数将会被破坏。而且,在发生这种情况时,没有任何错误日志或警告信息可供参考。目前,如果无法信任正在进行聚合的时间戳偏移量,那么必须始终排除具有超过一定未来时间戳的记录。即使你信任时间戳,也最好做好防护措施。

我非常担心诚实和大量的努力,以及对整个功能的可靠性非常不满意,但好在终于找到了问题的原因。

由于Kafka Streams的处理涉及到时间戳的机制,因此对于可能出现严重时间偏差数据的系统,需要注意处理的方法。