Akka的開始之旅(2)

我将介绍一些在使用Akka时值得了解的库和组件。虽然有很多选择,但我将解释Typesafe Config库、路由、流处理、部署工具等。

类型安全的配置

一个用于加载Scala中常用的配置设置的库。它会自动读取包含在类路径(例如src/resources文件夹)中的配置文件。官方网站在这里。

为了进行导入,需要在build.sbt中输入以下内容。然而,据说akka-actor包中已经包含了这个设置,如果是这样的话,下面的设置似乎不是必需的。

libraryDependencies += "com.typesafe" % "config" % "1.3.2"

配置文件的名称应该如下所示。虽然可以通过启动参数进行更改,但实际上没有这个必要。根据官方网站,库和框架的设置应该写在reference.conf中,而应用程序特定的设置应该写在application.conf中。

    • application.conf (HOCON形式の設定プロパティ)

 

    • application.json (JSON形式の設定プロパティ)

 

    • application.properties (Javaのプロパティファイル形式の設定プロパティ)

 

    reference.conf (HOCON形式の設定プロパティ)

HOCON是typesafe公司提出和实现的一种文件格式。虽然JSON也可以使用,但由于可读性较高,建议使用HOCON。

比如,我们可以按照以下的方式进行编写。虽然是随意的例子,但请把它视为MQTT的配置文件。顺便一提,通过写入url=${?HOSTNAME}等等,我们也能获取相应的环境变量。这种写法的好处在于,只有当该环境变量存在时才会被覆盖。

mqtt {
  url = "localhost"
  port = 1883
  clientId = "test"
  useTLS = false
}

我们可以通过应用程序获取以下信息。

  val config = ConfigFactory.load()
  val url = config.getString("mqtt.url")
  val port = config.getInt("mqtt.port")
  val tls = config.getBoolean("mqtt.useTLS")

在Akka中的Typesafe Config

如果使用Akka,将自动使用ConfigFactory.load()来加载配置并生成Actor系统。您也可以将自定义配置作为Actor系统的参数进行设置。建议使用Typesafe Config来设置Logger配置和路由器配置等。下面的代码与上述代码是等价的。

  val system = ActorSystem("testSystem")
  val config = system.settings.config
  val url = config.getString("mqtt.url")
  val port = config.getInt("mqtt.port")
  val tls = config.getBoolean("mqtt.useTLS")

路由 (lù

在Akka中,可以轻松地对Actor进行扩展。当存在多个执行相同任务的Actor时,路由机制会将适当的消息分配给每个Actor的邮箱。担任路由功能的是路由器Actor,它决定消息分配的目标称为路由。而确定如何分配的是路由逻辑。尽管Akka提供了许多可用的路由器,但一般常用的有以下几种。顺便提一下,Pool指的是路由器管理路由的一种类型。还有一种类型是由系统管理路由的Group。

    • RoundRobinPool : 生成したルーティーに順番にタスクを振り分ける

 

    • BalancingPool : アイドル状態のルーティーに順次配信する

 

    ConsistentHashingPool : コンシステントハッシュを使用してルーティーを選択

生成一台简单的池路由器的方法如下。

import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import akka.routing.BalancingPool
import scala.io.StdIn

class PrintMyActorRefActor extends Actor with ActorLogging {
  override def receive: Receive = {
    case "hello" 
      log.info("Hello world")
  }
}


object RouterExample extends App {

  val system = ActorSystem("testSystem")
  val router = system.actorOf(BalancingPool(5).props(Props[PrintMyActorRefActor]), "poolRouter")

  for (i <- 1 to 100) {
    router ! "hello"
  }

  println(">>> Press ENTER to exit <<<")
  try StdIn.readLine()
  finally system.terminate()
}

在BalancingPool中,我们决定了要创建的路由器数量。上述示例是硬编码的,但建议使用Typesafe Config进行配置。请注意,如果路由器名称”poolRouter”与配置不匹配,将会失败。顺便提一下,您也可以使用Resizer动态地更改要创建的实例数量。更多详情请参考官方网站。

akka.actor.deployment {
  /poolRouter {
    router = balancing-pool
    nr-of-instances = 5
  }
}
 val system = ActorSystem("testSystem")
 val router = system.actorOf(FromConfig.props(Props[PrintMyActorRefActor]), "poolRouter")

阿卡流

为了使用Akka在Actor中处理连续的数据源,可以使用Akka Streams。输入称为Source,输出称为Sink,而将它们连接起来并进行格式转换等操作的是Flow。连接它们的东西被称为RunnableGraph。RunnableGraph在执行时会被转换为Akka的Actor,并通过称为ActorMaterializer的流调整机制进行执行。由于有点复杂,最好先在官方网站或者这里进行学习。

老实说,由于理解难度相当高,我打算另外写一篇文章来说明这个问题。

加拉馬基亞 (Alpaca)

使用基于Akka Streams构建的库集,可以轻松地连接MQTT、AMQP和kafka等。相比直接使用Akka Streams,我认为首先在这里查找连接器可能更好一些。

使用sbt-native-packager进行部署。

用Scala和Akka开发的应用程序应该如何部署呢?通常情况下,人们会在已安装了Scala的环境中使用GitHub克隆源代码,并运行sbt命令来启动应用程序。但是,现在有一些很好的工具可以简化这些步骤。使用这些工具,可以自动将应用程序转换为适应不同环境的部署形式,包括Docker等。具体而言,它可以转换为以下几种形式:

    • Universal zip,tar.gz, xz archives

 

    • deb and rpm packages for Debian/RHEL based systems

 

    • dmg for OSX

 

    • msi for Windows

 

    docker images

sbt-native-packager 官方网站

为了将sbt定义为追加命令,可以按照以下方式定义插件。

addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.6")
enablePlugins(JavaAppPackaging)

创建应用后,输入以下命令,它会在target文件夹中以指定的分发形式输出。Docker映像等非常方便(先安装Docker是必需的)。

# universal zip
sbt universal:packageBin

# debian package
sbt debian:packageBin

# rpm package
sbt rpm:packageBin

# docker image
sbt docker:publishLocal

哎呀,这个真是有深度啊。。

广告
将在 10 秒后关闭
bannerAds