使用KafkaDirectStream时,使用正则表达式的方法和注意事项

创建 KafkaDirectStream 时,我认为需要传递要消费的主题信息,
可以设置正则表达式作为主题名称。

代码示例(KafkaDirectStream)

如果存在log20181201或log20181202等主题,则设置如下。

KafkaUtils.createDirectStream(
      sparkStreamingContext,
      PreferConsistent,
      SubscribePattern[String, String]("log.+".r.pattern, kafkaParams))

请注意

在需要顺序关键的处理中无法使用。

如果在上述例子中同时将数据放入log20181201和log20181202,并且需要忽略主题来消费数据,而需要考虑主题之间的顺序,那么就不能使用这种方法。

如果删除主题,则提交偏移量时会崩溃。

在KafkaDirectStream中,我们将已获取的偏移信息提交到Kafka,但同时也提交了没有任何活动的主题的信息。
因此,如果删除了未使用的旧主题,并且它们是正则表达式的匹配对象,可能会导致提交失败并导致程序崩溃。

如果遇到像上述问题这样的情况,可以将Kafka的auto.commit参数设置为false,然后明确地提交仅发生了偏移量变化的内容,如下所示。

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val commitOffsetRange = offsetRanges.filterNot(x => x.untilOffset == x.fromOffset)
kafkaDirectStream.asInstanceOf[CanCommitOffsets].commitAsync(commitOffsetRange)
广告
将在 10 秒后关闭
bannerAds