ksql 简洁技术集

目标

以下是一篇只是从下面的指南中挑选并运行了一些有趣部分的文章,以供自学KSQL使用。如果查看原文,就会发现一切都解决了,但如果能给您提供一点参考,那就算是对您有所帮助了。

原始材料:如何指南

从”How-to guide”中找到的ksql表达集合

結構化資料 huà

CREATE ...
  hoge STRUCT <
       hoge_C1 VARCHAR,
       hoge_C2 INT
       >

INSERT INTO ...
       , STRUCT(hoge_C1 :='a', hoge_C2 :=2)

SELECT ...
       hoge->hoge_C1,
       hoge->hoge_C2,

在这个教程中也提到了。要在Stream/Table内创建结构化数据,在CREATE时需要使用STRUCT<>。在下面的例子中,我们使用Avro Schema格式,在b列中存储了VARCHAR类型的c和INT类型的d。

CREATE STREAM s2 (
    a VARCHAR KEY,
    b STRUCT<
        c VARCHAR,
        d INT
    >
) WITH (
    kafka_topic = 's2',
    partitions = 1,
    value_format = 'avro'
);

我们可以使用STRUCT()将数据插入到结构化数据中。

INSERT INTO s2 (
    a, b
) VALUES (
    'k1', STRUCT(c := 'v1', d := 5)
);

INSERT INTO s2 (
    a, b
) VALUES (
    'k2', STRUCT(c := 'v2', d := 6)
);

INSERT INTO s2 (
    a, b
) VALUES (
    'k3', STRUCT(c := 'v3', d := 7)
);

提取结构化数据需要使用->。

SELECT a,
       b,
       b->c,
       b->d
FROM s2
EMIT CHANGES;
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|A                                         |B                                         |C                                         |D                                         |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|k1                                        |{C=v1, D=5}                               |v1                                        |5                                         |
|k2                                        |{C=v2, D=6}                               |v2                                        |6                                         |
|k3                                        |{C=v3, D=7}                               |v3                                        |7                                         |

地图

CREATE ...
  hoge MAP <
       VARCHAR
       INT
       >

INSERT INTO ...
       , MAP('hoge_c1' :=1, 'hoge_c2=':=2)

SELECT ...
       hoge['hoge_c1'] AS HOGE_C1,
       hoge['hoge_c2'] AS HOGE_C2,

可以使用Map的形式。可以将任何东西放入key和value的组合中,但key和value的数据类型必须始终保持不变。要在Stream/Table中创建Map,可以在CREATE时使用MAP<>。在以下示例中,列b将成为一个包含VARCHAR和INT的Map。

CREATE STREAM s3 (
    a VARCHAR KEY,
    b MAP<VARCHAR, INT>
) WITH (
    kafka_topic = 's3',
    partitions = 1,
    value_format = 'avro'
);

在对Map进行INSERT操作时,需要使用MAP()函数。

INSERT INTO s3 (
    a, b
) VALUES (
    'k1', MAP('c' := 2, 'd' := 4)
);

INSERT INTO s3 (
    a, b
) VALUES (
    'k2', MAP('c' := 4, 'd' := 8)
);

INSERT INTO s3 (
    a, b
) VALUES (
    'k3', MAP('c' := 8, 'd' := 16)
);

从Map中获取值的方法是通过使用方括号指定键值。

SELECT a,
       b,
       b['c'] AS C,
       b['d'] AS D
FROM s3
EMIT CHANGES;
+---------------------------+---------------------------+---------------------------+---------------------------+
|A                          |B                          |C                          |D                          |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k1                         |{c=2, d=4}                 |2                          |4                          |
|k2                         |{c=4, d=8}                 |4                          |8                          |
|k3                         |{c=8, d=16}                |8                          |16                         |

数组

CREATE ...
  hoge ARRAY <INT>

INSERT INTO ...
       , ARRAY[10,20]

SELECT ...
       hoge[1] as hoge_1, hoge[2] as hoge_2

我们可以将单一数据类型的值排列成一个数组进行处理。要在Stream/Table中创建数组,可以在CREATE语句中使用ARRAY<>。在下面的示例中,列b将成为一个INT类型的数组。

CREATE STREAM s4 (
    a VARCHAR KEY,
    b ARRAY<INT>
) WITH (
    kafka_topic = 's4',
    partitions = 1,
    value_format = 'avro'
);

在插入数组中时要使用ARRAY[]。

INSERT INTO s4 (
    a, b
) VALUES (
    'k1', ARRAY[1]
);

INSERT INTO s4 (
    a, b
) VALUES (
    'k2', ARRAY[2, 3]
);

INSERT INTO s4 (
    a, b
) VALUES (
    'k3', ARRAY[4, 5, 6]
);

在中括号[]中使用索引来提取数组元素。[]内的索引指定从左到右按照元素的顺序进行。对于负数索引(-1),表示逆方向,因此在这个例子中将访问数组中的最后一个元素。

SELECT a,
       b,
       b[1] AS b_1,
       b[2] AS b_2,
       b[3] AS b_3,
       b[-1] AS b_minus_1
FROM s4
EMIT CHANGES;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|A                |B                |B_1              |B_2              |B_3              |B_MINUS_1        |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|k1               |[1]              |1                |null             |null             |1                |
|k2               |[2, 3]           |2                |3                |null             |3                |
|k3               |[4, 5, 6]        |4                |5                |6                |6                |

提取最新的偏移数据

CREATE ... AS
    SELECT ...,
           LATEST_BY_OFFSET(hoge) AS hoge,

在这个教程里也提到过。通过使用SQL聚合函数LATEST_BY_OFFSET,可以将最后插入的数据反映到表中。

假设有以下的流:

CREATE STREAM s1 (
    k VARCHAR KEY,
    v1 INT,
    v2 VARCHAR,
    v3 BOOLEAN
) WITH (
    kafka_topic = 's1',
    partitions = 1,
    value_format = 'avro'
);

在以下情况下,消息密钥k1和k2分别进行了2次插入,密钥k3进行了1次插入:

INSERT INTO s1 (
    k, v1, v2, v3
) VALUES (
    'k1', 0, 'a', true
);

INSERT INTO s1 (
    k, v1, v2, v3
) VALUES (
    'k2', 1, 'b', false
);

INSERT INTO s1 (
    k, v1, v2, v3
) VALUES (
    'k1', 2, 'c', false
);

INSERT INTO s1 (
    k, v1, v2, v3
) VALUES (
    'k3', 3, 'd', true
);

INSERT INTO s1 (
    k, v1, v2, v3
) VALUES (
    'k2', 4, 'e', true
);

可以按照以下方式创建一个用于提取v1、v2、v3最新偏移数据的表。(在使用LATEST_BY_OFFSET时,通过GROUP BY指定了Message Key,应该是因为它是聚合函数)

CREATE TABLE t1 AS
    SELECT k,
           LATEST_BY_OFFSET(v1) AS v1,
           LATEST_BY_OFFSET(v2) AS v2,
           LATEST_BY_OFFSET(v3) AS v3
    FROM s1
    GROUP BY k
    EMIT CHANGES;

在使用 pull query 进行愉快的搜索时,将返回 v1、v2 和 v3 插入的最后值(≒Topic 中最新偏移量的信息)。

ksql> SELECT k, v1, v2, v3 FROM t1 WHERE k='k1';
+---------------------------+---------------------------+---------------------------+---------------------------+
|K                          |V1                         |V2                         |V3                         |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k1                         |2                          |c                          |false                      |
Query terminated

ksql> SELECT k, v1, v2, v3 FROM t1 WHERE k='k2';
+---------------------------+---------------------------+---------------------------+---------------------------+
|K                          |V1                         |V2                         |V3                         |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k2                         |4                          |e                          |true                       |
Query terminated

ksql> SELECT k, v1, v2, v3 FROM t1 WHERE k='k3';
+---------------------------+---------------------------+---------------------------+---------------------------+
|K                          |V1                         |V2                         |V3                         |
+---------------------------+---------------------------+---------------------------+---------------------------+
|k3                         |3                          |d                          |true                       |
Query terminated

对于表、列等的名称,使用英文小写字母。

ksql在默认情况下会将表和列的所有名称转换为大写英文字母,但如果想使用小写英文字母,只需用“括起来即可。

CREATE STREAM `s2_Case` (
    `foo` VARCHAR KEY,
    `BAR` INT,
    `Baz` VARCHAR,
    `grault` STRUCT<
        `Corge` VARCHAR,
        `garply` INT
    >,
    qux INT
) WITH (
    kafka_topic = 's2_Case',
    partitions = 1,
    value_format = 'avro'
);

尝试进行描述。

Name                 : s2_Case
 Field  | Type
--------------------------------------------------------
 foo    | VARCHAR(STRING)  (key)
 BAR    | INTEGER
 Baz    | VARCHAR(STRING)
 grault | STRUCT<Corge VARCHAR(STRING), garply INTEGER>
 QUX    | INTEGER
--------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

在使用INSERT或SELECT语句时,需要用“符号来包围流/表名和列名,这一点很麻烦。请注意,在未使用SELECT语句包围的列名qux和qux2在ksql回复时会被转换为大写。

INSERT INTO `s2_Case` (
    `foo`, `BAR`, `Baz`, `grault`, qux
) VALUES (
    'k1', 1, 'x', STRUCT(`Corge` := 'v1', `garply` := 5), 2
);
SELECT `foo`,
       `BAR`,
       `Baz`,
       `grault`->`Corge`,
       `grault`->`garply`,
       qux,
       QUX AS qux2
FROM `s2`
EMIT CHANGES;
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|foo           |BAR           |Baz           |Corge         |garply        |QUX           |QUX2          |
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|k1            |1             |x             |v1            |5             |2             |2             |

时间戳列的定义和使用

考虑到事件处理,处理Kafka消息中的时间戳会变得非常重要。例如,假设有一个包含时间戳信息的ts列的流:

CREATE STREAM s1_time (
    k VARCHAR KEY,
    ts VARCHAR,
    v1 INT,
    v2 VARCHAR
) WITH (
    kafka_topic = 's1_time',
    partitions = 1,
    value_format = 'avro'
);

假设将以下数据输入:

INSERT INTO s1_time (
    k, ts, v1, v2
) VALUES (
    'k1', '2020-05-04 01:00:00', 0, 'a'
);

INSERT INTO s1_time (
    k, ts, v1, v2
) VALUES (
    'k2', '2020-05-04 02:00:00', 1, 'b'
);

在Stream中有一个隐式的系统列名为ROWTIME,如果不指定任何内容,则保留了Kafka消息写入Topic的时间。我们可以使用TIMESTAMPTOSTRING函数将其转换为可读的格式。

SELECT k,
       ROWTIME,
       TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS rowtime_formatted,
       ts,
       v1,
       v2
FROM s1_time
EMIT CHANGES;
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|K                                    |ROWTIME                              |ROWTIME_FORMATTED                    |TS                                   |V1                                   |V2                                   |
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|k1                                   |1638949784646                        |2021-12-08 16:49:44.646              |2020-05-04 01:00:00                  |0                                    |a                                    |
|k2                                   |1638949802709                        |2021-12-08 16:50:02.709              |2020-05-04 02:00:00                  |1                                    |b                                    |

如果您希望将Kafka消息中的特定字段作为时间戳进行处理,而不是使用Kafka消息写入的时间,那么在CREATE时,在WITH内指定以下的timestamp和timestamp_format即可。

CREATE STREAM s2_time WITH (
    timestamp = 'ts',
    timestamp_format = 'yyyy-MM-dd HH:mm:ss'
)   AS
    SELECT *
    FROM s1_time
    EMIT CHANGES;

观察这个流程,可以发现ROWTIME的含义已经从消息写入时间变为存储在ts列中的时间。这看起来很方便。

SELECT k,
       ROWTIME,
       TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS rowtime_formatted,
       ts,
       v1,
       v2
FROM s2_time
EMIT CHANGES;
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|K                                    |ROWTIME                              |ROWTIME_FORMATTED                    |TS                                   |V1                                   |V2                                   |
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|k1                                   |1588521600000                        |2020-05-04 01:00:00.000              |2020-05-04 01:00:00                  |0                                    |a                                    |
|k2                                   |1588525200000                        |2020-05-04 02:00:00.000              |2020-05-04 02:00:00                  |1                                    |b                                    |

当然,你可以选择在创建Stream时直接指定要用作时间戳的列,而不是像上面的例子一样重新定义Stream。在这种情况下,插入后的ROWTIME将变为ts列的值。

CREATE STREAM s3_time (
    k VARCHAR KEY,
    ts VARCHAR,
    v1 INT
) WITH (
    kafka_topic = 's3_time',
    partitions = 1,
    value_format = 'avro',
    timestamp = 'ts',
    timestamp_format = 'yyyy-MM-dd HH:mm:ss'
);

注意:如果没有指定timestamp_format,则需要注意的是,用于时间戳的列应以与ROWTIME列相同的BIGINT Unix时间格式存储。

变量

在KSQL中可以使用DEFINE定义变量,并在KSQL中使用。在KSQL中调用变量需要使用${}进行标记。

DEFINE format = 'AVRO';
DEFINE replicas = '3';

CREATE STREAM str1_variables (
  id INT
) WITH (
  kafka_topic = 'str1_variables',
  value_format = '${format}',
  partitions = ${replicas}
);

可以通过SHOW VARIABLES来确认已定义的变量。

ksql> SHOW VARIABLES;

 Variable Name | Value
-----------------------
 format        | AVRO
 replicas      | 3
-----------------------

在删除变量定义时使用`UNDEFINE`,在转义时使用`$$`。(虽然转义的意义不太清楚,但是…)

UNDEFINE replicas;

DEFINE format = 'AVRO';
SELECT '$${format}' FROM stream;

可以指定变量的地方包括文本、文字、列名和流/表名,但不能在保留字中使用。下面是一个使用变量的ksql查询示例。

DEFINE streamName = 'str2_variables'
DEFINE colName1 = 'col1'
DEFINE colName2 = 'col2'
DEFINE format = 'AVRO'
DEFINE replicas = '3'
DEFINE topicName = 'str2_variables'
DEFINE val1 = '1'
DEFINE val2 = 'HOGE'

CREATE STREAM ${streamName} (
  ${colName1} INT,
  ${colName2} STRING
) WITH (
  kafka_topic = '${topicName}',
  value_format = '${format}',
  partitions = ${replicas}
);

INSERT INTO ${streamName} (
  ${colName1},
  ${colName2}
) VALUES (
  ${val1},
  '${val2}'
);

SELECT * FROM ${streamName}
WHERE ${colName1} = ${val1} and ${colName2} = '${val2}'
EMIT CHANGES; 

注意:不知为何,在SELECT语句中使用变量作为列名({colName1}和{colName2}),导致了非法参数的执行错误(confluent 6.2.1)。但并未深入追究该问题。

匿名函数

在ksqlDB中,可以使用Lambda表达式来处理结构化数据。操作符使用”=>”,参数最多可以有3个。可通过调用函数来实现变换(TRANSFORM)、聚合(REDUCE)和筛选(FILTER)三个功能。

转变

展示一个创建包含Map的Stream,并将其转化为应用TRANSFORM后的Stream的示例。转换条件如下所示。

    • Map中のKeyをUCASE関数で大文字化

 

    Map中のValueを+5
CREATE STREAM stream1_lambda (
  id INT,
  lambda_map MAP<STRING, INTEGER>
) WITH (
  kafka_topic = 'stream1_lambda',
  partitions = 1,
  value_format = 'avro'
);

CREATE STREAM output AS
  SELECT id, 
  TRANSFORM(lambda_map, (k, v) => UCASE(k), (k, v) => v + 5) 
  FROM stream1_lambda;

观察插入值并进行转换的Map(KSQL_COL_0是内部命名的转换列),可以看到值在Map内按照指定的方式进行了转换。

INSERT INTO stream1_lambda (
  id, lambda_map
) VALUES (
  3, MAP('hello':= 15, 'goodbye':= -5)
);

SELECT * FROM stream1_lambda EMIT CHAGES;

SELECT id, ksq_col_0 AS final_output 
  FROM output EMIT CHANGES;
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |LAMBDA_MAP                                              |
+--------------------------------------------------------+--------------------------------------------------------+
|3                                                       |{goodbye=-5, hello=15}                                  |
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |FINAL_OUTPUT                                            |
+--------------------------------------------------------+--------------------------------------------------------+
|3                                                       |{GOODBYE=0, HELLO=20}                                   |

减少

展示一个将包含数组的Stream转换为应用REDUCE函数后的Stream的例子。该例子通过CEIL函数对两个值进行除法并向上取整来对数组进行聚合。但是,我不明白第二个参数state的含义(在这个例子中是2),无法理解… 唉。

CREATE STREAM stream2_lambda (
  id INT,
  lambda_arr ARRAY<INTEGER>
) WITH (
  kafka_topic = 'stream2_lambda',
  partitions = 1,
  value_format = 'avro'
);

CREATE STREAM output2 AS
  SELECT id, 
  REDUCE(lambda_arr, 2, (s, x) => CEIL(x/s)) 
  FROM stream2_lambda
  EMIT CHANGES;

观察插入了实际值并进行REDUCE的数组(KSQL_COL_0是内部命名的REDUCE列)。 聚合结果变为了原始数据相同的5,但不理解其原因令人感到不舒服。

INSERT INTO stream2_lambda (
  id, lambda_arr
) VALUES (
  1, ARRAY[2, 3, 4, 5]
);

SELECT * FROM stream2_lambda EMIT CHAGES;

SELECT id, ksq_col_0 AS final_output 
  FROM output2 EMIT CHANGES;
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |LAMBDA_ARR                                              |
+--------------------------------------------------------+--------------------------------------------------------+
|1                                                       |[2, 3, 4, 5]                                            |
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |FINAL_OUTPUT                                            |
+--------------------------------------------------------+--------------------------------------------------------+
|1                                                       |5                                                       |

过滤器

展示一个将包含Map的Stream转换为应用了过滤器的Stream的示例。过滤器应满足以下AND条件。

    • Map中のKeyに関しnameという文字列が出現するかをINSTR関数で調査(>0なので必然的に出現有無のみの判別になる)

 

    Map中のValueに関しゼロ以外かを調査
CREATE STREAM stream3_lambda (
  id INT,
  lambda_map MAP<STRING, INTEGER>
) WITH (
  kafka_topic = 'stream3_lambda',
  partitions = 1,
  value_format = 'avro'
);

CREATE STREAM output3 AS
  SELECT id, 
  FILTER(lambda_map, (k, v) => instr(k, 'name') > 0 AND v != 0) 
  FROM stream3_lambda
  EMIT CHANGES;

观察插入了实际值并进行过过滤的Map(KSQL_COL_0是内部命名的过滤列),可以看到值按照指定的方式在Map中被选中。

INSERT INTO stream3_lambda (
  id, lambda_map
) VALUES (
  1, MAP('first name':= 15, 'middle':= 25, 'last name':= 0, 'alt name':= 33)
);

SELECT * FROM stream3_lambda EMIT CHAGES;

SELECT id, ksq_col_0 AS final_output 
  FROM output3 EMIT CHANGES;
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |LAMBDA_MAP                                              |
+--------------------------------------------------------+--------------------------------------------------------+
|1                                                       |{middle=25, last name=0, first name=15, alt name=33}    |
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |FINAL_OUTPUT                                            |
+--------------------------------------------------------+--------------------------------------------------------+
|1                                                       |{first name=15, alt name=33}                            |

总结

    • Kafkaメッセージのデータ構造をCREATEで指定する際には様々な構造化表現が使える

 

    • Kafkaメッセージ中のタイムスタンプ取り込みは非常に役立ちそう。

 

    ラムダ式としてTRANSFORM、REDUCE、FILTERの3つが使える。これをうまく使えればわざわざユーザー定義関数をJavaで書かなくても済むかもしれない。
广告
将在 10 秒后关闭
bannerAds