k8sベースのアプリケーションログをkafkaにpublishして、Vectorを介してDatadogへ転送する構成を試していた際に一部ハマりそうなポイントがあったため備忘録として記載
Vectorとは
公式ドキュメントにも記載されていますが、ログ/メトリクス/トレースデータの収集、加工、他ツールへの連携を行うためのデータパイプラインツールとなります。
やりたいこと
-
- k8s上で稼働しているアプリログをfluent-bit経由でkafkaへpublish
- kafkaに溜まったログに対して、vectorを経由してDatadogへ転送する
datadog agentを介して直接Datadogへログを転送することも可能ですが、
1. ログを基盤側でバッファリングしておくことで、Datadogに障害が起きた際のログの可用性を担保したかった
2. 基盤側で強制的に付与したいDD_TAGSがあった
ため上記の流れでログを転送していました。
構成
-
- application podはEKSクラスタ上に構築
-
- ログはstdoutへ出力し、fluent bit経由でMSK kafkaへpublishされる
-
- MSK kafkaに溜まったログをvectorがsubscribe
- Vectorのsink設定としてdatadog_logsを指定し、Datadogへログを転送する
kafka構築
terraformを利用して、kafkaクラスタとconfigurationの設定を定義
(プライベートサブネットとセキュリティグループは作成済みの前提)
locals {
service_name = "vector-log-poc"
}
resource "aws_msk_cluster" "kafka" {
cluster_name = "${local.service_name}-cluster"
kafka_version = "3.5.1"
number_of_broker_nodes = 3
broker_node_group_info {
instance_type = "kafka.t3.small"
storage_info {
ebs_storage_info {
volume_size = 10 //GB
}
}
client_subnets = [
aws_subnet.private_ap_northeast_1a.id, //事前に作成しておく
aws_subnet.private_ap_northeast_1b.id, //事前に作成しておく
aws_subnet.private_ap_northeast_1d.id, //事前に作成しておく
]
security_groups = [aws_security_group.kafka.id] //事前に作成しておく
}
client_authentication {
unauthenticated = true
sasl {
iam = false
scram = false
}
tls {}
}
encryption_info {
encryption_in_transit {
client_broker = "TLS_PLAINTEXT"
}
}
configuration_info {
arn = aws_msk_configuration.kafka.arn
revision = aws_msk_configuration.kafka.latest_revision
}
tags = {
Name = "${local.service_name}-cluster"
}
}
resource "aws_msk_configuration" "kafka" {
kafka_versions = ["3.5.1"]
name = "${local.service_name}-cluster"
server_properties = <<PROPERTIES
auto.create.topics.enable = true
delete.topic.enable = true
log.retention.hours = 24
PROPERTIES
}
Vector構築
Helm chartを利用してAggrigatorタイプのVector podをデプロイする
Quick startに従ってhelm chartをinstallする
helm install --name my-vector vector/vector
次にvalues.yamlを作成する
role: Aggregator
customConfig:
api:
enabled: true
address: 127.0.0.1:8686
playground: false
sources:
kafka:
type: kafka
bootstrap_servers: b-1.xxx:9092,b-2.xxx:9092,b-3.xxx:9092
group_id: log-consumer-group
topics:
- fluent-bit-logs # fluent bitがログを送っているtopicを指定する
vector_logs:
type: internal_logs
transforms:
parse_kafka:
type: remap
inputs:
- kafka
source: >-
.ddsource = "kafka"
.ddtags = "env:dev,environment:dev,region:ap-northeast-1"
.service = "vector-log-poc"
sinks:
ddlog:
type: datadog_logs
inputs:
- parse_kafka
- vector_logs
default_api_key: xxx # Datadog側で作成したapi keyを指定する
site: ap1.datadoghq.com # 利用中のDatadogサイトを指定する https://docs.datadoghq.com/ja/getting_started/site/
autoscaling:
enabled: true
minReplicas: 1
maxReplicas: 3
podDisruptionBudget:
enabled: true
minAvailable: 1
helm upgradeでvalues.yamlの変更を反映する
helm upgrade -f values.yaml my-vector vector/vector
Datadog上でのログの確認
14日間限定で無料トライアルも提供されています。もし、コストをかけずに試したい場合はこちらを利用して確認することも可能です。
以下の通り、ログがDatadogにて可視化できていました。
Vectorの設定でハマったポイント
当初transformsの設定が抜けており、body dataに必要なmessageやservice、ddtags等が渡せていませんでした。
Vector側で設定関連のエラーが出ていないものの、Datadogへログが転送できていない(ログの表示ができていない)状態となり、切り分けに時間がかかってしまいました。
対応策として、上記のvalues.yamlにある通り、transformsにて.ddsourceの設定を追加してdatadogのlog pipelineにおいてログのパースを行うようにしました。
※参考
https://vector.dev/docs/reference/configuration/sinks/datadog_logs/#attributes
まとめ
初めてvectorを触ってみましたが、source/transforms/sinkとログ処理の一連の設定を簡易に定義でき、初学者にも扱いやすいツールだなと感じました。
またドキュメントが充実しているため、設定に関しての不明点があれば公式ドキュメントを見れば良さそうでした。