尝试使用Java Kafka客户端在[Oracle Cloud] 的Streaming中作为生产者

首先

在Oracle Cloud Infrastructure(OCI)中,提供了一项名为Streaming的服务,可以实时收集和处理流数据。Streaming具有与Apache Kafka兼容的API,因此可以从Kafka客户端连接并进行生产和消费操作。

本次我们将确认如何使用Java的Kafka客户端作为生产者输入数据。

虚拟机

我将创建一个适当的CentOS 7虚拟机。

生成授权令牌

要使用Kafka API连接进行流式传输,需要IAM用户的身份验证令牌。您可以在自己的IAM用户详细信息页面上生成该令牌。

1588474181611.png

我会给出一个合适的解释。

1588474211704.png

由于无法使用图像中的内容,因此会记录下令牌。

如果在Token中包含有分号等符号,我感觉它可能无法正常运行。如果出现故障而无法正常操作时,请考虑更换Token。

1588474230416.png

创建流

在OCI控制台上,选择Analytics > Streaming。然后选择Create Stream。

1588519801919.png

根据需要随意添加参数以创建。

1588519908911.png

当创建Stream时,如果没有Stream Pool,则会自动创建默认的Stream Pool。teststream01将自动分配到DefaultPool中。Stream Pool是用于管理多个Stream的概念,可以一起管理将Stream的Endpoint设为公共还是私有,以及使用什么密钥加密数据等事项。

这次我们选择了自动创建,因此端点是公开的,并且使用了由Oracle管理的加密密钥进行设置。

1588520017422.png

点击上面的屏幕上的“查看Kafka连接设置”。

1588521024252.png

点击“复制全部”,记下所有的设置值。

1588521056591.png

Open JDK 安装

为了在Java中运行Kafka客户端,需要安装OpenJDK。

sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel

Maven 3.6.3 安装

为了管理依赖关系,安装Maven。

在以下的URL上检查下载链接。
https://maven.apache.org/download.cgi

1587385604751.png

使用在下载页面复制的URL,在CentOS上下载tar.gz文件。

mkdir ~/maven
cd ~/maven
wget https://ftp.jaist.ac.jp/pub/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz

请按照以下步骤进行安装:
https://maven.apache.org/install.html

解压 tar.gz 文件

tar xfvz apache-maven-3.6.3-bin.tar.gz

在bashrc文件中追加环境变量。

echo 'export PATH=$PATH:$HOME/maven/apache-maven-3.6.3/bin' >> ~/.bashrc

重新加载 bashrc

source ~/.bashrc

确认mvn命令是否可以执行。

[opc@kafkaconsumer1 maven]$ mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /home/opc/maven/apache-maven-3.6.3
Java version: 1.8.0_252, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.252.b09-2.el7_8.x86_64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.18.1.el7.x86_64", arch: "amd64", family: "unix"
[opc@kafkaconsumer1 maven]$ 

Java构建

创建目录。

mkdir ~/kafkaproducer
mkdir -p ~/kafkaproducer/src/main/java/jp/test/sugi
cd ~/kafkaproducer

我要编写Java源代码。

cat <<'EOF' > ~/kafkaproducer/src/main/java/jp/test/sugi/KafkaProducerApps.java
package jp.test.sugi;

import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaProducerApp {
    public static void main(final String[] args) {
        System.out.println("Start.");

        // 接続時の設定値を Properties インスタンスとして構築する
        final Properties properties = new Properties();

        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
                "cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092");
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        properties.put(SaslConfigs.SASL_JAAS_CONFIG,
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla\" password=\"8t[shwUN}I-d+{}8Nx_a\";");
        properties.put(CommonClientConfigs.RETRIES_CONFIG, 5);
        properties.put("max.request.size", 1024 * 1024);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // Producer を構築する
        final KafkaProducer<String, String> producer = new KafkaProducer<>(properties, new StringSerializer(),
                new StringSerializer());

        try {
            // トピックを指定してメッセージを送信する
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<String, String>("teststream01", String.format("message%02d", i)));
            }
        } catch (final Exception e) {
            System.out.println("例外が発生しました。");
            System.out.println(e);
        } finally {
            producer.close();
        }

        System.out.println("End.");
    }
}
EOF

在Java源代码中,我们将解释重要的要点。首先,在下面的部分中,我们指定了Kafka客户端的连接目标流的终点等信息。请指定从Stream Pool中复制来的值。将SaslConfigs.SASL_JAAS_CONFIG的密码改为OCI Auth Token的值。


        // 接続時の設定値を Properties インスタンスとして構築する
        final Properties properties = new Properties();

        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
                "cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092");
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        properties.put(SaslConfigs.SASL_JAAS_CONFIG,
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla\" password=\"8t[shwUN}I-d+{}8Nx_a\";");
        properties.put(CommonClientConfigs.RETRIES_CONFIG, 5);
        properties.put("max.request.size", 1024 * 1024);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

另外,下一个重要的要点是这个。在 Kafka 客户端中,名为 testsream01 的名称指定了主题名称。主题名称与 OCI 的流关联在一起,因此请将在创建流时指定的名称放入其中。

                producer.send(new ProducerRecord<String, String>("teststream01", String.format("message%02d", i)));

为了指定Maven的依赖关系,我们需要创建pom.xml文件。


cat <<'EOF' > ~/kafkaproducer/pom.xml
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>jp.test.sugi</groupId>
  <artifactId>KafkaProducerApp</artifactId>
  <version>1.0</version>
  <name>KafkaProducerApp</name>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.26</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-api</artifactId>
      <version>2.13.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>2.13.2</version>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <version>3.3.0</version>
          <configuration>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <archive>
              <manifest>
                <mainClass>jp.test.sugi.KafkaProducerApp</mainClass>
              </manifest>
            </archive>
          </configuration>
          <executions>
            <execution>
              <id>make-assembly</id>
              <phase>package</phase>
              <goals>
                <goal>single</goal>
              </goals>
            </execution>
          </executions>
        </plugin>
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
          <configuration>
            <archive>
              <manifest>
                <mainClass>jp.test.sugi.KafkaProducerApp</mainClass>
              </manifest>
            </archive>
          </configuration>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>
EOF

创建一个包含所有依赖关系的Fat Jar。首次运行需要等待几分钟来完成下载。

cd ~/kafkaproducer
mvn package assembly:single

运行已经编译好的Jar文件后,数据将被输入到OCI Streaming中。

java -jar target/KafkaProducerApp-1.0-jar-with-dependencies.jar

当您在OCI控制台上点击“加载消息”时,您将能够看到所插入的数据。

1588563837576.png

请提供URL参考

Azure
https://docs.microsoft.com/zh-cn/azure/hdinsight/kafka/apache-kafka-producer-consumer-api

广告
将在 10 秒后关闭
bannerAds