使用Lambda将CloudTrail日志投放到Elasticsearch
首先
我觉得把Cloudtrail日志放入可以使用Kibana的Elasticsearch中进行分析会很方便,所以参考了这个并尝试创建了Lambda函数的示例。
这里是创建的 Lambda 函数的代码。
var aws = require('aws-sdk');
var zlib = require('zlib');
var elasticsearch = require('elasticsearch');
var ES_INDEX = 'cloudtrail'; // Elasticsearch index name
var ES_TYPE = 'log'; // Elsticsearch index type name
var ES_CLIENT = new elasticsearch.Client({
host: '<ELASTICSEARCH_URL:PORT_NUMBER>' //Elasticsearch URL:port
});
//start lambda function
exports.handler = function(event, context) {
console.log('Received event:');
var bucket = event.Records[0].s3.bucket.name;
var key = event.Records[0].s3.object.key;
var region = event.Records[0].awsRegion;
var s3 = new aws.S3({
apiVersion: '2006-03-01',
region : region
});
s3.getObject({
Bucket : bucket,
Key : key
}, function(err,data) {
if(err){
context.done('error','error getting file' + err);
} else {
var contentType = data.ContentType;
var contentEncoding = data.ContentEncoding;
if (contentType === "application/json"
&& contentEncoding === "gzip") {
var logFileName = key.substr(key.lastIndexOf("/") + 1);
var buf = data.Body;
zlib.gunzip(buf, function(_, dezipped) {
var json = JSON.parse(dezipped.toString('utf-8'));
sendToES(context,region,logFileName,json);
});
}
}
});
};
//bulk send to Elasticsearch
function sendToES(context,region,logFileName,json){
var records = json.Records;
var searchRecords = [];
for(var i = 0; i < records.length; i++){
var record = records[i];
var header = {
"index":{
"_index": ES_INDEX,
"_type": ES_TYPE,
"_id": record.eventTime + "-" + record.requestID
}
};
var searchRecord = {
"usertype" : record.userIdentity.type,
"arn" : record.userIdentity.arn,
"accesskeyid" : record.userIdentity.accessKeyId,
"username" : record.userIdentity.userName,
"eventtime" : record.eventTime,
"eventsource" : record.eventSource,
"eventname" : record.eventName,
"awsregion" : record.awsRegion,
"sourceipaddress" : record.sourceIPAddress,
"useragent" : record.userAgent,
"requestid" : record.requestID,
"eventid" : record.eventID,
"logfilename" : logFileName
};
searchRecords.push(header);
searchRecords.push(searchRecord);
};
console.log(searchRecords);
ES_CLIENT.bulk({
"body": searchRecords
}, function(err, resp){
if(err){
console.log(err);
context.done("error",err);
}else{
console.log(resp);
context.done(null,'success');
};
});
};
用法
请提前启用CloudTrail。CloudTrail日志应放置在与Lambda函数相同的区域内。
首先,准备一台安装了Elasticsearch / kibana的服务器。由于kibana4最近官方发布,所以这次选择Elasticsearch 1.4.4/kibana 4.0。
-
- http://www.elasticsearch.org/download/
- http://www.elasticsearch.org/overview/kibana/installation/
一旦启动Elasticsearch,就可以创建索引。当向Elasticsearch放置文档时,它会自动创建映射,但如果在Kibana中使用时,最好明确指定为not_analyzed。
curl -XPUT http://localhost:9200/cloudtrail -d '
{
mappings: {
log: {
properties: {
accesskeyid: {
type: "string",
index: "not_analyzed"
},
arn: {
type: "string",
index: "not_analyzed"
},
awsregion: {
type: "string",
index: "not_analyzed"
},
eventid: {
type: "string",
index: "not_analyzed"
},
eventname: {
type: "string",
index: "not_analyzed"
},
eventsource: {
type: "string",
index: "not_analyzed"
},
eventtime: {
type: "date",
format: "dateOptionalTime"
},
logfilename: {
type: "string",
index: "not_analyzed"
},
requestid: {
type: "string",
index: "not_analyzed"
},
sourceipaddress: {
type: "string",
index: "not_analyzed"
},
useragent: {
type: "string",
index: "not_analyzed"
},
username: {
type: "string",
index: "not_analyzed"
},
usertype: {
type: "string",
index: "not_analyzed"
}
}
}
}
}'
一旦Elasticsearch环境准备就绪,我们将创建一个lambda function。
安装Node和npm,并设置lambda函数的环境以打包。这里参考设置Lambda开发环境。
由于函数中使用了Elasticsearch.js,所以使用npm进行安装。
npm install elasticsearch
将开头的Lambda函数保存为js格式,并与node_module一起压缩为zip文件。当 zip文件准备好后,上传到Lambda中。
使用aws cli工具,上传后会直接创建Lambda函数。
$ zip -r function.zip lambda-function.js node_modules
$ aws lambda upload-function --function-name CloudtrailToElasticsearch --function-zip "./function.zip" --runtime nodejs --role arn:aws:iam::<AWS_ACCOUNT_ID>:role/lambda_exec_role --mode event --handler lambda-function.handler
创建Lambda函数后,需要进行事件源的配置。在Lambda管理控制台中,点击“配置事件源”,选择作为Cloudtrail日志输出的S3存储桶以及在读取时使用的IAM角色,并进行设置。
如果您按照這個設定,並且沒有任何問題的話,CloudTrail日誌將自動被推送到Elasticsearch。
只需使用Kibana4创建仪表板即可。 Kibana4的操作非常有用。