我尝试在minikube上部署MQTT-Broker和Fluentd

目标与概述 yǔ

让我们从大学的机械臂中收集数据吧!因此,我们考虑了一种试验性的实现方案,通过在机器人上安装Raspberry Pi并使用MQTT发送数据,然后将数据保存到数据湖中。

必须向Kafka发送数据

在确认服务器环境后,得知数据是由Kubernetes上的Kafka进行管理。因此,我们决定使用开源工具Fluentd创建连接器,以将MQTT和Kafka进行连接。虽然我们也考虑将MQTT Broker部署在边缘设备中,但为了尽量减少负荷,我们也将其部署在Kubernetes中。数据将通过设备 => MQTT Broker => Fluentd => Kafka 的流程进行传输。

这篇文章的内容

我将按照以下顺序解释使用Minikube在本地所完成的内容。

    1. 部署MQTT-Broker

 

    1. 添加Fluentd插件并创建Docker镜像

 

    部署Fluentd

开发环境

    • macOS Big Sur, version 11.2.1

Docker, version 20.10.5

minikube, v1.18.1

eclipse-mosquitto, v 2.0.9

fluentd-kubernetes-daemonset, v1.12.0-debian-kafka2-1.2
Fluent::Plugin::Mqtt::IO
fluent-plugin-kafka

事前准备

我使用minikube作为Kubernetes的本地开发环境。minikube在虚拟机管理器如Docker或Virtual Box上运行。这次我使用了Docker。

    1. Docker安装

minikube安装

根据每个页面的指示进行操作。为了进行操作确认,建议事先在本地安装mosquitto client。

MQTT-Broker的部署

从这里开始进入正题。
使用eclipse-mosquitto在minikube上部署了代理。由于我们直接使用docker镜像,所以从创建yaml文件开始。首先,为了编辑默认的mosquitto.conf文件,我们创建了ConfigMap。

创建ConfigMap

ConfigMap是处理配置信息的资源。将每个应用程序的配置作为ConfigMap保存在Kubernetes中,可以在容器部署时使用它来创建配置文件。如果手头有mosquitto.conf文件,则可以使用kubectl create configmap –from-file=命令进行创建。当从yaml文件创建时,可以使用如下描述。

apiVersion: v1
kind: ConfigMap
metadata:
  name: mosquitto-config-file
data:
  mosquitto.conf: |
    listener 1883
    allow_anonymous false
    password_file /mosquitto/secret/passwd

下面的描述在data标签下说明了mosquitto.conf文件的内容将映射到后续的文本中(以保持换行)。由于设置了allow_anonymous false,因此连接到代理的用户名和密码是必需的。密码文件的保存位置设置为password_file /mosquitto/secret/passwd。

本來應該使用Secret來創建密碼文件的映射,但由於部署時無法解決錯誤,且對kafka的訪問需要使用ssl,因此我暫時使用configmap來創建。我參考了這個網站上有關mosquitto密碼加密的方法。

创建部署

请将以下描述添加到yaml文件中(如果与configmap文件位于同一文件中,请不要忘记添加分隔符—!)

apiVersion: apps/v1 
kind: Deployment
metadata:
  name: mqtt-broker
spec:
  selector:
    matchLabels:
      app: mqtt-broker
  replicas: 1 # レプリカの数の指定
  template:
    metadata:
      labels:
        app: mqtt-broker
    spec:
      containers:
        - name: mosquitto
          image: eclipse-mosquitto #レジストリのホスト名を指定しない場合は、KubernetesはDockerパブリックレジストリを意味していると見なします。
          ports:
            - containerPort: 1883
          volumeMounts: # マウント先の指定
            - name: mosquitto-config
              mountPath: /mosquitto/config
            - name: mosquitto-passwd
              mountPath: /mosquitto/secret
              readOnly: true
      volumes: # マップのソースを指定
        - name: mosquitto-config
          configMap:
            name: mosquitto-config-file  #ConfigMapの名前を指定
        - name: mosquitto-passwd
          configMap:
            name: mqtt-passwd

我会指定Docker镜像的获取仓库和设置的映射目标。这样一来,当部署时,将创建一个新的Pod,并在其中运行容器。

创建Service

接下来,将设备以及Fluentd与该Pod进行通信的服务描述也添加到yaml中。尽管使用kubectl expose命令也可以实现,但在下次部署时如果服务名称不同会造成困扰,所以我选择将其写入yaml文件中。

apiVersion: v1
kind: Service
metadata:
  name: mosquitto-service
  labels:
    name: mosquitto
spec:
  selector:
    app: mqtt-broker
  ports:
  - port: 1883
    name: mosquitto-port
    protocol: TCP

部署

这样yaml的描述就完成了。整体来说是这种感觉。

# mosquitto.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: mosquitto-config-file
data:
  mosquitto.conf: |
    listener 1883
    allow_anonymous false
    password_file /mosquitto/secret/passwd
#---
## この部分はパスワードファイル用のマップです。記述せずにコマンドでsecretを作成するか、別ファイルからapplyすることをおすすめします
#apiVersion: v1
#kind: ConfigMap
#metadata:
#  name: mqtt-passwd
#data:
#  passwd: |
#    <your-mosquitto-username>:<generated-password>   
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mqtt-broker
spec:
  selector:
    matchLabels:
      app: mqtt-broker
  replicas: 1 
  template:
    metadata:
      labels:
        app: mqtt-broker
    spec:
      containers:
        - name: mosquitto
          image: eclipse-mosquitto
          ports:
            - containerPort: 1883
          volumeMounts:
            - name: mosquitto-config
              mountPath: /mosquitto/config
            - name: mosquitto-passwd
              mountPath: /mosquitto/secret
              readOnly: true
      volumes:
        - name: mosquitto-config
          configMap:
            name: mosquitto-config-file
        - name: mosquitto-passwd
          configMap:
            name: mqtt-passwd
---
apiVersion: v1
kind: Service
metadata:
  name: mosquitto-service
  labels:
    name: mosquitto
spec:
  selector:
    app: mqtt-broker
  ports:
  - port: 1883
    name: mosquitto-port
    protocol: TCP

启动Docker后,使用minikube start命令启动集群,然后使用kubectl apply -f命令将此yaml文件部署。可以使用minikube dashboard命令启动仪表板来确认部署是否成功。需要注意的是,创建的服务没有外部访问端口。在投稿主的本地环境中,无法通过NodePort进行访问,因此进行了端口转发。使用kubectl port-forward svc/mosquitto-service 1883:1883命令在本地端口和远程端口之间建立了隧道。现在可以通过localhost:1883访问代理服务器了。

在实际环境中,我认为会使用LoadBalancer或者NodePort来提供外部IP。

Fluentd的部署

接下来是Fluentd的登场。有许多针对Fluentd开发的插件,并且安装也非常简单。

(顺便说一下,在使用Raspberry Pi安装Fluentd时,我尝试使用fluentd-gem install来替代bundle和Gemfile,结果遇到了错误。希望这对你有所帮助。另外,如果要在Mac上安装td-agent并添加插件,通常的fluentd-gem是无法安装的。似乎需要使用/opt/td-agent/embedded/bin/fluent-gem install 来安装插件。)

在Fluentd中添加MQTT插件

我下载并编辑了 fluentd-kubernetes-daemonset 的 debian-kafka2-fluent。创建 Fluentd Docker 镜像的步骤如下所示。

    1. 将fluent-plugin-mqtt-io添加到Gemfile中。

 

    然后使用docker build -t : .创建映像,并使用docker image push 将其推送到Docker Hub。

下一步是编辑fluent.conf文件。在标签内,添加从之前创建的mosquitto-service读取数据的配置。以下是添加的示例部分。

<source>
  @type mqtt
  host "#{ENV['FLUENT_MQTT_BROKER_HOST'] || 'mosquitto-service'}"
  port "#{ENV['FLUENT_MQTT_BROKER_PORT'] || '1883'}"
  topic "#{ENV['FLUENT_MQTT_SUB_TOPIC'] || 'topic'}"
  <security>
     username "#{ENV['FLUENT_MQTT_USER'] || 'user'}"
     password "#{ENV['FLUENT_MQTT_PASSWORD'] || 'password'}"
  </security>
  <parse>
    @type "#{ENV['FLUENT_MQTT_INPUT_FORMAT_TYPE'] || 'json'}"
  </parse>
</source>

除了地址信息外,您还可以指定要订阅的主题和输入类型。有关其他设置,请参阅Fluent::Plugin::Mqtt::IO。

Kafka插件的设置

本次使用的仓库Dockerfile已经预先设置了Kafka插件的安装。(如果仅在Docker环境中使用,可以在Fluentd的Docker Hub上找到自定义方法的说明。)同时,环境变量也可以通过yaml文件进行配置。在这里,我们还添加了用于与Kafka进行通信的TLS/SSL配置。

为了进行客户端认证,我们准备了根证书、客户端证书和客户端密钥。这次我们使用kubectl create secret generic –from-file=ca_cert.pem –from-file=client_cert.pem –from-file=client_key.pem创建了一个密钥。

下一步是在包含@type kafka2的标签部分中的fluent.conf中。

ssl_ca_cert /fluentd/ssl/ca_cert.pem
ssl_client_cert /fluentd/ssl/client_cert.pem
ssl_client_cert_key /fluentd/ssl/client_cert_key.pem

在Kafka中添加一行以启用客户端认证。(请确保在Kafka端添加了客户端证书的根证书,以便进行认证)

参考Fluentd-Kafka用的yaml文件时,我创建了一个文件,参考了fluentd-kubernetes-daemonset页面上的文件。在secret的volumes设定中,我使用了name而不是secretName,但在控制台上收到了警告。

volumes:
- name: fluentd-config
        configMap:
            name: fluent-conf
- name: fluentd-kafka-tls
        secret:
            secretName: tls-secret

一下子設定完成了。之後可以使用kubectl apply -f命令部署並進行操作確認。

确认动作

我从本地的Mosquitto进行了发布。关于从Mosquitto发送数据到Fluentd,我在fluentd.conf文件中的Kafka 标签中将其替换为@stdout并进行了部署,然后通过终端或仪表板进行了日志确认。由于Kafka的输出已经有Grafana了,所以如果没有的话,您可以使用Kafka-Consumer等工具来确认日志。

这次实施的内容就是以上了。

我計劃在不久的將來實現Kafka => Telegraf / Fluentd => InfluxDB的路由,希望能在那方面進行發佈。

其他所参考的

Kubernetes 文档
Kubernetes Fluentd
Kubernetes 配置映射和密钥作为 Kubernetes 卷 | 演示 (YouTube)

广告
将在 10 秒后关闭
bannerAds