我在ksqlDB中尝试了客户忠诚度计划
摘要
這次,我們選擇客戶忠誠度作為使用案例,並嘗試使用 ksqlDB 去編寫程序。
考虑客户忠诚度计划
“在许多地方我们都能看到:「集齐10枚印章就能免费享用一杯咖啡」这种简单的活动。这些被称为顾客忠诚度计划。
这一次,我们将进一步演变这样的计划,以创建一个更加精致的奖励计划,吸引客户在适当的地点和时间。
为此,我们需要整合多个数据流,以在适当的时机应用适当的促销活动。”
使用ksqlDB
ksqlDB可以直接从云端常见的数据源和终端系统中导入和导出数据。在这里,我们将使用ksqlDB的INSERT INTO功能来运行包含模拟数据的代码。
本文旨在探讨各种方法,分析用户行为并决定为客户提供激励方案的方式。
步骤1:创建流。
我们将执行以下 ksqlDB 语法。
在这里,我们将创建三个 STREAM,分别为 users、products 和 purchases。
CREATE STREAM users (
user_id VARCHAR KEY,
name VARCHAR
) WITH (
KAFKA_TOPIC = 'USERS',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
CREATE STREAM products (
product_id VARCHAR KEY,
category VARCHAR,
price DECIMAL(10,2)
) WITH (
KAFKA_TOPIC = 'products',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
CREATE STREAM purchases (
user_id VARCHAR KEY,
product_id VARCHAR
) WITH (
KAFKA_TOPIC = 'purchases',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
创建物化视图的步骤2:
CREATE TABLE all_products AS
SELECT
product_id,
LATEST_BY_OFFSET(category) AS category,
LATEST_BY_OFFSET(CAST(price AS DOUBLE)) AS price
FROM products
GROUP BY product_id;
CREATE STREAM enriched_purchases AS
SELECT
purchases.user_id,
purchases.product_id AS product_id,
all_products.category,
all_products.price
FROM purchases
LEFT JOIN all_products ON purchases.product_id = all_products.product_id;
CREATE TABLE sales_totals AS
SELECT
user_id,
SUM(price) AS total,
CASE
WHEN SUM(price) > 400 THEN 'GOLD'
WHEN SUM(price) > 300 THEN 'SILVER'
WHEN SUM(price) > 200 THEN 'BRONZE'
ELSE 'CLIMBING'
END AS reward_level
FROM enriched_purchases
GROUP BY user_id;
CREATE TABLE caffeine_index AS
SELECT
user_id,
COUNT(*) AS total,
(COUNT(*) % 6) AS sequence,
(COUNT(*) % 6) = 5 AS next_one_free
FROM purchases
WHERE product_id = 'coffee'
GROUP BY user_id;
CREATE TABLE promotion_french_poodle
AS
SELECT
user_id,
collect_set(product_id) AS products,
'french_poodle' AS promotion_name
FROM purchases
WHERE product_id IN ('dog', 'beret')
GROUP BY user_id
HAVING ARRAY_CONTAINS( collect_set(product_id), 'dog' )
AND ARRAY_CONTAINS( collect_set(product_id), 'beret' )
EMIT changes;
CREATE TABLE promotion_loose_leaf AS
SELECT
user_id,
collect_set(product_id) AS products,
'loose_leaf' AS promotion_name
FROM enriched_purchases
WHERE product_id IN ('coffee', 'tea')
GROUP BY user_id
HAVING ARRAY_CONTAINS( collect_set(product_id), 'coffee' )
AND NOT ARRAY_CONTAINS( collect_set(product_id), 'tea' )
AND sum(price) > 20;
第三步:插入模拟数据。
在这里,我们将使用 ksqlDB 插入模拟数据。
我们将给用户、产品和购买数据分别设置模拟数据。
-- Some users.
INSERT INTO users ( user_id, name ) VALUES ( 'u2001', 'kris' );
INSERT INTO users ( user_id, name ) VALUES ( 'u2002', 'dave' );
INSERT INTO users ( user_id, name ) VALUES ( 'u2003', 'yeva' );
INSERT INTO users ( user_id, name ) VALUES ( 'u2004', 'rick' );
-- Some products.
INSERT INTO products ( product_id, category, price ) VALUES ( 'tea', 'beverages', 2.55 );
INSERT INTO products ( product_id, category, price ) VALUES ( 'coffee', 'beverages', 2.99 );
INSERT INTO products ( product_id, category, price ) VALUES ( 'dog', 'pets', 249.99 );
INSERT INTO products ( product_id, category, price ) VALUES ( 'cat', 'pets', 195.00 );
INSERT INTO products ( product_id, category, price ) VALUES ( 'beret', 'fashion', 34.99 );
INSERT INTO products ( product_id, category, price ) VALUES ( 'handbag', 'fashion', 126.00 );
-- Some purchases.
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'beret' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'cat' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'rick', 'tea' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'dave', 'dog' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'dave', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'beret' );
-- A price increase!
INSERT INTO products ( product_id, category, price ) VALUES ( 'coffee', 'beverages', 3.05 );
-- Some more purchases.
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'rick', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'rick', 'dog' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'rick', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'rick', 'cat' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'handbag' );
第四步:验证
在这里,我们执行以下查询来验证模拟数据是否正确输入。
SELECT * FROM promotion_loose_leaf;
如果获得了以下输出,那么至此该过程已经正确运行。
+-----------------------------+----------------------------------+--------------------------------------+
|USER_ID |PRODUCTS |PROMOTION_NAME |
+-----------------------------+----------------------------------+--------------------------------------+
|kris |[coffee] |loose_leaf |
Query terminated
验证的解释 de
因为还有其他材质化视图可供选择,所以下次我想继续尝试。
非常感谢您的阅读至最后。