在RabbitMQ中,有发布者和消费者(使用C#编程)

首先

我想起之前写过“下次用C#写”的事情,所以试着写一下。坦白说,和官方的教程差别不大。

※ 不会解释RabbitMQ本身

成为构成。

.Net Framework: 4.6.1(也可以使用4.5系列)
RabbitMQ.Client: 5.0
Newtonsoft.Json: 11.0

成果产物

消息模型

像往常一样,我会制作一个模型(将其分为发送和消费,没有特殊的原因)。

namespace DotNetRabbitMQExample
{
    public class SendMessage
    {
        public string Message { get; set; }

        public long Timestamp { get; set; }
    }

    public class ConsumedMessage
    {
        public string Message { get; set; }

        public long Timestamp { get; set; }
    }
}

出版商和消费者

因为感到烦恼,所以这一次我会同时发布给发布商和消费者。

using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace DotNetRabbitMQExample
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("please input rabbitmq hostname");

            // Get ホスト名
            var hostname = Console.ReadLine();

            // Taskキャンセルトークン
            var tokenSource = new CancellationTokenSource();

            Console.WriteLine($"start .Net RabbitMQ Example. Ctl+C to exit");

            // ファクトリ生成
            var factory = new ConnectionFactory()
            {
                HostName = hostname
            };

            // パブリッシャータスク
            var pTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>(async(f, cancel) => {
                // コネクション&チャンネル生成
                using (var conn = f.CreateConnection())
                using (var channel = conn.CreateModel())
                {
                    // Exchange生成
                    channel.ExchangeDeclare("test", "fanout", false, true);

                    while(true)
                    {
                        // キャンセル待ち
                        if (cancel.IsCancellationRequested)
                        {
                            break;
                        }

                        var msg = new SendMessage()
                        {
                            Message = "Hello",
                            Timestamp = DateTime.UtcNow.ToBinary()
                        };
                        var body = JsonConvert.SerializeObject(msg);

                        // Publish!!
                        try
                        {
                            channel.BasicPublish("test", "", null, Encoding.UTF8.GetBytes(body));
                            Console.WriteLine($"success send. message: {msg.Message}, timestamp: {msg.Timestamp}");
                        } catch (Exception ex)
                        {
                            Console.WriteLine($"failer send. reason: {ex.Message}");
                        }

                        await Task.Delay(10000);
                    }
                }
            })(factory, tokenSource.Token), tokenSource.Token);

            // コンシューマータスク
            var cTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>((f, cancel) => {
                // コネクション&チャンネル生成
                using (var conn = f.CreateConnection())
                using (var channel = conn.CreateModel())
                {
                    // Exchange生成
                    channel.ExchangeDeclare("test", "fanout", false, true);

                    // Queue生成
                    var queueName = channel.QueueDeclare().QueueName;

                    // Bind Queue
                    channel.QueueBind(queueName, "test", "");

                    // コンシューマー生成
                    var consumer = new EventingBasicConsumer(channel);

                    // 受信イベント定義
                    consumer.Received += (_, ea) =>
                    {
                        var msg = JsonConvert.DeserializeObject<ConsumedMessage>(Encoding.UTF8.GetString(ea.Body));
                        Console.WriteLine($"success consumed. message: {msg.Message}, timestamp: {msg.Timestamp}");
                    };

                    // コンシューマー登録
                    channel.BasicConsume(queueName, true, consumer);

                    while(true)
                    {
                        // キャンセル待ち   
                        if (cancel.IsCancellationRequested)
                        {
                            break;
                        }
                    }
                }
            })(factory, tokenSource.Token), tokenSource.Token);


            // Ctl+C待機
            Console.CancelKeyPress += (_, e) =>
            {
                e.Cancel = true;
                tokenSource.Cancel(); // Taskキャンセル
            };

            Task.WaitAll(pTask, cTask);

            Console.WriteLine("stop .Net RabbitMQ Example. press any key to close.");

            Console.ReadKey();
        }
    }
}

这个内容和上一次的Java客户端没有太大的不同。conn.CreateModel()这个是什么,是通道还是模型,不太直观啊… 还有,源代码中有一个叫AsyncEventingBasicConsumer的东西,但是我不太明白它的作用,所以没有使用它。

执行

执行后的样子是这样的。

please input rabbitmq hostname
your.rabbitmq.address // 例
start .Net RabbitMQ Example. Ctl+C to exit
success send. message: Hello, timestamp: 5248299395184651547
success consumed. message: Hello, timestamp: 5248299395184651547
success send. message: Hello, timestamp: 5248299395284769260
success consumed. message: Hello, timestamp: 5248299395284769260
success send. message: Hello, timestamp: 5248299395384921978
success consumed. message: Hello, timestamp: 5248299395384921978
stop .Net RabbitMQ Example. press any key to close.

结束

总的来说,我对客户端的写作比起Kafka更加轻松一些,只有这样的感受。

广告
将在 10 秒后关闭
bannerAds