我尝试在minikube上部署MQTT-Broker和Fluentd
目标与概述 yǔ
让我们从大学的机械臂中收集数据吧!因此,我们考虑了一种试验性的实现方案,通过在机器人上安装Raspberry Pi并使用MQTT发送数据,然后将数据保存到数据湖中。
必须向Kafka发送数据
在确认服务器环境后,得知数据是由Kubernetes上的Kafka进行管理。因此,我们决定使用开源工具Fluentd创建连接器,以将MQTT和Kafka进行连接。虽然我们也考虑将MQTT Broker部署在边缘设备中,但为了尽量减少负荷,我们也将其部署在Kubernetes中。数据将通过设备 => MQTT Broker => Fluentd => Kafka 的流程进行传输。
这篇文章的内容
我将按照以下顺序解释使用Minikube在本地所完成的内容。
-
- 部署MQTT-Broker
-
- 添加Fluentd插件并创建Docker镜像
- 部署Fluentd
开发环境
-
- macOS Big Sur, version 11.2.1
Docker, version 20.10.5
minikube, v1.18.1
eclipse-mosquitto, v 2.0.9
fluentd-kubernetes-daemonset, v1.12.0-debian-kafka2-1.2
Fluent::Plugin::Mqtt::IO
fluent-plugin-kafka
事前准备
我使用minikube作为Kubernetes的本地开发环境。minikube在虚拟机管理器如Docker或Virtual Box上运行。这次我使用了Docker。
-
- Docker安装
minikube安装
根据每个页面的指示进行操作。为了进行操作确认,建议事先在本地安装mosquitto client。
MQTT-Broker的部署
从这里开始进入正题。
使用eclipse-mosquitto在minikube上部署了代理。由于我们直接使用docker镜像,所以从创建yaml文件开始。首先,为了编辑默认的mosquitto.conf文件,我们创建了ConfigMap。
创建ConfigMap
ConfigMap是处理配置信息的资源。将每个应用程序的配置作为ConfigMap保存在Kubernetes中,可以在容器部署时使用它来创建配置文件。如果手头有mosquitto.conf文件,则可以使用kubectl create configmap –from-file=命令进行创建。当从yaml文件创建时,可以使用如下描述。
apiVersion: v1
kind: ConfigMap
metadata:
name: mosquitto-config-file
data:
mosquitto.conf: |
listener 1883
allow_anonymous false
password_file /mosquitto/secret/passwd
下面的描述在data标签下说明了mosquitto.conf文件的内容将映射到后续的文本中(以保持换行)。由于设置了allow_anonymous false,因此连接到代理的用户名和密码是必需的。密码文件的保存位置设置为password_file /mosquitto/secret/passwd。
本來應該使用Secret來創建密碼文件的映射,但由於部署時無法解決錯誤,且對kafka的訪問需要使用ssl,因此我暫時使用configmap來創建。我參考了這個網站上有關mosquitto密碼加密的方法。
创建部署
请将以下描述添加到yaml文件中(如果与configmap文件位于同一文件中,请不要忘记添加分隔符—!)
apiVersion: apps/v1
kind: Deployment
metadata:
name: mqtt-broker
spec:
selector:
matchLabels:
app: mqtt-broker
replicas: 1 # レプリカの数の指定
template:
metadata:
labels:
app: mqtt-broker
spec:
containers:
- name: mosquitto
image: eclipse-mosquitto #レジストリのホスト名を指定しない場合は、KubernetesはDockerパブリックレジストリを意味していると見なします。
ports:
- containerPort: 1883
volumeMounts: # マウント先の指定
- name: mosquitto-config
mountPath: /mosquitto/config
- name: mosquitto-passwd
mountPath: /mosquitto/secret
readOnly: true
volumes: # マップのソースを指定
- name: mosquitto-config
configMap:
name: mosquitto-config-file #ConfigMapの名前を指定
- name: mosquitto-passwd
configMap:
name: mqtt-passwd
我会指定Docker镜像的获取仓库和设置的映射目标。这样一来,当部署时,将创建一个新的Pod,并在其中运行容器。
创建Service
接下来,将设备以及Fluentd与该Pod进行通信的服务描述也添加到yaml中。尽管使用kubectl expose命令也可以实现,但在下次部署时如果服务名称不同会造成困扰,所以我选择将其写入yaml文件中。
apiVersion: v1
kind: Service
metadata:
name: mosquitto-service
labels:
name: mosquitto
spec:
selector:
app: mqtt-broker
ports:
- port: 1883
name: mosquitto-port
protocol: TCP
部署
这样yaml的描述就完成了。整体来说是这种感觉。
# mosquitto.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: mosquitto-config-file
data:
mosquitto.conf: |
listener 1883
allow_anonymous false
password_file /mosquitto/secret/passwd
#---
## この部分はパスワードファイル用のマップです。記述せずにコマンドでsecretを作成するか、別ファイルからapplyすることをおすすめします
#apiVersion: v1
#kind: ConfigMap
#metadata:
# name: mqtt-passwd
#data:
# passwd: |
# <your-mosquitto-username>:<generated-password>
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: mqtt-broker
spec:
selector:
matchLabels:
app: mqtt-broker
replicas: 1
template:
metadata:
labels:
app: mqtt-broker
spec:
containers:
- name: mosquitto
image: eclipse-mosquitto
ports:
- containerPort: 1883
volumeMounts:
- name: mosquitto-config
mountPath: /mosquitto/config
- name: mosquitto-passwd
mountPath: /mosquitto/secret
readOnly: true
volumes:
- name: mosquitto-config
configMap:
name: mosquitto-config-file
- name: mosquitto-passwd
configMap:
name: mqtt-passwd
---
apiVersion: v1
kind: Service
metadata:
name: mosquitto-service
labels:
name: mosquitto
spec:
selector:
app: mqtt-broker
ports:
- port: 1883
name: mosquitto-port
protocol: TCP
启动Docker后,使用minikube start命令启动集群,然后使用kubectl apply -f命令将此yaml文件部署。可以使用minikube dashboard命令启动仪表板来确认部署是否成功。需要注意的是,创建的服务没有外部访问端口。在投稿主的本地环境中,无法通过NodePort进行访问,因此进行了端口转发。使用kubectl port-forward svc/mosquitto-service 1883:1883命令在本地端口和远程端口之间建立了隧道。现在可以通过localhost:1883访问代理服务器了。
在实际环境中,我认为会使用LoadBalancer或者NodePort来提供外部IP。
Fluentd的部署
接下来是Fluentd的登场。有许多针对Fluentd开发的插件,并且安装也非常简单。
(顺便说一下,在使用Raspberry Pi安装Fluentd时,我尝试使用fluentd-gem install来替代bundle和Gemfile,结果遇到了错误。希望这对你有所帮助。另外,如果要在Mac上安装td-agent并添加插件,通常的fluentd-gem是无法安装的。似乎需要使用/opt/td-agent/embedded/bin/fluent-gem install 来安装插件。)
在Fluentd中添加MQTT插件
我下载并编辑了 fluentd-kubernetes-daemonset 的 debian-kafka2-fluent。创建 Fluentd Docker 镜像的步骤如下所示。
-
- 将fluent-plugin-mqtt-io添加到Gemfile中。
- 然后使用docker build -t : .创建映像,并使用docker image push 将其推送到Docker Hub。
下一步是编辑fluent.conf文件。在标签内,添加从之前创建的mosquitto-service读取数据的配置。以下是添加的示例部分。
<source>
@type mqtt
host "#{ENV['FLUENT_MQTT_BROKER_HOST'] || 'mosquitto-service'}"
port "#{ENV['FLUENT_MQTT_BROKER_PORT'] || '1883'}"
topic "#{ENV['FLUENT_MQTT_SUB_TOPIC'] || 'topic'}"
<security>
username "#{ENV['FLUENT_MQTT_USER'] || 'user'}"
password "#{ENV['FLUENT_MQTT_PASSWORD'] || 'password'}"
</security>
<parse>
@type "#{ENV['FLUENT_MQTT_INPUT_FORMAT_TYPE'] || 'json'}"
</parse>
</source>
除了地址信息外,您还可以指定要订阅的主题和输入类型。有关其他设置,请参阅Fluent::Plugin::Mqtt::IO。
Kafka插件的设置
本次使用的仓库Dockerfile已经预先设置了Kafka插件的安装。(如果仅在Docker环境中使用,可以在Fluentd的Docker Hub上找到自定义方法的说明。)同时,环境变量也可以通过yaml文件进行配置。在这里,我们还添加了用于与Kafka进行通信的TLS/SSL配置。
为了进行客户端认证,我们准备了根证书、客户端证书和客户端密钥。这次我们使用kubectl create secret generic –from-file=ca_cert.pem –from-file=client_cert.pem –from-file=client_key.pem创建了一个密钥。
下一步是在包含@type kafka2的标签部分中的fluent.conf中。
ssl_ca_cert /fluentd/ssl/ca_cert.pem
ssl_client_cert /fluentd/ssl/client_cert.pem
ssl_client_cert_key /fluentd/ssl/client_cert_key.pem
在Kafka中添加一行以启用客户端认证。(请确保在Kafka端添加了客户端证书的根证书,以便进行认证)
参考Fluentd-Kafka用的yaml文件时,我创建了一个文件,参考了fluentd-kubernetes-daemonset页面上的文件。在secret的volumes设定中,我使用了name而不是secretName,但在控制台上收到了警告。
volumes:
- name: fluentd-config
configMap:
name: fluent-conf
- name: fluentd-kafka-tls
secret:
secretName: tls-secret
一下子設定完成了。之後可以使用kubectl apply -f命令部署並進行操作確認。
确认动作
我从本地的Mosquitto进行了发布。关于从Mosquitto发送数据到Fluentd,我在fluentd.conf文件中的Kafka 标签中将其替换为@stdout并进行了部署,然后通过终端或仪表板进行了日志确认。由于Kafka的输出已经有Grafana了,所以如果没有的话,您可以使用Kafka-Consumer等工具来确认日志。
这次实施的内容就是以上了。
我計劃在不久的將來實現Kafka => Telegraf / Fluentd => InfluxDB的路由,希望能在那方面進行發佈。
其他所参考的
Kubernetes 文档
Kubernetes Fluentd
Kubernetes 配置映射和密钥作为 Kubernetes 卷 | 演示 (YouTube)