使用Lambda将CloudTrail日志投放到Elasticsearch

首先

我觉得把Cloudtrail日志放入可以使用Kibana的Elasticsearch中进行分析会很方便,所以参考了这个并尝试创建了Lambda函数的示例。

SnapCrab_NoName_2015-2-21_23-17-16_No-00.png

这里是创建的 Lambda 函数的代码。

var aws = require('aws-sdk');
var zlib = require('zlib');
var elasticsearch = require('elasticsearch');

var ES_INDEX = 'cloudtrail'; // Elasticsearch index name
var ES_TYPE = 'log'; // Elsticsearch index type name
var ES_CLIENT = new elasticsearch.Client({
    host: '<ELASTICSEARCH_URL:PORT_NUMBER>' //Elasticsearch URL:port
}); 

//start lambda function
exports.handler = function(event, context) {
    console.log('Received event:');
    var bucket = event.Records[0].s3.bucket.name;
    var key = event.Records[0].s3.object.key;
    var region = event.Records[0].awsRegion;
    var s3 = new aws.S3({
        apiVersion: '2006-03-01',
        region : region
    });

    s3.getObject({
        Bucket : bucket,
        Key : key
    }, function(err,data) {
        if(err){
            context.done('error','error getting file' + err);
        } else {
            var contentType = data.ContentType;
            var contentEncoding = data.ContentEncoding;
            if (contentType === "application/json"
                && contentEncoding === "gzip") {
                var logFileName = key.substr(key.lastIndexOf("/") + 1);
                var buf = data.Body;
                zlib.gunzip(buf, function(_, dezipped) {
                    var json = JSON.parse(dezipped.toString('utf-8'));
                    sendToES(context,region,logFileName,json);
                });
            }
        }
    });
};

//bulk send to Elasticsearch
function sendToES(context,region,logFileName,json){
    var records = json.Records;
    var searchRecords = [];
    for(var i = 0; i < records.length; i++){
        var record = records[i];
        var header = {
            "index":{
                "_index": ES_INDEX,
                "_type": ES_TYPE,
                "_id": record.eventTime + "-" + record.requestID
            }
        };

        var searchRecord = {
            "usertype" : record.userIdentity.type,
            "arn" : record.userIdentity.arn,
            "accesskeyid" : record.userIdentity.accessKeyId,
            "username" : record.userIdentity.userName,
            "eventtime" : record.eventTime,
            "eventsource" : record.eventSource,
            "eventname" : record.eventName,
            "awsregion" : record.awsRegion,
            "sourceipaddress" : record.sourceIPAddress,
            "useragent" : record.userAgent,
            "requestid" : record.requestID,
            "eventid" : record.eventID,
            "logfilename" : logFileName
        };
        searchRecords.push(header);
        searchRecords.push(searchRecord);
    };
    console.log(searchRecords);
    ES_CLIENT.bulk({
        "body": searchRecords
    }, function(err, resp){
            if(err){
                console.log(err);
                context.done("error",err);
            }else{
                console.log(resp);
                context.done(null,'success');
            };
    });
};

用法

请提前启用CloudTrail。CloudTrail日志应放置在与Lambda函数相同的区域内。

首先,准备一台安装了Elasticsearch / kibana的服务器。由于kibana4最近官方发布,所以这次选择Elasticsearch 1.4.4/kibana 4.0。

    • http://www.elasticsearch.org/download/

 

    http://www.elasticsearch.org/overview/kibana/installation/

一旦启动Elasticsearch,就可以创建索引。当向Elasticsearch放置文档时,它会自动创建映射,但如果在Kibana中使用时,最好明确指定为not_analyzed。

curl -XPUT http://localhost:9200/cloudtrail -d '
{
    mappings: {
        log: {
            properties: {
                accesskeyid: {
                    type: "string",
                    index: "not_analyzed"
                },
                arn: {
                    type: "string",
                    index: "not_analyzed"
                },
                awsregion: {
                    type: "string",
                    index: "not_analyzed"
                },
                eventid: {
                    type: "string",
                    index: "not_analyzed"
                },
                eventname: {
                    type: "string",
                    index: "not_analyzed"
                },
                eventsource: {
                    type: "string",
                    index: "not_analyzed"
                },
                eventtime: {
                    type: "date",
                    format: "dateOptionalTime"
                },
                logfilename: {
                    type: "string",
                    index: "not_analyzed"
                },
                requestid: {
                    type: "string",
                    index: "not_analyzed"
                },
                sourceipaddress: {
                    type: "string",
                    index: "not_analyzed"
                },
                useragent: {
                    type: "string",
                    index: "not_analyzed"
                },
                username: {
                    type: "string",
                    index: "not_analyzed"
                },
                usertype: {
                    type: "string",
                    index: "not_analyzed"
                }
            }
        }
    }
}'

一旦Elasticsearch环境准备就绪,我们将创建一个lambda function。

安装Node和npm,并设置lambda函数的环境以打包。这里参考设置Lambda开发环境。
由于函数中使用了Elasticsearch.js,所以使用npm进行安装。

npm install elasticsearch

将开头的Lambda函数保存为js格式,并与node_module一起压缩为zip文件。当 zip文件准备好后,上传到Lambda中。
使用aws cli工具,上传后会直接创建Lambda函数。

$ zip -r function.zip lambda-function.js node_modules

$ aws lambda upload-function --function-name CloudtrailToElasticsearch --function-zip "./function.zip" --runtime nodejs --role  arn:aws:iam::<AWS_ACCOUNT_ID>:role/lambda_exec_role --mode event --handler lambda-function.handler 

创建Lambda函数后,需要进行事件源的配置。在Lambda管理控制台中,点击“配置事件源”,选择作为Cloudtrail日志输出的S3存储桶以及在读取时使用的IAM角色,并进行设置。

如果您按照這個設定,並且沒有任何問題的話,CloudTrail日誌將自動被推送到Elasticsearch。

只需使用Kibana4创建仪表板即可。 Kibana4的操作非常有用。

SnapCrab_NoName_2015-2-22_12-41-29_No-00.png
SnapCrab_NoName_2015-2-22_12-37-35_No-00.png
广告
将在 10 秒后关闭
bannerAds