AVRO联合记录及其周边

在将JSON流传给Kafka时,AVRO JSON是否是最常用的选择?特别是当成员变量的结构体被嵌套保持,并且可以通过Union切换类型时,会变得非常复杂。

卡夫卡的主题最好是具有稳定的类型,并且可以根据数据集来设计类(有各种不同的观点)。这样做的结果是,联合类型也可能成为最佳选择。尽管这会导致稍微冗长的JSON表示形式,但可以作为通用JSON与其他工具进行兼容。

将AVRO json序列化

当使用 Union 处理记录时,key 中包含记录名称。

{"bulk":null}
{"bulk":{"string":"test"}}
{"bulk":{"name.space.Name":{"field1":"test"}}}

浮士德记录

在使用Faust的过程中,将类型信息嵌入到代码中。更多详情请参考:https://faust.readthedocs.io/en/latest/userguide/models.html

Avro 是用于通用类型转换的,而 faust.Record 还可以转换 UUID、datetime 等更特定的类型。.dump() 可以快速查看输出结果,非常方便。

ksql -> 基于流的SQL

双引号可用于转义。成员访问使用箭头。请参考此链接:https://docs.confluent.io/current/ksql/docs/developer-guide/query-with-structured-data.html

CREATE STREAM T (TYPE VARCHAR,
                DATA STRUCT<
                      timestamp VARCHAR,
                      "field-a" INT,
                      "field-b" VARCHAR,
                      "field-c" INT,
                      "field-d" VARCHAR>)
        WITH (KAFKA_TOPIC='raw-topic',
              VALUE_FORMAT='JSON');

SELECT DATA->"field-a", DATA->"field-b" FROM T WHERE TYPE='key1' LIMIT 2;

阿弗罗-JSON序列化器

尽管它的运行相对顺利,但是控制串行化方法的 union 和 record 是一个挑战。依赖于 avro-python3。
URL:https://github.com/linkedin/python-avro-json-serializer

import avro.schema
from avro_json_serializer import *

schema_dict = {
  "fields": [
    {"name": "frec",
      "type": [
        "null",
        "string",
        {"fields": [{"name": "subfint", "type": "int"}],
                   "name": "Rec",
                  "type": "record"},
        {"fields": [{"name": "subfint2", "type": "int"}],
                  "name": "Rec2",
                  "type": "record"}
      ]
    }
  ],
  "name": "all_field",
  "namespace": "com.some.thing",
  "type": "record"
}

avro_schema = avro.schema.SchemaFromJSONData(schema_dict, avro.schema.Names())
serializer = AvroJsonSerializer(avro_schema)

serializer.to_json({
    "ffloat": 1.0,
    "funion_null": None,
    "flong": 1,
    "fdouble": 2.0,
    "fint": 1,
    "fstring": "hi there",
    "frec": {"subfint": 1}
})
# '{"frec":{"com.some.thing.Rec":{"subfint":1}}}'
serializer.to_json({
    "ffloat": 1.0,
    "funion_null": None,
    "flong": 1,
    "fdouble": 2.0,
    "fint": 1,
    "fstring": "hi there",
    "frec": {"subfint2": 2}
})
# '{"frec":{"com.some.thing.Rec2":{"subfint2":2}}}'
广告
将在 10 秒后关闭
bannerAds