尝试将Kinesis Data Firehose中的数据流向MongoDB Cloud

根据听说,Amazon Kinesis Data Firehose开始支持将数据传送到MongoDB Cloud,因此我尝试了一下,但是遇到了一些困难点,现在我把它们记录下来。

在MongoDB Atlas上创建集群。

首先,我们使用MongoDB Atlas创建Cluster(集群)、Database(数据库)和Collection(集合)。
MongoDB Atlas是MongoDB作为数据库即服务(DBaaS)的本身,而MongoDB Realm则是用于操作Atlas的服务接口。

    Get Started with Atlas

实现MongoDB Realm Functions

我以为只需在GUI上轻松完成设定,但在Firehose中需要自己实现服务器无状态函数作为Webhook终结点。
那么使用Kinesis Data Streams + Lambda怎么样呢?

    MongoDB Realm Functions

发布说明链接中的示例代码开头是这样的。

exports = function(payload, response) {
    /* Using Buffer in Realm causes a severe performance hit
    this function is ~6 times faster
    */
    const decodeBase64 = (s) => {
        var e={},i,b=0,c,x,l=0,a,r='',w=String.fromCharCode,L=s.length
        var A="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
        for(i=0;i<64;i++){e[A.charAt(i)]=i}
        for(x=0;x<L;x++){
            c=e[s.charAt(x)];b=(b<<6)+c;l+=6
            while(l>=8){((a=(b>>>(l-=8))&0xff)||(x<(L-2)))&&(r+=w(a))}
        }
        return r
    }
...

我对自身实现Base64解码感到吃惊,因为它无法处理多字节字符,所以必须想办法解决这个问题。
幸运的是,我们可以使用“导入外部依赖项”来使用外部的npm包。
在函数编辑器界面上选择“添加依赖项”,这次我们添加了js-base64的3.7.2版本。
似乎不支持使用import,只能使用require。

const decoded = require("js-base64").decode(encoded)

最后,我使用以下代码创建了一个函数。
由于我想要在端点的身份验证中使用 API 密钥,所以在创建函数时选择将身份验证设为系统。

exports = (payload, response) => {
  const body = JSON.parse(payload.body.text())
  const requestId = body.requestId
  const timestamp = new Date(body.timestamp)
  const firehoseAccessKey = payload.headers["X-Amz-Firehose-Access-Key"]

  response.addHeader(
    "Content-Type",
    "application/json"
  )

  if (firehoseAccessKey != context.values.get("FIREHOSE_ACCESS_KEY")) {
    console.log("Invalid X-Amz-Firehose-Access-Key: " + firehoseAccessKey)
    response.setStatusCode(401)
    response.setBody(JSON.stringify({
      requestId: requestId,
      timestamp: timestamp.getTime(),
      errorMessage: "Invalid X-Amz-Firehose-Access-Key"
    }))
    return
  }

  const bulkOp = context.services.get("mongodb-atlas").db("test-db").collection("test-collection").initializeOrderedBulkOp()
  body.records.forEach((record, index) => {
    data = JSON.parse(require("js-base64").decode(record.data))
    bulkOp.insert(data) // _idをセットしたければここに実装する
  })

  bulkOp.execute().then(() => {
    response.setStatusCode(200)
    response.setBody(JSON.stringify({
      requestId: requestId,
      timestamp: timestamp.getTime()
    }))
    return
  }).catch((error) => {
    response.setStatusCode(500)
    response.setBody(JSON.stringify({
      requestId: requestId,
      timestamp: timestamp.getTime(),
      errorMessage: error
    }))
    return
  })
}

创建Realm HTTPS终端点

自从发布说明书写出来后,仅过去半年时间,我们的Realm WebHook已经被标记为弃用。

请参考公式文档中的“配置 HTTPS 端点”,然后选择“添加端点”选项。
选择先前创建的函数,并选择 HTTP 方法为 POST,请求验证为“无附加授权”。

配置Kinesis数据firehose。

创建一个 MongoDB Realm 的服务器 API 密钥。

选择将MongoDB Cloud作为您的目标

秘密的设置

在之前实施的Realm Function中,通过Firehose发出的请求头中包含的X-Amz-Firehose-Access-Key的值进行验证,因此将该值设置为环境变量的FIREHOSE_ACCESS_KEY。

    Define a Value

创建 Secret 后,继续创建 Value 并将其链接到 Secret,这是一系列步骤。

undefined

考试

从之前创建的Kinesis Delivery Streams中选择Test with demo data,在AWS管理控制台上尝试流动测试数据。
可以从MongoDB Atlas仪表板上确认Kinesis的测试数据已经被添加到Collections中。
如果不成功,可以从MongoDB Realm的仪表板上查看MANAGE → Logs进行确认。


有几个卡住的点,但最终成功确认了操作。
如前所述,“既然要写代码,为什么不用Kinesis Data Streams + Lambda?”这样的想法确实存在,但使用Firehose不仅可以根据缓冲区大小或时间触发,还有许多优点,不仅关乎记录数量。

我期待着未来,因为无疑地存在着对于消息队列和数据库/数据仓库之间简便的数据管道的需求。

广告
将在 10 秒后关闭
bannerAds