尝试使用ksqlDB从物联网数据中识别离线设备
简要概括
这次我们尝试使用ksqlDB来测试IoT的一个应用案例。
考虑到IoT的应用情况作为一个开始的契机
最近,IoT和AI的使用案例并不稀奇。然而,不久前我从一个客户那里听说,虽然IoT系统很方便,但过度依赖IoT设备可能会有危险。仔细询问后才得知,该客户现场的气体传感器配备的IoT设备出了问题(并非完全损坏),如果直接相信该设备收集到的数据并用AI/ML进行处理,可能会导致意想不到的麻烦。
我设想了一个简单的情况并尝试了一下,如果能够通过听到这些并使用ksqlDB从IoT数据中自动检测到设备是否离线或处于异常状态,可能对解决这些问题有一些帮助。
关于物联网的用例
中国的原生语言化的翻译如下:
提及物联网时,它广泛应用于车辆、制造设备和智能家电等领域。大部分由物联网设备产生的数据通常在特定时间生成和处理,数据量和速度也常常是重要的因素。在这种情况下,当数据传输(共享)的行为与正常情况不同时,可能需要判断是否需要进行设备维护。物联网资产追踪系统的重要功能之一是帮助进行这种判断。
在这里,我们简单介绍使用ksqlDB来确认哪些设备的数据不能被获取的方法。
使用ksqlDB
ksqlDB可以直接从云端常见的数据源和终端系统中导入和导出数据。在这个例子中,我们将使用ksqlDB的INSERT INTO功能来执行使用模拟数据的代码。
作为物联网(IoT)用例的一个例子,当我们从设备中获取数据时,我们可以通过观察是否有异常来判断设备是否存在故障的征兆,或者是否可能成为网络攻击的目标等。
创建一个流程:步骤1:创建STREAM。
执行以下的ksqlDB语法。
CREATE STREAM iot_telemetry(
device_id INT,
ts BIGINT
) WITH (
KAFKA_TOPIC = ‘iot_telemetry‘,
VALUE_FORMAT = 'JSON‘,
PARTITIONS = 6,
TIMESTAMP = 'ts’
);
步骤二:创建视图
CREATE TABLE iot_telemetry_lags WITH (KAFKA_TOPIC = 'iot_telemetry_lags') AS
SELECT
device_id,
WINDOWEND - LATEST_BY_OFFSET(ts) as lag_ms,
TIMESTAMPTOSTRING(WINDOWSTART, ‘yyyy-MM-dd HH:mm:ss’) as window_start,
TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss') as window_end
FROM iot_telemetry
WINDOW TUMBLING (SIZE 120 SECONDS)
GROUP BY device_id;
步骤三:输入模拟数据
在这里,我们将使用ksqlDB来投入模拟数据。
INSERT INTO iot_telemetry (device_id, ts) VALUES (1, 1655144403000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144403000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144423000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144443000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144463000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144483000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144503000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144523000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144543000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144563000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144583000);
步骤四:确认
在这个例子中,我们通过监控IoT设备在一定时间内未发送遥测数据的情况,并在该情况发生时(例如,lag_ms的值超过60000),输出相关的数据。通过非常简单的SQL语法,我们可以检查IoT设备的状态。
非常感谢您这次直到最后阅读完。