我试着通过使用Liberty来使用Event Streams(Kafka)

一开始

這篇文章是WebSphere 25週年、DB2/MQ30週年紀念活動的參與文章,為了參與Qiita文章投稿活動而寫成的。

我尝试创建了一个从Liberty发送和接收消息到Event Streams(Kafka)的简单应用程序。

我去过

    • IBM Cloud 無料版Event Streamsを利用

 

    • Spring Bootを利用したWebアプリを作成

 

    ローカル環境のLibertyからEvent Streamsに接続

使用IBM Cloud的免费版Event Streams。

undefined
undefined
undefined

这是连接所需的信息。

{
  "api_key": "xxxxxxxxxxxxxxxxxxxxx",
  "apikey": "xxxxxxxxxxxxxxxxxxxxx",
  "bootstrap_endpoints": "broker-xxxx.ibm.com:9093,broker-yyyy.ibm.com:9093,broker-zzzz.ibm.com:9093,broker-aaaa.ibm.com:9093,broker-bbbb.ibm.com:9093,broker-cccc.ibm.com:9093",
  "iam_apikey_description": "Auto-generated for key xxxxxxxxx,
  "iam_apikey_name": "cre1",
  "iam_role_crn": "xxxxxxxxx",
  "iam_serviceid_crn": "xxxxxxxxxxx",
  "instance_id": "aaaaaa-aaaa-bbbb-cccc-xxxxxxxxxx",
  "kafka_admin_url": "https://xxxxxxxxxxx.ibm.com",
  "kafka_brokers_sasl": [
    "broker-xxxx.ibm.com:9093",
    "broker-yyyy.ibm.com:9093",
    "broker-zzzz.ibm.com:9093",
    "broker-aaaa.ibm.com:9093",
    "broker-bbbb.ibm.com:9093",
    "broker-cccc.ibm.com:9093"
  ],
  "kafka_http_url": "https://xxxxxxxxxxx.ibm.com",
  "password": "XXXXXXXXXXXXXXXXXXXXXX",
  "user": "token"
}

使用Spring Boot创建Web应用程序

我們將使用Spring Boot創建下面這種簡單的Web應用程式。

    • index.htmlからEvent Streamsにメッセージを送信

 

    送信されたメッセージは、KafkaListnerでログに出力

将Spring Boot和Spring Kafka添加到pom.xml文件的依赖关系中。

-----略------
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-thymeleaf</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

在应用程序的`.properties`文件中写入刚刚复制的Event Streams连接信息。要连接Event Streams,需要进行安全设置。

# Kafkaブローカーリスト
spring.kafka.bootstrap-servers=broker-xxxx.ibm.com:9093,broker-yyyy.ibm.com:9093,broker-zzzz.ibm.com:9093,broker-aaaa.ibm.com:9093,broker-bbbb.ibm.com:9093,broker-cccc.ibm.com:9093

# Producer設定
spring.kafka.producer.acks=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#Consumer設定
spring.kafka.consumer.group-id=group_id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer


## Security options
spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="XXXXXXXXXXXXXXXXXXXXXX";

准备两个HTML文件,一个用于发送消息,另一个用于显示发送的内容。

<!DOCTYPE html><html>
<head>
    <meta charset="UTF-8">
    <title>Kafka Sender</title>
</head>
<body>
	<h1>Hello Event Streams!</h1>

    <form method="post" action="/EventApp/send">
        <label for="message">Message:</label>
        <input type="text" name="message" id="message">
        <button type="submit">Send</button>
    </form>
</body></html>
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8">
    <title>Kafka Sender</title>
</head>
<body>
    <p>Message sent: [[${message}]]</p> 
</body>
</html>

在Java中,创建一个Controller类。

    • コンテキスト・ルートへのアクセスをindex.htmlへ

 

    • Spring KafkaのKafkaTemplateを利用してメッセージを送信

 

    spring KafkaのKafkaListenrを利用してメッセージを受信し、SystemOutに出力
package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.ModelAndView;

@Controller
public class MessageController {

	@Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

	@RequestMapping(value = "/", method = RequestMethod.GET)
    public ModelAndView index(final ModelAndView mv) {
		mv.setViewName("index");
        return mv;
    }

    @RequestMapping(value = "/send", method = RequestMethod.POST)
    public ModelAndView send(@RequestParam("message") final String message, final ModelAndView mv) {
    	kafkaTemplate.send("libertyTopic", message);
    	mv.setViewName("result");
        mv.addObject("message", message);
        return mv;
    }

    @KafkaListener(topics = "libertyTopic", groupId = "")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }    
}

用中国语言进行本地环境的Liberty连接到Event Streams。

undefined
undefined
undefined

总结

我使用Liberty创建了一个使用Event Streams(Kafka)的简单应用程序。

广告
将在 10 秒后关闭
bannerAds