使用Apache Kafka进行简单的发布/订阅消息传递
这个文件的内容
让我们使用Apache Kafka来实现一个简单的发布/订阅消息传递系统。
-
- 特定のトピックに対してメッセージを発行する Publisher アプリケーション
- 特定のトピックのメッセージを購読する Subscriber アプリケーション
建立 Kafka 的运行环境
我决定使用Docker来构建Kafka的运行环境,并且版本是2020年12月31日的2.6.0。
创建一个 docker-compose.yml 文件
创建一个 docker-compose.yml 文件。
-
- docker image は wurstmeister/kafka を使用することにしました。
-
- トピックの自動生成を有効にしました。
Confluent.Kafka ver 1.5.3 で確認したところ、メッセージの発行時にはトピックが自動生成されましたが、メッセージの購読時には自動生成されずに「トピックが存在しない」例外が発生しました。
version: '3'
services:
test-zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
test-kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: test-zookeeper:2181
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
启动Docker容器。
在存放有docker-compose.yml的文件夹中,打开powershell控制台,并执行以下命令。
PS> docker-compose.yml up -d
Creating network "kafka_default" with the default driver
Creating kafka_test-zookeeper_1 ... done
Creating kafka_test-kafka_1 ... done
可以使用以下命令来检查是否成功启动。
PS> docker-compose.yml ps
Name Command State Ports
--------------------------------------------------------------------------------------------------------------------
kafka_test-kafka_1 start-kafka.sh Up 0.0.0.0:9092->9092/tcp
kafka_test-zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
停止并删除Docker容器。
在存储docker-compose.yml的文件夹中打开powershell控制台,并执行以下命令。
PS> docker-compose.yml down
确认Kafka的状态
在Docker桌面的仪表盘上查看Kafka的状态。
如果启动正常,应该会显示以下日志。
test-kafka_1 | [2020-12-30 04:19:50,593] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)
可以打开 CLI。您可以从 Kafka 实例页面的图标处启动。
要显示已注册主题的列表,请执行以下命令。如果主题不存在,则不会显示任何内容。
/# kafka-topics.sh --zookeeper test-zookeeper:2181 --list
在这里注册一个话题,需要执行以下命令。
/# kafka-topics.sh --zookeeper test-zookeeper:2181 --create --topic test-topic --replication-factor 1 --partitions 1
实现Pub/Sub应用程序
我使用.NET Core 3.1将其实现为控制台应用程序。我使用了Confluent.Kafka版本1.5.3的Kafka库。
完整的源代码已经上传到了 GitHub。
密钥和消息
我们将在应用程序之间协调以下密钥和消息。
/// <summary>
/// キー
/// </summary>
public readonly struct SampleMessageKey : IEquatable<SampleMessageKey>
{
public SampleMessageKey(string key)
{
Key = key;
}
public string Key { get; }
public override bool Equals(object obj)
{
return obj is SampleMessageKey key && Equals(key);
}
public bool Equals(SampleMessageKey other)
{
return Key == other.Key;
}
public override int GetHashCode()
{
return 990326508 + EqualityComparer<string>.Default.GetHashCode(Key);
}
public override string ToString()
{
return Key;
}
}
/// <summary>
/// メッセージ
/// </summary>
public class SampleMessageBody
{
public SampleMessageBody(DateTimeOffset time, string message)
{
Time = time;
Message = message;
}
public DateTimeOffset Time { get; }
public string Message { get; }
public override string ToString()
{
return Message;
}
}
出版者应用程序
这是一个控制台应用程序,它以固定间隔向Kafka发布消息。发布消息的目标引导服务器和主题会从控制台输入中接收到。
实现入口点
class Program
{
static async Task Main(string[] args)
{
try
{
await RunAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
Console.ReadKey();
}
/// <summary>
/// 処理を実行します。
/// </summary>
/// <returns></returns>
static async Task RunAsync()
{
int process = System.Diagnostics.Process.GetCurrentProcess().Id;
Console.WriteLine($"パブリッシャーをプロセス {process} で起動しました。");
// コンソールからパラメーターを受け取る
Console.WriteLine("bootstrap servers を入力してください(省略時 127.0.0.1):");
var bootstrapServers = Console.ReadLine();
if (string.IsNullOrEmpty(bootstrapServers)) { bootstrapServers = "127.0.0.1"; }
Console.WriteLine($"トピックを入力してください(省略時 {Constants.DefaultTopic}):");
var topic = Console.ReadLine();
if (string.IsNullOrEmpty(topic)) { topic = Constants.DefaultTopic; }
// キャンセルトークンを生成する
using var cancelTokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancelTokenSource.Cancel();
};
// 動作設定を生成する
var publisherSetting = new MessagePublisherSetting()
{
BootstrapServers = bootstrapServers
};
// キーを生成するメソッド
static SampleMessageKey GenerateKey()
{
return new SampleMessageKey(Guid.NewGuid().ToString());
}
// パブリッシャーを生成する
var factory = new SampleMessagePublisherFactory(publisherSetting, new SampleLogger());
using IMessagePublisher<SampleMessageBody> publisher
= factory.CreatePublisher<SampleMessageKey, SampleMessageBody>(topic, GenerateKey);
Console.WriteLine("メッセージの送信処理を開始します。終了するには Ctrl+C を押してください。");
// 一定間隔でメッセージを発行する
int sequence = 0;
TimeSpan interval = TimeSpan.FromSeconds(5);
while (true)
{
if (cancelTokenSource.Token.IsCancellationRequested) { break; }
++sequence;
await publisher.PublishAsync(
new SampleMessageBody(DateTimeOffset.UtcNow, $"{sequence}回目のメッセージ(プロセス{process})")
, cancelTokenSource.Token
).ConfigureAwait(false);
await Task.Delay(interval, cancelTokenSource.Token);
}
Console.WriteLine("メッセージの送信処理を終了しました。");
}
}
出版商的实施
我们采用了简单工厂模式。
/// <summary>
/// パブリッシャーを生成します。
/// </summary>
internal class SampleMessagePublisherFactory : MessagePublisherFactoryBase
{
/// <summary>
/// インスタンスを生成します。
/// </summary>
/// <param name="publisherSetting">パブリッシャーの動作設定</param>
/// <param name="logger">ロガー</param>
internal SampleMessagePublisherFactory(MessagePublisherSetting publisherSetting, ILogger logger)
: base(publisherSetting, logger)
{
}
/// <summary>
/// シリアライザを取得します。
/// </summary>
/// <typeparam name="T">シリアライズ対象オブジェクトの型</typeparam>
/// <returns>シリアライザ</returns>
protected override ISerializer<T> GetSerializer<T>()
{
return SampleSerializerFactory.Create<T>();
}
}
/// <summary>
/// パブリッシャーの生成処理の基本実装。
/// </summary>
public abstract class MessagePublisherFactoryBase
{
/// <summary>
/// インスタンスを生成します。
/// </summary>
/// <param name="publisherSetting">パブリッシャーの動作設定</param>
/// <param name="logger">ロガー</param>
protected MessagePublisherFactoryBase(MessagePublisherSetting publisherSetting, ILogger logger)
{
PublisherSetting = publisherSetting;
Logger = logger;
}
/// <summary>
/// パブリッシャーの動作設定を取得します。
/// </summary>
protected MessagePublisherSetting PublisherSetting { get; }
/// <summary>
/// ロガーを取得します。
/// </summary>
private ILogger Logger { get; }
/// <summary>
/// パブリッシャーを生成します。
/// </summary>
/// <typeparam name="TKey">キーの型</typeparam>
/// <typeparam name="TMessage">メッセージの型</typeparam>
/// <param name="topic">トピック</param>
/// <param name="keyGenerator">キーの生成処理</param>
/// <returns>パブリッシャー</returns>
public MessagePublisher<TKey, TMessage> CreatePublisher<TKey, TMessage>(string topic, Func<TKey> keyGenerator)
{
return new MessagePublisher<TKey, TMessage>(
GetSerializer<TKey>()
, GetSerializer<TMessage>()
, PublisherSetting
, topic
, keyGenerator
, Logger
);
}
/// <summary>
/// シリアライザを取得します。
/// </summary>
/// <typeparam name="T">シリアライズ対象オブジェクトの型</typeparam>
/// <returns>シリアライザ</returns>
protected abstract ISerializer<T> GetSerializer<T>();
}
出版商内部包含一个对 Kafka 的生产者(IProducer<TKey, TMessage>),并将指定的消息发送到 Kafka。
/// <summary>
/// Kafka にメッセージを送信します。
/// </summary>
/// <typeparam name="TKey">キーの型</typeparam>
/// <typeparam name="TMessage">メッセージの型</typeparam>
public class MessagePublisher<TKey, TMessage> : IMessagePublisher<TMessage>
{
/// <summary>
/// インスタンスを生成します。
/// </summary>
/// <param name="keySerializer">キーに対するシリアライザ</param>
/// <param name="messageSerializer">メッセージに対するシリアライザ</param>
/// <param name="setting">プロデューサーの動作設定</param>
/// <param name="keyGenerator">キーの生成処理</param>
/// <param name="topic">トピック</param>
/// <param name="logger">ロガー</param>
public MessagePublisher(ISerializer<TKey> keySerializer, ISerializer<TMessage> messageSerializer, MessagePublisherSetting setting, string topic, Func<TKey> keyGenerator, ILogger logger)
{
KeySerializer = keySerializer;
MessageSerializer = messageSerializer;
Topic = topic;
KeyGenerator = keyGenerator;
Logger = logger;
Producer = BuildProducer(GetProducerConfig(setting));
}
/// <summary>
/// 使用しているリソースを解放します。
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// 使用しているリソースを解放します。
/// </summary>
protected virtual void Dispose(bool disposing)
{
TerminateProducer();
}
/// <summary>
/// キーに対するシリアライザを取得します。
/// </summary>
private ISerializer<TKey> KeySerializer { get; }
/// <summary>
/// メッセージに対するシリアライザを取得します。
/// </summary>
private ISerializer<TMessage> MessageSerializer { get; }
/// <summary>
/// ロガーを取得します。
/// </summary>
private ILogger Logger { get; }
/// <summary>
/// トピックを取得します。
/// </summary>
private string Topic { get; }
/// <summary>
/// キーの生成処理を取得します。
/// </summary>
private Func<TKey> KeyGenerator { get; }
#region プロデューサー
/// <summary>
/// プロデューサーを取得します。
/// </summary>
private IProducer<TKey, TMessage> Producer { get; }
/// <summary>
/// プロデューサーを解放します。
/// </summary>
private void TerminateProducer()
{
if (Producer == null) { return; }
Producer.Flush(TimeSpan.FromMilliseconds(10000));
Producer.Dispose();
}
/// <summary>
/// プロデューサーを生成します。
/// </summary>
/// <param name="config">動作設定</param>
/// <returns>プロデューサー</returns>
protected virtual IProducer<TKey, TMessage> BuildProducer(IEnumerable<KeyValuePair<string, string>> config)
{
var producerBuilder = new ProducerBuilder<TKey, TMessage>(config)
.SetKeySerializer(KeySerializer)
.SetValueSerializer(MessageSerializer)
.SetErrorHandler(OnError)
;
return producerBuilder.Build();
}
/// <summary>
/// プロデューサーの動作設定を取得します。
/// </summary>
/// <param name="producerSetting">プロデューサーの動作設定</param>
/// <returns>動作設定のキーと値の組み合わせ</returns>
protected virtual IEnumerable<KeyValuePair<string, string>> GetProducerConfig(MessagePublisherSetting producerSetting)
{
if (producerSetting.BootstrapServers == null || producerSetting.BootstrapServers == "")
{
throw new NullReferenceException("ブートストラップサーバーが設定されていません。");
}
return new ProducerConfig()
{
BootstrapServers = producerSetting.BootstrapServers,
};
}
/// <summary>
/// エラーが発生したときの処理を行います。
/// </summary>
/// <param name="producer"></param>
/// <param name="error"></param>
protected virtual void OnError(IProducer<TKey, TMessage> producer, Error error)
{
WriteLog(LogLevel.Error, () => BuildLogMessage(error));
}
#endregion
#region メッセージ発行
/// <summary>
/// 指定されたメッセージを発行します。
/// </summary>
/// <param name="message">メッセージ</param>
/// <returns></returns>
public Task PublishAsync(TMessage message, CancellationToken cancellationToken)
{
var kafkaMessage = new Message<TKey, TMessage>()
{
Key = GenerateNewKey(),
Value = message,
Timestamp = new Timestamp(DateTimeOffset.UtcNow)
};
return Producer.ProduceAsync(Topic, kafkaMessage, cancellationToken)
.ContinueWith(t => OnPublished(t.Result));
}
/// <summary>
/// メッセージを発行したときの処理を行います。
/// </summary>
/// <param name="result">発行の結果</param>
protected virtual void OnPublished(DeliveryResult<TKey, TMessage> result)
{
WriteLog(LogLevel.Debug, () => BuildLogMessage(result));
}
/// <summary>
/// 新しいキーを生成します。
/// </summary>
/// <returns>キー</returns>
private TKey GenerateNewKey()
{
return KeyGenerator();
}
#endregion
#region ロギング
/// <summary>
/// 指定されたログを出力します。
/// </summary>
/// <param name="level">ログレベル</param>
/// <param name="messageBuilder">ログメッセージを生成するメソッド</param>
/// <param name="exception">例外</param>
private void WriteLog(LogLevel level, Func<string> messageBuilder, Exception? exception = null)
{
if (!Logger.IsEnabled(level)) { return; }
if (exception == null)
{
Logger.Log(level, messageBuilder());
}
else
{
Logger.Log(level, exception, messageBuilder());
}
}
private string BuildLogMessage(DeliveryResult<TKey, TMessage> result)
{
return $"メッセージを発行しました。[{result.Topic}:{result.Offset}] {result.Message.Value}";
}
private string BuildLogMessage(Error error)
{
return error.Reason;
}
#endregion
}
订阅者应用程序
这是一个控制台应用程序,用于订阅Kafka的消息。订阅的引导服务器和主题将从控制台输入中接收。我们采用了使用ReactiveExtensions(System.Reactive)的可观察模式。
实现入口点
class Program
{
static async Task Main(string[] args)
{
try
{
await RunAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
Console.ReadKey();
}
static async Task RunAsync()
{
int process = System.Diagnostics.Process.GetCurrentProcess().Id;
Console.WriteLine($"サブスクライバーをプロセス {process} で起動しました。");
// コンソールからパラメーターを受け取る
Console.WriteLine("bootstrap servers を入力してください(省略時 127.0.0.1):");
var bootstrapServers = Console.ReadLine();
if (string.IsNullOrEmpty(bootstrapServers)) { bootstrapServers = "127.0.0.1"; }
Console.WriteLine($"コンシューマーグループIDを入力してください(省略時 {Constants.DefaultComsumerGroupID}):");
var groupID = Console.ReadLine();
if (string.IsNullOrEmpty(groupID)) { groupID = Constants.DefaultComsumerGroupID; }
Console.WriteLine($"トピックを入力してください(省略時 {Constants.DefaultTopic}):");
var topic = Console.ReadLine();
if (string.IsNullOrEmpty(topic)) { topic = Constants.DefaultTopic; }
// キャンセルトークンを生成する
using var cancelTokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancelTokenSource.Cancel();
};
// 動作設定を生成する
var subscriberSetting = new MessageSubscriberSetting()
{
BootstrapServers = bootstrapServers,
ConsumerGroupID = groupID
};
// observable パターンでメッセージを監視する
var factory = new SampleMessageSubscriberFactory(subscriberSetting, new SampleLogger());
Console.WriteLine($"メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。");
var subscriber = factory.CreateSubscriber<SampleMessageKey, SampleMessageBody>(topic);
using var releaser = subscriber.Subscribe(new SampleMessageObserver());
await subscriber.SubscribeAsync(cancelTokenSource.Token).ConfigureAwait(false);
Console.WriteLine("メッセージの受信処理を終了しました。");
}
}
订阅者实施
我们采用了简单工厂模式。
/// <summary>
/// サブスクライバーを生成します。
/// </summary>
internal class SampleMessageSubscriberFactory : MessageSubscriberFactoryBase
{
/// <summary>
/// インスタンスを生成します。
/// </summary>
/// <param name="subscriberSetting">サブスクライバーの動作設定</param>
/// <param name="logger">ロガー</param>
internal SampleMessageSubscriberFactory(MessageSubscriberSetting subscriberSetting, ILogger logger)
: base(subscriberSetting, logger)
{
}
/// <summary>
/// シリアライザを取得します。
/// </summary>
/// <typeparam name="T">シリアライズ対象オブジェクトの型</typeparam>
/// <returns>シリアライザ</returns>
protected override IDeserializer<T> GetDeserializer<T>()
{
return SampleSerializerFactory.Create<T>();
}
}
/// <summary>
/// サブスクライバーの生成処理の基本実装。
/// </summary>
public abstract class MessageSubscriberFactoryBase
{
/// <summary>
/// インスタンスを生成します。
/// </summary>
/// <param name="subscriberSetting">サブスクライバーの動作設定</param>
/// <param name="logger">ロガー</param>
protected MessageSubscriberFactoryBase(MessageSubscriberSetting subscriberSetting, ILogger logger)
{
SubscriberSetting = subscriberSetting;
Logger = logger;
}
/// <summary>
/// サブスクライバーの動作設定を取得します。
/// </summary>
private MessageSubscriberSetting SubscriberSetting { get; }
/// <summary>
/// ロガーを取得します。
/// </summary>
private ILogger Logger { get; }
/// <summary>
/// サブスクライバーを生成します。
/// </summary>
/// <typeparam name="TKey">キーの型</typeparam>
/// <typeparam name="TMessage">メッセージの型</typeparam>
/// <returns>サブスクライバー</returns>
public MessageSubscriber<TKey, TMessage> CreateSubscriber<TKey, TMessage>(string topic)
{
return new MessageSubscriber<TKey, TMessage>(
GetDeserializer<TKey>()
, GetDeserializer<TMessage>()
, SubscriberSetting
, topic
, Logger
);
}
/// <summary>
/// シリアライザを取得します。
/// </summary>
/// <typeparam name="T">シリアライズ対象オブジェクトの型</typeparam>
/// <returns>シリアライザ</returns>
protected abstract IDeserializer<T> GetDeserializer<T>();
}
订阅者包含对Kafka的消费者(IConsumer <TKey,TMessage>),从Kafka接收发布到指定主题的消息。
/// <summary>
/// Kafka からのメッセージを監視します。
/// </summary>
/// <typeparam name="TKey">メッセージのキーの型</typeparam>
/// <typeparam name="TMessage">メッセージの型</typeparam>
public class MessageSubscriber<TKey, TMessage> : System.Reactive.ObservableBase<TMessage>
{
/// <summary>
/// インスタンスを生成します。
/// </summary>
/// <param name="keyDeserializer">キーに対するデシリアライザ</param>
/// <param name="messageDeserializer">メッセージに対するデシリアライザ</param>
/// <param name="subscriberSetting">動作設定</param>
/// <param name="topic">トピック</param>
/// <param name="logger">ロガー</param>
public MessageSubscriber(IDeserializer<TKey> keyDeserializer, IDeserializer<TMessage> messageDeserializer, MessageSubscriberSetting subscriberSetting, string topic, ILogger logger) : base()
{
KeyDeserializer = keyDeserializer;
MessageDeserializer = messageDeserializer;
SubscriberSetting = subscriberSetting;
Topic = topic;
Logger = logger ?? Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance;
}
/// <summary>
/// ロガーを取得します。
/// </summary>
private ILogger Logger { get; }
/// <summary>
/// 動作設定を取得します。
/// </summary>
private MessageSubscriberSetting SubscriberSetting { get; }
/// <summary>
/// トピックを取得します。
/// </summary>
private string Topic { get; }
#region 受信
/// <summary>
/// キーに対するデシリアライザを取得します。
/// </summary>
private IDeserializer<TKey> KeyDeserializer { get; }
/// <summary>
/// メッセージに対するデシリアライザを取得します。
/// </summary>
private IDeserializer<TMessage> MessageDeserializer { get; }
/// <summary>
/// メッセージ受信を開始します。
/// </summary>
/// <param name="cancellation">キャンセルトークン</param>
public Task SubscribeAsync(CancellationToken cancellation)
{
Task.Yield();
TimeSpan interval = SubscriberSetting.ConsumeInterval;
using var consumer = BuildConsumer(GetConsumerConfig(SubscriberSetting));
consumer.Subscribe(Topic);
while (true)
{
if (cancellation.IsCancellationRequested) { break; }
try
{
if (m_Observers.Count == 0) { continue; }
ConsumeResult<TKey, TMessage> result = consumer.Consume(interval);
if (result == null) { continue; }
// TODO: 今回の確認では IsPartitionEOF を発生させることができなかった。
if (result.IsPartitionEOF) { continue; }
WriteLog(LogLevel.Debug, () => BuildLogMessage(result));
NotifyMessage(result.Message.Value);
consumer.Commit(result);
}
catch (Exception ex)
{
OnException(consumer, ex);
break;
}
}
NotifyComplated();
return Task.CompletedTask;
}
/// <summary>
/// コンシューマーを生成します。
/// </summary>
/// <param name="config">動作設定</param>
/// <returns>コンシューマー</returns>
protected IConsumer<TKey, TMessage> BuildConsumer(IEnumerable<KeyValuePair<string, string>> config)
{
var consumerBuilder = new ConsumerBuilder<TKey, TMessage>(config)
.SetKeyDeserializer(KeyDeserializer)
.SetValueDeserializer(MessageDeserializer)
.SetErrorHandler(OnError)
.SetLogHandler(OnLogging)
;
return consumerBuilder.Build();
}
/// <summary>
/// コンシューマーの動作設定を取得します。
/// </summary>
/// <param name="consumerSetting">コンシューマーの動作設定</param>
/// <returns>動作設定のキーと値の組み合わせ</returns>
protected IEnumerable<KeyValuePair<string, string>> GetConsumerConfig(MessageSubscriberSetting consumerSetting)
{
if (consumerSetting.BootstrapServers == null || consumerSetting.BootstrapServers == "")
{
throw new NullReferenceException("ブートストラップサーバーが設定されていません。");
}
if (consumerSetting.ConsumerGroupID == null || consumerSetting.ConsumerGroupID == "")
{
throw new NullReferenceException("コンシューマーグループIDが設定されていません。");
}
return new ConsumerConfig()
{
BootstrapServers = consumerSetting.BootstrapServers,
GroupId = consumerSetting.ConsumerGroupID,
EnableAutoCommit = false,
AutoOffsetReset = AutoOffsetReset.Earliest
};
}
/// <summary>
/// ログを出力します。
/// </summary>
/// <param name="consumer"></param>
/// <param name="log"></param>
private void OnLogging(IConsumer<TKey, TMessage> consumer, LogMessage log)
{
LogLevel logLevel = log.Level.ToLogLevel();
WriteLog(logLevel, () => BuildLogMessage(log));
}
/// <summary>
/// エラーが発生したときの処理を行います。
/// </summary>
/// <param name="consumer"></param>
/// <param name="error"></param>
private void OnError(IConsumer<TKey, TMessage> consumer, Error error)
{
WriteLog(LogLevel.Error, () => BuildLogMessage(error));
NotifyError(new Exception(error.Reason));
}
/// <summary>
/// 例外が発生したときの処理を行います。
/// </summary>
/// <param name="consumer"></param>
/// <param name="exception"></param>
private void OnException(IConsumer<TKey, TMessage> consumer, Exception exception)
{
WriteLog(LogLevel.Critical, () => BuildLogMessage(exception), exception);
NotifyError(exception);
}
#endregion
#region 通知
/// <summary>
/// 指定されたメッセージを通知します。
/// </summary>
/// <param name="message">メッセージ</param>
private void NotifyMessage(TMessage message)
{
if (m_Observers.Count == 0) { return; }
lock (m_Observers)
{
for (int i = 0; i < m_Observers.Count; ++i)
{
m_Observers[i].OnNext(message);
}
}
}
/// <summary>
/// 指定されたメッセージを通知します。
/// </summary>
/// <param name="exception">例外</param>
private void NotifyError(Exception exception)
{
if (m_Observers.Count == 0) { return; }
lock (m_Observers)
{
for (int i = 0; i < m_Observers.Count; ++i)
{
m_Observers[i].OnError(exception);
}
}
}
/// <summary>
/// 完了を通知します。
/// </summary>
private void NotifyComplated()
{
if (m_Observers.Count == 0) { return; }
lock (m_Observers)
{
for (int i = 0; i < m_Observers.Count; ++i)
{
m_Observers[i].OnCompleted();
}
}
}
#endregion
#region オブザーバー
/// <summary>
/// 指定されたオブザーバーによる購読を開始します。
/// </summary>
/// <param name="observer"></param>
/// <returns></returns>
protected override IDisposable SubscribeCore(IObserver<TMessage> observer)
{
AddObserver(observer);
return System.Reactive.Disposables.Disposable.Create(() => RemoveObserver(observer));
}
private readonly List<IObserver<TMessage>> m_Observers = new List<IObserver<TMessage>>();
/// <summary>
/// 指定されたオブザーバーを追加します。
/// </summary>
/// <param name="observer"></param>
private void AddObserver(IObserver<TMessage> observer)
{
lock (m_Observers)
{
m_Observers.Add(observer);
}
}
/// <summary>
/// 指定されたオブザーバーを削除します。
/// </summary>
/// <param name="observer"></param>
private void RemoveObserver(IObserver<TMessage> observer)
{
if (m_Observers.Contains(observer))
{
lock (m_Observers)
{
m_Observers.Remove(observer);
}
}
}
#endregion
#region ロギング
/// <summary>
/// 指定されたログを出力します。
/// </summary>
/// <param name="level">ログレベル</param>
/// <param name="messageBuilder">ログメッセージを生成するメソッド</param>
/// <param name="exception">例外</param>
private void WriteLog(LogLevel level, Func<string> messageBuilder, Exception? exception = null)
{
if (!Logger.IsEnabled(level)) { return; }
if (exception == null)
{
Logger.Log(level, messageBuilder());
}
else
{
Logger.Log(level, exception, messageBuilder());
}
}
private string BuildLogMessage(ConsumeResult<TKey, TMessage> result)
{
return $"メッセージを受け取りました。[{result.Topic}:{result.Offset}] {result.Message.Value}";
}
private string BuildLogMessage(LogMessage log)
{
return log.Message;
}
private string BuildLogMessage(Error error)
{
return error.Reason;
}
private string BuildLogMessage(Exception exception)
{
return exception.Message;
}
#endregion
}
确认应用程序的操作。
使用消费者组和主题进行发送和接收控制
启动多个发布者和订阅者以确认消息的发送和接收方式。
在日志中输出的 “[Topic-A:0]” 表示了主题和该主题的偏移值(每次发行消息时递增的连续编号)。
パブリッシャーをプロセス 5040 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの送信処理を開始します。終了するには Ctrl+C を押してください。
11:33:13.426 [Debug] メッセージを発行しました。[Topic-A:0] 1回目のメッセージ(プロセス5040)
11:33:18.453 [Debug] メッセージを発行しました。[Topic-A:1] 2回目のメッセージ(プロセス5040)
11:33:23.491 [Debug] メッセージを発行しました。[Topic-A:2] 3回目のメッセージ(プロセス5040)
11:33:28.508 [Debug] メッセージを発行しました。[Topic-A:3] 4回目のメッセージ(プロセス5040)
11:33:33.530 [Debug] メッセージを発行しました。[Topic-A:4] 5回目のメッセージ(プロセス5040)
11:33:38.546 [Debug] メッセージを発行しました。[Topic-A:5] 6回目のメッセージ(プロセス5040)
11:33:43.565 [Debug] メッセージを発行しました。[Topic-A:6] 7回目のメッセージ(プロセス5040)
11:33:48.595 [Debug] メッセージを発行しました。[Topic-A:7] 8回目のメッセージ(プロセス5040)
(以下割愛)
有两个进程向 Tobic-B 发出消息。可以看到偏移值在不重复的情况下递增。
パブリッシャーをプロセス 15532 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
トピックを入力してください(省略時 test-topic):
Topic-B
メッセージの送信処理を開始します。終了するには Ctrl+C を押してください。
11:33:21.558 [Debug] メッセージを発行しました。[Topic-B:0] 1回目のメッセージ(プロセス15532)
11:33:26.592 [Debug] メッセージを発行しました。[Topic-B:1] 2回目のメッセージ(プロセス15532)
11:33:31.621 [Debug] メッセージを発行しました。[Topic-B:2] 3回目のメッセージ(プロセス15532)
11:33:36.640 [Debug] メッセージを発行しました。[Topic-B:4] 4回目のメッセージ(プロセス15532)
11:33:41.653 [Debug] メッセージを発行しました。[Topic-B:6] 5回目のメッセージ(プロセス15532)
11:33:46.672 [Debug] メッセージを発行しました。[Topic-B:8] 6回目のメッセージ(プロセス15532)
11:33:51.687 [Debug] メッセージを発行しました。[Topic-B:10] 7回目のメッセージ(プロセス15532)
11:33:56.714 [Debug] メッセージを発行しました。[Topic-B:12] 8回目のメッセージ(プロセス15532)
(以下割愛)
パブリッシャーをプロセス 4928 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
トピックを入力してください(省略時 test-topic):
Topic-B
メッセージの送信処理を開始します。終了するには Ctrl+C を押してください。
11:33:33.667 [Debug] メッセージを発行しました。[Topic-B:3] 1回目のメッセージ(プロセス4928)
11:33:38.698 [Debug] メッセージを発行しました。[Topic-B:5] 2回目のメッセージ(プロセス4928)
11:33:43.718 [Debug] メッセージを発行しました。[Topic-B:7] 3回目のメッセージ(プロセス4928)
11:33:48.738 [Debug] メッセージを発行しました。[Topic-B:9] 4回目のメッセージ(プロセス4928)
11:33:53.764 [Debug] メッセージを発行しました。[Topic-B:11] 5回目のメッセージ(プロセス4928)
11:33:58.790 [Debug] メッセージを発行しました。[Topic-B:13] 6回目のメッセージ(プロセス4928)
11:34:03.800 [Debug] メッセージを発行しました。[Topic-B:15] 7回目のメッセージ(プロセス4928)
11:34:08.809 [Debug] メッセージを発行しました。[Topic-B:17] 8回目のメッセージ(プロセス4928)
(以下割愛)
在Consumer Group1中启动两个进程来订阅Topic-A的消息,只有其中一个进程会接收到消息。停止接收消息的订阅者后,另一个订阅者会开始接收消息。
サブスクライバーをプロセス 11948 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group1
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
11:33:13.493 [Debug] メッセージを受け取りました。[Topic-A:0] 1回目のメッセージ(プロセス5040)
11:33:18.450 [Debug] メッセージを受け取りました。[Topic-A:1] 2回目のメッセージ(プロセス5040)
11:33:23.487 [Debug] メッセージを受け取りました。[Topic-A:2] 3回目のメッセージ(プロセス5040)
11:33:28.510 [Debug] メッセージを受け取りました。[Topic-A:3] 4回目のメッセージ(プロセス5040)
11:33:33.531 [Debug] メッセージを受け取りました。[Topic-A:4] 5回目のメッセージ(プロセス5040)
11:33:38.547 [Debug] メッセージを受け取りました。[Topic-A:5] 6回目のメッセージ(プロセス5040)
11:33:43.566 [Debug] メッセージを受け取りました。[Topic-A:6] 7回目のメッセージ(プロセス5040)
11:33:48.595 [Debug] メッセージを受け取りました。[Topic-A:7] 8回目のメッセージ(プロセス5040)
11:33:53.615 [Debug] メッセージを受け取りました。[Topic-A:8] 9回目のメッセージ(プロセス5040)
11:33:58.633 [Debug] メッセージを受け取りました。[Topic-A:9] 10回目のメッセージ(プロセス5040)
11:34:03.660 [Debug] メッセージを受け取りました。[Topic-A:10] 11回目のメッセージ(プロセス5040)
11:34:08.685 [Debug] メッセージを受け取りました。[Topic-A:11] 12回目のメッセージ(プロセス5040)
11:34:13.701 [Debug] メッセージを受け取りました。[Topic-A:12] 13回目のメッセージ(プロセス5040)
11:34:18.719 [Debug] メッセージを受け取りました。[Topic-A:13] 14回目のメッセージ(プロセス5040)
11:34:23.743 [Debug] メッセージを受け取りました。[Topic-A:14] 15回目のメッセージ(プロセス5040)
11:34:28.765 [Debug] メッセージを受け取りました。[Topic-A:15] 16回目のメッセージ(プロセス5040)
メッセージの受信処理を終了しました。
サブスクライバーをプロセス 13144 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group1
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
11:34:42.843 [Debug] メッセージを受け取りました。[Topic-A:16] 17回目のメッセージ(プロセス5040)
11:34:42.854 [Debug] メッセージを受け取りました。[Topic-A:17] 18回目のメッセージ(プロセス5040)
11:34:43.810 [Debug] メッセージを受け取りました。[Topic-A:18] 19回目のメッセージ(プロセス5040)
11:34:48.821 [Debug] メッセージを受け取りました。[Topic-A:19] 20回目のメッセージ(プロセス5040)
只要消费者群体不同,就会接收所有消息。
サブスクライバーをプロセス 11192 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group2
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
11:33:13.493 [Debug] メッセージを受け取りました。[Topic-A:0] 1回目のメッセージ(プロセス5040)
11:33:18.451 [Debug] メッセージを受け取りました。[Topic-A:1] 2回目のメッセージ(プロセス5040)
11:33:23.484 [Debug] メッセージを受け取りました。[Topic-A:2] 3回目のメッセージ(プロセス5040)
11:33:28.511 [Debug] メッセージを受け取りました。[Topic-A:3] 4回目のメッセージ(プロセス5040)
11:33:33.530 [Debug] メッセージを受け取りました。[Topic-A:4] 5回目のメッセージ(プロセス5040)
11:33:38.547 [Debug] メッセージを受け取りました。[Topic-A:5] 6回目のメッセージ(プロセス5040)
11:33:43.565 [Debug] メッセージを受け取りました。[Topic-A:6] 7回目のメッセージ(プロセス5040)
11:33:48.594 [Debug] メッセージを受け取りました。[Topic-A:7] 8回目のメッセージ(プロセス5040)
11:33:53.616 [Debug] メッセージを受け取りました。[Topic-A:8] 9回目のメッセージ(プロセス5040)
11:33:58.633 [Debug] メッセージを受け取りました。[Topic-A:9] 10回目のメッセージ(プロセス5040)
11:34:03.661 [Debug] メッセージを受け取りました。[Topic-A:10] 11回目のメッセージ(プロセス5040)
11:34:08.684 [Debug] メッセージを受け取りました。[Topic-A:11] 12回目のメッセージ(プロセス5040)
11:34:13.702 [Debug] メッセージを受け取りました。[Topic-A:12] 13回目のメッセージ(プロセス5040)
11:34:18.717 [Debug] メッセージを受け取りました。[Topic-A:13] 14回目のメッセージ(プロセス5040)
11:34:23.743 [Debug] メッセージを受け取りました。[Topic-A:14] 15回目のメッセージ(プロセス5040)
11:34:28.765 [Debug] メッセージを受け取りました。[Topic-A:15] 16回目のメッセージ(プロセス5040)
11:34:33.786 [Debug] メッセージを受け取りました。[Topic-A:16] 17回目のメッセージ(プロセス5040)
11:34:38.797 [Debug] メッセージを受け取りました。[Topic-A:17] 18回目のメッセージ(プロセス5040)
11:34:43.809 [Debug] メッセージを受け取りました。[Topic-A:18] 19回目のメッセージ(プロセス5040)
11:34:48.820 [Debug] メッセージを受け取りました。[Topic-A:19] 20回目のメッセージ(プロセス5040)
主题B由两个进程发出了消息。可以按照发出的顺序接收。
サブスクライバーをプロセス 13912 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group1
トピックを入力してください(省略時 test-topic):
Topic-B
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
11:33:21.625 [Debug] メッセージを受け取りました。[Topic-B:0] 1回目のメッセージ(プロセス15532)
11:33:26.589 [Debug] メッセージを受け取りました。[Topic-B:1] 2回目のメッセージ(プロセス15532)
11:33:31.623 [Debug] メッセージを受け取りました。[Topic-B:2] 3回目のメッセージ(プロセス15532)
11:33:33.650 [Debug] メッセージを受け取りました。[Topic-B:3] 1回目のメッセージ(プロセス4928)
11:33:36.639 [Debug] メッセージを受け取りました。[Topic-B:4] 4回目のメッセージ(プロセス15532)
11:33:38.698 [Debug] メッセージを受け取りました。[Topic-B:5] 2回目のメッセージ(プロセス4928)
11:33:41.655 [Debug] メッセージを受け取りました。[Topic-B:6] 5回目のメッセージ(プロセス15532)
11:33:43.718 [Debug] メッセージを受け取りました。[Topic-B:7] 3回目のメッセージ(プロセス4928)
11:33:46.675 [Debug] メッセージを受け取りました。[Topic-B:8] 6回目のメッセージ(プロセス15532)
11:33:48.735 [Debug] メッセージを受け取りました。[Topic-B:9] 4回目のメッセージ(プロセス4928)
11:33:51.689 [Debug] メッセージを受け取りました。[Topic-B:10] 7回目のメッセージ(プロセス15532)
(以下割愛)
消息的送达保证
确保能接收到在没有订阅者存在的情况下发布的消息。
由于在前述的验证中,我们一直在运行Kafuka实例,所以Topic-A的偏移值从20开始。
パブリッシャーをプロセス 5096 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの送信処理を開始します。終了するには Ctrl+C を押してください。
12:21:50.114 [Debug] メッセージを発行しました。[Topic-A:20] 1回目のメッセージ(プロセス5096)
12:21:55.149 [Debug] メッセージを発行しました。[Topic-A:21] 2回目のメッセージ(プロセス5096)
12:22:00.164 [Debug] メッセージを発行しました。[Topic-A:22] 3回目のメッセージ(プロセス5096)
12:22:05.183 [Debug] メッセージを発行しました。[Topic-A:23] 4回目のメッセージ(プロセス5096)
12:22:10.201 [Debug] メッセージを発行しました。[Topic-A:24] 5回目のメッセージ(プロセス5096)
12:22:15.215 [Debug] メッセージを発行しました。[Topic-A:25] 6回目のメッセージ(プロセス5096)
12:22:20.239 [Debug] メッセージを発行しました。[Topic-A:26] 7回目のメッセージ(プロセス5096)
12:22:25.255 [Debug] メッセージを発行しました。[Topic-A:27] 8回目のメッセージ(プロセス5096)
12:22:30.280 [Debug] メッセージを発行しました。[Topic-A:28] 9回目のメッセージ(プロセス5096)
12:22:35.295 [Debug] メッセージを発行しました。[Topic-A:29] 10回目のメッセージ(プロセス5096)
12:22:40.311 [Debug] メッセージを発行しました。[Topic-A:30] 11回目のメッセージ(プロセス5096)
12:22:45.327 [Debug] メッセージを発行しました。[Topic-A:31] 12回目のメッセージ(プロセス5096)
12:22:50.344 [Debug] メッセージを発行しました。[Topic-A:32] 13回目のメッセージ(プロセス5096)
12:22:55.360 [Debug] メッセージを発行しました。[Topic-A:33] 14回目のメッセージ(プロセス5096)
当订阅者刚启动时,会一次性接收尚未订阅的 Kafka 消息(20, 21)。
サブスクライバーをプロセス 17640 で起動しました。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group1
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
12:21:58.386 [Debug] メッセージを受け取りました。[Topic-A:20] 1回目のメッセージ(プロセス5096)
12:21:58.397 [Debug] メッセージを受け取りました。[Topic-A:21] 2回目のメッセージ(プロセス5096)
12:22:00.165 [Debug] メッセージを受け取りました。[Topic-A:22] 3回目のメッセージ(プロセス5096)
12:22:05.182 [Debug] メッセージを受け取りました。[Topic-A:23] 4回目のメッセージ(プロセス5096)
12:22:10.202 [Debug] メッセージを受け取りました。[Topic-A:24] 5回目のメッセージ(プロセス5096)
メッセージの受信処理を終了しました。
当订阅者启动后,Kafka 会一次性接收到未读的消息(25, 26, 27, 28)。
bootstrap servers を入力してください(省略時 127.0.0.1):
コンシューマーグループIDを入力してください(省略時 test-group):
Group1
トピックを入力してください(省略時 test-topic):
Topic-A
メッセージの受信処理を開始します。終了するには Ctrl+C を押してください。
12:22:32.537 [Debug] メッセージを受け取りました。[Topic-A:25] 6回目のメッセージ(プロセス5096)
12:22:32.548 [Debug] メッセージを受け取りました。[Topic-A:26] 7回目のメッセージ(プロセス5096)
12:22:32.550 [Debug] メッセージを受け取りました。[Topic-A:27] 8回目のメッセージ(プロセス5096)
12:22:32.551 [Debug] メッセージを受け取りました。[Topic-A:28] 9回目のメッセージ(プロセス5096)
12:22:35.298 [Debug] メッセージを受け取りました。[Topic-A:29] 10回目のメッセージ(プロセス5096)
12:22:40.314 [Debug] メッセージを受け取りました。[Topic-A:30] 11回目のメッセージ(プロセス5096)
12:22:58.543 [Debug] メッセージを受け取りました。[Topic-A:31] 12回目のメッセージ(プロセス5096)
12:22:58.551 [Debug] メッセージを受け取りました。[Topic-A:32] 13回目のメッセージ(プロセス5096)
12:22:58.560 [Debug] メッセージを受け取りました。[Topic-A:33] 14回目のメッセージ(プロセス5096)
メッセージの受信処理を終了しました。
总结
我能够很容易地实现Pub/Sub。
虽然在产品中使用还缺少一些功能,但我打算将其用于分区和副本的操作验证。