Avro、SchemaRegistry的起步

首先

我打算在30岁时加强我的输出意识。虽然第一个主题有点普通,但如果你感兴趣的话。

我正在尝试引入Hadoop、Spark、Kafka和Fluentd等工具来完善内部的日志基础设施。

对于使用Kafka进行序列化,主流方法是使用Avro和SchemaRegistry。然而,很难找到直接可运行的Avro源代码或更深入的解释,我一度感到困惑,无法理解。希望我的困扰能为其他人提供帮助。

我认为只需简要介绍SchemaRegistry,只要了解Avro即可应对其复杂性。

另外,代码已经上传到GitHub上,供参考。

另外,基本上如果已经有优质的内容,我将只提供链接,让您去阅读相关说明。

Avro是什么?

由于它只需一分钟就能阅读完,所以让我们阅读一下Avro官方的概述。

JSON不可以吗?

Confluent的文档中提到了Avro解决JSON缺点的问题。
JSON的优势是与语言无关,但缺点是缺乏严格的格式。
具体而言,

    • データ作成側が自由にフィールドを追加、削除等の変更ができるので、利用側が解釈できなくなる可能性がある。

 

    フィールドや型情報はファイル内では同一であるにもかかわらず、レコードごとに記載するので、冗長である。

在Avro中,只需将模式信息存储在一个文件的开头。此外,通过使用SchemaRegistry,可以对模式进行集中管理和互操作性检查。

模式定义

我认为最好是阅读Avro的文档,但我会简单解释一下。另外,还有一些容易出错或误解的地方,我会进行解释。

以下以Payment2.avsc为例进行说明。

{"namespace": "com.example.kafka",
 "type": "record",
 "name": "Payment2",
 "fields": [
     {"name": "id", "type": "string"},
     {"name": "amount", "type": "double"},
     {"name": "region", "type": ["string", "null"], "default": "NOWHERE"}
 ]
}

命名空间

直接采用命名空间。使用点分隔符进行编写。
据说这里指定的内容将成为自动生成的模式类的包名称。
所以如果是与客户端应用程序不同的包名称,必须明确导入才能引用。

输入类型

在Java中,有原始类型(null、boolean、int、long、float、double、bytes和string)和复杂类型(record、enum、array、map、union和fixed)。

关于[“string”, “null”],这是一个union类型,表示可选的字符串类型。如果不指定此选项,将被视为必填项,并且如果未显式指定region,则会导致与架构不匹配的错误。

姓名

这将成为自动生成的模式的类名。

默认属性

非常重要的属性,但似乎非常难以理解。如果直接引用Avro的规范文件,

默认值:此字段的默认值,在读取不包含此字段的实例时使用(可选)。允许的值取决于字段的模式类型,根据下表所示。联合字段的默认值与联合中的第一个模式相对应。

需要正确记住这句文字通顺的中文翻译:”当读取没有指定默认属性的字段的记录时,将使用默认属性值”。这不是在没有指定值的情况下进行序列化时采用的配置值。它只用于在执行反序列化时使用。它只会在字段不存在时才会被使用,而在值为null时则不会被使用。

具体来说,如同所示的GitHub上的表格所示,

id,amountのみ指定してシリアライズすると、regionはnullとして書き込まれます。

{“id”: “DEF”, “amount”: 200.0, “region”: null}のレコードをこのスキーマで読んでも、regionはnullのままです。

{“id”: “DEF”, “amount”: 200.0}のレコードをこのスキーマで読むと、regionはNOWHEREになります。

为什么第三条记录的id=GHI的记录在序列化时没有被指定,却会变成NOWHERE呢?我在这里纠结了2、3个小时,突然看到了GettingStarted页面上的以下说明。

如示例所示,Avro对象可以通过直接调用构造函数或使用构建器来创建。与构造函数不同的是,构建器将自动设置模式中指定的任何默认值。

明白了!在使用构建器模式从模式类生成时,未指定的字段将会设置为默认值。

请到此处查看样例代码,并且在avro文件中可以看到实际上写入了「NOWHERE」。

在程式中執行

在pom.xml文件中,指定了模式。默认情况下,sourceDirectory中的*.avsc格式的文件会被加载,但也可以明确指定,如此处所显示。

有两种方式来进行代码生成,一种是Specific方式,另一种是Generic方式,在这里称之为。

特定的方式

如果在使用Maven,它可以自动为我们生成Java代码的Schema。我们可以在pom.xml中指定生成的目标路径。由于有一些人可能想要查看生成后的源文件,我将它们放在这里,但一般情况下,这类文件应该是不需要被版本控制的。当我们浏览源代码时,可以确认,正如在GettingStarted中所提到的,我们可以使用构造函数方式、set方式和builder方式来生成Schema对象。
由于使用了这种方式,所以在更改Schema定义时,需要进行编译。

标准的方式

这个选项更简单。它在运行时不会生成代码,而是参考模式定义。因此,我认为在想要动态更改方案时应选择这个选项。另外,当反序列化时,如果要指定与序列化时相同的模式,则无需指定模式。这是因为avro文件本身就包含模式定义。很多人错误地认为需要指定模式,可能是因为GettingStarted示例代码中指定了模式(我自己也曾误解过,但经过实际验证已确认)。

Avro文件

您可以在这里查看文件的内容。您大概可以了解它是如何进行序列化的。

你可以在这附近找到详细的说明。
可以明确地看到对象以Obj开头,并且在avro.schema中只存在一个模式定义,之后紧跟着数据。

模式解析

当写入和读取的模式不同时,如何解决这个问题。

也许有人会认为「只需要一直使用相同的架构就好了」,但是在使用Kafka等流处理时,即使同时更新Producer和Consumer的应用程序,也可能会读取到老旧架构定义的主题数据;而且如果Producer和Consumer位于不同的服务器、团队或公司进行管理,同时进行完全的更改将变得更加困难。

具体的的解决方法请参考此处,但我了解到以下两点非常重要。

如果读者的记录模式包含一个包含默认值的字段,而写者的模式没有具有相同名称的字段,则读者应该使用其字段的默认值。
如果读者的记录模式中没有默认值的字段,而写者的模式没有具有相同名称的字段,则会发出错误信号。

另外,我已经在GitHub上发布了Schema Resolution的样本代码结果,供您参考。

模式注册表

对于SchemaRegistry而言,只要能够很好地理解Avro,就不会太难。
此外,Confluent的文档非常出色,并且在GitHub上有示例代码,与文档相互链接,所以我就很少需要自己写东西了,但我稍微总结一下。
顺便说一下,我读了以下两个引用,就大概明白了。

    • Confluent Schema Registry Tutorial

 

    Schema Evolution and Compatibility

功能

在Kafka中,使用Avro进行序列化时,它提供以下两种功能。

    • スキーマを一元管理できます。RESTAPI経由で、Kafkaのproducer,consumerを介さずに、スキーマ参照、作成等できます。

 

    スキーマの変更において、互換性チェックができます。

模式管理

在SchemaRegistry上管理着Schema信息,并为其分配了称为schema id的标识。对于Kafka的消息而言,不需要保存全部的Schema信息,只需保留schema id即可。其操作流程如下:

    1. 当producer有一个新的schema时,它会将其发送到SchemaRegistry,然后获取返回的schema id。

 

    1. producer会将schema和对应的schema id保存在缓存中。

 

    1. 当consumer有一个新的schema id时,它会将其发送到SchemaRegistry,然后获取返回的schema。

 

    consumer会将schema id和对应的schema保存在缓存中。

在生产者和消费者之间,如果在缓存中存在,则不会发生与SchemaRegistry的通信。
默认情况下,生产者会自动注册新的模式,但是在文档中建议在生产环境中不要自动注册,而是通过REST API来进行注册。

此外,我原本误解了一个问题。在consumer端以Specific格式消费消息时,我尝试指定了新的模式,它也能成功地进行反序列化。我原以为它已经在SchemaRegistry上注册了该模式,但实际上没有。看来只有producer和REST API能够在SchemaRegistry上进行注册。

进行互换性检查 (Carry out intercompatibility check)

我會將詳細資訊放在文件中。然而,默認設定是

反向:(默认)使用新架构的消费者可以读取由使用最新注册架构的生产者写入的数据。

请注意:根据某些规定,我们仅保证与最新版本的向后兼容性。例如,当模式从Schema1进化到Schema2再到Schema3时,可能存在以下情况:Schema1与Schema2之间以及Schema2与Schema3之间存在向后兼容性,但是Schema1与Schema3之间可能不存在向后兼容性。(已经过验证)

    • スキーマ3でcosumerがconsumeするときに、kafkaトピック上にスキーマ1が残っており、エラーになる

 

    スキーマ2の後にスキーマ1に戻し、その後にスキーマ3にエラーなく、あげることが可能(最新のバージョンはあくまでスキーマ2なので)。この場合、consumerはスキーマ3にあげると、スキーマ1をconsumeすることになり、エラーになります。

示例代码

建议您在这里尝试运行Specific方式。这是教程中使用的示例代码。
建议您在这里尝试运行Generic方式。这是Confluent的KafkaStreams示例代码的一部分,但无需了解KafkaStreams的知识,所以请放心尝试。

额外福利

可以在现有的Avro文件中添加数据。
具体来说,只需要将代码中的dataFileWriter.create(p1.getSchema(), file)一行改为dataFileWriter.appendTo(file)即可。

我对于能否向现有的Avro文件中添加具有不同模式记录的问题表示了一丝疑虑,所以尝试了一下。
当然,正常情况下是不会这么做的!

如果追加了包括类型在内的不同模式的对象

org.apache.avro.file.DataFileWriter$AppendWriteException: org.apache.avro.UnresolvedUnionException: 不在联合类型[“int”,”null”]中:900.0
编译通过,但在写入时出现“类型不匹配”的错误。

如果只是添加具有不同属性名称的对象,则包括格式在内的所有格式都是相同的。

正常进行。
然后,尝试以无模式读取记录,发现属性名称是基于源字段名加载的。

最后

听起来很简单,但实际上却比想象中困难,这是我的坦率感受。
是否要引入它还是一个犹豫不决的问题。。

由于我对Avro和模式注册表还只是一个刚开始学习的人,所以如果有任何错误,请温柔地提醒我。

广告
将在 10 秒后关闭
bannerAds