使用Kafka Stream来向Line发送通知

首先

在开始IoT开发的ESP32上使用Kafka时,我打算稍微扩展一下,并尝试搭建一个将通知发送到Line的功能。

image.png

我这里所说的Kafka Stream -> Firebase -> Line

开发环境- 建立软件或应用程序时所用的工具、框架和平台。

    • windows 10

 

    • Software

node.js: 16.13.1(12.0以降であれば問題ないハズ)

结局

drawing

 

Line Messaging API方案

要做的事情是什么?

    • Firebase Functions & Firestoreアプリを作成する

Line Developers ConsoleからMessaging APIのchannelを作る
Firebase FunctionsとLine Messaging APIを連動させる
自身の端末から認証する

创建Firebase Functions和Firestore应用程序

在Firebase控制台上创建一个新项目。

    プロジェクト名: esp32-iot-notificator

一旦项目完成,就创建Firestore数据库。

image.png

如果能够创建数据库,请先创建集合元素。集合名称为line-notificate-users。

image.png

接下来是对functions的设置。需要改变到按量计费,因此要鼓起勇气切换。

image.png

立即在本地创建项目并继续前进

firebase login --no-localhost --interactive
firebase init

在下面打勾

    • firestore

 

    functions

暂时先修正functions/src/index.ts文件。

import * as functions from "firebase-functions";

export const callback = functions.https.onRequest((request, response) => {
  functions.logger.info("response", request.body);
  response.status(200).send("callback");
});

部署

firebase deploy

如果在Firebase的界面上注册了回调,就可以了。

image.png

在Line开发者控制台上创建Messaging API的频道。

首先,按照介绍中使用LINE的Messaging API来进行主动通知的方法创建一个频道。

image.png

将刚刚注册函数的回调URL设置为Webhook URL。通过设置的这个回调,可以在Line用户注册应用程序并向频道发送消息时等情况下调用。

image.png

将Firebase Functions与Line Messaging API进行连接。

首先,需要整理要做的事情。

image.png

因此,需要在以下时间段内编写触发函数。

    • ユーザーがchannelを友達承認した

 

    メッセージを能動的に打つ

用户同意了频道或发送了消息

本次,当收到消息时会自动进行注册。顺便提一下,收到消息时回调接收的请求目前如下(截至2022年4月11日)。

{
    "events":[
        {
            "message":{
                "id":"15899930248440",
                "type":"text",
                "text":"おい"
            },
            "source":{
                "type":"user",
                "userId":"<lineユーザーのID>"
            },
            "type":"message",
            "replyToken":"<返信などで必要となるトークン>",
            "timestamp":1649672154163,
            "mode":"active"
        }
    ],
    "destination":"<channelのユーザーID>"
}

在这里需要获取的是events[].source.userId。顺便说一下,这个userId不是显示在Line个人资料中的ID,而是在消息的发送和接收中需要使用的另一个ID。

将callback函数接收的数据存储到Firestore中。

import * as functions from "firebase-functions";
import * as admin from "firebase-admin";

admin.initializeApp();
const db = admin.firestore();
db.settings({
  ignoreUndefinedProperties: true,
});

export const callback = functions.https.onRequest((request, response) => {
  const userId = request.body.events[0].source.userId;
  db.collection("line-notificate-users").doc(userId).set({})
      .then(() => {
        response.status(200).send("Complete");
      })
      .catch((error) => {
        response.status(500).send(error);
      });
});

主动发消息

假设消息部分由Kafka调用并存储在Firestore中,向存储在Firestore中的用户发送消息。

首先,在package.json的dependencies中添加所需的包。

    "axios": "^0.26.1"

轻量级安装

npm install

在index.ts中添加一个函数。

import * as functions from "firebase-functions";
import * as admin from "firebase-admin";
import axios, {AxiosInstance} from "axios";

admin.initializeApp();
const db = admin.firestore();
db.settings({
  ignoreUndefinedProperties: true,
});

const lineToken = "<line messaging apiのトークン>";
const lineEndpoint = "https://api.line.me/v2/bot/message/push";

export const notificate = functions.https.onRequest((request, response) => {
  functions.logger.info("request.body.message", request.body.message);
  const message = request.body.message;
  db.collection("line-notificate-users").get()
      .then((snapshot) => {
        snapshot.forEach((doc) => {
          const userId = doc.id;
          const data = {
            to: userId,
            messages: [
              {
                type: "text",
                text: message,
              },
            ],
          };
          const apiClient: AxiosInstance = axios.create({
            headers: {
              "Content-type": "application/json",
              "Authorization": "Bearer " + lineToken,
            },
          });
          return apiClient.post(lineEndpoint, JSON.stringify(data))
              .then((data) => {
                console.log(data);
                functions.logger.info("send succeed");
                response.status(200).send("complete");
              })
              .catch((error) => {
                functions.logger.error("send failure", error);
                response.status(500).send(error);
              });
        });
      })
      .catch((error) => {
        functions.logger.error("snapshot failure", error);
        response.status(500).send(error);
      });
});

从自己的设备进行身份认证

在自己的Line上扫描Message API中的QR码,以成为朋友。

image.png

发送邮票或者其他东西时,会收到默认的消息回复。

drawing

用下面的命令尝试发送消息。

curl -v -X POST https://us-central1-esp32-iot-notificator-42297.cloudfunctions.net/notificate \
-H 'Content-Type: application/json' \
-d '{ "message": "hello bun bun youtu" }'

确认收到通知的线路。

drawing

组建 Kafka Stream 应用程序

在活动中,我们会测量光照度和水分,因此这次我们将通过流媒体监测光照度突然上升的情况,并发送通知。

    Brokerクラスタの構築は割愛

立即定义拓扑结构。流程如下:

    • handson0429-summaryのトピックを読み取り

 

    • jsonパース

 

    • key単位に異常を検知する(DeviceMetricAbnormalDetector)

 

    異常検知したらfirebaseを叩く
class AbnormalDetectTopology {
  private static final Logger log = LoggerFactory.getLogger(AbnormalDetectTopology.class);

  public static Topology build() {
    StreamsBuilder builder = new StreamsBuilder();
    Consumed<String, String> consumerOption = Consumed.with(Serdes.String(), Serdes.String());
    KStream<String, String> events = builder.stream("handson0429-summary", consumerOption);

    events
        .filter((key, value) -> value != null)
        .filter((key, value) -> !value.equals(""))
        .mapValues(value -> DeviceMetric.parse(value))
        .filter((key, value) -> !value.getDeviceId().equals(""))
        .map((key, value) -> KeyValue.pair(value.getDeviceId(), value))
        .groupByKey(Grouped.with(Serdes.String(), JsonSerdes.DeviceMetric()))
        .aggregate(
            () -> new DeviceMetricAbnormalDetector(),
            (key, value, detector) -> {
              detector.detect(value.getMoist(), value.getLight());
              return detector;
            },
            Materialized.as("metric-abnormal-detector")
                .with(Serdes.String(), JsonSerdes.DeviceMetricAbnormalDetector()))
        .filter((key, value) -> value.abnormal)
        .toStream()
        .foreach(
            (key, value) -> {
              try {
                var client = HttpClient.newHttpClient();
                var json = "{ \"message\" : \"" + key + "が眩しいってさ\" }";
                var req =
                    HttpRequest.newBuilder()
                        .uri( URI.create( System.getenv("NOTIFICATE_ENDPOINT")))
                        .header("Content-Type", "application/json")
                        .POST(BodyPublishers.ofString(json))
                        .build();
                var res = client.send(req, HttpResponse.BodyHandlers.ofString());
                System.out.println(res.body());
              } catch (Exception e) {
                System.out.println(e.toString());
              }
            });

    return builder.build();
  }
}

DeviceMetricAbnormalDetector的样子就是这样的。

public class DeviceMetricAbnormalDetector {
  public Long moist = 0L;
  public Long light = 0L;
  public Boolean abnormal = false;

  public DeviceMetricAbnormalDetector() {}

  public void detect(Long newMoist, Long newLight) {
    if ((newLight - light) > 1000L) {
      abnormal = true;
    } else {
      abnormal = false;
    }
    light = newLight;
    moist = newMoist;
  }

  @Override
  public String toString() {
    return "{"
        + "moist='"
        + moist
        + "'"
        + ", light='"
        + light
        + "'"
        + ", abnormal='"
        + abnormal
        + "'"
        + "}";
  }
}

通过应用程序指定拓扑结构


class App {
  public static void main(String[] args) {
    Topology topology = AbnormalDetectTopology.build();

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "device-metric-summarizer");
    props.put(
        StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
        "kafka-broker-001:9092,kafka-broker-002:9092,kafka-broker-003:9092");
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(StreamsConfig.STATE_DIR_CONFIG, "state-store");
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
    props.put(
        StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

    System.out.println(topology.describe());
    KafkaStreams streams = new KafkaStreams(topology, props);
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    streams.cleanUp();
    streams.start();
  }
}

剩下的内容将在活动之前上传。

验证通过发送虚拟数据来接收通知的情况

尝试编写并运行脚本。

#!/bin/bash

message='{"deviceId":"1001","moist":0 ,"light":0}'
./bin/kafka-console-producer.sh \
    --topic handson0429-summary \
    --property key.separator=, \
    --property parse.key=true \
    --bootstrap-server localhost:9092 << EOF
1001,${message}
EOF


message='{"deviceId":"1001","moist":1001,"light":1001}'
./bin/kafka-console-producer.sh \
    --topic handson0429-summary \
    --property key.separator=, \
    --property parse.key=true \
    --bootstrap-server localhost:9092 << EOF
1001,${message}
EOF

结果 – jié guǒ

drawing

反思

如果在Hands-on会议中使用Grafana进行可视化,如果有时间的话就进行通知。我发现加入通知功能后,感觉像是变成了IoT,即使是被动接收信息也很棒。

drawing

得意になりすぎて、度を越してしまった。

广告
将在 10 秒后关闭
bannerAds