ksql 简洁技术集
目标
以下是一篇只是从下面的指南中挑选并运行了一些有趣部分的文章,以供自学KSQL使用。如果查看原文,就会发现一切都解决了,但如果能给您提供一点参考,那就算是对您有所帮助了。
原始材料:如何指南
从”How-to guide”中找到的ksql表达集合
結構化資料 huà
CREATE ...
hoge STRUCT <
hoge_C1 VARCHAR,
hoge_C2 INT
>
INSERT INTO ...
, STRUCT(hoge_C1 :='a', hoge_C2 :=2)
SELECT ...
hoge->hoge_C1,
hoge->hoge_C2,
在这个教程中也提到了。要在Stream/Table内创建结构化数据,在CREATE时需要使用STRUCT<>。在下面的例子中,我们使用Avro Schema格式,在b列中存储了VARCHAR类型的c和INT类型的d。
CREATE STREAM s2 (
a VARCHAR KEY,
b STRUCT<
c VARCHAR,
d INT
>
) WITH (
kafka_topic = 's2',
partitions = 1,
value_format = 'avro'
);
我们可以使用STRUCT()将数据插入到结构化数据中。
INSERT INTO s2 (
a, b
) VALUES (
'k1', STRUCT(c := 'v1', d := 5)
);
INSERT INTO s2 (
a, b
) VALUES (
'k2', STRUCT(c := 'v2', d := 6)
);
INSERT INTO s2 (
a, b
) VALUES (
'k3', STRUCT(c := 'v3', d := 7)
);
提取结构化数据需要使用->。
SELECT a,
b,
b->c,
b->d
FROM s2
EMIT CHANGES;
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|A |B |C |D |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|k1 |{C=v1, D=5} |v1 |5 |
|k2 |{C=v2, D=6} |v2 |6 |
|k3 |{C=v3, D=7} |v3 |7 |
地图
CREATE ...
hoge MAP <
VARCHAR
INT
>
INSERT INTO ...
, MAP('hoge_c1' :=1, 'hoge_c2=':=2)
SELECT ...
hoge['hoge_c1'] AS HOGE_C1,
hoge['hoge_c2'] AS HOGE_C2,
可以使用Map的形式。可以将任何东西放入key和value的组合中,但key和value的数据类型必须始终保持不变。要在Stream/Table中创建Map,可以在CREATE时使用MAP<>。在以下示例中,列b将成为一个包含VARCHAR和INT的Map。
CREATE STREAM s3 (
a VARCHAR KEY,
b MAP<VARCHAR, INT>
) WITH (
kafka_topic = 's3',
partitions = 1,
value_format = 'avro'
);
在对Map进行INSERT操作时,需要使用MAP()函数。
INSERT INTO s3 (
a, b
) VALUES (
'k1', MAP('c' := 2, 'd' := 4)
);
INSERT INTO s3 (
a, b
) VALUES (
'k2', MAP('c' := 4, 'd' := 8)
);
INSERT INTO s3 (
a, b
) VALUES (
'k3', MAP('c' := 8, 'd' := 16)
);
从Map中获取值的方法是通过使用方括号指定键值。
SELECT a,
b,
b['c'] AS C,
b['d'] AS D
FROM s3
EMIT CHANGES;
+---------------------------+---------------------------+---------------------------+---------------------------+
|A |B |C |D |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k1 |{c=2, d=4} |2 |4 |
|k2 |{c=4, d=8} |4 |8 |
|k3 |{c=8, d=16} |8 |16 |
数组
CREATE ...
hoge ARRAY <INT>
INSERT INTO ...
, ARRAY[10,20]
SELECT ...
hoge[1] as hoge_1, hoge[2] as hoge_2
我们可以将单一数据类型的值排列成一个数组进行处理。要在Stream/Table中创建数组,可以在CREATE语句中使用ARRAY<>。在下面的示例中,列b将成为一个INT类型的数组。
CREATE STREAM s4 (
a VARCHAR KEY,
b ARRAY<INT>
) WITH (
kafka_topic = 's4',
partitions = 1,
value_format = 'avro'
);
在插入数组中时要使用ARRAY[]。
INSERT INTO s4 (
a, b
) VALUES (
'k1', ARRAY[1]
);
INSERT INTO s4 (
a, b
) VALUES (
'k2', ARRAY[2, 3]
);
INSERT INTO s4 (
a, b
) VALUES (
'k3', ARRAY[4, 5, 6]
);
在中括号[]中使用索引来提取数组元素。[]内的索引指定从左到右按照元素的顺序进行。对于负数索引(-1),表示逆方向,因此在这个例子中将访问数组中的最后一个元素。
SELECT a,
b,
b[1] AS b_1,
b[2] AS b_2,
b[3] AS b_3,
b[-1] AS b_minus_1
FROM s4
EMIT CHANGES;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|A |B |B_1 |B_2 |B_3 |B_MINUS_1 |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|k1 |[1] |1 |null |null |1 |
|k2 |[2, 3] |2 |3 |null |3 |
|k3 |[4, 5, 6] |4 |5 |6 |6 |
提取最新的偏移数据
CREATE ... AS
SELECT ...,
LATEST_BY_OFFSET(hoge) AS hoge,
在这个教程里也提到过。通过使用SQL聚合函数LATEST_BY_OFFSET,可以将最后插入的数据反映到表中。
假设有以下的流:
CREATE STREAM s1 (
k VARCHAR KEY,
v1 INT,
v2 VARCHAR,
v3 BOOLEAN
) WITH (
kafka_topic = 's1',
partitions = 1,
value_format = 'avro'
);
在以下情况下,消息密钥k1和k2分别进行了2次插入,密钥k3进行了1次插入:
INSERT INTO s1 (
k, v1, v2, v3
) VALUES (
'k1', 0, 'a', true
);
INSERT INTO s1 (
k, v1, v2, v3
) VALUES (
'k2', 1, 'b', false
);
INSERT INTO s1 (
k, v1, v2, v3
) VALUES (
'k1', 2, 'c', false
);
INSERT INTO s1 (
k, v1, v2, v3
) VALUES (
'k3', 3, 'd', true
);
INSERT INTO s1 (
k, v1, v2, v3
) VALUES (
'k2', 4, 'e', true
);
可以按照以下方式创建一个用于提取v1、v2、v3最新偏移数据的表。(在使用LATEST_BY_OFFSET时,通过GROUP BY指定了Message Key,应该是因为它是聚合函数)
CREATE TABLE t1 AS
SELECT k,
LATEST_BY_OFFSET(v1) AS v1,
LATEST_BY_OFFSET(v2) AS v2,
LATEST_BY_OFFSET(v3) AS v3
FROM s1
GROUP BY k
EMIT CHANGES;
在使用 pull query 进行愉快的搜索时,将返回 v1、v2 和 v3 插入的最后值(≒Topic 中最新偏移量的信息)。
ksql> SELECT k, v1, v2, v3 FROM t1 WHERE k='k1';
+---------------------------+---------------------------+---------------------------+---------------------------+
|K |V1 |V2 |V3 |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k1 |2 |c |false |
Query terminated
ksql> SELECT k, v1, v2, v3 FROM t1 WHERE k='k2';
+---------------------------+---------------------------+---------------------------+---------------------------+
|K |V1 |V2 |V3 |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k2 |4 |e |true |
Query terminated
ksql> SELECT k, v1, v2, v3 FROM t1 WHERE k='k3';
+---------------------------+---------------------------+---------------------------+---------------------------+
|K |V1 |V2 |V3 |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k3 |3 |d |true |
Query terminated
对于表、列等的名称,使用英文小写字母。
ksql在默认情况下会将表和列的所有名称转换为大写英文字母,但如果想使用小写英文字母,只需用“括起来即可。
CREATE STREAM `s2_Case` (
`foo` VARCHAR KEY,
`BAR` INT,
`Baz` VARCHAR,
`grault` STRUCT<
`Corge` VARCHAR,
`garply` INT
>,
qux INT
) WITH (
kafka_topic = 's2_Case',
partitions = 1,
value_format = 'avro'
);
尝试进行描述。
Name : s2_Case
Field | Type
--------------------------------------------------------
foo | VARCHAR(STRING) (key)
BAR | INTEGER
Baz | VARCHAR(STRING)
grault | STRUCT<Corge VARCHAR(STRING), garply INTEGER>
QUX | INTEGER
--------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
在使用INSERT或SELECT语句时,需要用“符号来包围流/表名和列名,这一点很麻烦。请注意,在未使用SELECT语句包围的列名qux和qux2在ksql回复时会被转换为大写。
INSERT INTO `s2_Case` (
`foo`, `BAR`, `Baz`, `grault`, qux
) VALUES (
'k1', 1, 'x', STRUCT(`Corge` := 'v1', `garply` := 5), 2
);
SELECT `foo`,
`BAR`,
`Baz`,
`grault`->`Corge`,
`grault`->`garply`,
qux,
QUX AS qux2
FROM `s2`
EMIT CHANGES;
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|foo |BAR |Baz |Corge |garply |QUX |QUX2 |
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|k1 |1 |x |v1 |5 |2 |2 |
时间戳列的定义和使用
考虑到事件处理,处理Kafka消息中的时间戳会变得非常重要。例如,假设有一个包含时间戳信息的ts列的流:
CREATE STREAM s1_time (
k VARCHAR KEY,
ts VARCHAR,
v1 INT,
v2 VARCHAR
) WITH (
kafka_topic = 's1_time',
partitions = 1,
value_format = 'avro'
);
假设将以下数据输入:
INSERT INTO s1_time (
k, ts, v1, v2
) VALUES (
'k1', '2020-05-04 01:00:00', 0, 'a'
);
INSERT INTO s1_time (
k, ts, v1, v2
) VALUES (
'k2', '2020-05-04 02:00:00', 1, 'b'
);
在Stream中有一个隐式的系统列名为ROWTIME,如果不指定任何内容,则保留了Kafka消息写入Topic的时间。我们可以使用TIMESTAMPTOSTRING函数将其转换为可读的格式。
SELECT k,
ROWTIME,
TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS rowtime_formatted,
ts,
v1,
v2
FROM s1_time
EMIT CHANGES;
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|K |ROWTIME |ROWTIME_FORMATTED |TS |V1 |V2 |
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|k1 |1638949784646 |2021-12-08 16:49:44.646 |2020-05-04 01:00:00 |0 |a |
|k2 |1638949802709 |2021-12-08 16:50:02.709 |2020-05-04 02:00:00 |1 |b |
如果您希望将Kafka消息中的特定字段作为时间戳进行处理,而不是使用Kafka消息写入的时间,那么在CREATE时,在WITH内指定以下的timestamp和timestamp_format即可。
CREATE STREAM s2_time WITH (
timestamp = 'ts',
timestamp_format = 'yyyy-MM-dd HH:mm:ss'
) AS
SELECT *
FROM s1_time
EMIT CHANGES;
观察这个流程,可以发现ROWTIME的含义已经从消息写入时间变为存储在ts列中的时间。这看起来很方便。
SELECT k,
ROWTIME,
TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS rowtime_formatted,
ts,
v1,
v2
FROM s2_time
EMIT CHANGES;
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|K |ROWTIME |ROWTIME_FORMATTED |TS |V1 |V2 |
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|k1 |1588521600000 |2020-05-04 01:00:00.000 |2020-05-04 01:00:00 |0 |a |
|k2 |1588525200000 |2020-05-04 02:00:00.000 |2020-05-04 02:00:00 |1 |b |
当然,你可以选择在创建Stream时直接指定要用作时间戳的列,而不是像上面的例子一样重新定义Stream。在这种情况下,插入后的ROWTIME将变为ts列的值。
CREATE STREAM s3_time (
k VARCHAR KEY,
ts VARCHAR,
v1 INT
) WITH (
kafka_topic = 's3_time',
partitions = 1,
value_format = 'avro',
timestamp = 'ts',
timestamp_format = 'yyyy-MM-dd HH:mm:ss'
);
注意:如果没有指定timestamp_format,则需要注意的是,用于时间戳的列应以与ROWTIME列相同的BIGINT Unix时间格式存储。
变量
在KSQL中可以使用DEFINE定义变量,并在KSQL中使用。在KSQL中调用变量需要使用${}进行标记。
DEFINE format = 'AVRO';
DEFINE replicas = '3';
CREATE STREAM str1_variables (
id INT
) WITH (
kafka_topic = 'str1_variables',
value_format = '${format}',
partitions = ${replicas}
);
可以通过SHOW VARIABLES来确认已定义的变量。
ksql> SHOW VARIABLES;
Variable Name | Value
-----------------------
format | AVRO
replicas | 3
-----------------------
在删除变量定义时使用`UNDEFINE`,在转义时使用`$$`。(虽然转义的意义不太清楚,但是…)
UNDEFINE replicas;
DEFINE format = 'AVRO';
SELECT '$${format}' FROM stream;
可以指定变量的地方包括文本、文字、列名和流/表名,但不能在保留字中使用。下面是一个使用变量的ksql查询示例。
DEFINE streamName = 'str2_variables'
DEFINE colName1 = 'col1'
DEFINE colName2 = 'col2'
DEFINE format = 'AVRO'
DEFINE replicas = '3'
DEFINE topicName = 'str2_variables'
DEFINE val1 = '1'
DEFINE val2 = 'HOGE'
CREATE STREAM ${streamName} (
${colName1} INT,
${colName2} STRING
) WITH (
kafka_topic = '${topicName}',
value_format = '${format}',
partitions = ${replicas}
);
INSERT INTO ${streamName} (
${colName1},
${colName2}
) VALUES (
${val1},
'${val2}'
);
SELECT * FROM ${streamName}
WHERE ${colName1} = ${val1} and ${colName2} = '${val2}'
EMIT CHANGES;
注意:不知为何,在SELECT语句中使用变量作为列名({colName1}和{colName2}),导致了非法参数的执行错误(confluent 6.2.1)。但并未深入追究该问题。
匿名函数
在ksqlDB中,可以使用Lambda表达式来处理结构化数据。操作符使用”=>”,参数最多可以有3个。可通过调用函数来实现变换(TRANSFORM)、聚合(REDUCE)和筛选(FILTER)三个功能。
转变
展示一个创建包含Map的Stream,并将其转化为应用TRANSFORM后的Stream的示例。转换条件如下所示。
-
- Map中のKeyをUCASE関数で大文字化
- Map中のValueを+5
CREATE STREAM stream1_lambda (
id INT,
lambda_map MAP<STRING, INTEGER>
) WITH (
kafka_topic = 'stream1_lambda',
partitions = 1,
value_format = 'avro'
);
CREATE STREAM output AS
SELECT id,
TRANSFORM(lambda_map, (k, v) => UCASE(k), (k, v) => v + 5)
FROM stream1_lambda;
观察插入值并进行转换的Map(KSQL_COL_0是内部命名的转换列),可以看到值在Map内按照指定的方式进行了转换。
INSERT INTO stream1_lambda (
id, lambda_map
) VALUES (
3, MAP('hello':= 15, 'goodbye':= -5)
);
SELECT * FROM stream1_lambda EMIT CHAGES;
SELECT id, ksq_col_0 AS final_output
FROM output EMIT CHANGES;
+--------------------------------------------------------+--------------------------------------------------------+
|ID |LAMBDA_MAP |
+--------------------------------------------------------+--------------------------------------------------------+
|3 |{goodbye=-5, hello=15} |
+--------------------------------------------------------+--------------------------------------------------------+
|ID |FINAL_OUTPUT |
+--------------------------------------------------------+--------------------------------------------------------+
|3 |{GOODBYE=0, HELLO=20} |
减少
展示一个将包含数组的Stream转换为应用REDUCE函数后的Stream的例子。该例子通过CEIL函数对两个值进行除法并向上取整来对数组进行聚合。但是,我不明白第二个参数state的含义(在这个例子中是2),无法理解… 唉。
CREATE STREAM stream2_lambda (
id INT,
lambda_arr ARRAY<INTEGER>
) WITH (
kafka_topic = 'stream2_lambda',
partitions = 1,
value_format = 'avro'
);
CREATE STREAM output2 AS
SELECT id,
REDUCE(lambda_arr, 2, (s, x) => CEIL(x/s))
FROM stream2_lambda
EMIT CHANGES;
观察插入了实际值并进行REDUCE的数组(KSQL_COL_0是内部命名的REDUCE列)。 聚合结果变为了原始数据相同的5,但不理解其原因令人感到不舒服。
INSERT INTO stream2_lambda (
id, lambda_arr
) VALUES (
1, ARRAY[2, 3, 4, 5]
);
SELECT * FROM stream2_lambda EMIT CHAGES;
SELECT id, ksq_col_0 AS final_output
FROM output2 EMIT CHANGES;
+--------------------------------------------------------+--------------------------------------------------------+
|ID |LAMBDA_ARR |
+--------------------------------------------------------+--------------------------------------------------------+
|1 |[2, 3, 4, 5] |
+--------------------------------------------------------+--------------------------------------------------------+
|ID |FINAL_OUTPUT |
+--------------------------------------------------------+--------------------------------------------------------+
|1 |5 |
过滤器
展示一个将包含Map的Stream转换为应用了过滤器的Stream的示例。过滤器应满足以下AND条件。
-
- Map中のKeyに関しnameという文字列が出現するかをINSTR関数で調査(>0なので必然的に出現有無のみの判別になる)
- Map中のValueに関しゼロ以外かを調査
CREATE STREAM stream3_lambda (
id INT,
lambda_map MAP<STRING, INTEGER>
) WITH (
kafka_topic = 'stream3_lambda',
partitions = 1,
value_format = 'avro'
);
CREATE STREAM output3 AS
SELECT id,
FILTER(lambda_map, (k, v) => instr(k, 'name') > 0 AND v != 0)
FROM stream3_lambda
EMIT CHANGES;
观察插入了实际值并进行过过滤的Map(KSQL_COL_0是内部命名的过滤列),可以看到值按照指定的方式在Map中被选中。
INSERT INTO stream3_lambda (
id, lambda_map
) VALUES (
1, MAP('first name':= 15, 'middle':= 25, 'last name':= 0, 'alt name':= 33)
);
SELECT * FROM stream3_lambda EMIT CHAGES;
SELECT id, ksq_col_0 AS final_output
FROM output3 EMIT CHANGES;
+--------------------------------------------------------+--------------------------------------------------------+
|ID |LAMBDA_MAP |
+--------------------------------------------------------+--------------------------------------------------------+
|1 |{middle=25, last name=0, first name=15, alt name=33} |
+--------------------------------------------------------+--------------------------------------------------------+
|ID |FINAL_OUTPUT |
+--------------------------------------------------------+--------------------------------------------------------+
|1 |{first name=15, alt name=33} |
总结
-
- Kafkaメッセージのデータ構造をCREATEで指定する際には様々な構造化表現が使える
-
- Kafkaメッセージ中のタイムスタンプ取り込みは非常に役立ちそう。
- ラムダ式としてTRANSFORM、REDUCE、FILTERの3つが使える。これをうまく使えればわざわざユーザー定義関数をJavaで書かなくても済むかもしれない。