使用Apache Kafka进行简单的发布/订阅消息传递

这个文件的内容

让我们使用Apache Kafka来实现一个简单的发布/订阅消息传递系统。

    • 特定のトピックに対してメッセージを発行する Publisher アプリケーション

 

    特定のトピックのメッセージを購読する Subscriber アプリケーション
アプリケーション構成.png

建立 Kafka 的运行环境

我决定使用Docker来构建Kafka的运行环境,并且版本是2020年12月31日的2.6.0。

创建一个 docker-compose.yml 文件

创建一个 docker-compose.yml 文件。

    • docker image は wurstmeister/kafka を使用することにしました。

 

    • トピックの自動生成を有効にしました。

Confluent.Kafka ver 1.5.3 で確認したところ、メッセージの発行時にはトピックが自動生成されましたが、メッセージの購読時には自動生成されずに「トピックが存在しない」例外が発生しました。

version: '3'

services:

  test-zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  test-kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: test-zookeeper:2181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

启动Docker容器。

在存放有docker-compose.yml的文件夹中,打开powershell控制台,并执行以下命令。

PS> docker-compose.yml up -d
Creating network "kafka_default" with the default driver
Creating kafka_test-zookeeper_1 ... done
Creating kafka_test-kafka_1     ... done

可以使用以下命令来检查是否成功启动。

PS> docker-compose.yml ps
         Name                       Command               State                         Ports
--------------------------------------------------------------------------------------------------------------------
kafka_test-kafka_1       start-kafka.sh                   Up      0.0.0.0:9092->9092/tcp
kafka_test-zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

停止并删除Docker容器。

在存储docker-compose.yml的文件夹中打开powershell控制台,并执行以下命令。

PS> docker-compose.yml down

确认Kafka的状态

在Docker桌面的仪表盘上查看Kafka的状态。

Kafkaの起動ログ.png

如果启动正常,应该会显示以下日志。

test-kafka_1 | [2020-12-30 04:19:50,593] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)

可以打开 CLI。您可以从 Kafka 实例页面的图标处启动。

CLIの起動.png

要显示已注册主题的列表,请执行以下命令。如果主题不存在,则不会显示任何内容。

/# kafka-topics.sh --zookeeper test-zookeeper:2181 --list

在这里注册一个话题,需要执行以下命令。

/# kafka-topics.sh --zookeeper test-zookeeper:2181 --create --topic test-topic --replication-factor 1 --partitions 1

实现Pub/Sub应用程序

我使用.NET Core 3.1将其实现为控制台应用程序。我使用了Confluent.Kafka版本1.5.3的Kafka库。

完整的源代码已经上传到了 GitHub。

密钥和消息

我们将在应用程序之间协调以下密钥和消息。

/// <summary>
/// キー
/// </summary>
public readonly struct SampleMessageKey : IEquatable<SampleMessageKey>
{
    public SampleMessageKey(string key)
    {
        Key = key;
    }

    public string Key { get; }

    public override bool Equals(object obj)
    {
        return obj is SampleMessageKey key && Equals(key);
    }
    public bool Equals(SampleMessageKey other)
    {
        return Key == other.Key;
    }
    public override int GetHashCode()
    {
        return 990326508 + EqualityComparer<string>.Default.GetHashCode(Key);
    }
    public override string ToString()
    {
        return Key;
    }
}

/// <summary>
/// メッセージ
/// </summary>
public class SampleMessageBody
{
    public SampleMessageBody(DateTimeOffset time, string message)
    {
        Time = time;
        Message = message;
    }

    public DateTimeOffset Time { get; }
    public string Message { get; }

    public override string ToString()
    {
        return Message;
    }
}

出版者应用程序

这是一个控制台应用程序,它以固定间隔向Kafka发布消息。发布消息的目标引导服务器和主题会从控制台输入中接收到。

实现入口点

class Program
{
    static async Task Main(string[] args)
    {
        try
        {
            await RunAsync().ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
        Console.ReadKey();
    }

    /// <summary>
    /// 処理を実行します。
    /// </summary>
    /// <returns></returns>
    static async Task RunAsync()
    {
        int process = System.Diagnostics.Process.GetCurrentProcess().Id;
        Console.WriteLine($"パブリッシャーをプロセス {process} で起動しました。");

        // コンソールからパラメーターを受け取る
        Console.WriteLine("bootstrap servers を入力してください(省略時 127.0.0.1):");
        var bootstrapServers = Console.ReadLine();
        if (string.IsNullOrEmpty(bootstrapServers)) { bootstrapServers = "127.0.0.1"; }

        Console.WriteLine($"トピックを入力してください(省略時 {Constants.DefaultTopic}):");
        var topic = Console.ReadLine();
        if (string.IsNullOrEmpty(topic)) { topic = Constants.DefaultTopic; }

        // キャンセルトークンを生成する
        using var cancelTokenSource = new CancellationTokenSource();
        Console.CancelKeyPress += (_, e) =>
        {
            e.Cancel = true;
            cancelTokenSource.Cancel();
        };

        // 動作設定を生成する
        var publisherSetting = new MessagePublisherSetting()
        {
            BootstrapServers = bootstrapServers
        };

        // キーを生成するメソッド
        static SampleMessageKey GenerateKey()
        {
            return new SampleMessageKey(Guid.NewGuid().ToString());
        }

        // パブリッシャーを生成する
        var factory = new SampleMessagePublisherFactory(publisherSetting, new SampleLogger());

        using IMessagePublisher<SampleMessageBody> publisher
            = factory.CreatePublisher<SampleMessageKey, SampleMessageBody>(topic, GenerateKey);

        Console.WriteLine("メッセージの送信処理を開始します。終了するには Ctrl+C を押してください。");

        // 一定間隔でメッセージを発行する
        int sequence = 0;
        TimeSpan interval = TimeSpan.FromSeconds(5);

        while (true)
        {
            if (cancelTokenSource.Token.IsCancellationRequested) { break; }

            ++sequence;

            await publisher.PublishAsync(
                new SampleMessageBody(DateTimeOffset.UtcNow, $"{sequence}回目のメッセージ(プロセス{process})")
                , cancelTokenSource.Token
                ).ConfigureAwait(false);

            await Task.Delay(interval, cancelTokenSource.Token);
        }

        Console.WriteLine("メッセージの送信処理を終了しました。");
    }
}

出版商的实施

我们采用了简单工厂模式。

/// <summary>
/// パブリッシャーを生成します。
/// </summary>
internal class SampleMessagePublisherFactory : MessagePublisherFactoryBase
{
    /// <summary>
    /// インスタンスを生成します。
    /// </summary>
    /// <param name="publisherSetting">パブリッシャーの動作設定</param>
    /// <param name="logger">ロガー</param>
    internal SampleMessagePublisherFactory(MessagePublisherSetting publisherSetting, ILogger logger)
        : base(publisherSetting, logger)
    {
    }

    /// <summary>
    /// シリアライザを取得します。
    /// </summary>
    /// <typeparam name="T">シリアライズ対象オブジェクトの型</typeparam>
    /// <returns>シリアライザ</returns>
    protected override ISerializer<T> GetSerializer<T>()
    {
        return SampleSerializerFactory.Create<T>();
    }
}

/// <summary>
/// パブリッシャーの生成処理の基本実装。
/// </summary>
public abstract class MessagePublisherFactoryBase
{
    /// <summary>
    /// インスタンスを生成します。
    /// </summary>
    /// <param name="publisherSetting">パブリッシャーの動作設定</param>
    /// <param name="logger">ロガー</param>
    protected MessagePublisherFactoryBase(MessagePublisherSetting publisherSetting, ILogger logger)
    {
        PublisherSetting = publisherSetting;
        Logger = logger;
    }

    /// <summary>
    /// パブリッシャーの動作設定を取得します。
    /// </summary>
    protected MessagePublisherSetting PublisherSetting { get; }

    /// <summary>
    /// ロガーを取得します。
    /// </summary>
    private ILogger Logger { get; }

    /// <summary>
    /// パブリッシャーを生成します。
    /// </summary>
    /// <typeparam name="TKey">キーの型</typeparam>
    /// <typeparam name="TMessage">メッセージの型</typeparam>
    /// <param name="topic">トピック</param>
    /// <param name="keyGenerator">キーの生成処理</param>
    /// <returns>パブリッシャー</returns>
    public MessagePublisher<TKey, TMessage> CreatePublisher<TKey, TMessage>(string topic, Func<TKey> keyGenerator)
    {
        return new MessagePublisher<TKey, TMessage>(
            GetSerializer<TKey>()
            , GetSerializer<TMessage>()
            , PublisherSetting
            , topic
            , keyGenerator
            , Logger
            );
    }

    /// <summary>
    /// シリアライザを取得します。
    /// </summary>
    /// <typeparam name="T">シリアライズ対象オブジェクトの型</typeparam>
    /// <returns>シリアライザ</returns>
    protected abstract ISerializer<T> GetSerializer<T>();
}

出版商内部包含一个对 Kafka 的生产者(IProducer<TKey, TMessage>),并将指定的消息发送到 Kafka。

/// <summary>
/// Kafka にメッセージを送信します。
/// </summary>
/// <typeparam name="TKey">キーの型</typeparam>
/// <typeparam name="TMessage">メッセージの型</typeparam>
public class MessagePublisher<TKey, TMessage> : IMessagePublisher<TMessage>
{
    /// <summary>
    /// インスタンスを生成します。
    /// </summary>
    /// <param name="keySerializer">キーに対するシリアライザ</param>
    /// <param name="messageSerializer">メッセージに対するシリアライザ</param>
    /// <param name="setting">プロデューサーの動作設定</param>
    /// <param name="keyGenerator">キーの生成処理</param>
    /// <param name="topic">トピック</param>
    /// <param name="logger">ロガー</param>
    public MessagePublisher(ISerializer<TKey> keySerializer, ISerializer<TMessage> messageSerializer, MessagePublisherSetting setting, string topic, Func<TKey> keyGenerator, ILogger logger)
    {
        KeySerializer = keySerializer;
        MessageSerializer = messageSerializer;
        Topic = topic;
        KeyGenerator = keyGenerator;
        Logger = logger;
        Producer = BuildProducer(GetProducerConfig(setting));
    }

    /// <summary>
    /// 使用しているリソースを解放します。
    /// </summary>
    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    /// <summary>
    /// 使用しているリソースを解放します。
    /// </summary>
    protected virtual void Dispose(bool disposing)
    {
        TerminateProducer();
    }

    /// <summary>
    /// キーに対するシリアライザを取得します。
    /// </summary>
    private ISerializer<TKey> KeySerializer { get; }

    /// <summary>
    /// メッセージに対するシリアライザを取得します。
    /// </summary>
    private ISerializer<TMessage> MessageSerializer { get; }

    /// <summary>
    /// ロガーを取得します。
    /// </summary>
    private ILogger Logger { get; }

    /// <summary>
    /// トピックを取得します。
    /// </summary>
    private string Topic { get; }

    /// <summary>
    /// キーの生成処理を取得します。
    /// </summary>
    private Func<TKey> KeyGenerator { get; }

    #region プロデューサー

    /// <summary>
    /// プロデューサーを取得します。
    /// </summary>
    private IProducer<TKey, TMessage> Producer { get; }

    /// <summary>
    /// プロデューサーを解放します。
    /// </summary>
    private void TerminateProducer()
    {
        if (Producer == null) { return; }
        Producer.Flush(TimeSpan.FromMilliseconds(10000));
        Producer.Dispose();
    }

    /// <summary>
    /// プロデューサーを生成します。
    /// </summary>
    /// <param name="config">動作設定</param>
    /// <returns>プロデューサー</returns>
    protected virtual IProducer<TKey, TMessage> BuildProducer(IEnumerable<KeyValuePair<string, string>> config)
    {
        var producerBuilder = new ProducerBuilder<TKey, TMessage>(config)
            .SetKeySerializer(KeySerializer)
            .SetValueSerializer(MessageSerializer)
            .SetErrorHandler(OnError)
            ;

        return producerBuilder.Build();
    }

    /// <summary>
    /// プロデューサーの動作設定を取得します。
    /// </summary>
    /// <param name="producerSetting">プロデューサーの動作設定</param>
    /// <returns>動作設定のキーと値の組み合わせ</returns>
    protected virtual IEnumerable<KeyValuePair<string, string>> GetProducerConfig(MessagePublisherSetting producerSetting)
    {
        if (producerSetting.BootstrapServers == null || producerSetting.BootstrapServers == "")
        {
            throw new NullReferenceException("ブートストラップサーバーが設定されていません。");
        }

        return new ProducerConfig()
        {
            BootstrapServers = producerSetting.BootstrapServers,
        };
    }

    /// <summary>
    /// エラーが発生したときの処理を行います。
    /// </summary>
    /// <param name="producer"></param>
    /// <param name="error"></param>
    protected virtual void OnError(IProducer<TKey, TMessage> producer, Error error)
    {
        WriteLog(LogLevel.Error, () => BuildLogMessage(error));
    }

    #endregion

    #region メッセージ発行

    /// <summary>
    /// 指定されたメッセージを発行します。
    /// </summary>
    /// <param name="message">メッセージ</param>
    /// <returns></returns>
    public Task PublishAsync(TMessage message, CancellationToken cancellationToken)
    {
        var kafkaMessage = new Message<TKey, TMessage>()
        {
            Key = GenerateNewKey(),
            Value = message,
            Timestamp = new Timestamp(DateTimeOffset.UtcNow)
        };

        return Producer.ProduceAsync(Topic, kafkaMessage, cancellationToken)
            .ContinueWith(t => OnPublished(t.Result));
    }

    /// <summary>
    /// メッセージを発行したときの処理を行います。
    /// </summary>
    /// <param name="result">発行の結果</param>
    protected virtual void OnPublished(DeliveryResult<TKey, TMessage> result)
    {
        WriteLog(LogLevel.Debug, () => BuildLogMessage(result));
    }

    /// <summary>
    /// 新しいキーを生成します。
    /// </summary>
    /// <returns>キー</returns>
    private TKey GenerateNewKey()
    {
        return KeyGenerator();
    }

    #endregion

    #region ロギング

    /// <summary>
    /// 指定されたログを出力します。
    /// </summary>
    /// <param name="level">ログレベル</param>
    /// <param name="messageBuilder">ログメッセージを生成するメソッド</param>
    /// <param name="exception">例外</param>
    private void WriteLog(LogLevel level, Func<string> messageBuilder, Exception? exception = null)
    {
        if (!Logger.IsEnabled(level)) { return; }

        if (exception == null)
        {
            Logger.Log(level, messageBuilder());
        }
        else
        {
            Logger.Log(level, exception, messageBuilder());
        }
    }

    private string BuildLogMessage(DeliveryResult<TKey, TMessage> result)
    {
        return $"メッセージを発行しました。[{result.Topic}:{result.Offset}] {result.Message.Value}";
    }

    private string BuildLogMessage(Error error)
    {
        return error.Reason;
    }

    #endregion
}

订阅者应用程序

这是一个控制台应用程序,用于订阅Kafka的消息。订阅的引导服务器和主题将从控制台输入中接收。我们采用了使用ReactiveExtensions(System.Reactive)的可观察模式。

实现入口点

class Program
{
    static async Task Main(string[] args)
    {
        try
        {
            await RunAsync().ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
        Console.ReadKey();
    }

    static async Task RunAsync()
    {
        int process = System.Diagnostics.Process.GetCurrentProcess().Id;
        Console.WriteLine($"サブスクライバーをプロセス {process} で起動しました。");

        // コンソールからパラメーターを受け取る
        Console.WriteLine("bootstrap servers を入力してください(省略時 127.0.0.1):");
        var bootstrapServers = Console.ReadLine();
        if (string.IsNullOrEmpty(bootstrapServers)) { bootstrapServers = "127.0.0.1"; }

        Console.WriteLine($"コンシューマーグループIDを入力してください(省略時 {Constants.DefaultComsumerGroupID}):");
        var groupID = Console.ReadLine();
        if (string.IsNullOrEmpty(groupID)) { groupID = Constants.DefaultComsumerGroupID; }

        Console.WriteLine($"トピックを入力してください(省略時 {Constants.DefaultTopic}):");
        var topic = Console.ReadLine();
        if (string.IsNullOrEmpty(topic)) { topic = Constants.DefaultTopic; }

        // キャンセルトークンを生成する
        using var cancelTokenSource = new CancellationTokenSource();
        Console.CancelKeyPress += (_, e) =>
        {
            e.Cancel = true;
            cancelTokenSource.Cancel();
        };

        // 動作設定を生成する
        var subscriberSetting = new MessageSubscriberSetting()
        {
            BootstrapServers = bootstrapServers,
            ConsumerGroupID = groupID
        };

        // observable パターンでメッセージを監視する
        var factory = new SampleMessageSubscriberFactory(subscriberSetting, new SampleLogger());

        Console.WriteLine($"メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。");

        var subscriber = factory.CreateSubscriber<SampleMessageKey, SampleMessageBody>(topic);
        using var releaser = subscriber.Subscribe(new SampleMessageObserver());

        await subscriber.SubscribeAsync(cancelTokenSource.Token).ConfigureAwait(false);

        Console.WriteLine("メッセージの受信処理を終了しました。");
    }
}

订阅者实施

我们采用了简单工厂模式。

/// <summary>
/// サブスクライバーを生成します。
/// </summary>
internal class SampleMessageSubscriberFactory : MessageSubscriberFactoryBase
{
    /// <summary>
    /// インスタンスを生成します。
    /// </summary>
    /// <param name="subscriberSetting">サブスクライバーの動作設定</param>
    /// <param name="logger">ロガー</param>
    internal SampleMessageSubscriberFactory(MessageSubscriberSetting subscriberSetting, ILogger logger)
        : base(subscriberSetting, logger)
    {
    }

    /// <summary>
    /// シリアライザを取得します。
    /// </summary>
    /// <typeparam name="T">シリアライズ対象オブジェクトの型</typeparam>
    /// <returns>シリアライザ</returns>
    protected override IDeserializer<T> GetDeserializer<T>()
    {
        return SampleSerializerFactory.Create<T>();
    }
}

/// <summary>
/// サブスクライバーの生成処理の基本実装。
/// </summary>
public abstract class MessageSubscriberFactoryBase
{
    /// <summary>
    /// インスタンスを生成します。
    /// </summary>
    /// <param name="subscriberSetting">サブスクライバーの動作設定</param>
    /// <param name="logger">ロガー</param>
    protected MessageSubscriberFactoryBase(MessageSubscriberSetting subscriberSetting, ILogger logger)
    {
        SubscriberSetting = subscriberSetting;
        Logger = logger;
    }

    /// <summary>
    /// サブスクライバーの動作設定を取得します。
    /// </summary>
    private MessageSubscriberSetting SubscriberSetting { get; }

    /// <summary>
    /// ロガーを取得します。
    /// </summary>
    private ILogger Logger { get; }

    /// <summary>
    /// サブスクライバーを生成します。
    /// </summary>
    /// <typeparam name="TKey">キーの型</typeparam>
    /// <typeparam name="TMessage">メッセージの型</typeparam>
    /// <returns>サブスクライバー</returns>
    public MessageSubscriber<TKey, TMessage> CreateSubscriber<TKey, TMessage>(string topic)
    {
        return new MessageSubscriber<TKey, TMessage>(
            GetDeserializer<TKey>()
            , GetDeserializer<TMessage>()
            , SubscriberSetting
            , topic
            , Logger
            );
    }

    /// <summary>
    /// シリアライザを取得します。
    /// </summary>
    /// <typeparam name="T">シリアライズ対象オブジェクトの型</typeparam>
    /// <returns>シリアライザ</returns>
    protected abstract IDeserializer<T> GetDeserializer<T>();
}

订阅者包含对Kafka的消费者(IConsumer <TKey,TMessage>),从Kafka接收发布到指定主题的消息。

/// <summary>
/// Kafka からのメッセージを監視します。
/// </summary>
/// <typeparam name="TKey">メッセージのキーの型</typeparam>
/// <typeparam name="TMessage">メッセージの型</typeparam>
public class MessageSubscriber<TKey, TMessage> : System.Reactive.ObservableBase<TMessage>
{
    /// <summary>
    /// インスタンスを生成します。
    /// </summary>
    /// <param name="keyDeserializer">キーに対するデシリアライザ</param>
    /// <param name="messageDeserializer">メッセージに対するデシリアライザ</param>
    /// <param name="subscriberSetting">動作設定</param>
    /// <param name="topic">トピック</param>
    /// <param name="logger">ロガー</param>
    public MessageSubscriber(IDeserializer<TKey> keyDeserializer, IDeserializer<TMessage> messageDeserializer, MessageSubscriberSetting subscriberSetting, string topic, ILogger logger) : base()
    {
        KeyDeserializer = keyDeserializer;
        MessageDeserializer = messageDeserializer;
        SubscriberSetting = subscriberSetting;
        Topic = topic;
        Logger = logger ?? Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance;
    }

    /// <summary>
    /// ロガーを取得します。
    /// </summary>
    private ILogger Logger { get; }

    /// <summary>
    /// 動作設定を取得します。
    /// </summary>
    private MessageSubscriberSetting SubscriberSetting { get; }

    /// <summary>
    /// トピックを取得します。
    /// </summary>
    private string Topic { get; }

    #region 受信

    /// <summary>
    /// キーに対するデシリアライザを取得します。
    /// </summary>
    private IDeserializer<TKey> KeyDeserializer { get; }

    /// <summary>
    /// メッセージに対するデシリアライザを取得します。
    /// </summary>
    private IDeserializer<TMessage> MessageDeserializer { get; }

    /// <summary>
    /// メッセージ受信を開始します。
    /// </summary>
    /// <param name="cancellation">キャンセルトークン</param>
    public Task SubscribeAsync(CancellationToken cancellation)
    {
        Task.Yield();

        TimeSpan interval = SubscriberSetting.ConsumeInterval;

        using var consumer = BuildConsumer(GetConsumerConfig(SubscriberSetting));
        consumer.Subscribe(Topic);

        while (true)
        {
            if (cancellation.IsCancellationRequested) { break; }
            try
            {
                if (m_Observers.Count == 0) { continue; }

                ConsumeResult<TKey, TMessage> result = consumer.Consume(interval);

                if (result == null) { continue; }
                // TODO: 今回の確認では IsPartitionEOF を発生させることができなかった。
                if (result.IsPartitionEOF) { continue; }

                WriteLog(LogLevel.Debug, () => BuildLogMessage(result));
                NotifyMessage(result.Message.Value);

                consumer.Commit(result);
            }
            catch (Exception ex)
            {
                OnException(consumer, ex);
                break;
            }
        }

        NotifyComplated();

        return Task.CompletedTask;
    }

    /// <summary>
    /// コンシューマーを生成します。
    /// </summary>
    /// <param name="config">動作設定</param>
    /// <returns>コンシューマー</returns>
    protected IConsumer<TKey, TMessage> BuildConsumer(IEnumerable<KeyValuePair<string, string>> config)
    {
        var consumerBuilder = new ConsumerBuilder<TKey, TMessage>(config)
            .SetKeyDeserializer(KeyDeserializer)
            .SetValueDeserializer(MessageDeserializer)
            .SetErrorHandler(OnError)
            .SetLogHandler(OnLogging)
            ;

        return consumerBuilder.Build();
    }

    /// <summary>
    /// コンシューマーの動作設定を取得します。
    /// </summary>
    /// <param name="consumerSetting">コンシューマーの動作設定</param>
    /// <returns>動作設定のキーと値の組み合わせ</returns>
    protected IEnumerable<KeyValuePair<string, string>> GetConsumerConfig(MessageSubscriberSetting consumerSetting)
    {
        if (consumerSetting.BootstrapServers == null || consumerSetting.BootstrapServers == "")
        {
            throw new NullReferenceException("ブートストラップサーバーが設定されていません。");
        }

        if (consumerSetting.ConsumerGroupID == null || consumerSetting.ConsumerGroupID == "")
        {
            throw new NullReferenceException("コンシューマーグループIDが設定されていません。");
        }

        return new ConsumerConfig()
        {
            BootstrapServers = consumerSetting.BootstrapServers,
            GroupId = consumerSetting.ConsumerGroupID,
            EnableAutoCommit = false,
            AutoOffsetReset = AutoOffsetReset.Earliest
        };
    }

    /// <summary>
    /// ログを出力します。
    /// </summary>
    /// <param name="consumer"></param>
    /// <param name="log"></param>
    private void OnLogging(IConsumer<TKey, TMessage> consumer, LogMessage log)
    {
        LogLevel logLevel = log.Level.ToLogLevel();
        WriteLog(logLevel, () => BuildLogMessage(log));
    }

    /// <summary>
    /// エラーが発生したときの処理を行います。
    /// </summary>
    /// <param name="consumer"></param>
    /// <param name="error"></param>
    private void OnError(IConsumer<TKey, TMessage> consumer, Error error)
    {
        WriteLog(LogLevel.Error, () => BuildLogMessage(error));
        NotifyError(new Exception(error.Reason));
    }

    /// <summary>
    /// 例外が発生したときの処理を行います。
    /// </summary>
    /// <param name="consumer"></param>
    /// <param name="exception"></param>
    private void OnException(IConsumer<TKey, TMessage> consumer, Exception exception)
    {
        WriteLog(LogLevel.Critical, () => BuildLogMessage(exception), exception);
        NotifyError(exception);
    }

    #endregion

    #region 通知

    /// <summary>
    /// 指定されたメッセージを通知します。
    /// </summary>
    /// <param name="message">メッセージ</param>
    private void NotifyMessage(TMessage message)
    {
        if (m_Observers.Count == 0) { return; }
        lock (m_Observers)
        {
            for (int i = 0; i < m_Observers.Count; ++i)
            {
                m_Observers[i].OnNext(message);
            }
        }
    }

    /// <summary>
    /// 指定されたメッセージを通知します。
    /// </summary>
    /// <param name="exception">例外</param>
    private void NotifyError(Exception exception)
    {
        if (m_Observers.Count == 0) { return; }
        lock (m_Observers)
        {
            for (int i = 0; i < m_Observers.Count; ++i)
            {
                m_Observers[i].OnError(exception);
            }
        }
    }

    /// <summary>
    /// 完了を通知します。
    /// </summary>
    private void NotifyComplated()
    {
        if (m_Observers.Count == 0) { return; }
        lock (m_Observers)
        {
            for (int i = 0; i < m_Observers.Count; ++i)
            {
                m_Observers[i].OnCompleted();
            }
        }
    }

    #endregion

    #region オブザーバー

    /// <summary>
    /// 指定されたオブザーバーによる購読を開始します。
    /// </summary>
    /// <param name="observer"></param>
    /// <returns></returns>
    protected override IDisposable SubscribeCore(IObserver<TMessage> observer)
    {
        AddObserver(observer);
        return System.Reactive.Disposables.Disposable.Create(() => RemoveObserver(observer));
    }

    private readonly List<IObserver<TMessage>> m_Observers = new List<IObserver<TMessage>>();

    /// <summary>
    /// 指定されたオブザーバーを追加します。
    /// </summary>
    /// <param name="observer"></param>
    private void AddObserver(IObserver<TMessage> observer)
    {
        lock (m_Observers)
        {
            m_Observers.Add(observer);
        }
    }

    /// <summary>
    /// 指定されたオブザーバーを削除します。
    /// </summary>
    /// <param name="observer"></param>
    private void RemoveObserver(IObserver<TMessage> observer)
    {
        if (m_Observers.Contains(observer))
        {
            lock (m_Observers)
            {
                m_Observers.Remove(observer);
            }
        }
    }

    #endregion

    #region ロギング

    /// <summary>
    /// 指定されたログを出力します。
    /// </summary>
    /// <param name="level">ログレベル</param>
    /// <param name="messageBuilder">ログメッセージを生成するメソッド</param>
    /// <param name="exception">例外</param>
    private void WriteLog(LogLevel level, Func<string> messageBuilder, Exception? exception = null)
    {
        if (!Logger.IsEnabled(level)) { return; }

        if (exception == null)
        {
            Logger.Log(level, messageBuilder());
        }
        else
        {
            Logger.Log(level, exception, messageBuilder());
        }
    }

    private string BuildLogMessage(ConsumeResult<TKey, TMessage> result)
    {
        return $"メッセージを受け取りました。[{result.Topic}:{result.Offset}] {result.Message.Value}";
    }

    private string BuildLogMessage(LogMessage log)
    {
        return log.Message;
    }

    private string BuildLogMessage(Error error)
    {
        return error.Reason;
    }

    private string BuildLogMessage(Exception exception)
    {
        return exception.Message;
    }

    #endregion
}

确认应用程序的操作。

使用消费者组和主题进行发送和接收控制

启动多个发布者和订阅者以确认消息的发送和接收方式。

パブリッシャー動作Publisher1″Topic-A” に対してメッセージを発行します。Publisher2″Topic-B” に対してメッセージを発行します。Publisher3″Topic-B” に対してメッセージを発行します。
サブスクライバー動作SubScriber1コンシューマーグループ “Group1” に属するコンシューマーで “Topic-A” のメッセージを購読します。SubScriber2コンシューマーグループ “Group1” に属するコンシューマーで “Topic-A” のメッセージを購読します。SubScriber3コンシューマーグループ “Group2” に属するコンシューマーで “Topic-A” のメッセージを購読します。SubScriber4コンシューマーグループ “Group1” に属するコンシューマーで “Topic-B” のメッセージを購読します。

在日志中输出的 “[Topic-A:0]” 表示了主题和该主题的偏移值(每次发行消息时递增的连续编号)。

パブリッシャーをプロセス 5040 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの送信処理を開始します。終了するには Ctrl+C を押してください。
11:33:13.426 [Debug] メッセージを発行しました。[Topic-A:0] 1回目のメッセージ(プロセス5040)
11:33:18.453 [Debug] メッセージを発行しました。[Topic-A:1] 2回目のメッセージ(プロセス5040)
11:33:23.491 [Debug] メッセージを発行しました。[Topic-A:2] 3回目のメッセージ(プロセス5040)
11:33:28.508 [Debug] メッセージを発行しました。[Topic-A:3] 4回目のメッセージ(プロセス5040)
11:33:33.530 [Debug] メッセージを発行しました。[Topic-A:4] 5回目のメッセージ(プロセス5040)
11:33:38.546 [Debug] メッセージを発行しました。[Topic-A:5] 6回目のメッセージ(プロセス5040)
11:33:43.565 [Debug] メッセージを発行しました。[Topic-A:6] 7回目のメッセージ(プロセス5040)
11:33:48.595 [Debug] メッセージを発行しました。[Topic-A:7] 8回目のメッセージ(プロセス5040)
(以下割愛)

有两个进程向 Tobic-B 发出消息。可以看到偏移值在不重复的情况下递增。

パブリッシャーをプロセス 15532 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
トピックを入力してください(省略時 test-topic):
Topic-B
メッセージの送信処理を開始します。終了するには Ctrl+C を押してください。
11:33:21.558 [Debug] メッセージを発行しました。[Topic-B:0] 1回目のメッセージ(プロセス15532)
11:33:26.592 [Debug] メッセージを発行しました。[Topic-B:1] 2回目のメッセージ(プロセス15532)
11:33:31.621 [Debug] メッセージを発行しました。[Topic-B:2] 3回目のメッセージ(プロセス15532)
11:33:36.640 [Debug] メッセージを発行しました。[Topic-B:4] 4回目のメッセージ(プロセス15532)
11:33:41.653 [Debug] メッセージを発行しました。[Topic-B:6] 5回目のメッセージ(プロセス15532)
11:33:46.672 [Debug] メッセージを発行しました。[Topic-B:8] 6回目のメッセージ(プロセス15532)
11:33:51.687 [Debug] メッセージを発行しました。[Topic-B:10] 7回目のメッセージ(プロセス15532)
11:33:56.714 [Debug] メッセージを発行しました。[Topic-B:12] 8回目のメッセージ(プロセス15532)
(以下割愛)
パブリッシャーをプロセス 4928 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
トピックを入力してください(省略時 test-topic):
Topic-B
メッセージの送信処理を開始します。終了するには Ctrl+C を押してください。
11:33:33.667 [Debug] メッセージを発行しました。[Topic-B:3] 1回目のメッセージ(プロセス4928)
11:33:38.698 [Debug] メッセージを発行しました。[Topic-B:5] 2回目のメッセージ(プロセス4928)
11:33:43.718 [Debug] メッセージを発行しました。[Topic-B:7] 3回目のメッセージ(プロセス4928)
11:33:48.738 [Debug] メッセージを発行しました。[Topic-B:9] 4回目のメッセージ(プロセス4928)
11:33:53.764 [Debug] メッセージを発行しました。[Topic-B:11] 5回目のメッセージ(プロセス4928)
11:33:58.790 [Debug] メッセージを発行しました。[Topic-B:13] 6回目のメッセージ(プロセス4928)
11:34:03.800 [Debug] メッセージを発行しました。[Topic-B:15] 7回目のメッセージ(プロセス4928)
11:34:08.809 [Debug] メッセージを発行しました。[Topic-B:17] 8回目のメッセージ(プロセス4928)
(以下割愛)

在Consumer Group1中启动两个进程来订阅Topic-A的消息,只有其中一个进程会接收到消息。停止接收消息的订阅者后,另一个订阅者会开始接收消息。

サブスクライバーをプロセス 11948 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group1
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
11:33:13.493 [Debug] メッセージを受け取りました。[Topic-A:0] 1回目のメッセージ(プロセス5040)
11:33:18.450 [Debug] メッセージを受け取りました。[Topic-A:1] 2回目のメッセージ(プロセス5040)
11:33:23.487 [Debug] メッセージを受け取りました。[Topic-A:2] 3回目のメッセージ(プロセス5040)
11:33:28.510 [Debug] メッセージを受け取りました。[Topic-A:3] 4回目のメッセージ(プロセス5040)
11:33:33.531 [Debug] メッセージを受け取りました。[Topic-A:4] 5回目のメッセージ(プロセス5040)
11:33:38.547 [Debug] メッセージを受け取りました。[Topic-A:5] 6回目のメッセージ(プロセス5040)
11:33:43.566 [Debug] メッセージを受け取りました。[Topic-A:6] 7回目のメッセージ(プロセス5040)
11:33:48.595 [Debug] メッセージを受け取りました。[Topic-A:7] 8回目のメッセージ(プロセス5040)
11:33:53.615 [Debug] メッセージを受け取りました。[Topic-A:8] 9回目のメッセージ(プロセス5040)
11:33:58.633 [Debug] メッセージを受け取りました。[Topic-A:9] 10回目のメッセージ(プロセス5040)
11:34:03.660 [Debug] メッセージを受け取りました。[Topic-A:10] 11回目のメッセージ(プロセス5040)
11:34:08.685 [Debug] メッセージを受け取りました。[Topic-A:11] 12回目のメッセージ(プロセス5040)
11:34:13.701 [Debug] メッセージを受け取りました。[Topic-A:12] 13回目のメッセージ(プロセス5040)
11:34:18.719 [Debug] メッセージを受け取りました。[Topic-A:13] 14回目のメッセージ(プロセス5040)
11:34:23.743 [Debug] メッセージを受け取りました。[Topic-A:14] 15回目のメッセージ(プロセス5040)
11:34:28.765 [Debug] メッセージを受け取りました。[Topic-A:15] 16回目のメッセージ(プロセス5040)
メッセージの受信処理を終了しました。
サブスクライバーをプロセス 13144 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group1
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
11:34:42.843 [Debug] メッセージを受け取りました。[Topic-A:16] 17回目のメッセージ(プロセス5040)
11:34:42.854 [Debug] メッセージを受け取りました。[Topic-A:17] 18回目のメッセージ(プロセス5040)
11:34:43.810 [Debug] メッセージを受け取りました。[Topic-A:18] 19回目のメッセージ(プロセス5040)
11:34:48.821 [Debug] メッセージを受け取りました。[Topic-A:19] 20回目のメッセージ(プロセス5040)

只要消费者群体不同,就会接收所有消息。

サブスクライバーをプロセス 11192 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group2
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
11:33:13.493 [Debug] メッセージを受け取りました。[Topic-A:0] 1回目のメッセージ(プロセス5040)
11:33:18.451 [Debug] メッセージを受け取りました。[Topic-A:1] 2回目のメッセージ(プロセス5040)
11:33:23.484 [Debug] メッセージを受け取りました。[Topic-A:2] 3回目のメッセージ(プロセス5040)
11:33:28.511 [Debug] メッセージを受け取りました。[Topic-A:3] 4回目のメッセージ(プロセス5040)
11:33:33.530 [Debug] メッセージを受け取りました。[Topic-A:4] 5回目のメッセージ(プロセス5040)
11:33:38.547 [Debug] メッセージを受け取りました。[Topic-A:5] 6回目のメッセージ(プロセス5040)
11:33:43.565 [Debug] メッセージを受け取りました。[Topic-A:6] 7回目のメッセージ(プロセス5040)
11:33:48.594 [Debug] メッセージを受け取りました。[Topic-A:7] 8回目のメッセージ(プロセス5040)
11:33:53.616 [Debug] メッセージを受け取りました。[Topic-A:8] 9回目のメッセージ(プロセス5040)
11:33:58.633 [Debug] メッセージを受け取りました。[Topic-A:9] 10回目のメッセージ(プロセス5040)
11:34:03.661 [Debug] メッセージを受け取りました。[Topic-A:10] 11回目のメッセージ(プロセス5040)
11:34:08.684 [Debug] メッセージを受け取りました。[Topic-A:11] 12回目のメッセージ(プロセス5040)
11:34:13.702 [Debug] メッセージを受け取りました。[Topic-A:12] 13回目のメッセージ(プロセス5040)
11:34:18.717 [Debug] メッセージを受け取りました。[Topic-A:13] 14回目のメッセージ(プロセス5040)
11:34:23.743 [Debug] メッセージを受け取りました。[Topic-A:14] 15回目のメッセージ(プロセス5040)
11:34:28.765 [Debug] メッセージを受け取りました。[Topic-A:15] 16回目のメッセージ(プロセス5040)
11:34:33.786 [Debug] メッセージを受け取りました。[Topic-A:16] 17回目のメッセージ(プロセス5040)
11:34:38.797 [Debug] メッセージを受け取りました。[Topic-A:17] 18回目のメッセージ(プロセス5040)
11:34:43.809 [Debug] メッセージを受け取りました。[Topic-A:18] 19回目のメッセージ(プロセス5040)
11:34:48.820 [Debug] メッセージを受け取りました。[Topic-A:19] 20回目のメッセージ(プロセス5040)

主题B由两个进程发出了消息。可以按照发出的顺序接收。

サブスクライバーをプロセス 13912 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group1
トピックを入力してください(省略時 test-topic):
Topic-B
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
11:33:21.625 [Debug] メッセージを受け取りました。[Topic-B:0] 1回目のメッセージ(プロセス15532)
11:33:26.589 [Debug] メッセージを受け取りました。[Topic-B:1] 2回目のメッセージ(プロセス15532)
11:33:31.623 [Debug] メッセージを受け取りました。[Topic-B:2] 3回目のメッセージ(プロセス15532)
11:33:33.650 [Debug] メッセージを受け取りました。[Topic-B:3] 1回目のメッセージ(プロセス4928)
11:33:36.639 [Debug] メッセージを受け取りました。[Topic-B:4] 4回目のメッセージ(プロセス15532)
11:33:38.698 [Debug] メッセージを受け取りました。[Topic-B:5] 2回目のメッセージ(プロセス4928)
11:33:41.655 [Debug] メッセージを受け取りました。[Topic-B:6] 5回目のメッセージ(プロセス15532)
11:33:43.718 [Debug] メッセージを受け取りました。[Topic-B:7] 3回目のメッセージ(プロセス4928)
11:33:46.675 [Debug] メッセージを受け取りました。[Topic-B:8] 6回目のメッセージ(プロセス15532)
11:33:48.735 [Debug] メッセージを受け取りました。[Topic-B:9] 4回目のメッセージ(プロセス4928)
11:33:51.689 [Debug] メッセージを受け取りました。[Topic-B:10] 7回目のメッセージ(プロセス15532)
(以下割愛)

消息的送达保证

确保能接收到在没有订阅者存在的情况下发布的消息。

パブリッシャー動作Publisher1″Topic-A” に対してメッセージを発行します。
サブスクライバー動作SubScriber1Publisher1 からいくつかメッセージが発行された後で起動します。メッセージを受信した後、終了させます。SubScriber2SubScriber1 の終了後、Publisher1 からいくつかメッセージが発行された後で起動します。

由于在前述的验证中,我们一直在运行Kafuka实例,所以Topic-A的偏移值从20开始。

パブリッシャーをプロセス 5096 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの送信処理を開始します。終了するには Ctrl+C を押してください。
12:21:50.114 [Debug] メッセージを発行しました。[Topic-A:20] 1回目のメッセージ(プロセス5096)
12:21:55.149 [Debug] メッセージを発行しました。[Topic-A:21] 2回目のメッセージ(プロセス5096)
12:22:00.164 [Debug] メッセージを発行しました。[Topic-A:22] 3回目のメッセージ(プロセス5096)
12:22:05.183 [Debug] メッセージを発行しました。[Topic-A:23] 4回目のメッセージ(プロセス5096)
12:22:10.201 [Debug] メッセージを発行しました。[Topic-A:24] 5回目のメッセージ(プロセス5096)
12:22:15.215 [Debug] メッセージを発行しました。[Topic-A:25] 6回目のメッセージ(プロセス5096)
12:22:20.239 [Debug] メッセージを発行しました。[Topic-A:26] 7回目のメッセージ(プロセス5096)
12:22:25.255 [Debug] メッセージを発行しました。[Topic-A:27] 8回目のメッセージ(プロセス5096)
12:22:30.280 [Debug] メッセージを発行しました。[Topic-A:28] 9回目のメッセージ(プロセス5096)
12:22:35.295 [Debug] メッセージを発行しました。[Topic-A:29] 10回目のメッセージ(プロセス5096)
12:22:40.311 [Debug] メッセージを発行しました。[Topic-A:30] 11回目のメッセージ(プロセス5096)
12:22:45.327 [Debug] メッセージを発行しました。[Topic-A:31] 12回目のメッセージ(プロセス5096)
12:22:50.344 [Debug] メッセージを発行しました。[Topic-A:32] 13回目のメッセージ(プロセス5096)
12:22:55.360 [Debug] メッセージを発行しました。[Topic-A:33] 14回目のメッセージ(プロセス5096)

当订阅者刚启动时,会一次性接收尚未订阅的 Kafka 消息(20, 21)。

サブスクライバーをプロセス 17640 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group1
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
12:21:58.386 [Debug] メッセージを受け取りました。[Topic-A:20] 1回目のメッセージ(プロセス5096)
12:21:58.397 [Debug] メッセージを受け取りました。[Topic-A:21] 2回目のメッセージ(プロセス5096)
12:22:00.165 [Debug] メッセージを受け取りました。[Topic-A:22] 3回目のメッセージ(プロセス5096)
12:22:05.182 [Debug] メッセージを受け取りました。[Topic-A:23] 4回目のメッセージ(プロセス5096)
12:22:10.202 [Debug] メッセージを受け取りました。[Topic-A:24] 5回目のメッセージ(プロセス5096)
メッセージの受信処理を終了しました。

当订阅者启动后,Kafka 会一次性接收到未读的消息(25, 26, 27, 28)。

bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group1
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
12:22:32.537 [Debug] メッセージを受け取りました。[Topic-A:25] 6回目のメッセージ(プロセス5096)
12:22:32.548 [Debug] メッセージを受け取りました。[Topic-A:26] 7回目のメッセージ(プロセス5096)
12:22:32.550 [Debug] メッセージを受け取りました。[Topic-A:27] 8回目のメッセージ(プロセス5096)
12:22:32.551 [Debug] メッセージを受け取りました。[Topic-A:28] 9回目のメッセージ(プロセス5096)
12:22:35.298 [Debug] メッセージを受け取りました。[Topic-A:29] 10回目のメッセージ(プロセス5096)
12:22:40.314 [Debug] メッセージを受け取りました。[Topic-A:30] 11回目のメッセージ(プロセス5096)
12:22:58.543 [Debug] メッセージを受け取りました。[Topic-A:31] 12回目のメッセージ(プロセス5096)
12:22:58.551 [Debug] メッセージを受け取りました。[Topic-A:32] 13回目のメッセージ(プロセス5096)
12:22:58.560 [Debug] メッセージを受け取りました。[Topic-A:33] 14回目のメッセージ(プロセス5096)
メッセージの受信処理を終了しました。

总结

我能够很容易地实现Pub/Sub。
虽然在产品中使用还缺少一些功能,但我打算将其用于分区和副本的操作验证。

广告
将在 10 秒后关闭
bannerAds