我尝试了一下Alpakka AWS SQS连接器
由于 Alpakka 0.3 版本增加了 AWS SQS Connector,因此我尝试了一下。
发布了#Alpakka 0.3版本,新增了AWS SQS、Cassandra、FTP功能。太棒了,感谢所有贡献者。https://t.co/ObyGvhamWu — Akka团队 (@akkateam) 2016年12月2日
羊驼
GitHub 上的 akka/alpakka
该项目为Akka Streams提供了与各种技术、协议或库的连接器。
有提供 Akka Streams 连接器。
有关 Akka Streams 的相关信息,请参考这里和这里。
事前准备
-
- AWSのアカウントを作成しAccess key IDとSecret access keyを取得
- SQSでキューを作成し、URLを取得
实施
我将根据这个参考代码编写代码(几乎一样)。请根据需要适当调整配置。当消息被发送时,会输出正文并将其删除。
name := "alpakka-sqs-sample"
version := "1.0"
scalaVersion := "2.12.0"
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "0.3"
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.sqs.scaladsl.SqsSource
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.model.DeleteMessageRequest
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration.{ FiniteDuration, _ }
object AlpakkaSQSSample {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.load()
val credentials = new BasicAWSCredentials(
config.getString("aws.accessKey"), config.getString("aws.secretKey")
)
implicit val sqsClient: AmazonSQSAsyncClient =
new AmazonSQSAsyncClient(credentials).withEndpoint(config.getString("aws.sqs.endpoint"))
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
val queue = config.getString("aws.sqs.url")
SqsSource(queue)
.runForeach((message) => {
println(message.getBody)
sqsClient.deleteMessage(
new DeleteMessageRequest(queue, message.getReceiptHandle)
)
})
}
}
长轮询超时时间、缓冲区大小等可通过SqsSourceSettings(longPollingDuration: FiniteDuration, maxBufferSize: Int, maxBatchSize: Int)进行配置(作为SqsSource.apply的第二个参数)。
以下是默认设置。
执行
$ sbt
> run
当您发送消息时,将会显示如下所示的正文内容。
[info] Running ***.AlpakkaSQSSample
message-1
message-2
最后
Akka Streams 现在变得更加亲近。如果按照参考使用 ElasticMQ,您可以立即尝试,而无需使用 SQS。顺便提一下,由于 FIFO 队列的行为有点令人担忧,我在 Akka 用户列表上提问了一下。
请参考以上内容。
-
- Github
- Reference