快轉到主要內容
  1. 教學文章/

Rust Async + Tokio 入門:非同步 Rust vs Python asyncio

·8 分鐘· loading · loading · ·
Rust Async Tokio Python Asyncio 非同步
每日拍拍
作者
每日拍拍
科學家 X 科技宅宅
目錄
Rust 入門 - 本文屬於一個選集。
§ 11: 本文

一、前言
#

如果你用過 Python 的 asyncio,應該對 async defawait 不陌生。Python 的非同步程式設計讓你在一個執行緒裡同時處理上千個 I/O 操作——爬蟲、API 呼叫、WebSocket,都靠它。

但你有沒有想過:Rust 的 async/await 長什麼樣? 語法看起來差不多,底層卻完全不同。Python 的 coroutine 跑在內建的 event loop 上,而 Rust 的 Future 需要一個外部 runtime——最常用的就是 Tokio

這篇文章會從 Python asyncio 出發,帶你搞懂 Rust async 的核心概念,然後用 Tokio 寫出真正的非同步程式。我們會比較兩者的差異,讓你用最熟悉的方式學會 Rust 的非同步世界。


二、安裝與設定
#

Python 端
#

Python 3.4+ 內建 asyncio,不用額外安裝:

import asyncio

Rust 端
#

Rust 標準庫有 async/await 語法,但沒有內建 runtime。你需要自己選一個,最主流的是 Tokio:

cargo new async-demo
cd async-demo

Cargo.toml 加入:

[dependencies]
tokio = { version = "1", features = ["full"] }

features = ["full"] 會啟用所有功能(runtime、net、time、sync 等)。正式專案可以按需只開需要的 feature。


三、基本語法對比:async/await
#

Python 的 coroutine
#

import asyncio

async def greet(name: str) -> str:
    await asyncio.sleep(1)  # 模擬非同步 I/O
    return f"嗨,{name}!"

async def main():
    result = await greet("拍拍君")
    print(result)

asyncio.run(main())

重點:

  • async def 定義 coroutine function
  • await 暫停執行,等結果回來
  • asyncio.run() 啟動 event loop

Rust 的 Future
#

use tokio::time::{sleep, Duration};

async fn greet(name: &str) -> String {
    sleep(Duration::from_secs(1)).await;  // 模擬非同步 I/O
    format!("嗨,{}!", name)
}

#[tokio::main]
async fn main() {
    let result = greet("拍拍君").await;
    println!("{}", result);
}

重點:

  • async fn 定義回傳 Future 的函式
  • .await 放在後面(不是前面!)
  • #[tokio::main] 巨集自動建立 Tokio runtime

語法差異整理
#

概念 Python Rust
定義非同步函式 async def f() async fn f()
等待結果 await expr expr.await
啟動 runtime asyncio.run(main()) #[tokio::main]
回傳型別 Coroutine impl Future<Output = T>
睡眠 asyncio.sleep(1) tokio::time::sleep(Duration::from_secs(1))

注意 .await 的位置!Python 放前面,Rust 放後面。這是因為 Rust 的 .await 是一個後綴運算子,可以跟方法鏈串在一起:

let body = reqwest::get("https://example.com")
    .await?
    .text()
    .await?;

四、並行執行:同時跑多個任務
#

非同步程式設計的精髓就是並行——同時發起多個 I/O 操作,而不是傻傻排隊等。

Python:gather
#

import asyncio
import time

async def fetch(name: str, delay: float) -> str:
    print(f"開始抓 {name}...")
    await asyncio.sleep(delay)
    print(f"{name} 完成!")
    return f"{name} 的資料"

async def main():
    start = time.time()

    # 同時執行三個任務
    results = await asyncio.gather(
        fetch("API-A", 2),
        fetch("API-B", 1),
        fetch("API-C", 3),
    )

    elapsed = time.time() - start
    print(f"全部完成,花了 {elapsed:.1f} 秒")  # ~3 秒,不是 6 秒
    print(results)

asyncio.run(main())

Rust:tokio::join!
#

use tokio::time::{sleep, Duration, Instant};

async fn fetch(name: &str, delay_secs: u64) -> String {
    println!("開始抓 {}...", name);
    sleep(Duration::from_secs(delay_secs)).await;
    println!("{} 完成!", name);
    format!("{} 的資料", name)
}

#[tokio::main]
async fn main() {
    let start = Instant::now();

    // 同時執行三個任務
    let (a, b, c) = tokio::join!(
        fetch("API-A", 2),
        fetch("API-B", 1),
        fetch("API-C", 3),
    );

    let elapsed = start.elapsed();
    println!("全部完成,花了 {:.1?}", elapsed);  // ~3 秒
    println!("{}, {}, {}", a, b, c);
}

tokio::join! 類似 asyncio.gather:所有 Future 會同時開始,等全部完成才繼續。差別在於 join! 回傳 tuple,而 gather 回傳 list。


五、spawn:把任務丟到背景執行
#

有時候你想 fire-and-forget,或者需要「真正的」並行任務(不是只是等 I/O)。

Python:create_task
#

import asyncio

async def background_job(name: str):
    for i in range(3):
        print(f"[{name}] 第 {i+1} 次心跳")
        await asyncio.sleep(1)

async def main():
    # 丟到背景
    task = asyncio.create_task(background_job("監控"))

    print("主程式繼續做其他事...")
    await asyncio.sleep(2)
    print("主程式做完了")

    await task  # 等背景任務結束

asyncio.run(main())

Rust:tokio::spawn
#

use tokio::time::{sleep, Duration};

async fn background_job(name: &'static str) {
    for i in 0..3 {
        println!("[{}] 第 {} 次心跳", name, i + 1);
        sleep(Duration::from_secs(1)).await;
    }
}

#[tokio::main]
async fn main() {
    // 丟到背景——spawn 回傳 JoinHandle
    let handle = tokio::spawn(background_job("監控"));

    println!("主程式繼續做其他事...");
    sleep(Duration::from_secs(2)).await;
    println!("主程式做完了");

    handle.await.unwrap();  // 等背景任務結束
}

關鍵差異tokio::spawn 要求 Future 是 'static 的——也就是說它不能引用短暫的變數。這是 Rust 所有權系統的約束。如果你需要傳遞擁有的資料:

let name = String::from("監控");
let handle = tokio::spawn(async move {
    // 用 `move` 把 name 的所有權移進來
    for i in 0..3 {
        println!("[{}] 第 {} 次心跳", name, i + 1);
        sleep(Duration::from_secs(1)).await;
    }
});

async move {} 讓 closure 取得變數的所有權,解決生命週期的問題。


六、select!:誰先完成就處理誰
#

有時候你不想等「所有」任務完成,而是想在「任一個」完成時立刻反應——像是超時機制、或是多個來源取最快的。

Python:wait + FIRST_COMPLETED
#

import asyncio

async def fast_api():
    await asyncio.sleep(1)
    return "快速 API 的結果"

async def slow_api():
    await asyncio.sleep(5)
    return "慢速 API 的結果"

async def main():
    tasks = [
        asyncio.create_task(fast_api()),
        asyncio.create_task(slow_api()),
    ]

    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED,
    )

    for task in done:
        print(f"先完成的:{task.result()}")

    # 取消還沒完成的
    for task in pending:
        task.cancel()

asyncio.run(main())

Rust:tokio::select!
#

use tokio::time::{sleep, Duration};

async fn fast_api() -> &'static str {
    sleep(Duration::from_secs(1)).await;
    "快速 API 的結果"
}

async fn slow_api() -> &'static str {
    sleep(Duration::from_secs(5)).await;
    "慢速 API 的結果"
}

#[tokio::main]
async fn main() {
    tokio::select! {
        result = fast_api() => {
            println!("先完成的:{}", result);
        }
        result = slow_api() => {
            println!("先完成的:{}", result);
        }
    }
    // 沒被選中的分支會自動被取消(drop)
}

select! 超好用!而且沒被選中的 Future 會自動 drop,不用手動取消。

實用範例:超時機制
#

use tokio::time::{sleep, Duration};

async fn call_external_api() -> String {
    sleep(Duration::from_secs(10)).await;  // 模擬很慢的 API
    "回應".to_string()
}

#[tokio::main]
async fn main() {
    tokio::select! {
        result = call_external_api() => {
            println!("API 回應:{}", result);
        }
        _ = sleep(Duration::from_secs(3)) => {
            println!("⏰ 超時了!3 秒內沒有回應");
        }
    }
}

其實 Tokio 也有內建的 tokio::time::timeout,更簡潔:

use tokio::time::{timeout, Duration};

#[tokio::main]
async fn main() {
    match timeout(Duration::from_secs(3), call_external_api()).await {
        Ok(result) => println!("API 回應:{}", result),
        Err(_) => println!("⏰ 超時了!"),
    }
}

七、Channel:任務之間的通訊
#

當多個非同步任務需要互相傳遞資料,你需要 channel。

Python:asyncio.Queue
#

import asyncio

async def producer(queue: asyncio.Queue):
    for i in range(5):
        await queue.put(f"訊息 {i}")
        print(f"生產:訊息 {i}")
        await asyncio.sleep(0.5)

async def consumer(queue: asyncio.Queue):
    while True:
        msg = await queue.get()
        print(f"消費:{msg}")
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    producer_task = asyncio.create_task(producer(queue))
    consumer_task = asyncio.create_task(consumer(queue))

    await producer_task
    await queue.join()  # 等所有訊息被處理
    consumer_task.cancel()

asyncio.run(main())

Rust:tokio::sync::mpsc
#

use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // 建立 channel,buffer 大小 32
    let (tx, mut rx) = mpsc::channel::<String>(32);

    // 生產者
    let producer = tokio::spawn(async move {
        for i in 0..5 {
            let msg = format!("訊息 {}", i);
            println!("生產:{}", msg);
            tx.send(msg).await.unwrap();
            sleep(Duration::from_millis(500)).await;
        }
        // tx 被 drop,channel 關閉
    });

    // 消費者
    let consumer = tokio::spawn(async move {
        while let Some(msg) = rx.recv().await {
            println!("消費:{}", msg);
        }
        println!("Channel 已關閉,消費者結束");
    });

    producer.await.unwrap();
    consumer.await.unwrap();
}

mpsc = Multiple Producer, Single Consumer。tx(transmitter)可以 clone 給多個生產者,rx(receiver)只有一個消費者。

Tokio 還提供其他 channel:

Channel 類型 用途 Python 對應
mpsc 多生產者、單消費者 asyncio.Queue
oneshot 一次性回應(像 request-response) asyncio.Future
broadcast 一對多廣播 無直接對應
watch 最新值通知(狀態更新) 無直接對應

八、底層原理:Python vs Rust 的非同步模型
#

到這裡語法都學了,讓我們深入看看兩者底層的差異。

Python 的模型
#

asyncio.run()
    └── Event Loop(單執行緒)
        ├── coroutine A
        ├── coroutine B
        └── coroutine C
  • 單執行緒:所有 coroutine 跑在同一個 thread
  • Event loop 內建:Python 自帶,不用選
  • Coroutine 是 generator:底層用 yield 實作
  • GIL 限制:就算用 asyncio,CPU 密集任務還是會卡

Rust + Tokio 的模型
#

#[tokio::main]
    └── Tokio Runtime
        ├── Thread 1: task A, task D
        ├── Thread 2: task B, task E
        └── Thread 3: task C, task F
  • 多執行緒:Tokio 預設用 work-stealing 排程器,任務可在多個 thread 間切換
  • Runtime 可選:Tokio、async-std、smol… 你自己選
  • Future 是 state machine:編譯器把 async fn 轉成零成本的狀態機
  • 無 GIL:真正的平行執行

零成本抽象
#

Rust 的 async/await 號稱「零成本」——編譯後的程式碼跟你手寫狀態機一樣高效,沒有額外的 heap allocation(不像 Python coroutine 每個都是 heap 上的物件)。

// 這個 async fn...
async fn example() -> i32 {
    let a = step_one().await;
    let b = step_two(a).await;
    a + b
}

// 編譯器大致會轉成這樣的 enum:
enum ExampleFuture {
    Step1 { /* step_one 的 future */ },
    Step2 { a: i32, /* step_two 的 future */ },
    Done,
}

每個 .await 點變成 enum 的一個 variant——大小在編譯時就確定了。


九、實戰:非同步 HTTP 爬蟲
#

最後來個完整的實戰範例,把學到的東西串起來。

Python 版
#

import asyncio
import aiohttp
import time

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as resp:
        return {
            "url": url,
            "status": resp.status,
            "length": len(await resp.text()),
        }

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
    ]

    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    elapsed = time.time() - start
    for r in results:
        print(f"  {r['url']} -> {r['status']} ({r['length']} bytes)")
    print(f"總共花了 {elapsed:.1f} 秒")

asyncio.run(main())

Rust 版
#

先加入依賴:

[dependencies]
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.12", features = ["json"] }
use reqwest;
use tokio::time::Instant;

async fn fetch_url(client: &reqwest::Client, url: &str)
    -> Result<(String, u16, usize), reqwest::Error>
{
    let resp = client.get(url).send().await?;
    let status = resp.status().as_u16();
    let body = resp.text().await?;
    Ok((url.to_string(), status, body.len()))
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let urls = vec![
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
    ];

    let client = reqwest::Client::new();
    let start = Instant::now();

    // 用 join_all 同時發起所有請求
    let futures: Vec<_> = urls.iter()
        .map(|url| fetch_url(&client, url))
        .collect();

    let results = futures::future::join_all(futures).await;

    let elapsed = start.elapsed();
    for result in results {
        match result {
            Ok((url, status, len)) => {
                println!("  {} -> {} ({} bytes)", url, status, len);
            }
            Err(e) => println!("  錯誤:{}", e),
        }
    }
    println!("總共花了 {:.1?}", elapsed);

    Ok(())
}

💡 這裡用了 futures::future::join_all,需要額外加 futures = "0.3" 依賴。也可以用 tokio::join!,但它需要固定數量的 Future。


結語
#

恭喜你走完了 Rust async 的入門之旅!讓我們回顧一下:

你學會了 Python Rust
基本語法 async def / await async fn / .await
並行執行 asyncio.gather tokio::join!
背景任務 create_task tokio::spawn
競速/超時 asyncio.wait tokio::select!
任務通訊 asyncio.Queue tokio::sync::mpsc
Runtime 內建 event loop 外部 Tokio runtime

Rust 的 async 一開始會覺得比 Python 麻煩——要選 runtime、要處理生命週期、Send + 'static 的限制會讓你抓頭。但一旦掌握了,你會得到零成本抽象 + 真正多執行緒的非同步程式。

如果你想繼續深入,建議順序是:

  1. 用 Tokio 寫一個簡單的 TCP echo server
  2. 試試 axumactix-web(Rust 的 async web framework)
  3. PinUnpin(async 的進階概念)

下一篇我們會聊聊 DevOps 的東西——Helm 和 Kubernetes 套件管理!📦


延伸閱讀
#

Rust 入門 - 本文屬於一個選集。
§ 11: 本文

相關文章

Rust CLI 實戰:用 clap 打造命令列工具(Python Typer 對照版)
·5 分鐘· loading · loading
Rust Cli Clap Typer Python
Python asyncio 非同步程式設計入門:讓你的程式不再傻等
·8 分鐘· loading · loading
Python Asyncio 非同步 並行
Rust for Python 開發者:Ownership 與 Borrowing 入門
·7 分鐘· loading · loading
Rust Python Ownership Borrowing Memory
Rust Closures + Iterators:用 FP 風格寫出優雅的 Rust
·9 分鐘· loading · loading
Rust Python Closures Iterators Functional Programming
Rust Collections vs Python:Vec 和 HashMap 的生存指南
·7 分鐘· loading · loading
Rust Python Collections Vec Hashmap
Rust 生命週期(Lifetime)入門:編譯器教你管記憶體
·8 分鐘· loading · loading
Rust Lifetime Borrowing Memory-Safety Python