やっはろー。Sleep sort の実装を通して Rust における並列な処理を雑に理解しよう、という話をします。
Sleep sort とはソートアルゴリズムの一つで、値の大きさに応じてスリープさせて明けた順に出力すると値がソートされている、というものです。
#!/bin/bash
function f() {
sleep "$1"
echo "$1"
}
while [ -n "$1" ]
do
f "$1" &
shift
done
wait
Genius sorting algorithm: Sleep sort – 4chan BBS
常識を覆すソートアルゴリズム!その名も”sleep sort”! – Islands in the byte stream
スレッドをスリープする
基本のパターンをやります。thread::spawn でスレッドをつくり、thread::sleep_ms でスリープです。
use std::{env, thread};
fn main() {
let threads = env::args().skip(1).map(|x| {
thread::spawn(move || {
let n = x.parse::<u32>().unwrap_or(0);
thread::sleep_ms(n * 1000);
println!("{}", n);
})
})
.collect::<Vec<_>>();
for t in threads {
let _ = t.join();
}
}
$ cargo run -- 5 3 6 3 6 3 1 4 7
Running `target/debug/sandbox`
1
3
3
3
4
5
6
6
7
上では捨てていますが、spawn の返す JoinHandle を join すると、スレッドの処理が終わるまで待ってくれる他、スレッドの返す値を受け取ることができます。
let t = thread::spawn(|| "きん!");
let r = t.join();
println!("{:?}", r);
// Ok("\u{304d}\u{3093}\u{ff01}")
println!("{}", r.unwrap());
// きん!
let t = thread::spawn(|| panic!("ぱつ!"));
let r = t.join();
println!("{:?}", r);
// Err(Any)
println!("{}", r.unwrap_err().downcast_ref::<&str>().unwrap());
// ぱつ!
std::thread – Rust
std::thread::JoinHandle – Rust
値を共有する
変数に結果を書き込むパターンをやります。Arc で所有権の共有、Mutex で排他の制御をすることで、複数スレッドからの安全なアクセスが保証されるっぽいです。
use std::{env, thread};
use std::sync::{Arc, Mutex};
fn main() {
let result = Arc::new(Mutex::new(Vec::new()));
let threads = env::args().skip(1).map(|x| {
let result = result.clone();
thread::spawn(move || {
let n = x.parse::<u32>().unwrap_or(0);
thread::sleep_ms(n * 1000);
let mut result = result.lock().unwrap();
result.push(n);
})
})
.collect::<Vec<_>>();
for t in threads {
let _ = t.join();
}
for x in result.lock().unwrap().iter() {
println!("{}", x);
}
}
Arc な値の参照カウンタを増やすための clone と、Mutex な値を参照するときの lock が重要っぽいです。
std::sync::Arc – Rust
std::sync::Mutex – Rust
チャンネルで通信する
チャンネルを通して結果を受け取るパターンをやります。Clojure や Go のそれと似ていますが、Sender と Receiver とで分かれているところが違います。
use std::{env, sync, thread};
fn main() {
let (tx, rx) = sync::mpsc::channel();
let threads = env::args().skip(1).map(|x| {
let tx = tx.clone();
thread::spawn(move || {
let n = x.parse::<u32>().unwrap_or(0);
thread::sleep_ms(n * 1000);
tx.send(n).unwrap();
})
})
.collect::<Vec<_>>();
for x in rx.iter().take(threads.len()) {
println!("{}", x);
}
}
チャンネルのイテレータを取るところが個人的にポイント高いんですが、普通にやるなら recv があります。
let (tx, rx) = sync::mpsc::channel();
let _ = thread::spawn(move || {
thread::sleep_ms(3000);
tx.send("きん!").unwrap();
});
println!("ぱつ!");
println!("{}", rx.recv().unwrap());
std::sync::mpsc – Rust
std::sync::mpsc::Sender – Rust
std::sync::mpsc::Receiver – Rust
終わりに
つまりは Book を読みなさいということなんだと思います。
Genius sorting algorithm: Sleep sort – 4chan BBS
Concurrency – Rust Book