使用ConsistentHashingGroup进行Akka分布式并行处理的备忘录
我想做的事情
-
- keyにより分散並列処理をakkaを使って実現
-
- 同じkeyに紐づくデータは同じnode, workerで処理させる
keyに対するon memory集計処理的なものをしている。
集計結果はdb(cassandraに各node, worker上でinsert処理をする。)
現在のところ、driver上でfile読み込みして、1 threadで整形して、各node, workerに送っているが、これをmap reduce typeの処理にする。
original版のgearpump. plaggerの流れをくむ?tiny map reduce system
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
deployment {
/router {
router = consistent-hashing-group
//router = round-robin-pool
routees.paths = [
"akka.tcp://system@1号:2550/user/workers/w1",
"akka.tcp://system@1号:2550/user/workers/w2",
"akka.tcp://system@2号:2550/user/workers/w3",
"akka.tcp://system@2号:2550/user/workers/w4",
"akka.tcp://system@3号:2550/user/workers/w5",
"akka.tcp://system@3号:2550/user/workers/w6",
]
nr-of-instances = 1
}
target.nodes = [
"akka.tcp://system@1号:2550",
"akka.tcp://system@2号:2550",
"akka.tcp://system@3号:2550",
]
}
}
class Workers extends Actor {
context.actorOf(Props[Worker], "w1")
context.actorOf(Props[Worker], "w2")
context.actorOf(Props[Worker], "w3")
context.actorOf(Props[Worker], "w4")
context.actorOf(Props[Worker], "w5")
context.actorOf(Props[Worker], "w6")
def receive = {
case message => {
println(message)
}
}
}
object Node {
def main(args: Array[String]): Unit = {
val host = args(0)
val port = args(1)
System.setProperty("akka.remote.netty.tcp.hostname", host)
System.setProperty("akka.remote.netty.tcp.port", port)
// akka system
val system = ActorSystem("system")
system.actorOf(Props[Workers], name="workers")
}
}
// akka system
val system = ActorSystem("system")
val master = system.actorOf(Props[Master], name = "master")
Cluster(system).subscribe(master, classOf[ClusterDomainEvent])
// generate routees
system.actorOf(Props[Workers], name="workers")
val router = system.actorOf(FromConfig.props(Props[Aggregation]), "router")
.
.
.
.
router.tell(ConsistentHashableEnvelope(Request(data), key), master)
备忘录
-
- pathに対応するactorを作るには、くらすをネストするしかない?
/user/mappers/m1を作るには、Mappers actorを作ってその中でMapper actorをname=m1で作る必要がある?こんな面倒なことする必要があるか? for group routerの場合
poolの場合は自動で作ってくれるが、この階層構造も上のような感じ
workerで落ちても、driverを再起動させれば再度全workerで処理を実施できる。
error handling等はかなり先