k8sベースのアプリケーションログをkafkaにpublishして、Vectorを介してDatadogへ転送する構成を試していた際に一部ハマりそうなポイントがあったため備忘録として記載

Vectorとは

公式ドキュメントにも記載されていますが、ログ/メトリクス/トレースデータの収集、加工、他ツールへの連携を行うためのデータパイプラインツールとなります。

Screenshot 2023-12-02 at 22-23-04 Data model.png

やりたいこと

    • k8s上で稼働しているアプリログをfluent-bit経由でkafkaへpublish

 

    kafkaに溜まったログに対して、vectorを経由してDatadogへ転送する

datadog agentを介して直接Datadogへログを転送することも可能ですが、
1. ログを基盤側でバッファリングしておくことで、Datadogに障害が起きた際のログの可用性を担保したかった
2. 基盤側で強制的に付与したいDD_TAGSがあった
ため上記の流れでログを転送していました。

構成

    • application podはEKSクラスタ上に構築

 

    • ログはstdoutへ出力し、fluent bit経由でMSK kafkaへpublishされる

 

    • MSK kafkaに溜まったログをvectorがsubscribe

 

    Vectorのsink設定としてdatadog_logsを指定し、Datadogへログを転送する
vector.drawio.png

kafka構築

terraformを利用して、kafkaクラスタとconfigurationの設定を定義
(プライベートサブネットとセキュリティグループは作成済みの前提)

locals {
    service_name = "vector-log-poc"
}

resource "aws_msk_cluster" "kafka" {
  cluster_name           = "${local.service_name}-cluster"
  kafka_version          = "3.5.1"
  number_of_broker_nodes = 3
  broker_node_group_info {
    instance_type = "kafka.t3.small"
    storage_info {
      ebs_storage_info {
        volume_size = 10 //GB
      }
    }
    client_subnets = [
      aws_subnet.private_ap_northeast_1a.id, //事前に作成しておく
      aws_subnet.private_ap_northeast_1b.id, //事前に作成しておく
      aws_subnet.private_ap_northeast_1d.id, //事前に作成しておく
    ]
    security_groups = [aws_security_group.kafka.id]  //事前に作成しておく
  }

  client_authentication {
    unauthenticated = true

    sasl {
      iam   = false
      scram = false
    }

    tls {}
  }
  encryption_info {
    encryption_in_transit {
      client_broker = "TLS_PLAINTEXT"
    }
  }

  configuration_info {
    arn      = aws_msk_configuration.kafka.arn
    revision = aws_msk_configuration.kafka.latest_revision
  }

  tags = {
    Name = "${local.service_name}-cluster"
  }
}

resource "aws_msk_configuration" "kafka" {
  kafka_versions = ["3.5.1"]
  name           = "${local.service_name}-cluster"

  server_properties = <<PROPERTIES
auto.create.topics.enable = true
delete.topic.enable = true
log.retention.hours = 24
PROPERTIES
}

Vector構築

Helm chartを利用してAggrigatorタイプのVector podをデプロイする

Quick startに従ってhelm chartをinstallする

helm install --name my-vector vector/vector

次にvalues.yamlを作成する

role: Aggregator

customConfig:
  api:
    enabled: true
    address: 127.0.0.1:8686
    playground: false

  sources:
    kafka:
      type: kafka
      bootstrap_servers: b-1.xxx:9092,b-2.xxx:9092,b-3.xxx:9092
      group_id: log-consumer-group
      topics: 
        - fluent-bit-logs # fluent bitがログを送っているtopicを指定する

    vector_logs:
      type: internal_logs

  transforms:
    parse_kafka:
      type: remap
      inputs: 
        - kafka
      source: >-
        .ddsource = "kafka"

        .ddtags = "env:dev,environment:dev,region:ap-northeast-1"

        .service = "vector-log-poc"

  sinks:
    ddlog:
      type: datadog_logs
      inputs: 
        - parse_kafka
        - vector_logs
      default_api_key: xxx # Datadog側で作成したapi keyを指定する
      site: ap1.datadoghq.com # 利用中のDatadogサイトを指定する https://docs.datadoghq.com/ja/getting_started/site/

autoscaling:
  enabled: true
  minReplicas: 1
  maxReplicas: 3

podDisruptionBudget:
  enabled: true
  minAvailable: 1

helm upgradeでvalues.yamlの変更を反映する

helm upgrade -f values.yaml my-vector vector/vector

Datadog上でのログの確認

14日間限定で無料トライアルも提供されています。もし、コストをかけずに試したい場合はこちらを利用して確認することも可能です。
以下の通り、ログがDatadogにて可視化できていました。

image.png

Vectorの設定でハマったポイント

当初transformsの設定が抜けており、body dataに必要なmessageやservice、ddtags等が渡せていませんでした。
Vector側で設定関連のエラーが出ていないものの、Datadogへログが転送できていない(ログの表示ができていない)状態となり、切り分けに時間がかかってしまいました。

対応策として、上記のvalues.yamlにある通り、transformsにて.ddsourceの設定を追加してdatadogのlog pipelineにおいてログのパースを行うようにしました。

※参考
https://vector.dev/docs/reference/configuration/sinks/datadog_logs/#attributes

まとめ

初めてvectorを触ってみましたが、source/transforms/sinkとログ処理の一連の設定を簡易に定義でき、初学者にも扱いやすいツールだなと感じました。
またドキュメントが充実しているため、設定に関しての不明点があれば公式ドキュメントを見れば良さそうでした。

广告
将在 10 秒后关闭
bannerAds