一、前言 #
如果你用過 Python 的 asyncio,應該對 async def 和 await 不陌生。Python 的非同步程式設計讓你在一個執行緒裡同時處理上千個 I/O 操作——爬蟲、API 呼叫、WebSocket,都靠它。
但你有沒有想過:Rust 的 async/await 長什麼樣? 語法看起來差不多,底層卻完全不同。Python 的 coroutine 跑在內建的 event loop 上,而 Rust 的 Future 需要一個外部 runtime——最常用的就是 Tokio。
這篇文章會從 Python asyncio 出發,帶你搞懂 Rust async 的核心概念,然後用 Tokio 寫出真正的非同步程式。我們會比較兩者的差異,讓你用最熟悉的方式學會 Rust 的非同步世界。
二、安裝與設定 #
Python 端 #
Python 3.4+ 內建 asyncio,不用額外安裝:
import asyncio
Rust 端 #
Rust 標準庫有 async/await 語法,但沒有內建 runtime。你需要自己選一個,最主流的是 Tokio:
cargo new async-demo
cd async-demo
在 Cargo.toml 加入:
[dependencies]
tokio = { version = "1", features = ["full"] }
features = ["full"] 會啟用所有功能(runtime、net、time、sync 等)。正式專案可以按需只開需要的 feature。
三、基本語法對比:async/await #
Python 的 coroutine #
import asyncio
async def greet(name: str) -> str:
await asyncio.sleep(1) # 模擬非同步 I/O
return f"嗨,{name}!"
async def main():
result = await greet("拍拍君")
print(result)
asyncio.run(main())
重點:
async def定義 coroutine functionawait暫停執行,等結果回來asyncio.run()啟動 event loop
Rust 的 Future #
use tokio::time::{sleep, Duration};
async fn greet(name: &str) -> String {
sleep(Duration::from_secs(1)).await; // 模擬非同步 I/O
format!("嗨,{}!", name)
}
#[tokio::main]
async fn main() {
let result = greet("拍拍君").await;
println!("{}", result);
}
重點:
async fn定義回傳Future的函式.await放在後面(不是前面!)#[tokio::main]巨集自動建立 Tokio runtime
語法差異整理 #
| 概念 | Python | Rust |
|---|---|---|
| 定義非同步函式 | async def f() |
async fn f() |
| 等待結果 | await expr |
expr.await |
| 啟動 runtime | asyncio.run(main()) |
#[tokio::main] |
| 回傳型別 | Coroutine |
impl Future<Output = T> |
| 睡眠 | asyncio.sleep(1) |
tokio::time::sleep(Duration::from_secs(1)) |
注意 .await 的位置!Python 放前面,Rust 放後面。這是因為 Rust 的 .await 是一個後綴運算子,可以跟方法鏈串在一起:
let body = reqwest::get("https://example.com")
.await?
.text()
.await?;
四、並行執行:同時跑多個任務 #
非同步程式設計的精髓就是並行——同時發起多個 I/O 操作,而不是傻傻排隊等。
Python:gather #
import asyncio
import time
async def fetch(name: str, delay: float) -> str:
print(f"開始抓 {name}...")
await asyncio.sleep(delay)
print(f"{name} 完成!")
return f"{name} 的資料"
async def main():
start = time.time()
# 同時執行三個任務
results = await asyncio.gather(
fetch("API-A", 2),
fetch("API-B", 1),
fetch("API-C", 3),
)
elapsed = time.time() - start
print(f"全部完成,花了 {elapsed:.1f} 秒") # ~3 秒,不是 6 秒
print(results)
asyncio.run(main())
Rust:tokio::join! #
use tokio::time::{sleep, Duration, Instant};
async fn fetch(name: &str, delay_secs: u64) -> String {
println!("開始抓 {}...", name);
sleep(Duration::from_secs(delay_secs)).await;
println!("{} 完成!", name);
format!("{} 的資料", name)
}
#[tokio::main]
async fn main() {
let start = Instant::now();
// 同時執行三個任務
let (a, b, c) = tokio::join!(
fetch("API-A", 2),
fetch("API-B", 1),
fetch("API-C", 3),
);
let elapsed = start.elapsed();
println!("全部完成,花了 {:.1?}", elapsed); // ~3 秒
println!("{}, {}, {}", a, b, c);
}
tokio::join! 類似 asyncio.gather:所有 Future 會同時開始,等全部完成才繼續。差別在於 join! 回傳 tuple,而 gather 回傳 list。
五、spawn:把任務丟到背景執行 #
有時候你想 fire-and-forget,或者需要「真正的」並行任務(不是只是等 I/O)。
Python:create_task #
import asyncio
async def background_job(name: str):
for i in range(3):
print(f"[{name}] 第 {i+1} 次心跳")
await asyncio.sleep(1)
async def main():
# 丟到背景
task = asyncio.create_task(background_job("監控"))
print("主程式繼續做其他事...")
await asyncio.sleep(2)
print("主程式做完了")
await task # 等背景任務結束
asyncio.run(main())
Rust:tokio::spawn #
use tokio::time::{sleep, Duration};
async fn background_job(name: &'static str) {
for i in 0..3 {
println!("[{}] 第 {} 次心跳", name, i + 1);
sleep(Duration::from_secs(1)).await;
}
}
#[tokio::main]
async fn main() {
// 丟到背景——spawn 回傳 JoinHandle
let handle = tokio::spawn(background_job("監控"));
println!("主程式繼續做其他事...");
sleep(Duration::from_secs(2)).await;
println!("主程式做完了");
handle.await.unwrap(); // 等背景任務結束
}
關鍵差異:tokio::spawn 要求 Future 是 'static 的——也就是說它不能引用短暫的變數。這是 Rust 所有權系統的約束。如果你需要傳遞擁有的資料:
let name = String::from("監控");
let handle = tokio::spawn(async move {
// 用 `move` 把 name 的所有權移進來
for i in 0..3 {
println!("[{}] 第 {} 次心跳", name, i + 1);
sleep(Duration::from_secs(1)).await;
}
});
async move {} 讓 closure 取得變數的所有權,解決生命週期的問題。
六、select!:誰先完成就處理誰 #
有時候你不想等「所有」任務完成,而是想在「任一個」完成時立刻反應——像是超時機制、或是多個來源取最快的。
Python:wait + FIRST_COMPLETED #
import asyncio
async def fast_api():
await asyncio.sleep(1)
return "快速 API 的結果"
async def slow_api():
await asyncio.sleep(5)
return "慢速 API 的結果"
async def main():
tasks = [
asyncio.create_task(fast_api()),
asyncio.create_task(slow_api()),
]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED,
)
for task in done:
print(f"先完成的:{task.result()}")
# 取消還沒完成的
for task in pending:
task.cancel()
asyncio.run(main())
Rust:tokio::select! #
use tokio::time::{sleep, Duration};
async fn fast_api() -> &'static str {
sleep(Duration::from_secs(1)).await;
"快速 API 的結果"
}
async fn slow_api() -> &'static str {
sleep(Duration::from_secs(5)).await;
"慢速 API 的結果"
}
#[tokio::main]
async fn main() {
tokio::select! {
result = fast_api() => {
println!("先完成的:{}", result);
}
result = slow_api() => {
println!("先完成的:{}", result);
}
}
// 沒被選中的分支會自動被取消(drop)
}
select! 超好用!而且沒被選中的 Future 會自動 drop,不用手動取消。
實用範例:超時機制 #
use tokio::time::{sleep, Duration};
async fn call_external_api() -> String {
sleep(Duration::from_secs(10)).await; // 模擬很慢的 API
"回應".to_string()
}
#[tokio::main]
async fn main() {
tokio::select! {
result = call_external_api() => {
println!("API 回應:{}", result);
}
_ = sleep(Duration::from_secs(3)) => {
println!("⏰ 超時了!3 秒內沒有回應");
}
}
}
其實 Tokio 也有內建的 tokio::time::timeout,更簡潔:
use tokio::time::{timeout, Duration};
#[tokio::main]
async fn main() {
match timeout(Duration::from_secs(3), call_external_api()).await {
Ok(result) => println!("API 回應:{}", result),
Err(_) => println!("⏰ 超時了!"),
}
}
七、Channel:任務之間的通訊 #
當多個非同步任務需要互相傳遞資料,你需要 channel。
Python:asyncio.Queue #
import asyncio
async def producer(queue: asyncio.Queue):
for i in range(5):
await queue.put(f"訊息 {i}")
print(f"生產:訊息 {i}")
await asyncio.sleep(0.5)
async def consumer(queue: asyncio.Queue):
while True:
msg = await queue.get()
print(f"消費:{msg}")
queue.task_done()
async def main():
queue = asyncio.Queue()
producer_task = asyncio.create_task(producer(queue))
consumer_task = asyncio.create_task(consumer(queue))
await producer_task
await queue.join() # 等所有訊息被處理
consumer_task.cancel()
asyncio.run(main())
Rust:tokio::sync::mpsc #
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// 建立 channel,buffer 大小 32
let (tx, mut rx) = mpsc::channel::<String>(32);
// 生產者
let producer = tokio::spawn(async move {
for i in 0..5 {
let msg = format!("訊息 {}", i);
println!("生產:{}", msg);
tx.send(msg).await.unwrap();
sleep(Duration::from_millis(500)).await;
}
// tx 被 drop,channel 關閉
});
// 消費者
let consumer = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
println!("消費:{}", msg);
}
println!("Channel 已關閉,消費者結束");
});
producer.await.unwrap();
consumer.await.unwrap();
}
mpsc = Multiple Producer, Single Consumer。tx(transmitter)可以 clone 給多個生產者,rx(receiver)只有一個消費者。
Tokio 還提供其他 channel:
| Channel 類型 | 用途 | Python 對應 |
|---|---|---|
mpsc |
多生產者、單消費者 | asyncio.Queue |
oneshot |
一次性回應(像 request-response) | asyncio.Future |
broadcast |
一對多廣播 | 無直接對應 |
watch |
最新值通知(狀態更新) | 無直接對應 |
八、底層原理:Python vs Rust 的非同步模型 #
到這裡語法都學了,讓我們深入看看兩者底層的差異。
Python 的模型 #
asyncio.run()
└── Event Loop(單執行緒)
├── coroutine A
├── coroutine B
└── coroutine C
- 單執行緒:所有 coroutine 跑在同一個 thread
- Event loop 內建:Python 自帶,不用選
- Coroutine 是 generator:底層用 yield 實作
- GIL 限制:就算用
asyncio,CPU 密集任務還是會卡
Rust + Tokio 的模型 #
#[tokio::main]
└── Tokio Runtime
├── Thread 1: task A, task D
├── Thread 2: task B, task E
└── Thread 3: task C, task F
- 多執行緒:Tokio 預設用 work-stealing 排程器,任務可在多個 thread 間切換
- Runtime 可選:Tokio、async-std、smol… 你自己選
- Future 是 state machine:編譯器把 async fn 轉成零成本的狀態機
- 無 GIL:真正的平行執行
零成本抽象 #
Rust 的 async/await 號稱「零成本」——編譯後的程式碼跟你手寫狀態機一樣高效,沒有額外的 heap allocation(不像 Python coroutine 每個都是 heap 上的物件)。
// 這個 async fn...
async fn example() -> i32 {
let a = step_one().await;
let b = step_two(a).await;
a + b
}
// 編譯器大致會轉成這樣的 enum:
enum ExampleFuture {
Step1 { /* step_one 的 future */ },
Step2 { a: i32, /* step_two 的 future */ },
Done,
}
每個 .await 點變成 enum 的一個 variant——大小在編譯時就確定了。
九、實戰:非同步 HTTP 爬蟲 #
最後來個完整的實戰範例,把學到的東西串起來。
Python 版 #
import asyncio
import aiohttp
import time
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as resp:
return {
"url": url,
"status": resp.status,
"length": len(await resp.text()),
}
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/get",
"https://httpbin.org/ip",
]
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
for r in results:
print(f" {r['url']} -> {r['status']} ({r['length']} bytes)")
print(f"總共花了 {elapsed:.1f} 秒")
asyncio.run(main())
Rust 版 #
先加入依賴:
[dependencies]
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.12", features = ["json"] }
use reqwest;
use tokio::time::Instant;
async fn fetch_url(client: &reqwest::Client, url: &str)
-> Result<(String, u16, usize), reqwest::Error>
{
let resp = client.get(url).send().await?;
let status = resp.status().as_u16();
let body = resp.text().await?;
Ok((url.to_string(), status, body.len()))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let urls = vec![
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/get",
"https://httpbin.org/ip",
];
let client = reqwest::Client::new();
let start = Instant::now();
// 用 join_all 同時發起所有請求
let futures: Vec<_> = urls.iter()
.map(|url| fetch_url(&client, url))
.collect();
let results = futures::future::join_all(futures).await;
let elapsed = start.elapsed();
for result in results {
match result {
Ok((url, status, len)) => {
println!(" {} -> {} ({} bytes)", url, status, len);
}
Err(e) => println!(" 錯誤:{}", e),
}
}
println!("總共花了 {:.1?}", elapsed);
Ok(())
}
💡 這裡用了
futures::future::join_all,需要額外加futures = "0.3"依賴。也可以用tokio::join!,但它需要固定數量的 Future。
結語 #
恭喜你走完了 Rust async 的入門之旅!讓我們回顧一下:
| 你學會了 | Python | Rust |
|---|---|---|
| 基本語法 | async def / await |
async fn / .await |
| 並行執行 | asyncio.gather |
tokio::join! |
| 背景任務 | create_task |
tokio::spawn |
| 競速/超時 | asyncio.wait |
tokio::select! |
| 任務通訊 | asyncio.Queue |
tokio::sync::mpsc |
| Runtime | 內建 event loop | 外部 Tokio runtime |
Rust 的 async 一開始會覺得比 Python 麻煩——要選 runtime、要處理生命週期、Send + 'static 的限制會讓你抓頭。但一旦掌握了,你會得到零成本抽象 + 真正多執行緒的非同步程式。
如果你想繼續深入,建議順序是:
- 用 Tokio 寫一個簡單的 TCP echo server
- 試試
axum或actix-web(Rust 的 async web framework) - 學
Pin和Unpin(async 的進階概念)
下一篇我們會聊聊 DevOps 的東西——Helm 和 Kubernetes 套件管理!📦