使用.NET 6和Dapr进行分布式服务开发,第7部分Rabbit MQ和输入绑定
通过输入绑定实现协作
迄今为止,我們一直按照以下流程進行工作。這次,我們想要探討在Dapr中的Input Binding(輸入綁定)中,當將消息傳遞到Rabbit MQ時,會觸發的執行動作。
在此基础上,我们将添加输入绑定功能,因此,如果您正在阅读此页面,请先阅读以下内容作为前提条件。
关于输入绑定
以下是之前文章的重新发布。Input Binding(输入绑定)可以用于一些场景,比如每隔5分钟(事件)调用一次服务(触发器),在Azure Storage Queue / Amazon SQS / Apache Kafka等队列中enqueue(事件)时调用服务(触发器)。
可以构建与Azure Functions和AWS Lambda等类似的机制,以对事件进行处理。
绑定清单包括输入绑定和输出绑定,详细信息如下所示。
尝试使用Rabbit MQ绑定
在这些输入绑定中,我们将尝试一种机制,当消息被添加到队列时,指定的API将被触发。
准备RabbitMQ
首先,在本地环境中准备能够运行Rabbit MQ的环境。使用以下命令进行加载。已暴露TCP端口5672、5673和15672。
docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 5673:5673 -p 15672:15672 rabbitmq:3-management
如果Docker启动并加载了以下内容,则可以放心。这次是加载了带有管理控制台的版本。
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
097a36fc518b rabbitmq:3-management "docker-entrypoint.s…" 9 minutes ago Up 9 minutes 4369/tcp, 5671/tcp, 0.0.0.0:5672-5673->5672-5673/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp some-rabbit
ba8a9bdb62ab daprio/dapr:1.6.0 "./placement" 2 weeks ago Up 12 hours 0.0.0.0:6050->50005/tcp dapr_placement
808b6358b4e7 openzipkin/zipkin "start-zipkin" 2 weeks ago Up 12 hours (healthy) 9410/tcp, 0.0.0.0:9411->9411/tcp dapr_zipkin
bed085e54a77 redis "docker-entrypoint.s…" 2 weeks ago Up 12 hours 0.0.0.0:6379->6379/tcp dapr_redis
访问Rabbit MQ管理控制台并准备队列。
加载完成后,请访问http://localhost:15672/的管理控制台。您可以使用默认的用户名和密码guest/guest登录。
接下来创建一个Exchange。确认一下,目前这个组件的前提似乎是要创建一个Exchange。这是一个容易被卡住的地方,但是不要选择队列,从上面的选项卡中选择Exchange,然后以daprexchange的名称创建一个新的Exchange。
在创建Exchange后,您可以从队列选项卡中确认所创建的Exchange。
将rabbitmq.yaml文件添加到component文件夹中。
在处理statestore时,我认为你需要添加statestore.yaml文件,同样需要添加rabbitmq.yaml文件。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: Rmq
spec:
type: bindings.rabbitmq
version: v1
metadata:
- name: queueName
value: daprexchange
- name: host
value: amqp://guest:guest@localhost:5672
- name: durable
value: true
- name: deleteWhenUnused
value: false
- name: ttlInSeconds
value: 60
- name: prefetchCount
value: 0
- name: exclusive
value: false
- name: maxPriority
value: 5
- name: contentType
value: "application/json"
scopes:
- worker-service
此外,根据我的确认,调用方式在POST方法上被固定(可能有修改的方法,但暂时默认)。也就是说,在这种情况下,只需在RmqController中实现POST到/Rmq即可。
我們將直接使用上次使用的項目。
这次我们将在上一次使用的WorkerService项目中添加一个控制器。
请点击这里查看上一篇文章。
项目的变更
我会在项目中创建一个请求模型。我会将RmqRequest.cs添加到项目中。
namespace WorkerService;
public class RmqRequest
{
public String Message { get; set; }
}
将RmqController.cs添加到Controllers中。当收到POST请求时,这段代码只会将请求显示在控制台上。
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
namespace WorkerService.Controllers;
[ApiController]
[Route("[controller]")]
public class RmqController : ControllerBase
{
private readonly ILogger<RmqController> _logger;
public RmqController(ILogger<RmqController> logger)
{
_logger = logger;
}
[HttpPost(Name = "PostRmqQueue")]
public void Post(RmqRequest request)
{
Console.WriteLine((string)request.Message);
return;
}
}
启动和确认
编辑完成后,让我们使用命令 “tye run” 启动它。
$ tye run
Loading Application Details...
Launching Tye Host...
[14:44:48 INF] Executing application from tye.yaml
[14:44:48 INF] Dashboard running on http://127.0.0.1:8000
[14:44:48 INF] Build Watcher: Watching for builds...
...
请用相同的方式,在浏览器中打开 Tye 控制面板 http://127.0.0.1:8000 。首先,打开 worker-service 的日志。
首先,启动具有映射工作者服务的Swagger,进行项目单独的功能测试。
在这个截图示例中,它是在 http://localhost:57810/swagger 上启动的。从这里尝试进行POST操作……
从Tye的仪表板中,日志中也显示了消息字符串。
让我们从Rabbiq MQ的管理控制台中添加消息。在管理控制台上,您还可以确认来自Dapr的消费者连接,如下所示。
以Json形式发布以下消息。
{
"message": "DaprDaprDaprDaprDaprDaprDaprDaprDaprDaprDapr"
}
在Tye的控制面板上,worker-service的日志也会显示消息。