将ALB日志收集并转化为prometheus的指标

我认为在AWS上运行服务时,ALB(ELB)是一个非常常用的服务。

我所操作的服务使用的是EKS集群的入口为ALB,所有的流量都通过ALB来访问服务,并发送请求。

動機 –> Motivation

在监控和测量服务时,负载均衡器的度量标准是一个明显的基础。
在ALB的度量标准中,包括各种状态的计数器和平均延迟的度量标准,对许多人来说也是有用的。
然而,这些度量标准可能在ALB的操作方式和所需数据方面有些不足。

    • バックエンドまで到達できずエラーになった場合、HTTPCode_ELB_*_Countというメトリクスになり、どのTG(ターゲットグループ)に向けたリクエストがエラーになったのかがわからない

バックエンドが障害で落ちていたり、が高負荷で処理不能に陥った時これになる
ひとつのALBに複数のTGを設定してトラフィックを分岐させていると困る

パスやリクエストメソッドなどの単位で集計できない

これは仕方ないでしょうけど…

我认为后者经常在应用程序中获取指标,但是由于无法忽视无法达到类似前者应用程序的错误,所以最终还是必须监控ALB的指标。

在这种情况下,有一个很方便的工具,那就是ALB的日志。

ALB日志

只要在负载均衡器上进行设置,它就会帮我们将日志文件保存到S3存储桶中。正如上述文档所描述的那样,基本的信息都已经齐备了。

    • 処理時間

 

    • ステータス

 

    • メソッド

 

    • ターゲットのドメイン

 

    URI

如果对这些内容进行统计,您可以获得比应用程序侧的ALB指标更有用的指标。虽然日志中没有记录请求是针对哪个TG的,但是如果将侦听器规则与日志进行对照,我想可以知道。

存储位置

我决定对ALB的日志进行汇总,但在选择汇总结果的保存位置时存在一些选择。
考虑到这是关于AWS资源的事情,似乎选择使用Cloudwatch是比较稳妥的,但对于我来说,由于我正在运营k8s集群,基本上使用prometheus进行监控。
虽然我不得不查看一些托管服务如RDS的度量指标,但如果可以的话,我希望能够完全使用prometheus来完成。

因此,这次我们将汇总结果保存到prometheus中。

方式

呈现而出

ALB的日志将每5分钟存储在S3中,每个ALB节点都有一个日志文件。
通过Lambda钩子解析并发送结果到pushgateway来处理此文件的创建。
然后将其传递到prometheus展示。

download.png

不过,我们希望能够创建一些计数器度量指标,所以这次我们使用了这个库:https://github.com/weaveworks/prom-aggregation-gateway
可以像使用pushgateway一样使用这个库。
如果使用gauge没有问题的话,我认为最好还是使用官方的pushgateway。

此外,由于lambda到pushgateway的连接是全局连接,我们将在pushgateway中添加基本身份验证来进行适配。

匿名函数

本次以host、request_method、status、path作为标签。

    • リクエスト数

alb_log_request_count

処理時間の合計

今回はbackend_processing_timeを集計する
alb_log_request_duration

albエラー数

backendに到達できかなった(backendがステータスを返さなかった)5xxの数
メトリクス名 alb_log_elb_5xx

backendエラー数

backendが返した5xxの数
メトリクス名 alb_log_target_5xx

因为是计数器,所以在度量名称后面会自动加上”_total”。
通常的统计只需上述两个就足够了,但由于其他事项上需要知道5xx是否达到后端,所以我取了它。
以下是在lambda中使用的python脚本(请原谅质量)。

# -*- coding: utf-8 -*-
# 使用する環境変数
# USERNAME: pushgatewayのベーシック認証username
# PASSWORD: pushgatewayのベーシック認証password
# PUSH_ENDPOINT: pushgatewayのエンドポイント 

from prometheus_client import CollectorRegistry, Counter, push_to_gateway
from prometheus_client.exposition import basic_auth_handler
import io, re, os, gzip, boto3

fields = [
    "type",
    "timestamp",
    "alb",
    "client_ip_port",
    "backend_ip_port",
    "request_processing_time",
    "backend_processing_time",
    "response_processing_time",
    "alb_status_code",
    "backend_status_code",
    "received_bytes",
    "sent_bytes",
    "request_verb",
    "request_url",
    "request_proto",
    "user_agent",
    "ssl_cipher",
    "ssl_protocol",
    "target_group_arn",
    "trace_id",
    "domain_name",
    "chosen_cert_arn",
    "matched_rule_priority",
    "request_creation_time",
    "actions_executed",
    "redirect_url",
    "error_reason",
    "target_ip_port_list",
    "target_status_code_list",
    "classification",
    "classification_reason",
    "new_field",
]

regex = r"([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([-.0-9]*) ([-.0-9]*) ([-.0-9]*) (|[-0-9]*) (-|[-0-9]*) ([-0-9]*) ([-0-9]*) \"([^ ]*) ([^ ]*) (- |[^ ]*)\" \"([^\"]*)\" ([A-Z0-9-]+) ([A-Za-z0-9.-]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\" \"([^\"]*)\" ([-.0-9]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\" \"([^\"]*)\" \"([^\"]*)\" \"([^\"]*)\" \"([^\"]*)\" \"([^\"]*)\"($|.*)"

def auth_handler(url, method, timeout, headers, data):
    username = os.getenv('USERNAME')
    password = os.getenv('PASSWORD')
    return basic_auth_handler(url, method, timeout, headers, data, username, password)

def purge_path(request_url):
    regex = r"https?://[^/]+([^\?]*).*"
    match = re.match(regex, request_url)
    uri = match.group(1)
    return uri

def register_metrics(registry, elb_5xx, target_5xx, requests_total, duration_total, file_obj):
    for row in file_obj.readlines():
        matches = re.findall(regex, row)
        if matches:
            log = dict(zip(fields, list(matches[0])))
            label_values = [log['domain_name'], log['request_verb'], log['alb_status_code'], purge_path(log['request_url'])]
            requests_total.labels(*label_values).inc()
            duration = float(log['backend_processing_time'])
            if duration < 0:
                duration = 0 # 404など、backend_processing_timeに-1が入ることがあり、そのままだとinc()でエラーになるので0にする
            duration_total.labels(*label_values).inc(duration)
            if int(log['alb_status_code']) >= 500:
                if log['backend_status_code'] == '-':
                    elb_5xx.labels(*label_values).inc()
                else:
                    target_5xx.labels(*label_values).inc()

    return registry

def get_log_file(bucket, key):
    s3_client = boto3.client('s3')
    obj = s3_client.get_object(
        Bucket=bucket,
        Key=key
    )['Body'].read()
    file = gzip.open(io.BytesIO(obj), 'rt')
    return file

def lambda_handler(event, context):
    registry = CollectorRegistry()
    label_keys = ['host','request_method','status','path']
    elb_5xx = Counter('alb_log_elb_5xx', "A counter of elb_status_code 5xx", registry=registry, labelnames=label_keys)
    target_5xx = Counter('alb_log_target_5xx', "A counter of target_status_code 5xx", registry=registry, labelnames=label_keys)
    requests_total = Counter('alb_log_request_count', "Rows of alb log", registry=registry, labelnames=label_keys)
    duration_total = Counter('alb_log_request_duration', "Total duration of alb log", registry=registry, labelnames=label_keys)

    records = event['Records']
    for record in records:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        file_obj = get_log_file(bucket, key)
        register_metrics(registry, elb_5xx, target_5xx, requests_total,duration_total, file_obj)

    push_to_gateway(os.getenv('PUSH_ENDPOINT') , job='dummy', registry=registry, handler=auth_handler)

根据评论设置环境变量。
如果不需要基本验证,请随意删除。
尽管此处省略了,但实际上会添加标签以将主机和Kubernetes内的应用程序关联起来,并进行路径规范化等处理,因此根据环境的不同,添加这些将会很方便。

在中国,只需要一个选项来释义以上内容:通过指定alb日志保存在S3存储桶中的前缀,来触发ObjectCreatedByPut事件类型。

推送网关

如前所述,我們將成為prom-aggregation-gateway,但這並不需要特別複雜的設定。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: prom-aggregation-gateway
  labels:
    app.kubernetes.io/name: prom-aggregation-gateway
spec:
  strategy:
    type: RollingUpdate 
  selector:
    matchLabels:
      app.kubernetes.io/name: prom-aggregation-gateway
  revisionHistoryLimit: 3
  replicas: 1
  template:
    metadata:
      labels:
        app.kubernetes.io/name: prom-aggregation-gateway
    spec:
      containers:
        - name: pushgateway
          image: weaveworks/prom-aggregation-gateway:latest
          imagePullPolicy: IfNotPresent
          args:
            - '-listen=:9999'
          ports:
            - containerPort: 9999
---
apiVersion: v1
kind: Service
metadata:
  name: prom-aggregation-gateway
  labels:
    app.kubernetes.io/name: prom-aggregation-gateway
spec:
  type: ClusterIP
  ports:
    - port: 9999
      name: internal
  selector:
    app.kubernetes.io/name: prom-aggregation-gateway

由于接入和基本认证部分受运营情况影响,因此请根据实际情况处理。

省略掉关于普罗米修斯的设定。

看见

スクリーンショット 2021-12-14 2.30.01.png

请注意以下事项

    • ログ記録はベストエフォートになっており、全てのリクエストが記録されることは約束されていない

そうは言ってもそんなごっそり欠けることはないのでは?

ログが出るのが5分ごとなので、リアルタイム監視にはあまり向かない…

ここまでやっておいて…

lambdaスクリプトは不具合や考慮漏れがあるかも

广告
将在 10 秒后关闭
bannerAds