尝试使用Java Kafka客户端在[Oracle Cloud] 的Streaming中作为生产者
首先
在Oracle Cloud Infrastructure(OCI)中,提供了一项名为Streaming的服务,可以实时收集和处理流数据。Streaming具有与Apache Kafka兼容的API,因此可以从Kafka客户端连接并进行生产和消费操作。
本次我们将确认如何使用Java的Kafka客户端作为生产者输入数据。
虚拟机
我将创建一个适当的CentOS 7虚拟机。
生成授权令牌
要使用Kafka API连接进行流式传输,需要IAM用户的身份验证令牌。您可以在自己的IAM用户详细信息页面上生成该令牌。
我会给出一个合适的解释。
由于无法使用图像中的内容,因此会记录下令牌。
如果在Token中包含有分号等符号,我感觉它可能无法正常运行。如果出现故障而无法正常操作时,请考虑更换Token。
创建流
在OCI控制台上,选择Analytics > Streaming。然后选择Create Stream。
根据需要随意添加参数以创建。
当创建Stream时,如果没有Stream Pool,则会自动创建默认的Stream Pool。teststream01将自动分配到DefaultPool中。Stream Pool是用于管理多个Stream的概念,可以一起管理将Stream的Endpoint设为公共还是私有,以及使用什么密钥加密数据等事项。
这次我们选择了自动创建,因此端点是公开的,并且使用了由Oracle管理的加密密钥进行设置。
点击上面的屏幕上的“查看Kafka连接设置”。
点击“复制全部”,记下所有的设置值。
Open JDK 安装
为了在Java中运行Kafka客户端,需要安装OpenJDK。
sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
Maven 3.6.3 安装
为了管理依赖关系,安装Maven。
在以下的URL上检查下载链接。
https://maven.apache.org/download.cgi
使用在下载页面复制的URL,在CentOS上下载tar.gz文件。
mkdir ~/maven
cd ~/maven
wget https://ftp.jaist.ac.jp/pub/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
请按照以下步骤进行安装:
https://maven.apache.org/install.html
解压 tar.gz 文件
tar xfvz apache-maven-3.6.3-bin.tar.gz
在bashrc文件中追加环境变量。
echo 'export PATH=$PATH:$HOME/maven/apache-maven-3.6.3/bin' >> ~/.bashrc
重新加载 bashrc
source ~/.bashrc
确认mvn命令是否可以执行。
[opc@kafkaconsumer1 maven]$ mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /home/opc/maven/apache-maven-3.6.3
Java version: 1.8.0_252, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.252.b09-2.el7_8.x86_64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.18.1.el7.x86_64", arch: "amd64", family: "unix"
[opc@kafkaconsumer1 maven]$
Java构建
创建目录。
mkdir ~/kafkaproducer
mkdir -p ~/kafkaproducer/src/main/java/jp/test/sugi
cd ~/kafkaproducer
我要编写Java源代码。
cat <<'EOF' > ~/kafkaproducer/src/main/java/jp/test/sugi/KafkaProducerApps.java
package jp.test.sugi;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaProducerApp {
public static void main(final String[] args) {
System.out.println("Start.");
// 接続時の設定値を Properties インスタンスとして構築する
final Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092");
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
properties.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla\" password=\"8t[shwUN}I-d+{}8Nx_a\";");
properties.put(CommonClientConfigs.RETRIES_CONFIG, 5);
properties.put("max.request.size", 1024 * 1024);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Producer を構築する
final KafkaProducer<String, String> producer = new KafkaProducer<>(properties, new StringSerializer(),
new StringSerializer());
try {
// トピックを指定してメッセージを送信する
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("teststream01", String.format("message%02d", i)));
}
} catch (final Exception e) {
System.out.println("例外が発生しました。");
System.out.println(e);
} finally {
producer.close();
}
System.out.println("End.");
}
}
EOF
在Java源代码中,我们将解释重要的要点。首先,在下面的部分中,我们指定了Kafka客户端的连接目标流的终点等信息。请指定从Stream Pool中复制来的值。将SaslConfigs.SASL_JAAS_CONFIG的密码改为OCI Auth Token的值。
// 接続時の設定値を Properties インスタンスとして構築する
final Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092");
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
properties.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla\" password=\"8t[shwUN}I-d+{}8Nx_a\";");
properties.put(CommonClientConfigs.RETRIES_CONFIG, 5);
properties.put("max.request.size", 1024 * 1024);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
另外,下一个重要的要点是这个。在 Kafka 客户端中,名为 testsream01 的名称指定了主题名称。主题名称与 OCI 的流关联在一起,因此请将在创建流时指定的名称放入其中。
producer.send(new ProducerRecord<String, String>("teststream01", String.format("message%02d", i)));
为了指定Maven的依赖关系,我们需要创建pom.xml文件。
cat <<'EOF' > ~/kafkaproducer/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>jp.test.sugi</groupId>
<artifactId>KafkaProducerApp</artifactId>
<version>1.0</version>
<name>KafkaProducerApp</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.2</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>jp.test.sugi.KafkaProducerApp</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<archive>
<manifest>
<mainClass>jp.test.sugi.KafkaProducerApp</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
EOF
创建一个包含所有依赖关系的Fat Jar。首次运行需要等待几分钟来完成下载。
cd ~/kafkaproducer
mvn package assembly:single
运行已经编译好的Jar文件后,数据将被输入到OCI Streaming中。
java -jar target/KafkaProducerApp-1.0-jar-with-dependencies.jar
当您在OCI控制台上点击“加载消息”时,您将能够看到所插入的数据。
请提供URL参考
Azure
https://docs.microsoft.com/zh-cn/azure/hdinsight/kafka/apache-kafka-producer-consumer-api