使用KafkaDirectStream时,使用正则表达式的方法和注意事项
创建 KafkaDirectStream 时,我认为需要传递要消费的主题信息,
可以设置正则表达式作为主题名称。
代码示例(KafkaDirectStream)
如果存在log20181201或log20181202等主题,则设置如下。
KafkaUtils.createDirectStream(
sparkStreamingContext,
PreferConsistent,
SubscribePattern[String, String]("log.+".r.pattern, kafkaParams))
请注意
在需要顺序关键的处理中无法使用。
如果在上述例子中同时将数据放入log20181201和log20181202,并且需要忽略主题来消费数据,而需要考虑主题之间的顺序,那么就不能使用这种方法。
如果删除主题,则提交偏移量时会崩溃。
在KafkaDirectStream中,我们将已获取的偏移信息提交到Kafka,但同时也提交了没有任何活动的主题的信息。
因此,如果删除了未使用的旧主题,并且它们是正则表达式的匹配对象,可能会导致提交失败并导致程序崩溃。
如果遇到像上述问题这样的情况,可以将Kafka的auto.commit参数设置为false,然后明确地提交仅发生了偏移量变化的内容,如下所示。
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val commitOffsetRange = offsetRanges.filterNot(x => x.untilOffset == x.fromOffset)
kafkaDirectStream.asInstanceOf[CanCommitOffsets].commitAsync(commitOffsetRange)