やっはろー。Sleep sort の実装を通して Rust における並列な処理を雑に理解しよう、という話をします。

Sleep sort とはソートアルゴリズムの一つで、値の大きさに応じてスリープさせて明けた順に出力すると値がソートされている、というものです。

#!/bin/bash
function f() {
    sleep "$1"
    echo "$1"
}
while [ -n "$1" ]
do
    f "$1" &
    shift
done
wait

Genius sorting algorithm: Sleep sort – 4chan BBS

常識を覆すソートアルゴリズム!その名も”sleep sort”! – Islands in the byte stream

スレッドをスリープする

基本のパターンをやります。thread::spawn でスレッドをつくり、thread::sleep_ms でスリープです。

use std::{env, thread};

fn main() {
  let threads = env::args().skip(1).map(|x| {
    thread::spawn(move || {
      let n = x.parse::<u32>().unwrap_or(0);
      thread::sleep_ms(n * 1000);

      println!("{}", n);
    })
  })
  .collect::<Vec<_>>();

  for t in threads {
    let _ = t.join();
  }
}
$ cargo run -- 5 3 6 3 6 3 1 4 7
     Running `target/debug/sandbox`
1
3
3
3
4
5
6
6
7

上では捨てていますが、spawn の返す JoinHandle を join すると、スレッドの処理が終わるまで待ってくれる他、スレッドの返す値を受け取ることができます。

let t = thread::spawn(|| "きん!");
let r = t.join();

println!("{:?}", r);
// Ok("\u{304d}\u{3093}\u{ff01}")
println!("{}", r.unwrap());
// きん!
let t = thread::spawn(|| panic!("ぱつ!"));
let r = t.join();

println!("{:?}", r);
// Err(Any)
println!("{}", r.unwrap_err().downcast_ref::<&str>().unwrap());
// ぱつ!

std::thread – Rust

std::thread::JoinHandle – Rust

値を共有する

変数に結果を書き込むパターンをやります。Arc で所有権の共有、Mutex で排他の制御をすることで、複数スレッドからの安全なアクセスが保証されるっぽいです。

use std::{env, thread};
use std::sync::{Arc, Mutex};

fn main() {
  let result = Arc::new(Mutex::new(Vec::new()));

  let threads = env::args().skip(1).map(|x| {
    let result = result.clone();

    thread::spawn(move || {
      let n = x.parse::<u32>().unwrap_or(0);
      thread::sleep_ms(n * 1000);

      let mut result = result.lock().unwrap();
      result.push(n);
    })
  })
  .collect::<Vec<_>>();

  for t in threads {
    let _ = t.join();
  }

  for x in result.lock().unwrap().iter() {
    println!("{}", x);
  }
}

Arc な値の参照カウンタを増やすための clone と、Mutex な値を参照するときの lock が重要っぽいです。

std::sync::Arc – Rust

std::sync::Mutex – Rust

チャンネルで通信する

チャンネルを通して結果を受け取るパターンをやります。Clojure や Go のそれと似ていますが、Sender と Receiver とで分かれているところが違います。

use std::{env, sync, thread};

fn main() {
  let (tx, rx) = sync::mpsc::channel();

  let threads = env::args().skip(1).map(|x| {
    let tx = tx.clone();

    thread::spawn(move || {
      let n = x.parse::<u32>().unwrap_or(0);
      thread::sleep_ms(n * 1000);

      tx.send(n).unwrap();
    })
  })
  .collect::<Vec<_>>();

  for x in rx.iter().take(threads.len()) {
    println!("{}", x);
  }
}

チャンネルのイテレータを取るところが個人的にポイント高いんですが、普通にやるなら recv があります。

let (tx, rx) = sync::mpsc::channel();

let _ = thread::spawn(move || {
  thread::sleep_ms(3000);

  tx.send("きん!").unwrap();
});

println!("ぱつ!");
println!("{}", rx.recv().unwrap());

std::sync::mpsc – Rust

std::sync::mpsc::Sender – Rust

std::sync::mpsc::Receiver – Rust

終わりに

つまりは Book を読みなさいということなんだと思います。

Genius sorting algorithm: Sleep sort – 4chan BBS

Concurrency – Rust Book

广告
将在 10 秒后关闭
bannerAds