使用.NET 6和Dapr进行分布式服务开发,第7部分Rabbit MQ和输入绑定

通过输入绑定实现协作

迄今为止,我們一直按照以下流程進行工作。這次,我們想要探討在Dapr中的Input Binding(輸入綁定)中,當將消息傳遞到Rabbit MQ時,會觸發的執行動作。

在此基础上,我们将添加输入绑定功能,因此,如果您正在阅读此页面,请先阅读以下内容作为前提条件。


 

关于输入绑定

以下是之前文章的重新发布。Input Binding(输入绑定)可以用于一些场景,比如每隔5分钟(事件)调用一次服务(触发器),在Azure Storage Queue / Amazon SQS / Apache Kafka等队列中enqueue(事件)时调用服务(触发器)。

可以构建与Azure Functions和AWS Lambda等类似的机制,以对事件进行处理。

image.png

绑定清单包括输入绑定和输出绑定,详细信息如下所示。

 

尝试使用Rabbit MQ绑定

在这些输入绑定中,我们将尝试一种机制,当消息被添加到队列时,指定的API将被触发。

准备RabbitMQ

首先,在本地环境中准备能够运行Rabbit MQ的环境。使用以下命令进行加载。已暴露TCP端口5672、5673和15672。

docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 5673:5673 -p 15672:15672 rabbitmq:3-management

如果Docker启动并加载了以下内容,则可以放心。这次是加载了带有管理控制台的版本。

$ docker ps
CONTAINER ID   IMAGE                   COMMAND                  CREATED         STATUS                  PORTS                                                                                                                   NAMES
097a36fc518b   rabbitmq:3-management   "docker-entrypoint.s…"   9 minutes ago   Up 9 minutes            4369/tcp, 5671/tcp, 0.0.0.0:5672-5673->5672-5673/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp   some-rabbit
ba8a9bdb62ab   daprio/dapr:1.6.0       "./placement"            2 weeks ago     Up 12 hours             0.0.0.0:6050->50005/tcp                                                                                                 dapr_placement
808b6358b4e7   openzipkin/zipkin       "start-zipkin"           2 weeks ago     Up 12 hours (healthy)   9410/tcp, 0.0.0.0:9411->9411/tcp                                                                                        dapr_zipkin
bed085e54a77   redis                   "docker-entrypoint.s…"   2 weeks ago     Up 12 hours             0.0.0.0:6379->6379/tcp                                                                                                  dapr_redis

访问Rabbit MQ管理控制台并准备队列。

加载完成后,请访问http://localhost:15672/的管理控制台。您可以使用默认的用户名和密码guest/guest登录。

image.png

接下来创建一个Exchange。确认一下,目前这个组件的前提似乎是要创建一个Exchange。这是一个容易被卡住的地方,但是不要选择队列,从上面的选项卡中选择Exchange,然后以daprexchange的名称创建一个新的Exchange。

image.png

在创建Exchange后,您可以从队列选项卡中确认所创建的Exchange。

image.png

将rabbitmq.yaml文件添加到component文件夹中。

在处理statestore时,我认为你需要添加statestore.yaml文件,同样需要添加rabbitmq.yaml文件。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: Rmq
spec:
  type: bindings.rabbitmq
  version: v1
  metadata:
  - name: queueName
    value: daprexchange
  - name: host
    value: amqp://guest:guest@localhost:5672
  - name: durable
    value: true
  - name: deleteWhenUnused
    value: false
  - name: ttlInSeconds
    value: 60
  - name: prefetchCount
    value: 0
  - name: exclusive
    value: false
  - name: maxPriority
    value: 5
  - name: contentType
    value: "application/json"
scopes:
- worker-service

此外,根据我的确认,调用方式在POST方法上被固定(可能有修改的方法,但暂时默认)。也就是说,在这种情况下,只需在RmqController中实现POST到/Rmq即可。

我們將直接使用上次使用的項目。

这次我们将在上一次使用的WorkerService项目中添加一个控制器。
请点击这里查看上一篇文章。

 

项目的变更

我会在项目中创建一个请求模型。我会将RmqRequest.cs添加到项目中。

namespace WorkerService;

public class RmqRequest
{
    public String Message { get; set; }

}

将RmqController.cs添加到Controllers中。当收到POST请求时,这段代码只会将请求显示在控制台上。

using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;

namespace WorkerService.Controllers;

[ApiController]
[Route("[controller]")]
public class RmqController : ControllerBase
{
    private readonly ILogger<RmqController> _logger;

    public RmqController(ILogger<RmqController> logger)
    {
        _logger = logger;
    }

    [HttpPost(Name = "PostRmqQueue")]
    public void Post(RmqRequest request)
    {
        Console.WriteLine((string)request.Message);

        return;
    }
}

启动和确认

编辑完成后,让我们使用命令 “tye run” 启动它。

$ tye run
Loading Application Details...
Launching Tye Host...

[14:44:48 INF] Executing application from tye.yaml
[14:44:48 INF] Dashboard running on http://127.0.0.1:8000
[14:44:48 INF] Build Watcher: Watching for builds...
...

请用相同的方式,在浏览器中打开 Tye 控制面板 http://127.0.0.1:8000 。首先,打开 worker-service 的日志。

首先,启动具有映射工作者服务的Swagger,进行项目单独的功能测试。

image.png

在这个截图示例中,它是在 http://localhost:57810/swagger 上启动的。从这里尝试进行POST操作……

image.png

从Tye的仪表板中,日志中也显示了消息字符串。

image.png

让我们从Rabbiq MQ的管理控制台中添加消息。在管理控制台上,您还可以确认来自Dapr的消费者连接,如下所示。

 

image.png

以Json形式发布以下消息。

{
  "message": "DaprDaprDaprDaprDaprDaprDaprDaprDaprDaprDapr"
}

在Tye的控制面板上,worker-service的日志也会显示消息。

image.png
广告
将在 10 秒后关闭
bannerAds