Akka的開始之旅(2)
我将介绍一些在使用Akka时值得了解的库和组件。虽然有很多选择,但我将解释Typesafe Config库、路由、流处理、部署工具等。
类型安全的配置
一个用于加载Scala中常用的配置设置的库。它会自动读取包含在类路径(例如src/resources文件夹)中的配置文件。官方网站在这里。
为了进行导入,需要在build.sbt中输入以下内容。然而,据说akka-actor包中已经包含了这个设置,如果是这样的话,下面的设置似乎不是必需的。
libraryDependencies += "com.typesafe" % "config" % "1.3.2"
配置文件的名称应该如下所示。虽然可以通过启动参数进行更改,但实际上没有这个必要。根据官方网站,库和框架的设置应该写在reference.conf中,而应用程序特定的设置应该写在application.conf中。
-
- application.conf (HOCON形式の設定プロパティ)
-
- application.json (JSON形式の設定プロパティ)
-
- application.properties (Javaのプロパティファイル形式の設定プロパティ)
- reference.conf (HOCON形式の設定プロパティ)
HOCON是typesafe公司提出和实现的一种文件格式。虽然JSON也可以使用,但由于可读性较高,建议使用HOCON。
比如,我们可以按照以下的方式进行编写。虽然是随意的例子,但请把它视为MQTT的配置文件。顺便一提,通过写入url=${?HOSTNAME}等等,我们也能获取相应的环境变量。这种写法的好处在于,只有当该环境变量存在时才会被覆盖。
mqtt {
url = "localhost"
port = 1883
clientId = "test"
useTLS = false
}
我们可以通过应用程序获取以下信息。
val config = ConfigFactory.load()
val url = config.getString("mqtt.url")
val port = config.getInt("mqtt.port")
val tls = config.getBoolean("mqtt.useTLS")
在Akka中的Typesafe Config
如果使用Akka,将自动使用ConfigFactory.load()来加载配置并生成Actor系统。您也可以将自定义配置作为Actor系统的参数进行设置。建议使用Typesafe Config来设置Logger配置和路由器配置等。下面的代码与上述代码是等价的。
val system = ActorSystem("testSystem")
val config = system.settings.config
val url = config.getString("mqtt.url")
val port = config.getInt("mqtt.port")
val tls = config.getBoolean("mqtt.useTLS")
路由 (lù
在Akka中,可以轻松地对Actor进行扩展。当存在多个执行相同任务的Actor时,路由机制会将适当的消息分配给每个Actor的邮箱。担任路由功能的是路由器Actor,它决定消息分配的目标称为路由。而确定如何分配的是路由逻辑。尽管Akka提供了许多可用的路由器,但一般常用的有以下几种。顺便提一下,Pool指的是路由器管理路由的一种类型。还有一种类型是由系统管理路由的Group。
-
- RoundRobinPool : 生成したルーティーに順番にタスクを振り分ける
-
- BalancingPool : アイドル状態のルーティーに順次配信する
- ConsistentHashingPool : コンシステントハッシュを使用してルーティーを選択
生成一台简单的池路由器的方法如下。
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import akka.routing.BalancingPool
import scala.io.StdIn
class PrintMyActorRefActor extends Actor with ActorLogging {
override def receive: Receive = {
case "hello" ⇒
log.info("Hello world")
}
}
object RouterExample extends App {
val system = ActorSystem("testSystem")
val router = system.actorOf(BalancingPool(5).props(Props[PrintMyActorRefActor]), "poolRouter")
for (i <- 1 to 100) {
router ! "hello"
}
println(">>> Press ENTER to exit <<<")
try StdIn.readLine()
finally system.terminate()
}
在BalancingPool中,我们决定了要创建的路由器数量。上述示例是硬编码的,但建议使用Typesafe Config进行配置。请注意,如果路由器名称”poolRouter”与配置不匹配,将会失败。顺便提一下,您也可以使用Resizer动态地更改要创建的实例数量。更多详情请参考官方网站。
akka.actor.deployment {
/poolRouter {
router = balancing-pool
nr-of-instances = 5
}
}
val system = ActorSystem("testSystem")
val router = system.actorOf(FromConfig.props(Props[PrintMyActorRefActor]), "poolRouter")
阿卡流
为了使用Akka在Actor中处理连续的数据源,可以使用Akka Streams。输入称为Source,输出称为Sink,而将它们连接起来并进行格式转换等操作的是Flow。连接它们的东西被称为RunnableGraph。RunnableGraph在执行时会被转换为Akka的Actor,并通过称为ActorMaterializer的流调整机制进行执行。由于有点复杂,最好先在官方网站或者这里进行学习。
老实说,由于理解难度相当高,我打算另外写一篇文章来说明这个问题。
加拉馬基亞 (Alpaca)
使用基于Akka Streams构建的库集,可以轻松地连接MQTT、AMQP和kafka等。相比直接使用Akka Streams,我认为首先在这里查找连接器可能更好一些。
使用sbt-native-packager进行部署。
用Scala和Akka开发的应用程序应该如何部署呢?通常情况下,人们会在已安装了Scala的环境中使用GitHub克隆源代码,并运行sbt命令来启动应用程序。但是,现在有一些很好的工具可以简化这些步骤。使用这些工具,可以自动将应用程序转换为适应不同环境的部署形式,包括Docker等。具体而言,它可以转换为以下几种形式:
-
- Universal zip,tar.gz, xz archives
-
- deb and rpm packages for Debian/RHEL based systems
-
- dmg for OSX
-
- msi for Windows
- docker images
sbt-native-packager 官方网站
为了将sbt定义为追加命令,可以按照以下方式定义插件。
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.6")
enablePlugins(JavaAppPackaging)
创建应用后,输入以下命令,它会在target文件夹中以指定的分发形式输出。Docker映像等非常方便(先安装Docker是必需的)。
# universal zip
sbt universal:packageBin
# debian package
sbt debian:packageBin
# rpm package
sbt rpm:packageBin
# docker image
sbt docker:publishLocal
哎呀,这个真是有深度啊。。