我尝试了一下Alpakka AWS SQS连接器

由于 Alpakka 0.3 版本增加了 AWS SQS Connector,因此我尝试了一下。

发布了#Alpakka 0.3版本,新增了AWS SQS、Cassandra、FTP功能。太棒了,感谢所有贡献者。https://t.co/ObyGvhamWu — Akka团队 (@akkateam) 2016年12月2日

羊驼

GitHub 上的 akka/alpakka

该项目为Akka Streams提供了与各种技术、协议或库的连接器。

有提供 Akka Streams 连接器。
有关 Akka Streams 的相关信息,请参考这里和这里。

事前准备

    • AWSのアカウントを作成しAccess key IDとSecret access keyを取得

 

    SQSでキューを作成し、URLを取得

实施

我将根据这个参考代码编写代码(几乎一样)。请根据需要适当调整配置。当消息被发送时,会输出正文并将其删除。

name := "alpakka-sqs-sample"

version := "1.0"

scalaVersion := "2.12.0"

libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "0.3"
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.sqs.scaladsl.SqsSource
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.model.DeleteMessageRequest
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration.{ FiniteDuration, _ }

object AlpakkaSQSSample {
  def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load()

    val credentials = new BasicAWSCredentials(
      config.getString("aws.accessKey"), config.getString("aws.secretKey")
    )
    implicit val sqsClient: AmazonSQSAsyncClient =
      new AmazonSQSAsyncClient(credentials).withEndpoint(config.getString("aws.sqs.endpoint"))

    implicit val system = ActorSystem()
    implicit val mat = ActorMaterializer()

    val queue = config.getString("aws.sqs.url")

    SqsSource(queue)
      .runForeach((message) => {
        println(message.getBody)
        sqsClient.deleteMessage(
          new DeleteMessageRequest(queue, message.getReceiptHandle)
        )
      })
  }
}

长轮询超时时间、缓冲区大小等可通过SqsSourceSettings(longPollingDuration: FiniteDuration, maxBufferSize: Int, maxBatchSize: Int)进行配置(作为SqsSource.apply的第二个参数)。
以下是默认设置。

执行

$ sbt
> run

当您发送消息时,将会显示如下所示的正文内容。

[info] Running ***.AlpakkaSQSSample 
message-1
message-2

最后

Akka Streams 现在变得更加亲近。如果按照参考使用 ElasticMQ,您可以立即尝试,而无需使用 SQS。顺便提一下,由于 FIFO 队列的行为有点令人担忧,我在 Akka 用户列表上提问了一下。

请参考以上内容。

    • Github

 

    Reference
广告
将在 10 秒后关闭
bannerAds