快轉到主要內容
  1. 教學文章/

Python asyncio 非同步程式設計入門:讓你的程式不再傻等

·8 分鐘· loading · loading · ·
Python Asyncio 非同步 並行
每日拍拍
作者
每日拍拍
科學家 X 科技宅宅
目錄
Python 學習 - 本文屬於一個選集。
§ 19: 本文

一. 前言
#

你有沒有遇過這種情況:Python 腳本要同時下載 10 個網頁、呼叫 5 個 API,結果一個一個慢慢來,明明大部分時間都在「等回應」,CPU 卻閒在那邊發呆?😴

這就是同步程式的痛點——每次 I/O 操作(網路請求、讀寫檔案、資料庫查詢)都會「卡住」整個程式,直到操作完成才繼續。

asyncio 就是 Python 內建的非同步程式設計框架,讓你的程式在等待 I/O 的時候去做別的事,大幅提升效率!今天拍拍君就來帶大家從零開始學 asyncio ⚡

二. 同步 vs 非同步:一個生活化的比喻
#

想像你去一間咖啡廳:

同步模式 ☕
#

  1. 點第一杯咖啡 → 站在櫃台等 3 分鐘 → 拿到咖啡
  2. 點第二杯咖啡 → 再等 3 分鐘 → 拿到咖啡
  3. 點第三杯咖啡 → 再等 3 分鐘 → 拿到咖啡
  4. 總共 9 分鐘 😰

非同步模式 ⚡
#

  1. 點第一杯咖啡 → 拿到號碼牌
  2. 馬上點第二杯 → 拿到號碼牌
  3. 馬上點第三杯 → 拿到號碼牌
  4. 三杯同時做,叫號就去拿
  5. 總共約 3 分鐘 🎉

asyncio 就是幫你管理「號碼牌」的系統!

三. 安裝與基本環境
#

asyncio 是 Python 標準庫,不用額外安裝!只要 Python 3.7 以上就能用(建議 3.10+,語法更簡潔)。

import asyncio

# 就這樣,直接用!

💡 如果要搭配非同步 HTTP 請求,可以安裝 aiohttphttpx

pip install httpx

四. async/await 基礎語法
#

4.1 定義一個 coroutine(協程)
#

import asyncio

async def say_hello(name: str):
    """這是一個 coroutine 函式"""
    print(f"你好,{name}!")
    await asyncio.sleep(1)  # 模擬一個非同步等待(例如網路請求)
    print(f"再見,{name}!")

重點:

  • async def 定義一個 coroutine 函式
  • await 用來「等待」一個非同步操作,等待期間可以去做其他事
  • asyncio.sleep() 是非同步版的 time.sleep()

4.2 執行 coroutine
#

import asyncio

async def say_hello(name: str):
    print(f"你好,{name}!")
    await asyncio.sleep(1)
    print(f"再見,{name}!")

# 用 asyncio.run() 啟動非同步程式
asyncio.run(say_hello("拍拍君"))
你好,拍拍君!
(等待 1 秒)
再見,拍拍君!

⚠️ asyncio.run() 是程式的入口點,整個程式通常只呼叫一次。

4.3 直接呼叫 coroutine 不會執行!
#

# ❌ 這樣不會執行!只會建立一個 coroutine 物件
result = say_hello("拍拍君")
print(result)  # <coroutine object say_hello at 0x...>

# ✅ 必須用 await 或 asyncio.run()
await say_hello("拍拍君")      # 在 async 函式內
asyncio.run(say_hello("拍拍君"))  # 在同步程式碼中

五. 並行執行多個任務:asyncio.gather()
#

這才是 asyncio 的精髓!讓多個任務同時執行

5.1 同步版本(慢)
#

import time

def download(url: str):
    print(f"開始下載 {url}")
    time.sleep(2)  # 模擬下載
    print(f"完成下載 {url}")

start = time.time()
download("https://site-a.com")
download("https://site-b.com")
download("https://site-c.com")
print(f"總共花了 {time.time() - start:.1f} 秒")
# 總共花了 6.0 秒 😰

5.2 非同步版本(快)
#

import asyncio
import time

async def download(url: str):
    print(f"開始下載 {url}")
    await asyncio.sleep(2)  # 模擬非同步下載
    print(f"完成下載 {url}")
    return f"{url} 的內容"

async def main():
    start = time.time()

    # 用 gather 同時執行三個下載任務
    results = await asyncio.gather(
        download("https://site-a.com"),
        download("https://site-b.com"),
        download("https://site-c.com"),
    )

    print(f"總共花了 {time.time() - start:.1f} 秒")
    print(f"結果:{results}")

asyncio.run(main())
開始下載 https://site-a.com
開始下載 https://site-b.com
開始下載 https://site-c.com
完成下載 https://site-a.com
完成下載 https://site-b.com
完成下載 https://site-c.com
總共花了 2.0 秒 🎉
結果:['https://site-a.com 的內容', 'https://site-b.com 的內容', 'https://site-c.com 的內容']

三個下載同時進行,總時間只要 2 秒!asyncio.gather() 會等所有任務完成後,回傳一個結果列表。

六. Task:更靈活的任務管理
#

asyncio.gather() 適合「全部一起跑、全部等完」的情境。如果你需要更細緻的控制,可以用 asyncio.create_task()

import asyncio

async def cook(dish: str, seconds: int):
    print(f"🍳 開始煮 {dish}...")
    await asyncio.sleep(seconds)
    print(f"✅ {dish} 好了!")
    return dish

async def main():
    # 建立任務(立即開始排程)
    task1 = asyncio.create_task(cook("白飯", 3))
    task2 = asyncio.create_task(cook("味噌湯", 2))
    task3 = asyncio.create_task(cook("煎蛋", 1))

    print("所有任務已建立,開始等待...")

    # 可以個別 await
    egg = await task3
    print(f"先吃 {egg}!🥚")

    soup = await task2
    print(f"喝口 {soup} 🍜")

    rice = await task1
    print(f"配上 {rice} 🍚")

asyncio.run(main())
🍳 開始煮 白飯...
🍳 開始煮 味噌湯...
🍳 開始煮 煎蛋...
所有任務已建立,開始等待...
✅ 煎蛋 好了!
先吃 煎蛋!🥚
✅ 味噌湯 好了!
喝口 味噌湯 🍜
✅ 白飯 好了!
配上 白飯 🍚

create_task() 會立即把 coroutine 排進事件迴圈,不用等 await 才開始。

七. 實戰:非同步 HTTP 爬蟲
#

來寫一個真正的非同步爬蟲!用 httpx 同時抓取多個網頁:

import asyncio
import httpx
import time

async def fetch_url(client: httpx.AsyncClient, url: str) -> dict:
    """非同步抓取一個 URL"""
    try:
        response = await client.get(url, timeout=10.0)
        return {
            "url": url,
            "status": response.status_code,
            "length": len(response.text),
        }
    except Exception as e:
        return {"url": url, "status": "error", "error": str(e)}

async def main():
    urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/ip",
        "https://httpbin.org/user-agent",
        "https://httpbin.org/headers",
    ]

    start = time.time()

    # 用 async with 管理 HTTP client 的生命週期
    async with httpx.AsyncClient() as client:
        # 建立所有請求任務
        tasks = [fetch_url(client, url) for url in urls]

        # 同時執行所有請求
        results = await asyncio.gather(*tasks)

    elapsed = time.time() - start

    for r in results:
        if r["status"] == "error":
            print(f"  ❌ {r['url']}{r['error']}")
        else:
            print(f"  ✅ {r['url']}{r['status']} ({r['length']} bytes)")

    print(f"\n⏱️ 抓了 {len(urls)} 個網址,花了 {elapsed:.1f} 秒")

asyncio.run(main())
  ✅ https://httpbin.org/get → 200 (420 bytes)
  ✅ https://httpbin.org/delay/1 → 200 (385 bytes)
  ✅ https://httpbin.org/ip → 200 (32 bytes)
  ✅ https://httpbin.org/user-agent → 200 (46 bytes)
  ✅ https://httpbin.org/headers → 200 (218 bytes)

⏱️ 抓了 5 個網址,花了 1.2 秒

同步版本要花約 3-4 秒,非同步版本只要 1 秒多!差異在請求越多的時候越明顯。

八. Semaphore:控制並行數量
#

如果你一次丟 1000 個請求出去,目標伺服器可能會不開心 😅。用 asyncio.Semaphore 來限制同時進行的任務數量:

import asyncio
import httpx

async def fetch_with_limit(
    sem: asyncio.Semaphore,
    client: httpx.AsyncClient,
    url: str,
) -> str:
    async with sem:  # 同時最多 N 個任務能進入這個區塊
        print(f"⬇️ 下載 {url}")
        response = await client.get(url)
        print(f"✅ 完成 {url}")
        return response.text

async def main():
    sem = asyncio.Semaphore(3)  # 同時最多 3 個請求

    urls = [f"https://httpbin.org/delay/1?id={i}" for i in range(10)]

    async with httpx.AsyncClient() as client:
        tasks = [fetch_with_limit(sem, client, url) for url in urls]
        results = await asyncio.gather(*tasks)

    print(f"共完成 {len(results)} 個請求")

asyncio.run(main())

這樣即使有 10 個任務,同時也只會有 3 個在跑,其他的排隊等。

九. asyncio.wait_for():超時控制
#

有時候某個任務太久了,不想一直等:

import asyncio

async def slow_task():
    print("開始一個很慢的任務...")
    await asyncio.sleep(10)
    return "完成!"

async def main():
    try:
        # 最多等 3 秒
        result = await asyncio.wait_for(slow_task(), timeout=3.0)
        print(result)
    except asyncio.TimeoutError:
        print("⏰ 超時了!任務被取消")

asyncio.run(main())
開始一個很慢的任務...
⏰ 超時了!任務被取消

wait_for() 會在超時時自動取消任務並拋出 TimeoutError

十. async for 與 async with
#

10.1 async with(非同步上下文管理器)
#

import httpx

async def main():
    # async with 確保資源正確釋放
    async with httpx.AsyncClient() as client:
        response = await client.get("https://httpbin.org/get")
        print(response.status_code)
    # client 在這裡自動關閉

10.2 async for(非同步迭代器)
#

import asyncio

async def countdown(n: int):
    """非同步倒數產生器"""
    while n > 0:
        yield n
        await asyncio.sleep(0.5)
        n -= 1

async def main():
    async for number in countdown(5):
        print(f"⏳ {number}...")
    print("🚀 發射!")

asyncio.run(main())
⏳ 5...
⏳ 4...
⏳ 3...
⏳ 2...
⏳ 1...
🚀 發射!

async for 用於遍歷非同步產生器或非同步迭代器,常見於串流處理。

十一. 常見陷阱與注意事項
#

⚠️ 1. 不要在 async 函式裡用 time.sleep()
#

# ❌ 會阻塞整個事件迴圈!
async def bad():
    time.sleep(5)  # 所有其他任務都被卡住

# ✅ 用 asyncio.sleep()
async def good():
    await asyncio.sleep(5)  # 其他任務可以繼續跑

⚠️ 2. CPU 密集任務不適合 asyncio
#

asyncio 適合 I/O 密集 的任務(網路、檔案、資料庫)。如果你的任務是大量計算(影像處理、數學運算),應該用 multiprocessingconcurrent.futures

import asyncio
from concurrent.futures import ProcessPoolExecutor

def heavy_computation(n: int) -> int:
    """CPU 密集任務"""
    return sum(i * i for i in range(n))

async def main():
    loop = asyncio.get_event_loop()
    # 把 CPU 任務丟給 process pool
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, heavy_computation, 10_000_000)
    print(f"結果:{result}")

asyncio.run(main())

⚠️ 3. 忘了 await
#

# ❌ 忘了 await,coroutine 不會執行
async def main():
    asyncio.sleep(1)  # 沒有 await!什麼都不會發生
    # RuntimeWarning: coroutine 'sleep' was never awaited

# ✅
async def main():
    await asyncio.sleep(1)

⚠️ 4. 不要在非 async 函式中呼叫 await
#

# ❌ SyntaxError
def main():
    await asyncio.sleep(1)

# ✅ 必須是 async def
async def main():
    await asyncio.sleep(1)

十二. asyncio vs threading vs multiprocessing
#

asyncio threading multiprocessing
適用場景 I/O 密集 I/O 密集 CPU 密集
並行方式 協程(單執行緒) 多執行緒 多程序
GIL 影響 無(單執行緒)
記憶體用量 🟢 低 🟡 中 🔴 高
切換成本 🟢 極低 🟡 中 🔴 高
程式複雜度 🟡 async/await 🔴 鎖、競爭 🟡 序列化
適合任務數 🟢 數千到數萬 🟡 數十到數百 🟡 CPU 核心數

拍拍君的建議

  • 網路爬蟲、API 呼叫、WebSocket → asyncio
  • 檔案 I/O + 簡單並行 → threading
  • 影像處理、科學計算 → multiprocessing

十三. 總結
#

今天學了 Python asyncio 非同步程式設計:

  • async/await 基礎語法
  • asyncio.gather() 並行執行多個任務
  • create_task() 靈活的任務管理
  • Semaphore 控制並行數量
  • wait_for() 超時控制
  • async for / async with 非同步迭代與上下文管理
  • ✅ 常見陷阱與 asyncio vs threading 比較

非同步程式設計一開始可能會覺得有點繞,但一旦理解了「等待的時候去做別的事」的概念,就會發現 asyncio 超好用的!特別是寫爬蟲、API 整合的時候,效能提升非常有感 🚀

下次遇到大量 I/O 操作的時候,記得用 asyncio 讓你的程式不再傻等!拍拍君下次見 👋

參考資料
#

Python 學習 - 本文屬於一個選集。
§ 19: 本文

相關文章

用 Typer 打造專業 CLI 工具:Python 命令列框架教學
·10 分鐘· loading · loading
Python Typer Cli 開發工具
少寫一半程式碼:dataclasses 讓你的 Python 類別煥然一新
·6 分鐘· loading · loading
Python Dataclasses Oop 標準庫
Ruff:用 Rust 寫的 Python Linter,快到你會懷疑人生
·4 分鐘· loading · loading
Python Ruff Linter Formatter Code-Quality
超快速 Python 套件管理:uv 完全教學
·6 分鐘· loading · loading
Python Uv Package Manager Rust
讓你的終端機華麗變身:Rich 套件教學
·2 分鐘· loading · loading
Python Rich Cli
Python Typing:讓你的程式碼更安全、更好維護
·4 分鐘· loading · loading
Python Typing Type Hints