Rustで高性能なネットワークサーバーやCLIツールを構築する際、非同期処理は避けて通れません。async/awaitの構文自体はJavaScriptやPythonと似ていますが、Rustの非同期モデルには「ランタイムが言語に組み込まれていない」「Futureはポーリングされるまで何もしない」という独自の特徴があります。
Goのgoroutineはランタイムが暗黙的にスケジューリングし、Node.jsはシングルスレッドのイベントループで非同期処理を実行します。一方Rustは、どのランタイムを使うか、何スレッドで動かすかを開発者が明示的に選択します。この設計によりゼロコスト抽象化が実現されています。
本記事では、Futureトレイトとasync/await構文の基礎から、事実上の標準ランタイムであるtokioの実務パターンまでを段階的に解説します。
async/awaitの基本
Futureトレイトの仕組み
Rustの非同期処理は Future トレイトを基盤としています。Future は「まだ完了していないかもしれない計算」を表す型です。
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
poll が Ready(T) を返せば計算完了、Pending を返せばまだ完了していないことを意味します。ランタイムは Pending を受け取ると、Wakerで通知されるまでポーリングを中断します。
async fnと.await
async fn は Future を返す関数のシンタックスシュガーです。.await はFutureの完了を待つ演算子で、Ready になるまで現在のタスクの実行権を他のタスクに譲ります。
async fn fetch_data(url: &str) -> String {
let response = reqwest::get(url).await.unwrap();
response.text().await.unwrap()
}
他言語との比較
| 項目 | Rust (tokio) | Go (goroutine) | Node.js | Python (asyncio) |
|---|---|---|---|---|
| 非同期モデル | ポーリング型Future | CSPベースのグリーンスレッド | イベントループ | コルーチン |
| ランタイム | 外部クレート(tokio等) | 言語組み込み | V8組み込み | 標準ライブラリ |
| スレッドモデル | マルチスレッド対応 | M:Nスケジューリング | シングルスレッド | シングルスレッド |
| ゼロコスト抽象化 | あり | なし | なし | なし |
| メモリ管理 | 所有権システム | GC | GC | GC |
async fn はコンパイル時にステートマシンに変換され、ヒープアロケーションやランタイムの余分なオーバーヘッドが発生しません。これがゼロコストと呼ばれる理由です。
tokioランタイムの仕組み
tokioはRustの非同期ランタイムとして最も広く使われているクレートです。axum、tonic、sqlxなど主要なエコシステムがtokioを前提として構築されています。
#[tokio::main]マクロ
#[tokio::main] はmain関数をtokioランタイム上で動かすためのマクロです。
#[tokio::main]
async fn main() {
println!("Hello from tokio!");
}
// 上記は内部的に以下に展開される
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all().build().unwrap()
.block_on(async { println!("Hello from tokio!"); })
}
マルチスレッドとカレントスレッドの違い
| 項目 | multi_thread | current_thread |
|---|---|---|
| マクロ | #[tokio::main] | #[tokio::main(flavor = "current_thread")] |
| スレッド数 | CPUコア数(デフォルト) | 1 |
| タスクの要件 | Send + 'static | 'staticのみ |
| 適した用途 | サーバー、CPU並列処理 | CLIツール、テスト、WASM |
Runtime Builderを使えばワーカースレッド数やスレッド名の指定も可能です。
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap();
タスクのスポーンと並行実行
tokio::spawn
tokio::spawn はFutureを独立したタスクとしてランタイムに登録します。渡すFutureは Send + 'static を満たす必要があります。
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
sleep(Duration::from_secs(1)).await;
"タスク完了"
});
println!("タスクを待機中...");
let result = handle.await.unwrap();
println!("{}", result);
}
tokio::join!とtokio::select!
join! は複数の非同期処理を並行実行し全ての完了を待ちます。select! は最初に完了したものだけを処理します。
async fn fetch_user() -> String {
sleep(Duration::from_millis(200)).await;
"ユーザーデータ".to_string()
}
async fn fetch_orders() -> Vec<String> {
sleep(Duration::from_millis(300)).await;
vec!["注文A".to_string()]
}
#[tokio::main]
async fn main() {
// join!: 並行実行で合計300msで完了(200+300=500msではない)
let (user, orders) = tokio::join!(fetch_user(), fetch_orders());
// select!: 最初に完了したブランチのみ実行
tokio::select! {
_ = sleep(Duration::from_secs(1)) => println!("1秒経過"),
_ = sleep(Duration::from_secs(2)) => println!("実行されない"),
}
}
select! は最初に完了したブランチを実行し残りをドロップします。タイムアウトやシャットダウン監視に多用されます。
チャネルによるタスク間通信
mpsc(多対一チャネル)
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<String>(32);
for i in 0..3 {
let tx = tx.clone();
tokio::spawn(async move {
tx.send(format!("ワーカー{}の結果", i)).await.unwrap();
});
}
drop(tx); // 元のtxをドロップしないとrxが終了しない
while let Some(msg) = rx.recv().await {
println!("受信: {}", msg);
}
}
oneshotとbroadcast
oneshot はリクエスト/レスポンス向けの1回限りチャネル、broadcast は全受信者に同じメッセージを配信するチャネルです。
// oneshot: 1対1の一回限り
let (tx, rx) = oneshot::channel::<String>();
tx.send("結果".to_string()).unwrap();
// broadcast: 全受信者に配信
let (tx, _) = broadcast::channel::<String>(16);
let mut rx1 = tx.subscribe();
tx.send("設定更新".to_string()).unwrap();
チャネルの使い分け
| チャネル | 送信者 | 受信者 | 用途 |
|---|---|---|---|
| mpsc | 複数 | 1つ | ワーカープール、イベント集約 |
| oneshot | 1つ | 1つ | リクエスト/レスポンス |
| broadcast | 1つ | 複数 | 設定通知、イベントファンアウト |
| watch | 1つ | 複数 | 最新値の共有(古い値はスキップ) |
非同期I/O実践
tokio::fsとtokio::net
tokio::fs は標準ライブラリの非同期版ファイルI/O、tokio::net はTCP/UDPの非同期ネットワーキングを提供します。
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> std::io::Result<()> {
// 非同期ファイル読み込み
let content = tokio::fs::read_to_string("config.toml").await?;
// 非同期TCPエコーサーバー
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, addr) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = [0u8; 1024];
loop {
let n = match socket.read(&mut buf).await {
Ok(0) => return,
Ok(n) => n,
Err(_) => return,
};
if socket.write_all(&buf[..n]).await.is_err() { return; }
}
});
}
}
各接続を tokio::spawn で独立タスクとして処理するため、数万の同時接続を少ないスレッド数でさばけます。
タイムアウトとキャンセル
tokio::time::timeout
ネットワーク通信やデータベースクエリには必ずタイムアウトを設定すべきです。
use tokio::time::{timeout, sleep, Duration};
#[tokio::main]
async fn main() {
let result = timeout(Duration::from_secs(3), long_task()).await;
match result {
Ok(value) => println!("成功: {:?}", value),
Err(_) => println!("タイムアウト"),
}
}
async fn long_task() -> String {
sleep(Duration::from_secs(5)).await;
"完了".to_string()
}
非同期処理で待機するには tokio::time::sleep を使います。std::thread::sleep はスレッド全体をブロックするため、非同期コンテキストでは使ってはいけません。
キャンセル安全性
select! で選択されなかったFutureはドロップされます。途中まで進んだ処理が安全に中断されるかどうかが「キャンセル安全性(cancellation safety)」です。
// キャンセル安全でない例
tokio::select! {
data = async {
let chunk1 = stream.recv().await; // ここでキャンセルされると
let chunk2 = stream.recv().await; // chunk1は失われる
(chunk1, chunk2)
} => { /* ... */ }
_ = shutdown_signal() => { /* ... */ }
}
tokioのドキュメントには各メソッドのキャンセル安全性が明記されています。select! 内ではキャンセル安全なメソッドを選ぶか、tokio::pin! でFutureを固定して再利用してください。
Graceful Shutdown
本番サーバーでは、シグナル受信後に実行中リクエストを完了させてから終了するgraceful shutdownが必須です。
tokio::signalとCancellationToken
tokio_util::sync::CancellationToken を使うと、複数タスクへの一括キャンセル通知を簡潔に実装できます。
use tokio_util::sync::CancellationToken;
use tokio::time::{sleep, Duration};
use tokio::signal;
#[tokio::main]
async fn main() {
let token = CancellationToken::new();
for i in 0..3 {
let token = token.clone();
tokio::spawn(async move {
tokio::select! {
_ = token.cancelled() => {
println!("ワーカー{}: 停止", i);
}
_ = async {
loop {
sleep(Duration::from_millis(500)).await;
println!("ワーカー{}: 処理中", i);
}
} => {}
}
});
}
// Ctrl+Cでシャットダウン
signal::ctrl_c().await.unwrap();
println!("シャットダウン開始");
token.cancel();
sleep(Duration::from_millis(100)).await;
}
token.cancel() を呼ぶだけで全ワーカーに停止を通知できます。各ワーカーは select! で token.cancelled() を監視しており、現在の処理完了後に安全に停止します。
よくある質問(FAQ)
Q: asyncコンテキスト内でブロッキング処理を呼んでも良いですか?
いいえ。ブロッキング処理をasyncタスク内で直接呼ぶと、そのスレッド上の他のタスクがすべてブロックされます。tokio::task::spawn_blocking で隔離してください。
let result = tokio::task::spawn_blocking(|| {
std::fs::read_to_string("large_file.txt")
}).await.unwrap();
Q: Send + 'static 境界が要求される理由は?
tokio::spawn のタスクは任意のワーカースレッドに移動する可能性があります。Send はスレッド間の安全な移動を、'static はデータの生存期間を保証します。ローカル変数への参照は 'static を満たせないため、move クロージャで所有権を移動させます。
Q: マルチスレッドとカレントスレッドのどちらを使うべきですか?
サーバーや複数I/Oの同時処理にはマルチスレッド、CLIツールやテストにはカレントスレッドが適しています。WASMではカレントスレッドが必須です。
Q: tokio以外の非同期ランタイムはありますか?
async-std や smol がありますが、エコシステムの大部分がtokioに依存しているため、特別な理由がなければtokioを選ぶのが現実的です。
まとめ
Rustの非同期処理はFutureトレイトのポーリングモデルを基盤とし、async/awaitでゼロコストの非同期プログラミングを実現しています。tokioはその上にタスクスポーン・チャネル通信・タイムアウト制御・シグナルハンドリングを提供します。
- async/await: Futureは
.awaitされるまで実行されない。ステートマシン変換でオーバーヘッド最小 - tokio::spawn: 独立したタスクの並行実行。
Send + 'static境界に注意 - join! / select!: 複数タスクの並行待機と競合待機を使い分ける
- チャネル: mpsc・oneshot・broadcastを用途に応じて選択
- タイムアウト: ネットワーク通信には必ず
tokio::time::timeoutを設定 - Graceful Shutdown:
CancellationTokenで複数タスクの安全な停止を実現 - ブロッキング処理: 必ず
spawn_blockingで隔離する
非同期Rustは学習曲線がやや急ですが、所有権システムによりデータ競合がコンパイル時に防止されるため、一度正しく書けば並行性バグが起きにくいという利点があります。小さなCLIツールから始めて段階的にサーバーサイドへ進むのがおすすめです。