祝東京成功搭陸!使用Kinesis Data Analytics進行CloudWatch Logs的實時分析
首先
亞馬遜 Kinesis 數據分析現在可在東京地區使用。
由於還宣布了 Apache Kafka 的托管服務,我們對流媒體服務的繁榮感到好奇,因此想要嘗試一下。
Kinesis数据分析是什么?
-
- ストリーミングデータの分析/検索するサービス。
-
- リアルタイムにフィルターをかけることができるらしい。
- SQLで書けるらしい。
试试看
我們將使用數據分析來實時篩選定時流入的Cloudwatch日誌。本次以VPCFlowLogs輸出到CwLogs作為例子。
流动
1. 移动流 liú)
订阅
要将CloudWatch Logs与Kinesis Stream进行集成,我们需要使用名为”订阅”的功能。
由于在管理控制台中无法完成此操作,我们需要使用CLI。
创建IAM角色
- Kinesisストリームにデータを置く権限をCWLogsに付与
% cat << EOF > TrustPolicyForCWL.json
> {
> "Statement": {
> "Effect": "Allow",
> "Principal": { "Service": "logs.ap-northeast-1.amazonaws.com" },
> "Action": "sts:AssumeRole"
> }
> }
> EOF
% aws iam create-role --role-name CWLtoKinesisRole --assume-role-policy-document file://./TrustPolicyForCWL.json
{
"Role": {
"AssumeRolePolicyDocument": {
"Statement": {
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "logs.ap-northeast-1.amazonaws.com"
}
}
},
"RoleId": "AAAAAAAAAAAAAAAAAAA",
"CreateDate": "2019-06-04T13:35:24Z",
"RoleName": "CWLtoKinesisRole",
"Path": "/",
"Arn": "arn:aws:iam::123456789:role/CWLtoKinesisRole"
}
}
- PutPolicy
% cat PermissionsForCWL.json
{
"Statement": [
{
"Effect": "Allow",
"Action": "kinesis:PutRecord",
"Resource": "arn:aws:kinesis:ap-northeast-1:123456789:stream/VPCFlowLogs"
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": "arn:aws:iam::123456789:role/CWLtoKinesisRole"
}
]
}
% aws iam put-role-policy --role-name CWLtoKinesisRole --policy-name Permissions-Policy-For-CWL --policy-document file://~/PermissionsForCWL.json
- サブスクリプションフィルタ作成
% aws logs put-subscription-filter \
--log-group-name "/aws/vpcflowlogs/" \
--filter-name "VPCFlowLogsAllFilter" \
--filter-pattern "[version, account_id, interface_id, srcaddr != "-", dstaddr != "-", srcport != "-", dstport != "-", protocol, packets, bytes, start, end, action, log_status]" \
--destination-arn "arn:aws:kinesis:ap-northeast-1:123456789:stream/VPCFlowLogs" \
--role-arn "arn:aws:iam::123456789:role/CWLtoKinesisRole"
让我们确认一下是否与Kinesis Stream进行了协作。
% aws kinesis get-records --limit 10 --shard-iterator $(aws kinesis get-shard-iterator --stream-name VPCFlowLogs --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON | jq -r ."ShardIterator") | jq -r '.Records[].Data' | base64 -D | zcat
2. 亚马逊Kinesis数据分析
成功建立CwLogs和Kinesis Stream的连接后,接下来我们将创建Data Analytics。
创建预处理的Lambda函数
由于Cwlogs的输入记录已经被压缩,所以需要解压缩。
让我们使用预处理设置创建AWS Lambda函数,并尝试解压缩。
以前有一份设计草图,但现在已经不见了,因此我将参考这篇文章去创建。
'use strict';
console.log('Loading function');
const zlib = require('zlib');
exports.handler = (event, context, callback) => {
let success = 0; // Number of valid entries found
let failure = 0; // Number of invalid entries found
/* Process the list of records */
const output = event.records.map((record) => {
/* Data is base64-encoded, so decode here */
const compressedData = Buffer.from(record.data, 'base64');
try {
const decompressedData = zlib.unzipSync(compressedData);
/* Encode decompressed JSON or CSV */
const result = (Buffer.from(decompressedData, 'utf8')).toString('base64');
success++;
return {
recordId: record.recordId,
result: 'Ok',
data: result,
};
} catch (err) {
failure++;
return {
recordId: record.recordId,
result: 'ProcessingFailed',
data: record.data,
};
}
});
console.log('Processing completed. Successful records ${success}, Failed records ${failure}.');
callback(null, {
records: output,
});
};
创建SQL
由于源设置已完成,我们将创建一个实际应用了过滤器的SQL查询。
选择实时分析。
我們將使用位元組數進行篩選。
-- ** Continuous Filter **
-- Performs a continuous filter based on a WHERE condition.
-- .----------. .----------. .----------.
-- | SOURCE | | INSERT | | DESTIN. |
-- Source-->| STREAM |-->| & SELECT |-->| STREAM |-->Destination
-- | | | (PUMP) | | |
-- '----------' '----------' '----------'
-- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE
-- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM
-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
"srcaddr" varchar(16),
"dstaddr" varchar(16),
"bytes" DOUBLE
);
-- Create pump to insert into output
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "srcaddr", "dstaddr", "bytes"
FROM "SOURCE_SQL_STREAM_001"
WHERE "bytes" > 50;
后续的合作
总结
我已经检查了Kinesis分析的行为。使用SQL可以很好地进行过滤!我想继续深入了解一下。
在这个例子中,使用CloudWatch Logs Insights应该已经足够了。