改善 Kafka Trigger 的日志记录

我正在使用自己维护的 Azure Functions Kafka Extension 这个库。实际运行后发现,日志记录不足。

Azure Functions Kafka扩展是使用Confluent.Kafka的C#库,并在其中使用了本地库Libkafka。

通常的执行日志通常会在执行时记录,但当Libkafka出现问题时,例如出现 “Maximum application poll interval exceeded” 之后,尝试自动提交却失败了的消息会从LibKafka中输出,但同时也会从标准输出中显示出来。从运维的角度来看,这是不可接受的。

此外,我还注意到之前的Libkafka库的问题报告中,作者经常建议在出现问题时将Debug选项打开。我希望启用Debug模式。条件如下所述。

    • Libkafka のデバッグの有効化設定をできるようにしたい

 

    Libkafka 由来の Standard Output に出ているログをちゃんとしたログに流したい

我试着做了上述的事情。

開啟除錯功能

启用Libkafka的调试非常简单。只需按照以下步骤进行设置:设定debug。

消费者

debug = broker,topic,msg

制片人

debug = consumer,cgrp,topic,fetch

全套选择

有多种可怕的选项,甚至包括“全部”。

generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, all

使用Confluent.Kafka进行实现。

调试的添加

那么,在这里的Confluent.Kafka 应该怎么处理?

有一个名为 KafkaOptions 的类,它对应 Kafka 的 host.json 项目。如果我们在这里写入,它将自动从 host.json 中读取。由于默认为空字符串会导致问题,所以将其设为 null。

        /// <summary>
        /// Gets or sets the debug option for librdkafka library.
        /// Default = "" (disable)
        /// 
        /// Librdkafka: debug
        /// </summary>
        public string LibkafkaDebug { get; set; } = null; 

只需将此添加到ConsumerConfig和ProducerConfig中即可。这些类的超类是ClientConfig,在那里只需简单地分配给Debug变量即可。这很简单。它将传递设置给Libkafka库。

Kafka监听器 L152。

            ConsumerConfig conf = new ConsumerConfig()
            {
                // enable auto-commit 
                EnableAutoCommit = true,

                // disable auto storing read offsets since we need to store them after calling the trigger function
                EnableAutoOffsetStore = false,

                // Interval in which commits stored in memory will be saved
                AutoCommitIntervalMs = this.options.AutoCommitIntervalMs,

                // Librdkafka debug options               
                Debug = this.options.LibkafkaDebug,

制作方是 KafkaProducerFactory L131,但是它们是类似的,所以可以省略。

日志重定向

现在来看,我一直在想如何窃取正在标准输出中流淌的libkafka日志,但是并非只有我一个人需要它,所以库中很容易就找到了Hook。大致就是这样。

卡夫卡生产者工厂 L76

            var builder = new ProducerBuilder<byte[], byte[]>(producerConfig);
            ILogger logger = this.loggerProvider.CreateLogger("Kafka");
            builder.SetLogHandler((_, m) =>
            {
                logger.LogInformation($"Libkafka: {m?.Message}");
            });

            return builder.Build();

消费者也是类似的,就像 KafkaListener 的 L107 部分一样。

image.png
image.png

令人关注的日志新增内容

现在,当运行时,有时想知道用户是以哪种模式使用函数的。例如,Kafka触发器根据函数定义中的参数是数组还是其他来决定执行逻辑是单个还是多个。但是,我不想频繁地输出日志。因此,我尝试将其放入构造函数中,但实际上,我想要获取函数名。实际上,执行器对象具有函数名,但该库的作者将实现类设为内部类,并且是接口,因此通过反射获取可能是危险的。如果有合适的方法,请告诉我。

L22 单项功能执行器

        public SingleItemFunctionExecutor(ITriggeredFunctionExecutor executor, IConsumer<TKey, TValue> consumer, int channelCapacity, int channelFullRetryIntervalInMs, ICommitStrategy<TKey, TValue> commitStrategy, ILogger logger)
            : base(executor, consumer, channelCapacity, channelFullRetryIntervalInMs, commitStrategy, logger)
        {
            logger.LogInformation($"FunctionExecutor Loaded: {nameof(MultipleItemFunctionExecutor<TKey, TValue>)}");
        }

总结

现在初始计划的日志增强已经完成。顺便也将库进行升级了。

    Upgrade 1.5.0 and improve debugging
广告
将在 10 秒后关闭
bannerAds