尝试在 Oracle Cloud 中运行 Consumer Group 的 Streaming 示例
首先
在Oracle Cloud Infrastructure(OCI)中,提供了名为Streaming的服务,可以实时地收集和处理流数据。Streaming具有与Apache Kafka兼容的API,因此可以从Kafka Client进行连接,并进行数据的生产和消费操作。
这次我们将试着使用 Consumer Group 从多个 Consumer 中获取数据。Consumer Group 是一种能够在多个 Consumer 之间进行负载均衡并获取数据的功能。这篇文章中有配有图解的解释。我们也会确认 OCI 的 Streaming 是否有类似的行为。
流的分区数量
我正在使用名为teststream1的名称创建了一个流(Stream)。它有3个分区。
Java 代码
为了在一个Java程序中启动多个消费者,可以使用Executor。省略了详细的描述,通过查看代码我想您可能已经有所了解。
这是主要的类。当你更改int numConsumers = 1;的值时,你可以调整消费者的数量。
package jp.test.sugi;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
public class App {
public static void main(String[] args) {
int numConsumers = 1;
ExecutorService executorService = Executors.newFixedThreadPool(numConsumers);
IntStream.rangeClosed(1, numConsumers).forEach(i -> {
ConsumeTask consumeTask = new ConsumeTask("myConsumer" + i, "teststream01");
executorService.submit(consumeTask);
});
}
}
这是一个实际上将要进行消费的类。Consumer Group的名称被固定为字符串”myConsumerGroup”。
package jp.test.sugi;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumeTask implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final String consumerGroupName;
private final String consumerName;
private final String topic;
public ConsumeTask(String consumerName, String topic) {
this("myConsumerGroup", consumerName, topic);
}
public ConsumeTask(String consumerGroupName, String consumerName, String topic) {
this.consumerGroupName = consumerGroupName;
this.consumerName = consumerName;
this.topic = topic;
// configuration
Properties properties = new Properties();
// OCI Streaming に接続するための指定
properties.put("bootstrap.servers", "cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla\" password=\"8t[shwUN}I-d+{}8Nx_a\";");
properties.put("max.partition.fetch.bytes", 1024 * 1024);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupName);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(),
new StringDeserializer());
this.consumer = consumer;
}
@Override
public void run() {
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000));
// print information about topic record
records.forEach(record -> {
System.out.println("group: " + consumerGroupName + ", consumer: " + consumerName + ", partition: "
+ record.partition() + ", topic: " + record.topic() + ", key: " + record.key() + ", value: "
+ record.value());
});
}
} finally {
consumer.close();
}
}
}
执行结果:消费者1
以下是以Consumer为1运行的结果。myConsumer1负责所有分区,并消费数据。
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
执行结果: 消费者2
这是以 Consumer 数为 2 运行的结果。
负责的分区分散如下。
-
- myConsumer1 : パーティション0, パーティション1
- myConsumer2 : パーティション2
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=8
执行结果:消费者3
下面是对使用Consumer设置为3的结果。
负责的分区分散如下:
-
- myConsumer1 : パーティション1
-
- myConsumer2 : パーティション2
- myConsumer3 : パーティション0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
运行结果:消费者4
以下是将Consumer设置为4个并运行的结果。
负责的分区如下所示,分布较广。
-
- myConsumer1 : パーティション2
-
- myConsumer3 : パーティション0
- myConsumer4 : パーティション1
我的Consumer2没有进行任何处理。对于超出分区的Consumer,我们可以看到它不会进行负载均衡。
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=5
总结
在Kafka兼容的Streaming API中,Consumer Group正常工作。与Kafka类似,每个Consumer负责读取一个分区的数据。
在部署到生产环境时,建议根据性能需求来验证和决定分区数和Consumer数量。
请提供一个参考网址。
参考链接
请给出一个相关链接。
此处有一个参考链接。