在RabbitMQ中,有发布者和消费者(使用C#编程)
首先
我想起之前写过“下次用C#写”的事情,所以试着写一下。坦白说,和官方的教程差别不大。
※ 不会解释RabbitMQ本身
成为构成。
.Net Framework: 4.6.1(也可以使用4.5系列)
RabbitMQ.Client: 5.0
Newtonsoft.Json: 11.0
成果产物
消息模型
像往常一样,我会制作一个模型(将其分为发送和消费,没有特殊的原因)。
namespace DotNetRabbitMQExample
{
public class SendMessage
{
public string Message { get; set; }
public long Timestamp { get; set; }
}
public class ConsumedMessage
{
public string Message { get; set; }
public long Timestamp { get; set; }
}
}
出版商和消费者
因为感到烦恼,所以这一次我会同时发布给发布商和消费者。
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace DotNetRabbitMQExample
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("please input rabbitmq hostname");
// Get ホスト名
var hostname = Console.ReadLine();
// Taskキャンセルトークン
var tokenSource = new CancellationTokenSource();
Console.WriteLine($"start .Net RabbitMQ Example. Ctl+C to exit");
// ファクトリ生成
var factory = new ConnectionFactory()
{
HostName = hostname
};
// パブリッシャータスク
var pTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>(async(f, cancel) => {
// コネクション&チャンネル生成
using (var conn = f.CreateConnection())
using (var channel = conn.CreateModel())
{
// Exchange生成
channel.ExchangeDeclare("test", "fanout", false, true);
while(true)
{
// キャンセル待ち
if (cancel.IsCancellationRequested)
{
break;
}
var msg = new SendMessage()
{
Message = "Hello",
Timestamp = DateTime.UtcNow.ToBinary()
};
var body = JsonConvert.SerializeObject(msg);
// Publish!!
try
{
channel.BasicPublish("test", "", null, Encoding.UTF8.GetBytes(body));
Console.WriteLine($"success send. message: {msg.Message}, timestamp: {msg.Timestamp}");
} catch (Exception ex)
{
Console.WriteLine($"failer send. reason: {ex.Message}");
}
await Task.Delay(10000);
}
}
})(factory, tokenSource.Token), tokenSource.Token);
// コンシューマータスク
var cTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>((f, cancel) => {
// コネクション&チャンネル生成
using (var conn = f.CreateConnection())
using (var channel = conn.CreateModel())
{
// Exchange生成
channel.ExchangeDeclare("test", "fanout", false, true);
// Queue生成
var queueName = channel.QueueDeclare().QueueName;
// Bind Queue
channel.QueueBind(queueName, "test", "");
// コンシューマー生成
var consumer = new EventingBasicConsumer(channel);
// 受信イベント定義
consumer.Received += (_, ea) =>
{
var msg = JsonConvert.DeserializeObject<ConsumedMessage>(Encoding.UTF8.GetString(ea.Body));
Console.WriteLine($"success consumed. message: {msg.Message}, timestamp: {msg.Timestamp}");
};
// コンシューマー登録
channel.BasicConsume(queueName, true, consumer);
while(true)
{
// キャンセル待ち
if (cancel.IsCancellationRequested)
{
break;
}
}
}
})(factory, tokenSource.Token), tokenSource.Token);
// Ctl+C待機
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
tokenSource.Cancel(); // Taskキャンセル
};
Task.WaitAll(pTask, cTask);
Console.WriteLine("stop .Net RabbitMQ Example. press any key to close.");
Console.ReadKey();
}
}
}
这个内容和上一次的Java客户端没有太大的不同。conn.CreateModel()这个是什么,是通道还是模型,不太直观啊… 还有,源代码中有一个叫AsyncEventingBasicConsumer的东西,但是我不太明白它的作用,所以没有使用它。
执行
执行后的样子是这样的。
please input rabbitmq hostname
your.rabbitmq.address // 例
start .Net RabbitMQ Example. Ctl+C to exit
success send. message: Hello, timestamp: 5248299395184651547
success consumed. message: Hello, timestamp: 5248299395184651547
success send. message: Hello, timestamp: 5248299395284769260
success consumed. message: Hello, timestamp: 5248299395284769260
success send. message: Hello, timestamp: 5248299395384921978
success consumed. message: Hello, timestamp: 5248299395384921978
stop .Net RabbitMQ Example. press any key to close.
结束
总的来说,我对客户端的写作比起Kafka更加轻松一些,只有这样的感受。