RustのFutureとasync/awaitは、「時間のかかる処理」をするときに、「処理が終わるまでOSスレッドをブロックする(同期Rust)」のではなく、「当該処理を中断して、そのOSスレッドを別のタスクの処理に使う(非同期Rust)」ことで、スレッド数よりも多くの処理を同時に行う仕組みです。
同期Rustと非同期Rustには以下のようなアナロジーが成立します。
fn(T) -> U
fn(T) -> U
……の作成fn() {}
fn() {}
……の呼び出しf()
f()
時間のかかる処理fn(T) -> U
fn(T) -> impl Future<Output = U>
……の作成fn() {}
async fn() {}
……の呼び出しf()
f().await
同期Rustから非同期Rustへの移行は、おおよそこのアナロジーにしたがって行えることになります。 (ライブラリ・フレームワークも同様に移行される必要があります。)
とはいえ、単純置換で移行できるわけではありません。そこで以下でasync/awaitをスムーズに使うためのライブラリやテクニックを紹介します。
futures/futures01/futures-previewクレート
futuresは非同期プログラミングのベースになっているライブラリです。
futuresには主に2つのバージョンがあります。
-
- 0.1系。async/awaitがなかった頃に作られた非同期処理ライブラリ。
- 0.3系。async/awaitを念頭に置いて整理されている。
今ある非同期エコシステムは0.1系ベースのものが多いので、互換性を保つために以下のように書くのがおすすめです。
# futures01はfuturesの0.1系を指し続けるクレート。
# 0.3と併存させるために使う
futures01 = "0.1.28"
# 0.3系列はまだpreview版のため、別の名前で提供されている。
# そのうちfuturesにリネームされる
futures-preview = { version = "0.3.0-alpha.17", features = ["io-compat"] }
use futures::prelude::*;
futuresクレートには、標準ライブラリの非同期版がいくつか同梱されています。
std::sync::Mutex
futures::lock::Mutex
チャネルstd::sync::mpsc
futures::channel::mpsc
futures::channel::oneshot
イテレータstd::iter::Iterator
futures::stream::Stream
futuresの互換性
前節のコード例にある features = [“io-compat”] を使うと、futures 0.1系を使っているライブラリと相互運用することができます。
use futures::prelude::*; // TryFutureが入っている
let fut01 = async {
// ...
}.boxed().compat(); // futures01::future::Futureに変換される
use futures::compat::*; // Future01CompatExtが入っている
let fut01 = ...;
let fut03 = fut01.compat(); // futures::future::Futureに変換される
ストリーム処理
ストリームを for で処理するための構文は将来的には入ることが期待されていますが、すぐには入らない予定です。 while let で処理するのがイディオマティックです。
let mut stream = ...;
// pin_utils::pin_mut!(stream); // 場合によっては必要
while let Some(item) = stream.next().await {
// ...
}
runtimeクレート
runtimeは、非同期ランタイムをまとまった形で提供することを目指しているクレートです。非同期ランタイムの立ち上げを簡略化するための属性マクロも提供しています。
runtime = "0.3.0-alpha.6"
#![feature(async_await)]
use runtime::prelude::*;
#[runtime::main]
async fn main() {
// ...
}
#[runtime::test]
async fn test_foo() {
// ...
}
タスクの起動や非同期ソケットの立ち上げ、タイマーなどのインターフェースが提供されています。これは裏側のランタイム実装 (tokioまたはromio/juliex) やタイマー実装 (futures-timer) に移譲されます。
// タスクの起動
let handle = runtime::spawn(async {
// ...
});
handle.await;
use runtime::net::TcpListener;
// 非同期ソケット
let listener = TcpListener::bind("127.0.0.1:8080")?;
// ...
use std::time::Duration;
use runtime::time::Delay;
// タイマー
Delay::new(Duration_from_secs(1)).await
fn main() {}
#[runtime::main] async fn main() {}
test#[test] fn test() {}
#[runtime::test] async fn test() {}
bench#[bench] fn bench() {}
#[runtime::bench] async fn bench() {}
スレッド起動std::thread::spawn
runtime::spawn
スレッドのjoinhandle.join()
handle.await
UDPstd::net::UdpSocket
runtime::net::UdpSocket
TCPリスナーstd::net::TcpListener
runtime::net::TcpListener
TCP接続std::net::TcpStream
runtime::net::TcpStream
スリープstd::thread::sleep();
runtime::time::Delay::new().await;
hyperとの相互運用
hyperはデフォルトで自前のtokioランタイムを起動します。runtimeクレートを使う場合にはデフォルトの rt featureを無効化しておいたほうが無難です。
hyper = { version = "0.12.33", default-features = false }
use futures::compat::Compat;
use hyper::server::Server;
// runtimeのspawnerとlistenerを使う
let listener = ...;
let incoming = listener.incoming().map_ok(Compat::new).compat();
let server = Server::builder(incoming)
.executor(Compat::new(runtime::task::Spawner::new()))
.serve(...)
.compat();
server.await;
async-trait
async fnはトレイトやその実装内では使えません。トレイト内async fnの実現にはジェネリック関連型 (Generic Associated Type; GAT) と存在型 (existential type)が必要で、どちらも実装途上です。
また、トレイト内async fnが実装されたとしても、async fnを含むトレイトは関連型を含むためトレイトオブジェクト化が不可能になります。
では非同期関数を含むトレイトを作るにはどうすればいいかというと、 BoxFuture を返すようにすればよいです。
pub trait Foo {
fn foo(&self) -> BoxFuture<'_, i32> {
async { 42 }.boxed()
}
}
impl Foo for () {
fn foo(&self) -> BoxFuture<'_, i32> {
async { 84 }.boxed()
}
}
これはやや面倒です。複数の参照を取るメソッドになると話はさらにややこしくなります。
async-traitクレートにある async_trait 属性マクロを使うと、これをより自然に書くことができます。
#[async_trait]
pub trait Foo {
async fn foo(&self) -> i32 {
42
}
}
#[async_trait]
impl Foo for () {
async fn foo(&self) -> i32 {
84
}
}
まとめ
async/awaitへの移行を容易にする以下のテクニックを紹介しました。
-
- futuresクレートの互換性
-
- streamに対するfor構文の代替
-
- runtimeクレート
-
- hyperとの相互運用
async_trait マクロ