一. 前言:非同步程式最難的不是 async #
很多人第一次學 Python 非同步,會從 asyncio 開始。
你會寫 async def,也會用 await。
接著你很快會遇到更麻煩的問題:
- 有一個任務失敗時,其他任務要不要停?
- timeout 發生時,背景工作有沒有真的被取消?
- worker 卡住時,程式要怎麼優雅收尾?
- 測試 async code 時,要怎麼避免 event loop 亂掉?
這些問題通常不是語法問題,而是任務生命週期問題。
今天拍拍君要介紹的 AnyIO,就是專門整理這些混亂的工具。
它不是要取代你已經會的 asyncio。
比較好的理解方式是:
asyncio是 Python 內建非同步基礎;AnyIO是把 task group、取消、timeout 與跨 backend 這些麻煩事包得更乾淨的抽象層。
如果你還沒有碰過非同步,可以先看:
這篇會假設你已經知道 coroutine、await、event loop 的基本概念,然後直接進實戰。
二. AnyIO 適合什麼場景? #
AnyIO 常出現在三種地方。
第一種是你在寫需要同時處理多個 I/O 任務的工具。
例如批次呼叫 API、同步多個資料來源、寫 CLI crawler,或寫 background worker。
第二種是你在用 FastAPI、Starlette、HTTPX 這類 async 生態工具。
這些工具的設計觀念,很多都和 AnyIO 的 task group、cancel scope 很接近。
第三種是你在寫函式庫,希望 async 程式不要只綁死在 asyncio。
AnyIO 可以跑在不同 backend 上,最常見的是:
asynciotrio
一般應用程式不一定需要跨 backend。
但就算最後只跑 asyncio,AnyIO 的寫法也會逼你把任務範圍寫清楚。
這一點很值得。
三. 安裝與第一個範例 #
先建立專案。
uv init anyio-demo
cd anyio-demo
uv add anyio httpx
uv add --dev pytest pytest-anyio
不用 uv 的話:
python -m pip install anyio httpx pytest pytest-anyio
第一個程式:
# hello_anyio.py
import anyio
async def say(name: str) -> None:
await anyio.sleep(0.2)
print(f"hello, {name}")
async def main() -> None:
await say("拍拍君")
if __name__ == "__main__":
anyio.run(main)
執行:
uv run python hello_anyio.py
輸出:
hello, 拍拍君
注意 anyio.run() 接的是 async function 本身:
anyio.run(main)
不是 coroutine object:
anyio.run(main())
第一次寫很容易手滑。非同步語法就是這樣,有時候比它看起來還挑剔。
四. TaskGroup:讓一批任務有清楚邊界 #
AnyIO 最重要的概念之一是 TaskGroup。
它的意思是:
在同一個 task group 裡啟動的任務,都屬於這個範圍;離開範圍時,任務也必須被處理乾淨。
看例子。
# task_group_demo.py
import anyio
async def worker(name: str, delay: float) -> None:
await anyio.sleep(delay)
print(f"{name} done")
async def main() -> None:
async with anyio.create_task_group() as tg:
tg.start_soon(worker, "download", 0.4)
tg.start_soon(worker, "parse", 0.2)
tg.start_soon(worker, "save", 0.3)
print("all done")
if __name__ == "__main__":
anyio.run(main)
可能輸出:
parse done
save done
download done
all done
tg.start_soon() 會啟動背景任務。
async with 區塊不會在任務還沒結束時直接離開。
所有任務都完成後,才會印出 all done。
這比手動存 task list 再 await asyncio.gather(...) 更有結構。
async with anyio.create_task_group() as tg:
tg.start_soon(...)
tg.start_soon(...)
你一眼就看得出來:這批任務由這個區塊負責。
五. 任務失敗時,其他任務會怎麼辦? #
非同步程式真正麻煩的地方,不是全部成功。
全部成功誰不會,連 print() 都會。
麻煩的是其中一個任務失敗。
# failure_demo.py
import anyio
async def slow_worker() -> None:
try:
print("slow worker started")
await anyio.sleep(5)
print("slow worker finished")
finally:
print("slow worker cleanup")
async def broken_worker() -> None:
await anyio.sleep(0.3)
raise RuntimeError("API token expired")
async def main() -> None:
async with anyio.create_task_group() as tg:
tg.start_soon(slow_worker)
tg.start_soon(broken_worker)
if __name__ == "__main__":
anyio.run(main)
broken_worker() 失敗時,task group 會取消同一組裡還沒完成的任務。
所以你會看到 slow worker cleanup。
這個行為很重要。
沒有清楚的任務範圍時,常見結果是:
- 主流程已經報錯
- 背景任務還在跑
- socket 沒關
- 測試偶爾過,偶爾卡住
AnyIO 的 task group 讓錯誤傳播更有秩序。
你可以把它想成非同步版本的 with open(...)。
進入範圍,開始工作。
離開範圍,清理乾淨。
六. fail_after:timeout 要有明確範圍 #
很多人寫 timeout 會想到 asyncio.wait_for()。
AnyIO 也有自己的方式:fail_after()。
# timeout_demo.py
import anyio
async def call_slow_api() -> str:
await anyio.sleep(2)
return "ok"
async def main() -> None:
try:
with anyio.fail_after(0.5):
result = await call_slow_api()
print(result)
except TimeoutError:
print("timeout: API took too long")
if __name__ == "__main__":
anyio.run(main)
fail_after(0.5) 的意思是:
在這個 scope 裡,最多只能跑 0.5 秒。
時間到了,就取消裡面的工作,然後丟出 TimeoutError。
這種寫法的好處是範圍很清楚。
with anyio.fail_after(0.5):
...
這幾行裡面就是 timeout 的管轄範圍。
七. move_on_after:超時就跳過 #
有時候 timeout 不代表錯誤。
例如快取慢了,這輪先用 fallback。
這時可以用 move_on_after()。
# soft_timeout_demo.py
import anyio
async def load_optional_cache() -> dict[str, str]:
await anyio.sleep(2)
return {"source": "cache"}
async def main() -> None:
cache: dict[str, str] | None = None
with anyio.move_on_after(0.3) as scope:
cache = await load_optional_cache()
if scope.cancelled_caught:
print("cache too slow, use fallback")
cache = {"source": "fallback"}
print(cache)
if __name__ == "__main__":
anyio.run(main)
move_on_after() 不會在 timeout 時丟出例外。
它會取消 scope 裡的工作,然後繼續往下走。
所以你可以明確判斷:
if scope.cancelled_caught:
...
不是每個慢服務都值得讓整個程式爆炸。
八. 實戰:批次呼叫 API,限制並發數 #
假設我們要呼叫多個 API endpoint,但不想一次開太多連線。
可以用 anyio.Semaphore 控制並發數。
# fetch_many.py
from dataclasses import dataclass
import anyio
import httpx
@dataclass
class FetchResult:
url: str
status_code: int
size: int
async def fetch_one(
client: httpx.AsyncClient,
limiter: anyio.Semaphore,
url: str,
results: list[FetchResult],
) -> None:
async with limiter:
with anyio.fail_after(5):
response = await client.get(url)
response.raise_for_status()
results.append(
FetchResult(
url=url,
status_code=response.status_code,
size=len(response.content),
)
)
async def fetch_many(urls: list[str], limit: int = 3) -> list[FetchResult]:
results: list[FetchResult] = []
limiter = anyio.Semaphore(limit)
async with httpx.AsyncClient() as client:
async with anyio.create_task_group() as tg:
for url in urls:
tg.start_soon(fetch_one, client, limiter, url, results)
return results
這段有幾個實務重點。
httpx.AsyncClient() 用 async with 管理連線池。
TaskGroup 管理這批 request 的生命週期。
Semaphore 控制同時最多幾個 request。
fail_after(5) 保護單一 request 不要無限等待。
如果其中一個 request 失敗,整個 task group 會把其他還沒完成的任務取消。
這種行為很適合「所有資料都要成功,結果才算成功」的任務。
九. 部分失敗也要收集結果 #
有些批次任務不能因為一筆失敗就全停。
例如同步 100 個使用者資料,其中 3 個 API 回 404,其他 97 個仍然應該保存。
這時可以把錯誤轉成資料。
# collect_errors.py
from dataclasses import dataclass
import anyio
import httpx
@dataclass
class JobResult:
url: str
ok: bool
message: str
async def safe_fetch_one(
client: httpx.AsyncClient,
limiter: anyio.Semaphore,
url: str,
results: list[JobResult],
) -> None:
async with limiter:
try:
with anyio.fail_after(5):
response = await client.get(url)
response.raise_for_status()
except TimeoutError:
results.append(JobResult(url, False, "timeout"))
except httpx.HTTPError as exc:
results.append(JobResult(url, False, f"http error: {exc!s}"))
else:
results.append(JobResult(url, True, f"{len(response.content)} bytes"))
這裡的設計差異在於:
safe_fetch_one() 不讓單一失敗逃出 task group。
它把錯誤轉成資料。
所以整批任務會繼續跑完。
拍拍君會這樣判斷:
- 全部成功才有意義:讓錯誤逃出 task group
- 部分成功也有意義:在 worker 裡捕捉錯誤,回傳結構化結果
- 背景任務只是輔助:用
move_on_after()或明確 fallback
不要把所有錯誤都吞掉。
也不要讓一個可接受的小失敗拖死整批任務。
十. TaskGroup.start:等服務真的準備好 #
start_soon() 適合啟動一般背景任務。
但有些任務需要「啟動完成」訊號。
例如啟動本機測試 server、background listener,或 queue consumer。
這時可以用 tg.start() 搭配 task_status.started()。
# start_demo.py
import anyio
from anyio.abc import TaskStatus
async def background_indexer(
task_status: TaskStatus[str] = anyio.TASK_STATUS_IGNORED,
) -> None:
print("indexer booting")
await anyio.sleep(0.2)
task_status.started("ready")
while True:
await anyio.sleep(1)
print("indexer tick")
async def main() -> None:
async with anyio.create_task_group() as tg:
status = await tg.start(background_indexer)
print(f"indexer status: {status}")
await anyio.sleep(2.5)
tg.cancel_scope.cancel()
if __name__ == "__main__":
anyio.run(main)
tg.start() 會等到 worker 呼叫 task_status.started(...) 才繼續。
如果 worker 初始化失敗,主流程也會知道。
這比「sleep 0.5 秒,應該好了吧」可靠太多。
那種寫法是測試不穩的老朋友。看起來沒事,其實只是在賭運氣。
十一. 測試 AnyIO 程式 #
AnyIO 的測試也很舒服。
前面已經安裝:
uv add --dev pytest pytest-anyio
接著寫測試。
# test_timeout.py
import anyio
import pytest
async def slow_add(a: int, b: int) -> int:
await anyio.sleep(1)
return a + b
@pytest.mark.anyio
async def test_timeout() -> None:
with pytest.raises(TimeoutError):
with anyio.fail_after(0.1):
await slow_add(1, 2)
如果只想跑 asyncio backend,可以在 conftest.py 指定:
# conftest.py
import pytest
@pytest.fixture
def anyio_backend() -> str:
return "asyncio"
如果你在寫函式庫,也可以同時跑 asyncio 和 trio。
# conftest.py
import pytest
@pytest.fixture(params=["asyncio", "trio"])
def anyio_backend(request: pytest.FixtureRequest) -> str:
return request.param
一般應用程式不用急著追求跨 backend。
但就算只跑 asyncio,pytest.mark.anyio 仍然是一個乾淨的 async test 入口。
結語 #
AnyIO 最值得學的地方,不是它多了一組新的 API。
而是它把非同步程式裡最容易混亂的部分整理成幾個清楚概念:
TaskGroup:一批任務的生命週期fail_after():超時就取消並報錯move_on_after():超時就跳過並繼續cancel_scope:明確表達取消範圍pytest.mark.anyio:乾淨測試 async code
如果你已經會基本 asyncio,下一步不要急著背更多 event loop 細節。
先把任務怎麼開始、怎麼失敗、怎麼取消、怎麼收尾想清楚。
程式會立刻安靜很多。
拍拍君覺得這就是 AnyIO 最可愛的地方:它不吵,但會把爛攤子收好。