我试着通过使用Liberty来使用Event Streams(Kafka)
一开始
這篇文章是WebSphere 25週年、DB2/MQ30週年紀念活動的參與文章,為了參與Qiita文章投稿活動而寫成的。
我尝试创建了一个从Liberty发送和接收消息到Event Streams(Kafka)的简单应用程序。
我去过
-
- IBM Cloud 無料版Event Streamsを利用
-
- Spring Bootを利用したWebアプリを作成
- ローカル環境のLibertyからEvent Streamsに接続
使用IBM Cloud的免费版Event Streams。
这是连接所需的信息。
{
"api_key": "xxxxxxxxxxxxxxxxxxxxx",
"apikey": "xxxxxxxxxxxxxxxxxxxxx",
"bootstrap_endpoints": "broker-xxxx.ibm.com:9093,broker-yyyy.ibm.com:9093,broker-zzzz.ibm.com:9093,broker-aaaa.ibm.com:9093,broker-bbbb.ibm.com:9093,broker-cccc.ibm.com:9093",
"iam_apikey_description": "Auto-generated for key xxxxxxxxx,
"iam_apikey_name": "cre1",
"iam_role_crn": "xxxxxxxxx",
"iam_serviceid_crn": "xxxxxxxxxxx",
"instance_id": "aaaaaa-aaaa-bbbb-cccc-xxxxxxxxxx",
"kafka_admin_url": "https://xxxxxxxxxxx.ibm.com",
"kafka_brokers_sasl": [
"broker-xxxx.ibm.com:9093",
"broker-yyyy.ibm.com:9093",
"broker-zzzz.ibm.com:9093",
"broker-aaaa.ibm.com:9093",
"broker-bbbb.ibm.com:9093",
"broker-cccc.ibm.com:9093"
],
"kafka_http_url": "https://xxxxxxxxxxx.ibm.com",
"password": "XXXXXXXXXXXXXXXXXXXXXX",
"user": "token"
}
使用Spring Boot创建Web应用程序
我們將使用Spring Boot創建下面這種簡單的Web應用程式。
-
- index.htmlからEvent Streamsにメッセージを送信
- 送信されたメッセージは、KafkaListnerでログに出力
将Spring Boot和Spring Kafka添加到pom.xml文件的依赖关系中。
-----略------
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
在应用程序的`.properties`文件中写入刚刚复制的Event Streams连接信息。要连接Event Streams,需要进行安全设置。
# Kafkaブローカーリスト
spring.kafka.bootstrap-servers=broker-xxxx.ibm.com:9093,broker-yyyy.ibm.com:9093,broker-zzzz.ibm.com:9093,broker-aaaa.ibm.com:9093,broker-bbbb.ibm.com:9093,broker-cccc.ibm.com:9093
# Producer設定
spring.kafka.producer.acks=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#Consumer設定
spring.kafka.consumer.group-id=group_id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
## Security options
spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="XXXXXXXXXXXXXXXXXXXXXX";
准备两个HTML文件,一个用于发送消息,另一个用于显示发送的内容。
<!DOCTYPE html><html>
<head>
<meta charset="UTF-8">
<title>Kafka Sender</title>
</head>
<body>
<h1>Hello Event Streams!</h1>
<form method="post" action="/EventApp/send">
<label for="message">Message:</label>
<input type="text" name="message" id="message">
<button type="submit">Send</button>
</form>
</body></html>
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>Kafka Sender</title>
</head>
<body>
<p>Message sent: [[${message}]]</p>
</body>
</html>
在Java中,创建一个Controller类。
-
- コンテキスト・ルートへのアクセスをindex.htmlへ
-
- Spring KafkaのKafkaTemplateを利用してメッセージを送信
- spring KafkaのKafkaListenrを利用してメッセージを受信し、SystemOutに出力
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.ModelAndView;
@Controller
public class MessageController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping(value = "/", method = RequestMethod.GET)
public ModelAndView index(final ModelAndView mv) {
mv.setViewName("index");
return mv;
}
@RequestMapping(value = "/send", method = RequestMethod.POST)
public ModelAndView send(@RequestParam("message") final String message, final ModelAndView mv) {
kafkaTemplate.send("libertyTopic", message);
mv.setViewName("result");
mv.addObject("message", message);
return mv;
}
@KafkaListener(topics = "libertyTopic", groupId = "")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
用中国语言进行本地环境的Liberty连接到Event Streams。
总结
我使用Liberty创建了一个使用Event Streams(Kafka)的简单应用程序。