特征引擎Kaskada与外部系统的集成和数据输出
首先
前几天,我发布了以下的文章。
学习新的编程语言(的范式)是一件特别有趣的事情。无论何时接触到新技术,都会感到愉快。
所以,我仍在努力学习中,作为公式文档学习过程的记录,我整理了以下文章。
请参考以下文件作为本稿情报的来源。
在特征引擎Kaskada中,与外部系统的集成和数据输出
总结
Kaskada 支持多种输入源和输出(sink)。以下是对其概述(本文特别关注输出)。
Apache Pulsarはソースおよびシンクとしてサポートされています。
Apache Cassandra は、 Apache Pulsar シンクおよび Pulsar to Cassandra コネクタを使用する宛先としてサポートされます。
Redis は、 Apache Pulsar シンクと Pulsar to Redis コネクタを使用した宛先としてサポートされています。
AWS Redshift は、Parquet シンクを使用した宛先としてサポートされています( Parquet ファイルを Redshift にロードします)。
Snowflake は、 Parquet シンクを使用した宛先としてサポートされています(Parquet ファイルを Snowflake にロードします)。
阿帕奇脉冲星
Apache Pulsar是一个开源的分布式流处理平台。
向Pulsar的写入
通过创建以下的materialization来实现将查询结果实时写入Pulsar。
tenant = "public"
namespace = "default"
topic_name = "model_features"
broker_service_url = "pulsar://127.0.0.1:6650"
destination = materialization.PulsarDestination(tenant, namespace, topic_name, broker_service_url)
materialization.create_materialization(
name = "MaterializedFeatures",
destination = destination,
query = "{
key: Purchase.customer_id,
max_amount: Purchase.amount | max(),
min_amount: Purchase.amount | min(),
}"
)
亚马逊云计算(AWS)的红移数据库
Redshift 是一种基于主机的数据仓库,可为大规模数据集提供可扩展的 SQL 查询。
将数据写入Redshift数据库
首先,将值以Parquet格式导出。
然后,通过使用COPY命令,可以将Parquet文件加载到Redshift中。
如果指定了–output parquet,可以将结果输出为Parquet文件并作为URL返回。
%%fenl --output parquet
{
key: Purchase.customer_id,
max_amount: Purchase.amount | max(),
min_amount: Purchase.amount | min(),
}
通过使用该URL,您可以将Parquet文件加载到Redshift表中,方法如下。
COPY feature_vectors
FROM '<file url>'
FORMAT AS PARQUET;
雪花
Snowflake是一个主机型数据仓库,为大规模数据集提供可扩展的SQL查询。
写给Snowflake的留言
首先,将值导出为Parquet格式。
然后,通过使用COPY命令,将Parquet文件加载到Snowflake中。
如果指定了–output parquet,可以将结果作为一个标识URL输出为Parquet文件。
%%fenl --output parquet
{
key: Purchase.customer_id,
max_amount: Purchase.amount | max(),
min_amount: Purchase.amount | min(),
}
最终生成的 Parquet 文件可以以以下方式加载到 Snowflake 的临时表中。
create or replace temporary table feature_vectors (
key varchar default null,
max_amount number,
min_amount number
);
create or replace file format feature_vector_parquet_format
type = 'parquet';
create or replace temporary stage feature_vector_stage
file_format = feature_vector_parquet_format;
put <file url> @sf_tut_stage;
copy into cities
from (select * from @sf_tut_stage/<filename>.parquet);