使用Ruby(Bunny)时,使用Helm安装的RabbitMQ的笔记
首先
在应用程序之间进行协作时,并非总是适合通过HTTP调用API。特别是当需要依次处理放入队列的任务时,如果应用程序直接接收请求,那么就需要考虑在应用程序端进行持久化处理,并且负载平衡也会变得困难。
我计划在RabbitMQ中实现比Pub/Sub更简单的队列,所以我会把关于这个任务的记录留下来。
前提 – 提前设定的条件或假设,是进行一项事务或讨论的基础。
-
- Kubernetes v1.15.11 (by Kubespray v2.11.2)
-
- Rook/Cephが導入されている
- MetalLBが導入されている
参考文献
我参考了下面的资料。
-
- https://github.com/ruby-amqp/bunny
-
- http://rubybunny.info/articles/getting_started.html
-
- https://qiita.com/suketa/items/bb4a8c294351cb7b9025
-
- https://qiita.com/suketa/items/147aad2f0f583b1871c2
- https://stackoverflow.com/questions/39646442/using-bunny-how-to-set-x-max-length-when-connecting-to-existing-queue
使用Helm安装RabbitMQ
考虑重新开始并为处理做准备,我将准备一个Makefile。
REPO_NAME = stable/rabbitmq
NAMESPACE = rabbitmq
REL_NAME = myrabbitmq
RMQ_OPTIONS = --set persistence.storageClass=rook-ceph-block \
--set replicas=2 \
--set service.type=LoadBalancer \
--set service.loadBalancerIP=192.168.1.120 \
--set persistence.storageClass=rook-ceph-block \
--set rabbitmq.erlangCookie=9a63d47049016fd933371a76af08fc8f \
--set rabbitmq.password=70550b0ac43a2e5c \
--set metrics.enabled=true
.PHONY: init update fetch install upgrade
init:
kubectl create ns $(NAMESPACE)
update:
helm repo update
fetch:
helm fetch $(REPO_NAME)
install:
helm install $(REPO_NAME) --name $(REL_NAME) --namespace $(NAMESPACE) $(RMQ_OPTIONS)
upgrade:
helm upgrade ${REL_NAME} $(REPO_NAME) --namespace $(NAMESPACE) $(RMQ_OPTIONS)
一般情况下,导入时的步骤大致如下。
$ make fetch
$ ls
Makefile rabbitmq-6.18.2.tgz
## tgzファイルを展開し、values.yamlの内容を確認する
$ tar xvzf rabbitmq-6.18.2.tgz
$ less rabbitmq/values.yaml
## 他に変更する点がなければ導入する、あればMakefileのRMQ_OPTIONSに追記
$ make install
我认为需要根据环境进行更改的部分是在RMQ_OPTIONS中设置的”service.type=LoadBalancer”。请查看rabbitmq/values.yaml以了解可配置的选项。
由于在service.type中指定了LoadBalancer,所以4369、5672和15672端口都被公开了。如果这样没有问题,那就没事了。但如果想要选择性地限制端口,可以继续指定service.type=ClusterIP并指定以下类型的Service。
apiVersion: v1
kind: Service
metadata:
name: my-release-rabbitmq-lb
labels:
app: rabbitmq
namespace: rabbitmq
spec:
ports:
- name: amqp
port: 5672
protocol: TCP
targetPort: amqp
- name: stats
port: 15672
protocol: TCP
targetPort: stats
selector:
app: rabbitmq
release: myrabbitmq
type: LoadBalancer
RabbitMQ的准备工作
目前的服務已如下方式公開。
$ kubectl -n rabbitmq get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
myrabbitmq LoadBalancer 10.233.25.181 192.168.1.120 4369:31028/TCP,5672:32031/TCP,25672:31175/TCP,15672:32214,9419:32022/TCP 35m
myrabbitmq-headless ClusterIP None <none> 4369/TCP,5672/TCP,25672/TCP,15672/TCP 35m
从 http://192.168.1.120:15672/ 访问RabbitMQ的Web UI。
如果要登录由Helm安装的RabbitMQ,可以使用–set命令指定的rabbitmq.username(默认为user)和rabbitmq.password。这次我们将使用(username, password) = (user, 70550b0ac43a2e5c)。
创建队列 (Chuangjian duiLie)
点击WebUI的队列选项卡,添加一个新的队列。
-
- Name: testq
- Durable: yes
创建用于连接的用户
从管理员选项卡中添加新用户。
当你追加了用户后,点击该用户名并授予权限。
-
- username: user01
- password: secret
这次是考试,所以我们会给创建的用户默认授予权限。
-
- vhost: /
-
- Configure regexp: .*
-
- Write regexp: .*
- Read regexp: .*
兔子的连接
在创建Ruby应用程序的目录中准备一个Gemfile,并从bundle将其放置到lib目录中。
source 'https://rubygems.org'
gem "bunny"
$ bundle install --path lib
进行Put/Get测试
由于准备已就绪,现在我们要部署以下的Ruby脚本。
#!/usr/bin/ruby
#
require 'bundler/setup'
Bundler.require
require 'bunny'
conn = Bunny.new(host: "192.168.1.120", vhost: "/", user: "user01", password: "secret")
conn.start
ch = conn.create_channel
## x-max-lengthなどをQueueに設定している場合には、次のように:argumentsに設定を加える
q = ch.queue("testq",
durable: true,
arguments: { 'x-max-length' => 1024 ,
'x-max-length-bytes' => 1048576,
'x-queue-type' => 'classic' } ) ## arguments:の設定はQueue定義に応じて要変更
q.publish("Hello", persistent: true)
ch.close
conn.close
在这里,只发送并不能接收结果。为了接收结果,需要创建以下脚本。
#!/usr/bin/ruby
#
require 'bundler/setup'
Bundler.require
require 'bunny'
conn = Bunny.new(host: "192.168.1.120", vhost: "/", user: "user01", password: "zaq12wsx")
conn.start
ch = conn.create_channel
q = ch.queue("testq",
durable: true,
arguments: { 'x-max-length' => 1024,
'x-max-length-bytes' => 1048576,
'x-queue-type' => 'classic' } ) ## arguments:の設定はQueue定義に応じて要変更
puts "Message Count: #{q.message_count}"
delivery_info, metadata, payload = q.pop
puts "Received: #{payload}"
ch.close
conn.close
订阅测试
由于我想要在应用程序端等待消息的到达,所以我稍微修改了get.rb的代码。
#!/usr/bin/ruby
#
require 'bundler/setup'
Bundler.require
require 'bunny'
conn = Bunny.new(host: "192.168.1.120", vhost: "/", user: "user01", password: "zaq12wsx")
conn.start
ch = conn.create_channel
q = ch.queue("testq",
durable: true,
arguments: { 'x-max-length' => 1024,
'x-max-length-bytes' => 1048576,
'x-queue-type' => 'classic' } ) ## arguments:の設定はQueue定義に応じて要変更
q.subscribe(manual_ack: true) do |delivery_info, metadata, payload|
puts "-------"
puts "Message Count: #{q.message_count}"
puts "routing_key: #{delivery_info.routing_key}"
puts "Received: #{payload}"
ch.ack(delivery_info.delivery_tag)
sleep 10
end
## waiting for never
loop { sleep 5 }
ch.close
conn.close
来自普罗米修斯的监视
由于设置了metrics.enabled=true,可以访问9419端口。
$ curl http://192.168.1.120:9419/metrics
# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.000149472
go_gc_duration_seconds{quantile="0.25"} 0.000268979
...
# HELP rabbitmq_sockets_used File descriptors used as sockets.
# TYPE rabbitmq_sockets_used gauge
rabbitmq_sockets_used{node="rabbit@myrabbitmq-0.myrabbitmq-headless.rabbitmq.svc.cluster.local"} 0
rabbitmq_sockets_used{node="rabbit@myrabbitmq-1.myrabbitmq-headless.rabbitmq.svc.cluster.local"} 0
# HELP rabbitmq_up Was the last scrape of rabbitmq successful.
# TYPE rabbitmq_up gauge
rabbitmq_up 1
- job_name: rabbitmq-prod
scheme: http
metrics_path: /metrics
static_configs:
- targets:
- 192.168.1.120:9419
labels:
group: "rabbitmq"
如果要通过Grafana进行确认,请注意以下已注册的Dashboard(Easy RabbitMQ)。
- https://grafana.com/grafana/dashboards/10982
以上