尝试阅读Avro规范书
简而言之。
CDC Replication Engine for Kafka是一款用于Kafka的工具产品,它自动为我注册了Avro Schema,但是我发现其中有一些问题,所以在查看Apache Avro™ 1.10.2规范的同时尝试进行解读并记录了以下内容。
首先,我要说明的是,我已经学会了如何阅读Avro Schema,但最终我对其中引起疑问的地方并没有完全解决。
源头
我正在验证将位于主机上的IMS层次型数据库复制到Kafka中。从IMS抽取数据使用了Infosphere Classic CDC for z/OS(以下简称CCDC),并使用CDC Replication Engine for Kafka(以下简称CDC for Kafka)将数据投入到Kafka中。
如果我们以CCDC的方式提供DB记录的逻辑视角或数据结构,即通过元数据的形式给予,那么CDC for Kafka将自动创建目标主题并将与元数据对应的数据结构以Avro格式注册为模式,这非常方便。
我已经测试了一系列的数据类型,确保从生成元数据到注册Avro模式,再到复制等整个流程都能够正常运行。然而,在这个过程中我还有一些疑问。
二进制数据的复制,我们分别尝试了1字节、2字节和4字节,但是在源头(即IMS)中是0xf0的地方,当在Kafka端使用avro-consume来消费时,它变成了d。而0xfff0变成了yd,0xfffffff0变成了yyyd。
我明白0xff代表y,0xf0代表d,但并不是说y是d的第15个字符,为什么会变成这样我不明白。
因此,我决定首先认真查看我之前完全交给CDC for Kafka处理的Avro模式注册。
调查对象为Avro模式
{
"type": "record",
"name": "RTSEG2",
"namespace": "value.SOURCEDB.IMS.HICKA1",
"fields": [
{
"name": "RT_KEY",
"type": [
{
"type": "string",
"logicalType": "CHARACTER",
"dbColumnName": "RT_KEY",
"length": 6
},
"null"
],
"doc": "",
"default": ""
},
{
"name": "RT_BIT8",
"type": [
{
"type": "bytes",
"logicalType": "BINARY",
"dbColumnName": "RT_BIT8",
"length": 1
},
"null"
],
"doc": "",
"default": ""
},
{
"name": "RT_BIT16",
"type": [
{
"type": "bytes",
"logicalType": "BINARY",
"dbColumnName": "RT_BIT16",
"length": 2
},
"null"
],
"doc": "",
"default": ""
},
{
"name": "RT_BIT32",
"type": [
{
"type": "bytes",
"logicalType": "BINARY",
"dbColumnName": "RT_BIT32",
"length": 4
},
"null"
],
"doc": "",
"default": ""
}
]
}
阅读Avro格式
以下是本次复制的主题的Avro架构起始描述:
{
"type": "record",
"name": "RTSEG2",
"namespace": "value.SOURCEDB.IMS.HICKA1",
"fields": [
值得注意的是,”type”: “record”这一部分,这使得该模式符合Avro规范中所定义的”Complex Types”。
复杂类型
Avro支持六种复杂类型:记录、枚举、数组、映射、联合和固定。
记录
记录使用类型名称“record”,并支持三个属性:
…
当看到Records类型的说明时,可以得出这个结果。
字段:一个JSON数组,列出字段(必需)。每个字段是一个带有以下属性的JSON对象:
根据现有的情况来看,我们需要将本次发送的每个字段定义视为’fields’后面括号中的JSON对象,并据此进行解读。
进一步了解有关’fields’的说明发现
名称:一个提供字段名称的JSON字符串(必填),和
说明:一个描述该字段给用户的JSON字符串(可选)。
类型:一个按照上述定义的模式。
默认值:一个用于此字段的默认值,仅在读取缺少此字段的实例时用于模式演化目的。默认值的存在并不使得字段在编码时可选。允许的值取决于字段的模式类型,根据下表。联合字段的默认值对应于联合中的第一个模式。字节和固定字段的默认值是JSON字符串,其中Unicode代码点0-255映射为无符号8位字节值0-255。即使字段的值等于其默认值,Avro也会对字段进行编码。
这都有。
{
"name": "RT_BIT8",
"type": [
{
"type": "bytes",
"logicalType": "BINARY",
"dbColumnName": "RT_BIT8",
"length": 1
},
"null"
],
"doc": "",
"default": ""
},
name为”RT_BIT8″,在CCDC端的元数据中使用相同的字段名称。没有doc和default。
在类型中,有一个被大括号括起来的[{…},”null”],其中有两个值。看起来这是一种被称为Union的形式。
工会
如上所述,工会是使用JSON数组来表示的。例如,[“null”, “string”] 声明了一个架构,可以是null或字符串。
在这里,前者和后者是指两个值,其中前者使用JSON格式进行描述。Avro规范中提供了以下模式作为描述该模式的方法。
模式声明
模式以以下形式之一在JSON中表示:
– 一个JSON字符串,命名一个已定义的类型。
– 一个JSON对象,格式如下:
{“type”: “typeName” …attributes…}
其中typeName是一个原始类型或派生类型的名称,如下所定义。在本文档中未定义的属性可以作为元数据,但不能影响序列化数据的格式。
– 一个表示嵌套类型联合的JSON数组。
这个第二项是以“JSON对象”的形式进行描述的。它将“bytes”作为typeName进行了指定。
作为一种类型,对于bytes的解释是什么。
原始类型
原始类型的名称集合包括:
…
– 字节:8位无符号字节的序列
这是如此。
而接下来的”logicalType”: “BINARY”或者”length”: 1似乎对应于属性。
关于logicalType
逻辑类型
逻辑类型是带有额外属性的Avro原始或复杂类型,用于表示衍生类型。属性logicalType对于逻辑类型必须始终存在,并且是一个字符串,其中包含本节稍后列出的逻辑类型之一的名称。针对特定逻辑类型,可以定义其他属性。
逻辑类型总是使用其底层Avro类型进行序列化,以便值以与不具有logicalType属性的等效Avro类型完全相同的方式进行编码。语言实现可以选择使用适当的本地类型表示逻辑类型,但不是必需的。
在读取时,语言实现必须忽略未知的逻辑类型,并应使用底层的Avro类型。如果逻辑类型无效,例如小数的比例大于其精度,则实现应忽略逻辑类型并使用底层的Avro类型。
「原始类型或复杂类型的附加属性」,
「在本节下方的列表中以字符串指定的项目」,
「编码始终与基础原始类型进行相同处理」,
「无效的logicalType指定将被忽略」,
但是,由Kafka生成的CDC逻辑类型BINARY并未在该规范的列表中列出。
然而,可以认为该逻辑类型无论其有效与否,都会被视为以字节形式编码的原始类型。
编码解密
由于确认二进制字段是以字节类型处理的,因此阅读编码解释并跟踪d和yd之类的谜题。
关于复杂类型的编码,已经写明如下。
复杂类型
复杂类型按照以下方式以二进制形式进行编码:
记录
记录通过按照声明顺序对其字段的值进行编码来进行编码。换句话说,记录只是字段编码的串联。字段的值按照其模式进行编码。
例如,记录模式为:{
“type”: “record”,
“name”: “test”,
“fields” : [
{“name”: “a”, “type”: “long”},
{“name”: “b”, “type”: “string”}
]
}假设有一个记录的实例,其a字段的值为27(十六进制编码为36),b字段的值为”foo”(十六进制字节编码为06 66 6f 6f),那么它的编码就是这些值的简单串联,即十六进制字节序列:
36 06 66 6f 6f
根据各个字段的类型编码,只是简单地将它们写在一起。也就是说,JSON格式中包含的”length”: 1并不是用于解码的。
对于长度不确定的数据类型,如字节型(bytes)和字符型(string),似乎会在开头带有一个字节的字段长度头部。
字节被编码为一个长整数,后面是相应数量的数据字节。
字符串被编码为一个长整数,后面是相应数量的UTF-8编码字符数据。例如,三个字符的字符串 “foo” 被编码为长整数值 3(以十六进制 06 编码),后面是字符 ‘f’、’o’ 和 ‘o’ 的 UTF-8 编码(十六进制字节 66 6f 6f):
06 66 6f 6f
当提及3字节的字符串数据”foo”转换为UTF-8,则分别为66 6f 6f。在此之前,加上代表3的长整型值06,变为06 66 6f 6f。尽管称之为长整型,但根据该网站的示例,所有的值都变为了1字节长。
如果有一个上一行是long类型字段和string类型字段的连续示例,对于实际数据36 06 66 6f 6f,首先是long类型,所以取1个字节,接下来是string类型,所以查看首字节来获取数据长度,发现长度为3个字节,因此继续解码接下来的3个字节作为string字段…一直以此类推。
实际数据解读
现在,我已经理解了编码/解码规则,所以我也可以像avro-consumer一样解释原始数据了。
我将使用不是avro的console-consumer来提取原始数据,然后将其通过xxd传输并添加16进制表示。
00000000: 0000 0000 0e00 0c30 3030 3030 3100 1454 .......000001..T
00000010: 6869 7349 7352 4f4f 5400 3cef bdb1 efbd hisIsROOT.<.....
(中略)
00000110: 6421 0002 f000 04ff f000 08ff ffff f00a d!..............
因为在途中有很多其他数据类型的测试,所以我选择省略了这部分。
我确实找到了可能与1字节、2字节和4字节的二进制数据对应的部分。0002 f000 04ff f000 08ff ffff f00a。
原始数据分别为f0、fff0、fffffff0,并且每个字段长度标头分别为02、04、08。看起来很正确。每个字段之间似乎都有一个00。
换句话说,原始数据层面是正确的,但是在通过avro-consumer输出时出现了d或y之类的乱码。…我知道了这一点,但到底为什么会变成d或y仍然是个谜。
由于确认了CDC for Kafka能够正确地将数据输入,从而实现了产品验证的目的,所以我们决定在这里暂时停止。
更改历史
-
- 21-11-04
各フィールドのtypeはUnion形式で指定されているのではないかと@tomotagworkさんより指摘いただき、本文に反映しました。