一. 前言 #
你有沒有遇過這種情況:Python 腳本要同時下載 10 個網頁、呼叫 5 個 API,結果一個一個慢慢來,明明大部分時間都在「等回應」,CPU 卻閒在那邊發呆?😴
這就是同步程式的痛點——每次 I/O 操作(網路請求、讀寫檔案、資料庫查詢)都會「卡住」整個程式,直到操作完成才繼續。
asyncio 就是 Python 內建的非同步程式設計框架,讓你的程式在等待 I/O 的時候去做別的事,大幅提升效率!今天拍拍君就來帶大家從零開始學 asyncio ⚡
二. 同步 vs 非同步:一個生活化的比喻 #
想像你去一間咖啡廳:
同步模式 ☕ #
- 點第一杯咖啡 → 站在櫃台等 3 分鐘 → 拿到咖啡
- 點第二杯咖啡 → 再等 3 分鐘 → 拿到咖啡
- 點第三杯咖啡 → 再等 3 分鐘 → 拿到咖啡
- 總共 9 分鐘 😰
非同步模式 ⚡ #
- 點第一杯咖啡 → 拿到號碼牌
- 馬上點第二杯 → 拿到號碼牌
- 馬上點第三杯 → 拿到號碼牌
- 三杯同時做,叫號就去拿
- 總共約 3 分鐘 🎉
asyncio 就是幫你管理「號碼牌」的系統!
三. 安裝與基本環境 #
asyncio 是 Python 標準庫,不用額外安裝!只要 Python 3.7 以上就能用(建議 3.10+,語法更簡潔)。
import asyncio
# 就這樣,直接用!
💡 如果要搭配非同步 HTTP 請求,可以安裝
aiohttp或httpx:pip install httpx
四. async/await 基礎語法 #
4.1 定義一個 coroutine(協程) #
import asyncio
async def say_hello(name: str):
"""這是一個 coroutine 函式"""
print(f"你好,{name}!")
await asyncio.sleep(1) # 模擬一個非同步等待(例如網路請求)
print(f"再見,{name}!")
重點:
async def定義一個 coroutine 函式await用來「等待」一個非同步操作,等待期間可以去做其他事asyncio.sleep()是非同步版的time.sleep()
4.2 執行 coroutine #
import asyncio
async def say_hello(name: str):
print(f"你好,{name}!")
await asyncio.sleep(1)
print(f"再見,{name}!")
# 用 asyncio.run() 啟動非同步程式
asyncio.run(say_hello("拍拍君"))
你好,拍拍君!
(等待 1 秒)
再見,拍拍君!
⚠️
asyncio.run()是程式的入口點,整個程式通常只呼叫一次。
4.3 直接呼叫 coroutine 不會執行! #
# ❌ 這樣不會執行!只會建立一個 coroutine 物件
result = say_hello("拍拍君")
print(result) # <coroutine object say_hello at 0x...>
# ✅ 必須用 await 或 asyncio.run()
await say_hello("拍拍君") # 在 async 函式內
asyncio.run(say_hello("拍拍君")) # 在同步程式碼中
五. 並行執行多個任務:asyncio.gather() #
這才是 asyncio 的精髓!讓多個任務同時執行:
5.1 同步版本(慢) #
import time
def download(url: str):
print(f"開始下載 {url}")
time.sleep(2) # 模擬下載
print(f"完成下載 {url}")
start = time.time()
download("https://site-a.com")
download("https://site-b.com")
download("https://site-c.com")
print(f"總共花了 {time.time() - start:.1f} 秒")
# 總共花了 6.0 秒 😰
5.2 非同步版本(快) #
import asyncio
import time
async def download(url: str):
print(f"開始下載 {url}")
await asyncio.sleep(2) # 模擬非同步下載
print(f"完成下載 {url}")
return f"{url} 的內容"
async def main():
start = time.time()
# 用 gather 同時執行三個下載任務
results = await asyncio.gather(
download("https://site-a.com"),
download("https://site-b.com"),
download("https://site-c.com"),
)
print(f"總共花了 {time.time() - start:.1f} 秒")
print(f"結果:{results}")
asyncio.run(main())
開始下載 https://site-a.com
開始下載 https://site-b.com
開始下載 https://site-c.com
完成下載 https://site-a.com
完成下載 https://site-b.com
完成下載 https://site-c.com
總共花了 2.0 秒 🎉
結果:['https://site-a.com 的內容', 'https://site-b.com 的內容', 'https://site-c.com 的內容']
三個下載同時進行,總時間只要 2 秒!asyncio.gather() 會等所有任務完成後,回傳一個結果列表。
六. Task:更靈活的任務管理 #
asyncio.gather() 適合「全部一起跑、全部等完」的情境。如果你需要更細緻的控制,可以用 asyncio.create_task():
import asyncio
async def cook(dish: str, seconds: int):
print(f"🍳 開始煮 {dish}...")
await asyncio.sleep(seconds)
print(f"✅ {dish} 好了!")
return dish
async def main():
# 建立任務(立即開始排程)
task1 = asyncio.create_task(cook("白飯", 3))
task2 = asyncio.create_task(cook("味噌湯", 2))
task3 = asyncio.create_task(cook("煎蛋", 1))
print("所有任務已建立,開始等待...")
# 可以個別 await
egg = await task3
print(f"先吃 {egg}!🥚")
soup = await task2
print(f"喝口 {soup} 🍜")
rice = await task1
print(f"配上 {rice} 🍚")
asyncio.run(main())
🍳 開始煮 白飯...
🍳 開始煮 味噌湯...
🍳 開始煮 煎蛋...
所有任務已建立,開始等待...
✅ 煎蛋 好了!
先吃 煎蛋!🥚
✅ 味噌湯 好了!
喝口 味噌湯 🍜
✅ 白飯 好了!
配上 白飯 🍚
create_task() 會立即把 coroutine 排進事件迴圈,不用等 await 才開始。
七. 實戰:非同步 HTTP 爬蟲 #
來寫一個真正的非同步爬蟲!用 httpx 同時抓取多個網頁:
import asyncio
import httpx
import time
async def fetch_url(client: httpx.AsyncClient, url: str) -> dict:
"""非同步抓取一個 URL"""
try:
response = await client.get(url, timeout=10.0)
return {
"url": url,
"status": response.status_code,
"length": len(response.text),
}
except Exception as e:
return {"url": url, "status": "error", "error": str(e)}
async def main():
urls = [
"https://httpbin.org/get",
"https://httpbin.org/delay/1",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
"https://httpbin.org/headers",
]
start = time.time()
# 用 async with 管理 HTTP client 的生命週期
async with httpx.AsyncClient() as client:
# 建立所有請求任務
tasks = [fetch_url(client, url) for url in urls]
# 同時執行所有請求
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
for r in results:
if r["status"] == "error":
print(f" ❌ {r['url']} → {r['error']}")
else:
print(f" ✅ {r['url']} → {r['status']} ({r['length']} bytes)")
print(f"\n⏱️ 抓了 {len(urls)} 個網址,花了 {elapsed:.1f} 秒")
asyncio.run(main())
✅ https://httpbin.org/get → 200 (420 bytes)
✅ https://httpbin.org/delay/1 → 200 (385 bytes)
✅ https://httpbin.org/ip → 200 (32 bytes)
✅ https://httpbin.org/user-agent → 200 (46 bytes)
✅ https://httpbin.org/headers → 200 (218 bytes)
⏱️ 抓了 5 個網址,花了 1.2 秒
同步版本要花約 3-4 秒,非同步版本只要 1 秒多!差異在請求越多的時候越明顯。
八. Semaphore:控制並行數量 #
如果你一次丟 1000 個請求出去,目標伺服器可能會不開心 😅。用 asyncio.Semaphore 來限制同時進行的任務數量:
import asyncio
import httpx
async def fetch_with_limit(
sem: asyncio.Semaphore,
client: httpx.AsyncClient,
url: str,
) -> str:
async with sem: # 同時最多 N 個任務能進入這個區塊
print(f"⬇️ 下載 {url}")
response = await client.get(url)
print(f"✅ 完成 {url}")
return response.text
async def main():
sem = asyncio.Semaphore(3) # 同時最多 3 個請求
urls = [f"https://httpbin.org/delay/1?id={i}" for i in range(10)]
async with httpx.AsyncClient() as client:
tasks = [fetch_with_limit(sem, client, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"共完成 {len(results)} 個請求")
asyncio.run(main())
這樣即使有 10 個任務,同時也只會有 3 個在跑,其他的排隊等。
九. asyncio.wait_for():超時控制 #
有時候某個任務太久了,不想一直等:
import asyncio
async def slow_task():
print("開始一個很慢的任務...")
await asyncio.sleep(10)
return "完成!"
async def main():
try:
# 最多等 3 秒
result = await asyncio.wait_for(slow_task(), timeout=3.0)
print(result)
except asyncio.TimeoutError:
print("⏰ 超時了!任務被取消")
asyncio.run(main())
開始一個很慢的任務...
⏰ 超時了!任務被取消
wait_for() 會在超時時自動取消任務並拋出 TimeoutError。
十. async for 與 async with #
10.1 async with(非同步上下文管理器) #
import httpx
async def main():
# async with 確保資源正確釋放
async with httpx.AsyncClient() as client:
response = await client.get("https://httpbin.org/get")
print(response.status_code)
# client 在這裡自動關閉
10.2 async for(非同步迭代器) #
import asyncio
async def countdown(n: int):
"""非同步倒數產生器"""
while n > 0:
yield n
await asyncio.sleep(0.5)
n -= 1
async def main():
async for number in countdown(5):
print(f"⏳ {number}...")
print("🚀 發射!")
asyncio.run(main())
⏳ 5...
⏳ 4...
⏳ 3...
⏳ 2...
⏳ 1...
🚀 發射!
async for 用於遍歷非同步產生器或非同步迭代器,常見於串流處理。
十一. 常見陷阱與注意事項 #
⚠️ 1. 不要在 async 函式裡用 time.sleep() #
# ❌ 會阻塞整個事件迴圈!
async def bad():
time.sleep(5) # 所有其他任務都被卡住
# ✅ 用 asyncio.sleep()
async def good():
await asyncio.sleep(5) # 其他任務可以繼續跑
⚠️ 2. CPU 密集任務不適合 asyncio #
asyncio 適合 I/O 密集 的任務(網路、檔案、資料庫)。如果你的任務是大量計算(影像處理、數學運算),應該用 multiprocessing 或 concurrent.futures:
import asyncio
from concurrent.futures import ProcessPoolExecutor
def heavy_computation(n: int) -> int:
"""CPU 密集任務"""
return sum(i * i for i in range(n))
async def main():
loop = asyncio.get_event_loop()
# 把 CPU 任務丟給 process pool
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, heavy_computation, 10_000_000)
print(f"結果:{result}")
asyncio.run(main())
⚠️ 3. 忘了 await #
# ❌ 忘了 await,coroutine 不會執行
async def main():
asyncio.sleep(1) # 沒有 await!什麼都不會發生
# RuntimeWarning: coroutine 'sleep' was never awaited
# ✅
async def main():
await asyncio.sleep(1)
⚠️ 4. 不要在非 async 函式中呼叫 await #
# ❌ SyntaxError
def main():
await asyncio.sleep(1)
# ✅ 必須是 async def
async def main():
await asyncio.sleep(1)
十二. asyncio vs threading vs multiprocessing #
| asyncio | threading | multiprocessing | |
|---|---|---|---|
| 適用場景 | I/O 密集 | I/O 密集 | CPU 密集 |
| 並行方式 | 協程(單執行緒) | 多執行緒 | 多程序 |
| GIL 影響 | 無(單執行緒) | 有 | 無 |
| 記憶體用量 | 🟢 低 | 🟡 中 | 🔴 高 |
| 切換成本 | 🟢 極低 | 🟡 中 | 🔴 高 |
| 程式複雜度 | 🟡 async/await | 🔴 鎖、競爭 | 🟡 序列化 |
| 適合任務數 | 🟢 數千到數萬 | 🟡 數十到數百 | 🟡 CPU 核心數 |
拍拍君的建議:
- 網路爬蟲、API 呼叫、WebSocket → asyncio
- 檔案 I/O + 簡單並行 → threading
- 影像處理、科學計算 → multiprocessing
十三. 總結 #
今天學了 Python asyncio 非同步程式設計:
- ✅ async/await 基礎語法
- ✅ asyncio.gather() 並行執行多個任務
- ✅ create_task() 靈活的任務管理
- ✅ Semaphore 控制並行數量
- ✅ wait_for() 超時控制
- ✅ async for / async with 非同步迭代與上下文管理
- ✅ 常見陷阱與 asyncio vs threading 比較
非同步程式設計一開始可能會覺得有點繞,但一旦理解了「等待的時候去做別的事」的概念,就會發現 asyncio 超好用的!特別是寫爬蟲、API 整合的時候,效能提升非常有感 🚀
下次遇到大量 I/O 操作的時候,記得用 asyncio 讓你的程式不再傻等!拍拍君下次見 👋