使用ConsistentHashingGroup进行Akka分布式并行处理的备忘录
我想做的事情
- 
- keyにより分散並列処理をakkaを使って実現
 
 
- 
- 同じkeyに紐づくデータは同じnode, workerで処理させる
 
 
keyに対するon memory集計処理的なものをしている。
集計結果はdb(cassandraに各node, worker上でinsert処理をする。)
現在のところ、driver上でfile読み込みして、1 threadで整形して、各node, workerに送っているが、これをmap reduce typeの処理にする。
original版のgearpump. plaggerの流れをくむ?tiny map reduce system
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
    deployment {
      /router {
        router = consistent-hashing-group
        //router = round-robin-pool
        routees.paths = [
          "akka.tcp://system@1号:2550/user/workers/w1",
          "akka.tcp://system@1号:2550/user/workers/w2",
          "akka.tcp://system@2号:2550/user/workers/w3",
          "akka.tcp://system@2号:2550/user/workers/w4",
          "akka.tcp://system@3号:2550/user/workers/w5",
          "akka.tcp://system@3号:2550/user/workers/w6",
        ]
        nr-of-instances = 1
      }
      target.nodes = [
        "akka.tcp://system@1号:2550",
        "akka.tcp://system@2号:2550",
        "akka.tcp://system@3号:2550",
      ]
    }
  }
class Workers extends Actor {
  context.actorOf(Props[Worker], "w1")
  context.actorOf(Props[Worker], "w2")
  context.actorOf(Props[Worker], "w3")
  context.actorOf(Props[Worker], "w4")
  context.actorOf(Props[Worker], "w5")
  context.actorOf(Props[Worker], "w6")
  def receive = {
    case message => {
      println(message)
    }
  }
}
object Node {
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1)
    System.setProperty("akka.remote.netty.tcp.hostname", host)
    System.setProperty("akka.remote.netty.tcp.port", port)
    // akka system
    val system = ActorSystem("system")
    system.actorOf(Props[Workers], name="workers")
  }
}
// akka system
    val system = ActorSystem("system")
    val master = system.actorOf(Props[Master], name = "master")
    Cluster(system).subscribe(master, classOf[ClusterDomainEvent])
    // generate routees
    system.actorOf(Props[Workers], name="workers")
    val router = system.actorOf(FromConfig.props(Props[Aggregation]), "router")
.
.
.
.
router.tell(ConsistentHashableEnvelope(Request(data), key), master)
备忘录
- 
- pathに対応するactorを作るには、くらすをネストするしかない?
 
 
/user/mappers/m1を作るには、Mappers actorを作ってその中でMapper actorをname=m1で作る必要がある?こんな面倒なことする必要があるか? for group routerの場合
poolの場合は自動で作ってくれるが、この階層構造も上のような感じ
workerで落ちても、driverを再起動させれば再度全workerで処理を実施できる。
error handling等はかなり先