尝试使用ksqlDB从物联网数据中识别离线设备

简要概括

这次我们尝试使用ksqlDB来测试IoT的一个应用案例。

考虑到IoT的应用情况作为一个开始的契机

最近,IoT和AI的使用案例并不稀奇。然而,不久前我从一个客户那里听说,虽然IoT系统很方便,但过度依赖IoT设备可能会有危险。仔细询问后才得知,该客户现场的气体传感器配备的IoT设备出了问题(并非完全损坏),如果直接相信该设备收集到的数据并用AI/ML进行处理,可能会导致意想不到的麻烦。

我设想了一个简单的情况并尝试了一下,如果能够通过听到这些并使用ksqlDB从IoT数据中自动检测到设备是否离线或处于异常状态,可能对解决这些问题有一些帮助。

关于物联网的用例

中国的原生语言化的翻译如下:

提及物联网时,它广泛应用于车辆、制造设备和智能家电等领域。大部分由物联网设备产生的数据通常在特定时间生成和处理,数据量和速度也常常是重要的因素。在这种情况下,当数据传输(共享)的行为与正常情况不同时,可能需要判断是否需要进行设备维护。物联网资产追踪系统的重要功能之一是帮助进行这种判断。
在这里,我们简单介绍使用ksqlDB来确认哪些设备的数据不能被获取的方法。

使用ksqlDB

ksqlDB可以直接从云端常见的数据源和终端系统中导入和导出数据。在这个例子中,我们将使用ksqlDB的INSERT INTO功能来执行使用模拟数据的代码。
作为物联网(IoT)用例的一个例子,当我们从设备中获取数据时,我们可以通过观察是否有异常来判断设备是否存在故障的征兆,或者是否可能成为网络攻击的目标等。

创建一个流程:步骤1:创建STREAM。

执行以下的ksqlDB语法。

CREATE STREAM iot_telemetry( 
  device_id INT, 
  ts BIGINT 
) WITH ( 
  KAFKA_TOPIC = ‘iot_telemetry‘,     
  VALUE_FORMAT = 'JSON‘,  
  PARTITIONS = 6, 
  TIMESTAMP = 'ts’ 
);

步骤二:创建视图

CREATE TABLE iot_telemetry_lags WITH (KAFKA_TOPIC = 'iot_telemetry_lags') AS 
SELECT 
 device_id, 
 WINDOWEND - LATEST_BY_OFFSET(ts) as lag_ms,  
 TIMESTAMPTOSTRING(WINDOWSTART, ‘yyyy-MM-dd HH:mm:ss’) as window_start,  
 TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss') as window_end 
FROM iot_telemetry 
WINDOW TUMBLING (SIZE 120 SECONDS) 
GROUP BY device_id;

步骤三:输入模拟数据

在这里,我们将使用ksqlDB来投入模拟数据。

INSERT INTO iot_telemetry (device_id, ts) VALUES (1, 1655144403000); 
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144403000); 
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144423000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144443000); 
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144463000); 
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144483000); 
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144503000); 
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144523000); 
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144543000); 
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144563000); 
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144583000); 

步骤四:确认

image.png
image.png

在这个例子中,我们通过监控IoT设备在一定时间内未发送遥测数据的情况,并在该情况发生时(例如,lag_ms的值超过60000),输出相关的数据。通过非常简单的SQL语法,我们可以检查IoT设备的状态。

非常感谢您这次直到最后阅读完。

广告
将在 10 秒后关闭
bannerAds