尝试将Kinesis Data Firehose中的数据流向MongoDB Cloud
根据听说,Amazon Kinesis Data Firehose开始支持将数据传送到MongoDB Cloud,因此我尝试了一下,但是遇到了一些困难点,现在我把它们记录下来。
在MongoDB Atlas上创建集群。
首先,我们使用MongoDB Atlas创建Cluster(集群)、Database(数据库)和Collection(集合)。
MongoDB Atlas是MongoDB作为数据库即服务(DBaaS)的本身,而MongoDB Realm则是用于操作Atlas的服务接口。
- Get Started with Atlas
实现MongoDB Realm Functions
我以为只需在GUI上轻松完成设定,但在Firehose中需要自己实现服务器无状态函数作为Webhook终结点。
那么使用Kinesis Data Streams + Lambda怎么样呢?
- MongoDB Realm Functions
发布说明链接中的示例代码开头是这样的。
exports = function(payload, response) {
/* Using Buffer in Realm causes a severe performance hit
this function is ~6 times faster
*/
const decodeBase64 = (s) => {
var e={},i,b=0,c,x,l=0,a,r='',w=String.fromCharCode,L=s.length
var A="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
for(i=0;i<64;i++){e[A.charAt(i)]=i}
for(x=0;x<L;x++){
c=e[s.charAt(x)];b=(b<<6)+c;l+=6
while(l>=8){((a=(b>>>(l-=8))&0xff)||(x<(L-2)))&&(r+=w(a))}
}
return r
}
...
我对自身实现Base64解码感到吃惊,因为它无法处理多字节字符,所以必须想办法解决这个问题。
幸运的是,我们可以使用“导入外部依赖项”来使用外部的npm包。
在函数编辑器界面上选择“添加依赖项”,这次我们添加了js-base64的3.7.2版本。
似乎不支持使用import,只能使用require。
const decoded = require("js-base64").decode(encoded)
最后,我使用以下代码创建了一个函数。
由于我想要在端点的身份验证中使用 API 密钥,所以在创建函数时选择将身份验证设为系统。
exports = (payload, response) => {
const body = JSON.parse(payload.body.text())
const requestId = body.requestId
const timestamp = new Date(body.timestamp)
const firehoseAccessKey = payload.headers["X-Amz-Firehose-Access-Key"]
response.addHeader(
"Content-Type",
"application/json"
)
if (firehoseAccessKey != context.values.get("FIREHOSE_ACCESS_KEY")) {
console.log("Invalid X-Amz-Firehose-Access-Key: " + firehoseAccessKey)
response.setStatusCode(401)
response.setBody(JSON.stringify({
requestId: requestId,
timestamp: timestamp.getTime(),
errorMessage: "Invalid X-Amz-Firehose-Access-Key"
}))
return
}
const bulkOp = context.services.get("mongodb-atlas").db("test-db").collection("test-collection").initializeOrderedBulkOp()
body.records.forEach((record, index) => {
data = JSON.parse(require("js-base64").decode(record.data))
bulkOp.insert(data) // _idをセットしたければここに実装する
})
bulkOp.execute().then(() => {
response.setStatusCode(200)
response.setBody(JSON.stringify({
requestId: requestId,
timestamp: timestamp.getTime()
}))
return
}).catch((error) => {
response.setStatusCode(500)
response.setBody(JSON.stringify({
requestId: requestId,
timestamp: timestamp.getTime(),
errorMessage: error
}))
return
})
}
创建Realm HTTPS终端点
自从发布说明书写出来后,仅过去半年时间,我们的Realm WebHook已经被标记为弃用。
请参考公式文档中的“配置 HTTPS 端点”,然后选择“添加端点”选项。
选择先前创建的函数,并选择 HTTP 方法为 POST,请求验证为“无附加授权”。
配置Kinesis数据firehose。
创建一个 MongoDB Realm 的服务器 API 密钥。
选择将MongoDB Cloud作为您的目标
秘密的设置
在之前实施的Realm Function中,通过Firehose发出的请求头中包含的X-Amz-Firehose-Access-Key的值进行验证,因此将该值设置为环境变量的FIREHOSE_ACCESS_KEY。
- Define a Value
创建 Secret 后,继续创建 Value 并将其链接到 Secret,这是一系列步骤。
考试
从之前创建的Kinesis Delivery Streams中选择Test with demo data,在AWS管理控制台上尝试流动测试数据。
可以从MongoDB Atlas仪表板上确认Kinesis的测试数据已经被添加到Collections中。
如果不成功,可以从MongoDB Realm的仪表板上查看MANAGE → Logs进行确认。
有几个卡住的点,但最终成功确认了操作。
如前所述,“既然要写代码,为什么不用Kinesis Data Streams + Lambda?”这样的想法确实存在,但使用Firehose不仅可以根据缓冲区大小或时间触发,还有许多优点,不仅关乎记录数量。
我期待着未来,因为无疑地存在着对于消息队列和数据库/数据仓库之间简便的数据管道的需求。