祝東京成功搭陸!使用Kinesis Data Analytics進行CloudWatch Logs的實時分析

首先

亞馬遜 Kinesis 數據分析現在可在東京地區使用。
由於還宣布了 Apache Kafka 的托管服務,我們對流媒體服務的繁榮感到好奇,因此想要嘗試一下。

Kinesis数据分析是什么?

    • ストリーミングデータの分析/検索するサービス。

 

    • リアルタイムにフィルターをかけることができるらしい。

 

    SQLで書けるらしい。

试试看

我們將使用數據分析來實時篩選定時流入的Cloudwatch日誌。本次以VPCFlowLogs輸出到CwLogs作為例子。

流动

image.png

1. 移动流 liú)

image.png

订阅

要将CloudWatch Logs与Kinesis Stream进行集成,我们需要使用名为”订阅”的功能。
由于在管理控制台中无法完成此操作,我们需要使用CLI。

创建IAM角色

    Kinesisストリームにデータを置く権限をCWLogsに付与
% cat << EOF > TrustPolicyForCWL.json
> {
>   "Statement": {
>     "Effect": "Allow",
>     "Principal": { "Service": "logs.ap-northeast-1.amazonaws.com" },
>     "Action": "sts:AssumeRole"
>   }
> }
> EOF

% aws iam create-role --role-name CWLtoKinesisRole --assume-role-policy-document file://./TrustPolicyForCWL.json
{
    "Role": {
        "AssumeRolePolicyDocument": {
            "Statement": {
                "Action": "sts:AssumeRole",
                "Effect": "Allow",
                "Principal": {
                    "Service": "logs.ap-northeast-1.amazonaws.com"
                }
            }
        },
        "RoleId": "AAAAAAAAAAAAAAAAAAA",
        "CreateDate": "2019-06-04T13:35:24Z",
        "RoleName": "CWLtoKinesisRole",
        "Path": "/",
        "Arn": "arn:aws:iam::123456789:role/CWLtoKinesisRole"
    }
}
    PutPolicy
% cat PermissionsForCWL.json
{
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "kinesis:PutRecord",
      "Resource": "arn:aws:kinesis:ap-northeast-1:123456789:stream/VPCFlowLogs"
    },
    {
      "Effect": "Allow",
      "Action": "iam:PassRole",
      "Resource": "arn:aws:iam::123456789:role/CWLtoKinesisRole"
    }
  ]
}

% aws iam put-role-policy  --role-name CWLtoKinesisRole  --policy-name Permissions-Policy-For-CWL  --policy-document file://~/PermissionsForCWL.json
    サブスクリプションフィルタ作成
% aws logs put-subscription-filter \
    --log-group-name "/aws/vpcflowlogs/" \
    --filter-name "VPCFlowLogsAllFilter" \
    --filter-pattern "[version, account_id, interface_id, srcaddr != "-", dstaddr != "-", srcport != "-", dstport != "-", protocol, packets, bytes, start, end, action, log_status]" \
    --destination-arn "arn:aws:kinesis:ap-northeast-1:123456789:stream/VPCFlowLogs" \
    --role-arn "arn:aws:iam::123456789:role/CWLtoKinesisRole"
image.png

让我们确认一下是否与Kinesis Stream进行了协作。

% aws kinesis get-records --limit 10 --shard-iterator $(aws kinesis get-shard-iterator --stream-name VPCFlowLogs --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON | jq -r ."ShardIterator") | jq -r '.Records[].Data' | base64 -D | zcat

2. 亚马逊Kinesis数据分析

成功建立CwLogs和Kinesis Stream的连接后,接下来我们将创建Data Analytics。

image.png

创建预处理的Lambda函数

由于Cwlogs的输入记录已经被压缩,所以需要解压缩。
让我们使用预处理设置创建AWS Lambda函数,并尝试解压缩。

image.png

以前有一份设计草图,但现在已经不见了,因此我将参考这篇文章去创建。

'use strict';
console.log('Loading function');
const zlib = require('zlib');

exports.handler = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found
    /* Process the list of records */
    const output = event.records.map((record) => {
        /* Data is base64-encoded, so decode here */
        const compressedData = Buffer.from(record.data, 'base64');
        try {
            const decompressedData = zlib.unzipSync(compressedData);
            /* Encode decompressed JSON or CSV */
            const result = (Buffer.from(decompressedData, 'utf8')).toString('base64');
            success++;
            return {
                recordId: record.recordId,
                result: 'Ok',
                data: result,
            };
        } catch (err) {
            failure++;
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed',
                data: record.data,
            };
        }
    });
    console.log('Processing completed.  Successful records ${success}, Failed records ${failure}.');
    callback(null, {
        records: output,
    });
};
image.png
image.png
image.png

创建SQL

由于源设置已完成,我们将创建一个实际应用了过滤器的SQL查询。
选择实时分析。

image.png

我們將使用位元組數進行篩選。

-- ** Continuous Filter ** 
-- Performs a continuous filter based on a WHERE condition.
--          .----------.   .----------.   .----------.              
--          |  SOURCE  |   |  INSERT  |   |  DESTIN. |              
-- Source-->|  STREAM  |-->| & SELECT |-->|  STREAM  |-->Destination
--          |          |   |  (PUMP)  |   |          |              
--          '----------'   '----------'   '----------'               
-- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE
-- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM
-- Create output stream, which can be used to send to a destination

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   "srcaddr"     varchar(16),
   "dstaddr"   varchar(16),
   "bytes"        DOUBLE
);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "srcaddr", "dstaddr", "bytes"
FROM "SOURCE_SQL_STREAM_001"
WHERE "bytes" > 50;
image.png

后续的合作

image.png
image.png
image.png

总结

我已经检查了Kinesis分析的行为。使用SQL可以很好地进行过滤!我想继续深入了解一下。

在这个例子中,使用CloudWatch Logs Insights应该已经足够了。

广告
将在 10 秒后关闭
bannerAds