尝试在 Oracle Cloud 中运行 Consumer Group 的 Streaming 示例

首先

在Oracle Cloud Infrastructure(OCI)中,提供了名为Streaming的服务,可以实时地收集和处理流数据。Streaming具有与Apache Kafka兼容的API,因此可以从Kafka Client进行连接,并进行数据的生产和消费操作。

这次我们将试着使用 Consumer Group 从多个 Consumer 中获取数据。Consumer Group 是一种能够在多个 Consumer 之间进行负载均衡并获取数据的功能。这篇文章中有配有图解的解释。我们也会确认 OCI 的 Streaming 是否有类似的行为。

流的分区数量

我正在使用名为teststream1的名称创建了一个流(Stream)。它有3个分区。

1588703929401.png

Java 代码

为了在一个Java程序中启动多个消费者,可以使用Executor。省略了详细的描述,通过查看代码我想您可能已经有所了解。

这是主要的类。当你更改int numConsumers = 1;的值时,你可以调整消费者的数量。

package jp.test.sugi;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

public class App {
    public static void main(String[] args) {
        int numConsumers = 1;
        ExecutorService executorService = Executors.newFixedThreadPool(numConsumers);

        IntStream.rangeClosed(1, numConsumers).forEach(i -> {
            ConsumeTask consumeTask = new ConsumeTask("myConsumer" + i, "teststream01");
            executorService.submit(consumeTask);
        });
    }
}

这是一个实际上将要进行消费的类。Consumer Group的名称被固定为字符串”myConsumerGroup”。

package jp.test.sugi;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumeTask implements Runnable {
    private final KafkaConsumer<String, String> consumer;
    private final String consumerGroupName;
    private final String consumerName;
    private final String topic;

    public ConsumeTask(String consumerName, String topic) {
        this("myConsumerGroup", consumerName, topic);
    }

    public ConsumeTask(String consumerGroupName, String consumerName, String topic) {
        this.consumerGroupName = consumerGroupName;
        this.consumerName = consumerName;
        this.topic = topic;

        // configuration
        Properties properties = new Properties();

        // OCI Streaming に接続するための指定
        properties.put("bootstrap.servers", "cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla\" password=\"8t[shwUN}I-d+{}8Nx_a\";");
        properties.put("max.partition.fetch.bytes", 1024 * 1024);

        properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupName);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(),
                new StringDeserializer());

        this.consumer = consumer;
    }

    @Override
    public void run() {
        consumer.subscribe(Collections.singletonList(topic));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000));

                // print information about topic record
                records.forEach(record -> {
                    System.out.println("group: " + consumerGroupName + ", consumer: " + consumerName + ", partition: "
                            + record.partition() + ", topic: " + record.topic() + ", key: " + record.key() + ", value: "
                            + record.value());
                });
            }
        } finally {
            consumer.close();
        }
    }
}

执行结果:消费者1

以下是以Consumer为1运行的结果。myConsumer1负责所有分区,并消费数据。

group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9

执行结果: 消费者2

这是以 Consumer 数为 2 运行的结果。
负责的分区分散如下。

    • myConsumer1 : パーティション0, パーティション1

 

    myConsumer2 : パーティション2
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=8

执行结果:消费者3

下面是对使用Consumer设置为3的结果。
负责的分区分散如下:

    • myConsumer1 : パーティション1

 

    • myConsumer2 : パーティション2

 

    myConsumer3 : パーティション0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9

运行结果:消费者4

以下是将Consumer设置为4个并运行的结果。
负责的分区如下所示,分布较广。

    • myConsumer1 : パーティション2

 

    • myConsumer3 : パーティション0

 

    myConsumer4 : パーティション1

我的Consumer2没有进行任何处理。对于超出分区的Consumer,我们可以看到它不会进行负载均衡。

group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=5

总结

在Kafka兼容的Streaming API中,Consumer Group正常工作。与Kafka类似,每个Consumer负责读取一个分区的数据。
在部署到生产环境时,建议根据性能需求来验证和决定分区数和Consumer数量。

请提供一个参考网址。

参考链接

请给出一个相关链接。

此处有一个参考链接。

广告
将在 10 秒后关闭
bannerAds