使用Kafka Stream来向Line发送通知
首先
在开始IoT开发的ESP32上使用Kafka时,我打算稍微扩展一下,并尝试搭建一个将通知发送到Line的功能。
我这里所说的Kafka Stream -> Firebase -> Line
开发环境- 建立软件或应用程序时所用的工具、框架和平台。
-
- windows 10
-
- Software
node.js: 16.13.1(12.0以降であれば問題ないハズ)
结局
Line Messaging API方案
要做的事情是什么?
-
- Firebase Functions & Firestoreアプリを作成する
Line Developers ConsoleからMessaging APIのchannelを作る
Firebase FunctionsとLine Messaging APIを連動させる
自身の端末から認証する
创建Firebase Functions和Firestore应用程序
在Firebase控制台上创建一个新项目。
- プロジェクト名: esp32-iot-notificator
一旦项目完成,就创建Firestore数据库。
如果能够创建数据库,请先创建集合元素。集合名称为line-notificate-users。
接下来是对functions的设置。需要改变到按量计费,因此要鼓起勇气切换。
立即在本地创建项目并继续前进
firebase login --no-localhost --interactive
firebase init
在下面打勾
-
- firestore
- functions
暂时先修正functions/src/index.ts文件。
import * as functions from "firebase-functions";
export const callback = functions.https.onRequest((request, response) => {
functions.logger.info("response", request.body);
response.status(200).send("callback");
});
部署
firebase deploy
如果在Firebase的界面上注册了回调,就可以了。
在Line开发者控制台上创建Messaging API的频道。
首先,按照介绍中使用LINE的Messaging API来进行主动通知的方法创建一个频道。
将刚刚注册函数的回调URL设置为Webhook URL。通过设置的这个回调,可以在Line用户注册应用程序并向频道发送消息时等情况下调用。
将Firebase Functions与Line Messaging API进行连接。
首先,需要整理要做的事情。
因此,需要在以下时间段内编写触发函数。
-
- ユーザーがchannelを友達承認した
- メッセージを能動的に打つ
用户同意了频道或发送了消息
本次,当收到消息时会自动进行注册。顺便提一下,收到消息时回调接收的请求目前如下(截至2022年4月11日)。
{
"events":[
{
"message":{
"id":"15899930248440",
"type":"text",
"text":"おい"
},
"source":{
"type":"user",
"userId":"<lineユーザーのID>"
},
"type":"message",
"replyToken":"<返信などで必要となるトークン>",
"timestamp":1649672154163,
"mode":"active"
}
],
"destination":"<channelのユーザーID>"
}
在这里需要获取的是events[].source.userId。顺便说一下,这个userId不是显示在Line个人资料中的ID,而是在消息的发送和接收中需要使用的另一个ID。
将callback函数接收的数据存储到Firestore中。
import * as functions from "firebase-functions";
import * as admin from "firebase-admin";
admin.initializeApp();
const db = admin.firestore();
db.settings({
ignoreUndefinedProperties: true,
});
export const callback = functions.https.onRequest((request, response) => {
const userId = request.body.events[0].source.userId;
db.collection("line-notificate-users").doc(userId).set({})
.then(() => {
response.status(200).send("Complete");
})
.catch((error) => {
response.status(500).send(error);
});
});
主动发消息
假设消息部分由Kafka调用并存储在Firestore中,向存储在Firestore中的用户发送消息。
首先,在package.json的dependencies中添加所需的包。
"axios": "^0.26.1"
轻量级安装
npm install
在index.ts中添加一个函数。
import * as functions from "firebase-functions";
import * as admin from "firebase-admin";
import axios, {AxiosInstance} from "axios";
admin.initializeApp();
const db = admin.firestore();
db.settings({
ignoreUndefinedProperties: true,
});
const lineToken = "<line messaging apiのトークン>";
const lineEndpoint = "https://api.line.me/v2/bot/message/push";
export const notificate = functions.https.onRequest((request, response) => {
functions.logger.info("request.body.message", request.body.message);
const message = request.body.message;
db.collection("line-notificate-users").get()
.then((snapshot) => {
snapshot.forEach((doc) => {
const userId = doc.id;
const data = {
to: userId,
messages: [
{
type: "text",
text: message,
},
],
};
const apiClient: AxiosInstance = axios.create({
headers: {
"Content-type": "application/json",
"Authorization": "Bearer " + lineToken,
},
});
return apiClient.post(lineEndpoint, JSON.stringify(data))
.then((data) => {
console.log(data);
functions.logger.info("send succeed");
response.status(200).send("complete");
})
.catch((error) => {
functions.logger.error("send failure", error);
response.status(500).send(error);
});
});
})
.catch((error) => {
functions.logger.error("snapshot failure", error);
response.status(500).send(error);
});
});
从自己的设备进行身份认证
在自己的Line上扫描Message API中的QR码,以成为朋友。
发送邮票或者其他东西时,会收到默认的消息回复。
用下面的命令尝试发送消息。
curl -v -X POST https://us-central1-esp32-iot-notificator-42297.cloudfunctions.net/notificate \
-H 'Content-Type: application/json' \
-d '{ "message": "hello bun bun youtu" }'
确认收到通知的线路。
组建 Kafka Stream 应用程序
在活动中,我们会测量光照度和水分,因此这次我们将通过流媒体监测光照度突然上升的情况,并发送通知。
- Brokerクラスタの構築は割愛
立即定义拓扑结构。流程如下:
-
- handson0429-summaryのトピックを読み取り
-
- jsonパース
-
- key単位に異常を検知する(DeviceMetricAbnormalDetector)
- 異常検知したらfirebaseを叩く
class AbnormalDetectTopology {
private static final Logger log = LoggerFactory.getLogger(AbnormalDetectTopology.class);
public static Topology build() {
StreamsBuilder builder = new StreamsBuilder();
Consumed<String, String> consumerOption = Consumed.with(Serdes.String(), Serdes.String());
KStream<String, String> events = builder.stream("handson0429-summary", consumerOption);
events
.filter((key, value) -> value != null)
.filter((key, value) -> !value.equals(""))
.mapValues(value -> DeviceMetric.parse(value))
.filter((key, value) -> !value.getDeviceId().equals(""))
.map((key, value) -> KeyValue.pair(value.getDeviceId(), value))
.groupByKey(Grouped.with(Serdes.String(), JsonSerdes.DeviceMetric()))
.aggregate(
() -> new DeviceMetricAbnormalDetector(),
(key, value, detector) -> {
detector.detect(value.getMoist(), value.getLight());
return detector;
},
Materialized.as("metric-abnormal-detector")
.with(Serdes.String(), JsonSerdes.DeviceMetricAbnormalDetector()))
.filter((key, value) -> value.abnormal)
.toStream()
.foreach(
(key, value) -> {
try {
var client = HttpClient.newHttpClient();
var json = "{ \"message\" : \"" + key + "が眩しいってさ\" }";
var req =
HttpRequest.newBuilder()
.uri( URI.create( System.getenv("NOTIFICATE_ENDPOINT")))
.header("Content-Type", "application/json")
.POST(BodyPublishers.ofString(json))
.build();
var res = client.send(req, HttpResponse.BodyHandlers.ofString());
System.out.println(res.body());
} catch (Exception e) {
System.out.println(e.toString());
}
});
return builder.build();
}
}
DeviceMetricAbnormalDetector的样子就是这样的。
public class DeviceMetricAbnormalDetector {
public Long moist = 0L;
public Long light = 0L;
public Boolean abnormal = false;
public DeviceMetricAbnormalDetector() {}
public void detect(Long newMoist, Long newLight) {
if ((newLight - light) > 1000L) {
abnormal = true;
} else {
abnormal = false;
}
light = newLight;
moist = newMoist;
}
@Override
public String toString() {
return "{"
+ "moist='"
+ moist
+ "'"
+ ", light='"
+ light
+ "'"
+ ", abnormal='"
+ abnormal
+ "'"
+ "}";
}
}
通过应用程序指定拓扑结构
class App {
public static void main(String[] args) {
Topology topology = AbnormalDetectTopology.build();
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "device-metric-summarizer");
props.put(
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-broker-001:9092,kafka-broker-002:9092,kafka-broker-003:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(StreamsConfig.STATE_DIR_CONFIG, "state-store");
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
props.put(
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
System.out.println(topology.describe());
KafkaStreams streams = new KafkaStreams(topology, props);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.cleanUp();
streams.start();
}
}
剩下的内容将在活动之前上传。
验证通过发送虚拟数据来接收通知的情况
尝试编写并运行脚本。
#!/bin/bash
message='{"deviceId":"1001","moist":0 ,"light":0}'
./bin/kafka-console-producer.sh \
--topic handson0429-summary \
--property key.separator=, \
--property parse.key=true \
--bootstrap-server localhost:9092 << EOF
1001,${message}
EOF
message='{"deviceId":"1001","moist":1001,"light":1001}'
./bin/kafka-console-producer.sh \
--topic handson0429-summary \
--property key.separator=, \
--property parse.key=true \
--bootstrap-server localhost:9092 << EOF
1001,${message}
EOF
结果 – jié guǒ
反思
如果在Hands-on会议中使用Grafana进行可视化,如果有时间的话就进行通知。我发现加入通知功能后,感觉像是变成了IoT,即使是被动接收信息也很棒。
得意になりすぎて、度を越してしまった。