快轉到主要內容
  1. 教學文章/

Python AnyIO 實戰:TaskGroup、取消管理與跨框架非同步工具

·6 分鐘· loading · loading · ·
Python AnyIO Async Asyncio Trio Developer-Tools
每日拍拍
作者
每日拍拍
科學家 X 科技宅宅
目錄
Python 學習 - 本文屬於一個選集。
§ 74: 本文

featured

一. 前言:非同步程式最難的不是 async
#

很多人第一次學 Python 非同步,會從 asyncio 開始。

你會寫 async def,也會用 await

接著你很快會遇到更麻煩的問題:

  • 有一個任務失敗時,其他任務要不要停?
  • timeout 發生時,背景工作有沒有真的被取消?
  • worker 卡住時,程式要怎麼優雅收尾?
  • 測試 async code 時,要怎麼避免 event loop 亂掉?

這些問題通常不是語法問題,而是任務生命週期問題。

今天拍拍君要介紹的 AnyIO,就是專門整理這些混亂的工具。

它不是要取代你已經會的 asyncio

比較好的理解方式是:

asyncio 是 Python 內建非同步基礎;AnyIO 是把 task group、取消、timeout 與跨 backend 這些麻煩事包得更乾淨的抽象層。

如果你還沒有碰過非同步,可以先看:

這篇會假設你已經知道 coroutine、await、event loop 的基本概念,然後直接進實戰。

二. AnyIO 適合什麼場景?
#

AnyIO 常出現在三種地方。

第一種是你在寫需要同時處理多個 I/O 任務的工具。

例如批次呼叫 API、同步多個資料來源、寫 CLI crawler,或寫 background worker。

第二種是你在用 FastAPI、Starlette、HTTPX 這類 async 生態工具。

這些工具的設計觀念,很多都和 AnyIO 的 task group、cancel scope 很接近。

第三種是你在寫函式庫,希望 async 程式不要只綁死在 asyncio

AnyIO 可以跑在不同 backend 上,最常見的是:

  • asyncio
  • trio

一般應用程式不一定需要跨 backend。

但就算最後只跑 asyncio,AnyIO 的寫法也會逼你把任務範圍寫清楚。

這一點很值得。

三. 安裝與第一個範例
#

先建立專案。

uv init anyio-demo
cd anyio-demo
uv add anyio httpx
uv add --dev pytest pytest-anyio

不用 uv 的話:

python -m pip install anyio httpx pytest pytest-anyio

第一個程式:

# hello_anyio.py
import anyio


async def say(name: str) -> None:
    await anyio.sleep(0.2)
    print(f"hello, {name}")


async def main() -> None:
    await say("拍拍君")


if __name__ == "__main__":
    anyio.run(main)

執行:

uv run python hello_anyio.py

輸出:

hello, 拍拍君

注意 anyio.run() 接的是 async function 本身:

anyio.run(main)

不是 coroutine object:

anyio.run(main())

第一次寫很容易手滑。非同步語法就是這樣,有時候比它看起來還挑剔。

四. TaskGroup:讓一批任務有清楚邊界
#

AnyIO 最重要的概念之一是 TaskGroup

它的意思是:

在同一個 task group 裡啟動的任務,都屬於這個範圍;離開範圍時,任務也必須被處理乾淨。

看例子。

# task_group_demo.py
import anyio


async def worker(name: str, delay: float) -> None:
    await anyio.sleep(delay)
    print(f"{name} done")


async def main() -> None:
    async with anyio.create_task_group() as tg:
        tg.start_soon(worker, "download", 0.4)
        tg.start_soon(worker, "parse", 0.2)
        tg.start_soon(worker, "save", 0.3)

    print("all done")


if __name__ == "__main__":
    anyio.run(main)

可能輸出:

parse done
save done
download done
all done

tg.start_soon() 會啟動背景任務。

async with 區塊不會在任務還沒結束時直接離開。

所有任務都完成後,才會印出 all done

這比手動存 task list 再 await asyncio.gather(...) 更有結構。

async with anyio.create_task_group() as tg:
    tg.start_soon(...)
    tg.start_soon(...)

你一眼就看得出來:這批任務由這個區塊負責。

五. 任務失敗時,其他任務會怎麼辦?
#

非同步程式真正麻煩的地方,不是全部成功。

全部成功誰不會,連 print() 都會。

麻煩的是其中一個任務失敗。

# failure_demo.py
import anyio


async def slow_worker() -> None:
    try:
        print("slow worker started")
        await anyio.sleep(5)
        print("slow worker finished")
    finally:
        print("slow worker cleanup")


async def broken_worker() -> None:
    await anyio.sleep(0.3)
    raise RuntimeError("API token expired")


async def main() -> None:
    async with anyio.create_task_group() as tg:
        tg.start_soon(slow_worker)
        tg.start_soon(broken_worker)


if __name__ == "__main__":
    anyio.run(main)

broken_worker() 失敗時,task group 會取消同一組裡還沒完成的任務。

所以你會看到 slow worker cleanup

這個行為很重要。

沒有清楚的任務範圍時,常見結果是:

  • 主流程已經報錯
  • 背景任務還在跑
  • socket 沒關
  • 測試偶爾過,偶爾卡住

AnyIO 的 task group 讓錯誤傳播更有秩序。

你可以把它想成非同步版本的 with open(...)

進入範圍,開始工作。

離開範圍,清理乾淨。

六. fail_after:timeout 要有明確範圍
#

很多人寫 timeout 會想到 asyncio.wait_for()

AnyIO 也有自己的方式:fail_after()

# timeout_demo.py
import anyio


async def call_slow_api() -> str:
    await anyio.sleep(2)
    return "ok"


async def main() -> None:
    try:
        with anyio.fail_after(0.5):
            result = await call_slow_api()
            print(result)
    except TimeoutError:
        print("timeout: API took too long")


if __name__ == "__main__":
    anyio.run(main)

fail_after(0.5) 的意思是:

在這個 scope 裡,最多只能跑 0.5 秒。

時間到了,就取消裡面的工作,然後丟出 TimeoutError

這種寫法的好處是範圍很清楚。

with anyio.fail_after(0.5):
    ...

這幾行裡面就是 timeout 的管轄範圍。

七. move_on_after:超時就跳過
#

有時候 timeout 不代表錯誤。

例如快取慢了,這輪先用 fallback。

這時可以用 move_on_after()

# soft_timeout_demo.py
import anyio


async def load_optional_cache() -> dict[str, str]:
    await anyio.sleep(2)
    return {"source": "cache"}


async def main() -> None:
    cache: dict[str, str] | None = None

    with anyio.move_on_after(0.3) as scope:
        cache = await load_optional_cache()

    if scope.cancelled_caught:
        print("cache too slow, use fallback")
        cache = {"source": "fallback"}

    print(cache)


if __name__ == "__main__":
    anyio.run(main)

move_on_after() 不會在 timeout 時丟出例外。

它會取消 scope 裡的工作,然後繼續往下走。

所以你可以明確判斷:

if scope.cancelled_caught:
    ...

不是每個慢服務都值得讓整個程式爆炸。

八. 實戰:批次呼叫 API,限制並發數
#

假設我們要呼叫多個 API endpoint,但不想一次開太多連線。

可以用 anyio.Semaphore 控制並發數。

# fetch_many.py
from dataclasses import dataclass

import anyio
import httpx


@dataclass
class FetchResult:
    url: str
    status_code: int
    size: int


async def fetch_one(
    client: httpx.AsyncClient,
    limiter: anyio.Semaphore,
    url: str,
    results: list[FetchResult],
) -> None:
    async with limiter:
        with anyio.fail_after(5):
            response = await client.get(url)
            response.raise_for_status()

        results.append(
            FetchResult(
                url=url,
                status_code=response.status_code,
                size=len(response.content),
            )
        )


async def fetch_many(urls: list[str], limit: int = 3) -> list[FetchResult]:
    results: list[FetchResult] = []
    limiter = anyio.Semaphore(limit)

    async with httpx.AsyncClient() as client:
        async with anyio.create_task_group() as tg:
            for url in urls:
                tg.start_soon(fetch_one, client, limiter, url, results)

    return results

這段有幾個實務重點。

httpx.AsyncClient()async with 管理連線池。

TaskGroup 管理這批 request 的生命週期。

Semaphore 控制同時最多幾個 request。

fail_after(5) 保護單一 request 不要無限等待。

如果其中一個 request 失敗,整個 task group 會把其他還沒完成的任務取消。

這種行為很適合「所有資料都要成功,結果才算成功」的任務。

九. 部分失敗也要收集結果
#

有些批次任務不能因為一筆失敗就全停。

例如同步 100 個使用者資料,其中 3 個 API 回 404,其他 97 個仍然應該保存。

這時可以把錯誤轉成資料。

# collect_errors.py
from dataclasses import dataclass

import anyio
import httpx


@dataclass
class JobResult:
    url: str
    ok: bool
    message: str


async def safe_fetch_one(
    client: httpx.AsyncClient,
    limiter: anyio.Semaphore,
    url: str,
    results: list[JobResult],
) -> None:
    async with limiter:
        try:
            with anyio.fail_after(5):
                response = await client.get(url)
                response.raise_for_status()
        except TimeoutError:
            results.append(JobResult(url, False, "timeout"))
        except httpx.HTTPError as exc:
            results.append(JobResult(url, False, f"http error: {exc!s}"))
        else:
            results.append(JobResult(url, True, f"{len(response.content)} bytes"))

這裡的設計差異在於:

safe_fetch_one() 不讓單一失敗逃出 task group。

它把錯誤轉成資料。

所以整批任務會繼續跑完。

拍拍君會這樣判斷:

  • 全部成功才有意義:讓錯誤逃出 task group
  • 部分成功也有意義:在 worker 裡捕捉錯誤,回傳結構化結果
  • 背景任務只是輔助:用 move_on_after() 或明確 fallback

不要把所有錯誤都吞掉。

也不要讓一個可接受的小失敗拖死整批任務。

十. TaskGroup.start:等服務真的準備好
#

start_soon() 適合啟動一般背景任務。

但有些任務需要「啟動完成」訊號。

例如啟動本機測試 server、background listener,或 queue consumer。

這時可以用 tg.start() 搭配 task_status.started()

# start_demo.py
import anyio
from anyio.abc import TaskStatus


async def background_indexer(
    task_status: TaskStatus[str] = anyio.TASK_STATUS_IGNORED,
) -> None:
    print("indexer booting")
    await anyio.sleep(0.2)
    task_status.started("ready")

    while True:
        await anyio.sleep(1)
        print("indexer tick")


async def main() -> None:
    async with anyio.create_task_group() as tg:
        status = await tg.start(background_indexer)
        print(f"indexer status: {status}")

        await anyio.sleep(2.5)
        tg.cancel_scope.cancel()


if __name__ == "__main__":
    anyio.run(main)

tg.start() 會等到 worker 呼叫 task_status.started(...) 才繼續。

如果 worker 初始化失敗,主流程也會知道。

這比「sleep 0.5 秒,應該好了吧」可靠太多。

那種寫法是測試不穩的老朋友。看起來沒事,其實只是在賭運氣。

十一. 測試 AnyIO 程式
#

AnyIO 的測試也很舒服。

前面已經安裝:

uv add --dev pytest pytest-anyio

接著寫測試。

# test_timeout.py
import anyio
import pytest


async def slow_add(a: int, b: int) -> int:
    await anyio.sleep(1)
    return a + b


@pytest.mark.anyio
async def test_timeout() -> None:
    with pytest.raises(TimeoutError):
        with anyio.fail_after(0.1):
            await slow_add(1, 2)

如果只想跑 asyncio backend,可以在 conftest.py 指定:

# conftest.py
import pytest


@pytest.fixture
def anyio_backend() -> str:
    return "asyncio"

如果你在寫函式庫,也可以同時跑 asynciotrio

# conftest.py
import pytest


@pytest.fixture(params=["asyncio", "trio"])
def anyio_backend(request: pytest.FixtureRequest) -> str:
    return request.param

一般應用程式不用急著追求跨 backend。

但就算只跑 asynciopytest.mark.anyio 仍然是一個乾淨的 async test 入口。

結語
#

AnyIO 最值得學的地方,不是它多了一組新的 API。

而是它把非同步程式裡最容易混亂的部分整理成幾個清楚概念:

  • TaskGroup:一批任務的生命週期
  • fail_after():超時就取消並報錯
  • move_on_after():超時就跳過並繼續
  • cancel_scope:明確表達取消範圍
  • pytest.mark.anyio:乾淨測試 async code

如果你已經會基本 asyncio,下一步不要急著背更多 event loop 細節。

先把任務怎麼開始、怎麼失敗、怎麼取消、怎麼收尾想清楚。

程式會立刻安靜很多。

拍拍君覺得這就是 AnyIO 最可愛的地方:它不吵,但會把爛攤子收好。

延伸閱讀
#

Python 學習 - 本文屬於一個選集。
§ 74: 本文

相關文章

Rich Logging Dashboard 實戰:進度、表格與 Log Console 整合
·7 分鐘· loading · loading
Python Rich Logging Cli Dashboard Developer-Tools
Python uv scripts 實戰:PEP 723、inline dependencies 與單檔工具
·6 分鐘· loading · loading
Python Uv PEP 723 Script Developer-Tools Automation
Python pytest fixtures 進階:conftest、factory 與測試資料管理
·8 分鐘· loading · loading
Python Pytest Fixtures Testing Conftest Monkeypatch Developer-Tools
Python SQLAlchemy 2.0 實戰:Typed ORM、Session 與查詢模式
·9 分鐘· loading · loading
Python SQLAlchemy ORM Database SQLite Developer-Tools
FastAPI + Streamlit 實戰:API 後端與互動前端分工
·9 分鐘· loading · loading
Python Fastapi Streamlit Api Frontend Developer-Tools
Python pydantic-settings 實戰:型別安全管理 .env 與設定檔
·6 分鐘· loading · loading
Python Pydantic Pydantic-Settings Dotenv Configuration Developer-Tools