AVRO联合记录及其周边
在将JSON流传给Kafka时,AVRO JSON是否是最常用的选择?特别是当成员变量的结构体被嵌套保持,并且可以通过Union切换类型时,会变得非常复杂。
卡夫卡的主题最好是具有稳定的类型,并且可以根据数据集来设计类(有各种不同的观点)。这样做的结果是,联合类型也可能成为最佳选择。尽管这会导致稍微冗长的JSON表示形式,但可以作为通用JSON与其他工具进行兼容。
将AVRO json序列化
当使用 Union 处理记录时,key 中包含记录名称。
{"bulk":null}
{"bulk":{"string":"test"}}
{"bulk":{"name.space.Name":{"field1":"test"}}}
浮士德记录
在使用Faust的过程中,将类型信息嵌入到代码中。更多详情请参考:https://faust.readthedocs.io/en/latest/userguide/models.html
Avro 是用于通用类型转换的,而 faust.Record 还可以转换 UUID、datetime 等更特定的类型。.dump() 可以快速查看输出结果,非常方便。
ksql -> 基于流的SQL
双引号可用于转义。成员访问使用箭头。请参考此链接:https://docs.confluent.io/current/ksql/docs/developer-guide/query-with-structured-data.html
CREATE STREAM T (TYPE VARCHAR,
DATA STRUCT<
timestamp VARCHAR,
"field-a" INT,
"field-b" VARCHAR,
"field-c" INT,
"field-d" VARCHAR>)
WITH (KAFKA_TOPIC='raw-topic',
VALUE_FORMAT='JSON');
SELECT DATA->"field-a", DATA->"field-b" FROM T WHERE TYPE='key1' LIMIT 2;
阿弗罗-JSON序列化器
尽管它的运行相对顺利,但是控制串行化方法的 union 和 record 是一个挑战。依赖于 avro-python3。
URL:https://github.com/linkedin/python-avro-json-serializer
import avro.schema
from avro_json_serializer import *
schema_dict = {
"fields": [
{"name": "frec",
"type": [
"null",
"string",
{"fields": [{"name": "subfint", "type": "int"}],
"name": "Rec",
"type": "record"},
{"fields": [{"name": "subfint2", "type": "int"}],
"name": "Rec2",
"type": "record"}
]
}
],
"name": "all_field",
"namespace": "com.some.thing",
"type": "record"
}
avro_schema = avro.schema.SchemaFromJSONData(schema_dict, avro.schema.Names())
serializer = AvroJsonSerializer(avro_schema)
serializer.to_json({
"ffloat": 1.0,
"funion_null": None,
"flong": 1,
"fdouble": 2.0,
"fint": 1,
"fstring": "hi there",
"frec": {"subfint": 1}
})
# '{"frec":{"com.some.thing.Rec":{"subfint":1}}}'
serializer.to_json({
"ffloat": 1.0,
"funion_null": None,
"flong": 1,
"fdouble": 2.0,
"fint": 1,
"fstring": "hi there",
"frec": {"subfint2": 2}
})
# '{"frec":{"com.some.thing.Rec2":{"subfint2":2}}}'