使用ConsistentHashingGroup进行Akka分布式并行处理的备忘录

我想做的事情

    • keyにより分散並列処理をakkaを使って実現

 

    • 同じkeyに紐づくデータは同じnode, workerで処理させる

keyに対するon memory集計処理的なものをしている。

集計結果はdb(cassandraに各node, worker上でinsert処理をする。)
現在のところ、driver上でfile読み込みして、1 threadで整形して、各node, workerに送っているが、これをmap reduce typeの処理にする。

original版のgearpump. plaggerの流れをくむ?tiny map reduce system

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"

    deployment {
      /router {
        router = consistent-hashing-group
        //router = round-robin-pool
        routees.paths = [
          "akka.tcp://system@1号:2550/user/workers/w1",
          "akka.tcp://system@1号:2550/user/workers/w2",
          "akka.tcp://system@2号:2550/user/workers/w3",
          "akka.tcp://system@2号:2550/user/workers/w4",
          "akka.tcp://system@3号:2550/user/workers/w5",
          "akka.tcp://system@3号:2550/user/workers/w6",
        ]
        nr-of-instances = 1
      }
      target.nodes = [
        "akka.tcp://system@1号:2550",
        "akka.tcp://system@2号:2550",
        "akka.tcp://system@3号:2550",
      ]
    }
  }

class Workers extends Actor {
  context.actorOf(Props[Worker], "w1")
  context.actorOf(Props[Worker], "w2")
  context.actorOf(Props[Worker], "w3")
  context.actorOf(Props[Worker], "w4")
  context.actorOf(Props[Worker], "w5")
  context.actorOf(Props[Worker], "w6")

  def receive = {
    case message => {
      println(message)
    }
  }
}

object Node {

  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1)

    System.setProperty("akka.remote.netty.tcp.hostname", host)
    System.setProperty("akka.remote.netty.tcp.port", port)

    // akka system
    val system = ActorSystem("system")
    system.actorOf(Props[Workers], name="workers")
  }
}
// akka system
    val system = ActorSystem("system")
    val master = system.actorOf(Props[Master], name = "master")

    Cluster(system).subscribe(master, classOf[ClusterDomainEvent])

    // generate routees
    system.actorOf(Props[Workers], name="workers")

    val router = system.actorOf(FromConfig.props(Props[Aggregation]), "router")


.
.
.
.

router.tell(ConsistentHashableEnvelope(Request(data), key), master)

备忘录

    • pathに対応するactorを作るには、くらすをネストするしかない?

/user/mappers/m1を作るには、Mappers actorを作ってその中でMapper actorをname=m1で作る必要がある?こんな面倒なことする必要があるか? for group routerの場合
poolの場合は自動で作ってくれるが、この階層構造も上のような感じ

workerで落ちても、driverを再起動させれば再度全workerで処理を実施できる。

error handling等はかなり先

广告
将在 10 秒后关闭
bannerAds