How does Spark read data from Kafka?

To read data from Kafka in Spark, you can use Spark’s official Kafka integration library, known as Spark Streaming Kafka.

First, you need to add the Spark Streaming Kafka dependency to your Spark project. In a Maven project, you can add the following dependency to the pom.xml file:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    <version>3.0.2</version>
</dependency>

Next, you can create a StreamingContext using the SparkSession object and specify the batch processing interval.

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.kafka.common.serialization.StringDeserializer;

public class KafkaStreamingExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建SparkConf对象
        SparkConf sparkConf = new SparkConf().setAppName("KafkaStreamingExample").setMaster("local[*]");

        // 创建JavaStreamingContext对象
        JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));

        // 设置Kafka参数
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "test-group");

        // 创建Kafka主题列表
        Collection<String> topics = Arrays.asList("topic1", "topic2");

        // 创建Kafka输入流
        JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(
                streamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
        );

        // 处理Kafka数据
        kafkaStream.foreachRDD(rdd -> {
            // 在这里对每个RDD进行处理
            rdd.foreach(record -> {
                System.out.println("Key: " + record.key() + ", Value: " + record.value());
            });
        });

        // 启动流处理程序
        streamingContext.start();

        // 等待流处理程序终止
        streamingContext.awaitTermination();
    }
}

In the example above, we first create a SparkConf object and a JavaStreamingContext object. Then, we set the parameters for Kafka, including the Kafka server address, key and value deserialization classes, and consumer group ID. Next, we create a Kafka input stream and specify the topic to subscribe to and Kafka parameters. Finally, we use the foreachRDD method to process each RDD and extract the key and value for each record.

Please note that the createDirectStream method in the above example is suitable for Kafka version 0.10 and higher. If you are using an older version of Kafka, you can use another overloaded version of the createDirectStream method. Additionally, you can adjust other parameters and processing logic in the example as needed.

Leave a Reply 0

Your email address will not be published. Required fields are marked *


广告
Closing in 10 seconds
bannerAds