一、前言 #
上週拍拍君跟大家聊了 multiprocessing,學會了怎麼用多「進程」(process) 突破 GIL 做真正的平行運算。但你有沒有想過——不是每個任務都需要那麼重的武器?
想像一下這些場景:
- 🌐 同時發 50 個 HTTP 請求抓資料
- 📁 一邊讀檔一邊更新進度條
- 🖥️ GUI 程式裡跑背景任務不卡 UI
這些都是 I/O 密集型 任務,用 multiprocessing 開一堆進程太浪費資源了。這時候,threading(多執行緒) 才是正確的工具。
今天就來完整學會 Python 的 threading 模組,從基礎的 Thread 到進階的 Lock、Event、ThreadPoolExecutor,一次搞定!
🔑 一句話記住:CPU 密集 → multiprocessing,I/O 密集 → threading
二、threading vs multiprocessing:先搞清楚差異 #
在動手之前,讓拍拍君幫你把這兩兄弟的差異整理清楚:
multiprocessing threading
┌─────────────────┐ ┌─────────────────┐
│ Process A │ │ Thread 1 │
│ ┌───────────┐ │ │ ┌───────────┐ │
│ │ 獨立記憶體 │ │ │ │ 共享記憶體 │ │
│ │ 獨立 GIL │ │ │ │ 同一 GIL │ │
│ └───────────┘ │ │ └───────────┘ │
├─────────────────┤ │ Thread 2 │
│ Process B │ │ ┌───────────┐ │
│ ┌───────────┐ │ │ │ 共享記憶體 │ │
│ │ 獨立記憶體 │ │ │ │ 同一 GIL │ │
│ │ 獨立 GIL │ │ │ └───────────┘ │
│ └───────────┘ │ └─────────────────┘
└─────────────────┘ 一個 Process
多個 Process
| 項目 | threading | multiprocessing |
|---|---|---|
| 記憶體 | 共享(同一 process) | 獨立(各自 process) |
| GIL 限制 | 受限(同時只有一個 thread 跑 Python bytecode) | 不受限(各 process 有自己的 GIL) |
| 適合場景 | I/O 密集(網路、檔案) | CPU 密集(計算、壓縮) |
| 啟動成本 | 輕量、快速 | 較重(需 fork/spawn) |
| 資料共享 | 直接共用變數(但需同步) | 需用 Queue、Pipe、shared memory |
| Debug 難度 | 中(race condition) | 中高(跨進程除錯) |
三、Thread 基礎:你的第一個多執行緒程式 #
3.1 最簡單的用法 #
import threading
import time
def download_file(filename: str) -> None:
"""模擬下載檔案"""
print(f"⬇️ 開始下載 {filename}...")
time.sleep(2) # 模擬 I/O 等待
print(f"✅ {filename} 下載完成!")
# 建立執行緒
t1 = threading.Thread(target=download_file, args=("data_01.csv",))
t2 = threading.Thread(target=download_file, args=("data_02.csv",))
# 啟動
t1.start()
t2.start()
# 等待兩個都完成
t1.join()
t2.join()
print("🎉 全部下載完成!")
執行結果(兩個幾乎同時開始):
⬇️ 開始下載 data_01.csv...
⬇️ 開始下載 data_02.csv...
✅ data_01.csv 下載完成!
✅ data_02.csv 下載完成!
🎉 全部下載完成!
💡 如果用單執行緒,兩個
sleep(2)要等 4 秒。用 threading 只要 ~2 秒!
3.2 用 class 繼承 Thread #
如果你的執行緒邏輯比較複雜,可以繼承 Thread:
import threading
import time
class DataProcessor(threading.Thread):
def __init__(self, name: str, data: list):
super().__init__()
self.name = name
self.data = data
self.result = None
def run(self):
"""Thread 啟動時自動呼叫"""
print(f"🔧 {self.name} 開始處理 {len(self.data)} 筆資料...")
time.sleep(1) # 模擬處理
self.result = [x * 2 for x in self.data]
print(f"✅ {self.name} 處理完成!")
# 使用
p1 = DataProcessor("Worker-A", [1, 2, 3])
p2 = DataProcessor("Worker-B", [4, 5, 6])
p1.start()
p2.start()
p1.join()
p2.join()
print(f"結果 A: {p1.result}") # [2, 4, 6]
print(f"結果 B: {p2.result}") # [8, 10, 12]
四、Daemon Thread:背景執行緒 #
有些執行緒是「附屬的」——主程式結束時,它們也應該自動結束。這就是 daemon thread:
import threading
import time
def heartbeat():
"""每秒發一次心跳"""
while True:
print("💓 heartbeat...")
time.sleep(1)
# 設為 daemon thread
monitor = threading.Thread(target=heartbeat, daemon=True)
monitor.start()
# 主程式做自己的事
print("🚀 主程式開始工作...")
time.sleep(3)
print("👋 主程式結束,daemon thread 會自動終止")
🚀 主程式開始工作...
💓 heartbeat...
💓 heartbeat...
💓 heartbeat...
👋 主程式結束,daemon thread 會自動終止
⚠️ daemon thread 不會等
finally或atexit,所以不要讓它做需要善後的事(像是寫檔到一半)。
五、同步機制:Lock、Event、Semaphore #
多執行緒共享記憶體是方便,但也是萬惡之源。來看看怎麼避免 race condition。
5.1 Lock:互斥鎖 #
import threading
counter = 0
lock = threading.Lock()
def increment(n: int):
global counter
for _ in range(n):
with lock: # 同時只有一個 thread 能進入
counter += 1
threads = [threading.Thread(target=increment, args=(100_000,)) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"最終計數: {counter}") # 保證 500000
🚨 不用 Lock 的話,
counter += 1不是原子操作,五個 thread 同時改,最終結果可能小於 500000!
5.2 RLock:可重入鎖 #
如果同一個 thread 需要多次取得鎖(例如遞迴呼叫),用 RLock:
import threading
rlock = threading.RLock()
def recursive_task(n: int):
with rlock:
if n > 0:
print(f" 層級 {n}")
recursive_task(n - 1) # 同一 thread 可以再次取得鎖
t = threading.Thread(target=recursive_task, args=(3,))
t.start()
t.join()
5.3 Event:執行緒間的信號 #
Event 讓一個 thread 通知其他 thread「某件事發生了」:
import threading
import time
data_ready = threading.Event()
def producer():
print("📦 生產者:準備資料中...")
time.sleep(2)
print("📦 生產者:資料準備好了!")
data_ready.set() # 發送信號
def consumer(name: str):
print(f"🛒 {name}:等待資料...")
data_ready.wait() # 阻塞直到 event 被 set
print(f"🛒 {name}:收到資料,開始處理!")
threads = [
threading.Thread(target=producer),
threading.Thread(target=consumer, args=("消費者 A",)),
threading.Thread(target=consumer, args=("消費者 B",)),
]
for t in threads:
t.start()
for t in threads:
t.join()
5.4 Semaphore:限制同時存取數量 #
import threading
import time
# 最多同時 3 個連線
connection_pool = threading.Semaphore(3)
def access_database(worker_id: int):
with connection_pool:
print(f"🔗 Worker {worker_id} 取得連線")
time.sleep(1) # 模擬 DB 操作
print(f"🔓 Worker {worker_id} 釋放連線")
threads = [threading.Thread(target=access_database, args=(i,)) for i in range(8)]
for t in threads:
t.start()
for t in threads:
t.join()
六、ThreadPoolExecutor:現代寫法(推薦) #
手動管理 Thread 太囉唆?Python 3.2+ 的 concurrent.futures 提供了更優雅的介面:
6.1 基本用法 #
from concurrent.futures import ThreadPoolExecutor
import time
def fetch_url(url: str) -> str:
"""模擬抓取網頁"""
time.sleep(1)
return f"📄 {url} 的內容 ({len(url)} bytes)"
urls = [
"https://example.com/api/users",
"https://example.com/api/posts",
"https://example.com/api/comments",
"https://example.com/api/tags",
"https://example.com/api/categories",
]
# 最多 3 個 worker 同時跑
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(fetch_url, urls)
for result in results:
print(result)
6.2 submit + as_completed:誰先完成先處理 #
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
def download(file_id: int) -> dict:
delay = random.uniform(0.5, 3.0)
time.sleep(delay)
return {"id": file_id, "time": round(delay, 2)}
with ThreadPoolExecutor(max_workers=4) as executor:
# submit 回傳 Future 物件
futures = {
executor.submit(download, i): i
for i in range(6)
}
for future in as_completed(futures):
file_id = futures[future]
try:
result = future.result()
print(f"✅ 檔案 {result['id']} 完成(耗時 {result['time']}s)")
except Exception as e:
print(f"❌ 檔案 {file_id} 失敗:{e}")
6.3 搭配 tqdm 顯示進度 #
搭配之前學過的 tqdm,讓多執行緒下載也有進度條:
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time
import random
def process_item(item_id: int) -> int:
time.sleep(random.uniform(0.1, 0.5))
return item_id * 2
items = list(range(100))
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(process_item, i) for i in items]
results = []
with tqdm(total=len(items), desc="處理中") as pbar:
for future in as_completed(futures):
results.append(future.result())
pbar.update(1)
print(f"\n✅ 處理完成,共 {len(results)} 筆")
七、實戰範例:多執行緒網頁爬蟲 #
來個實際一點的例子——用 threading 加速爬蟲:
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import time
import random
@dataclass
class PageResult:
url: str
status: int
content_length: int
elapsed: float
def crawl_page(url: str) -> PageResult:
"""模擬爬取一個網頁"""
start = time.time()
time.sleep(random.uniform(0.2, 1.5)) # 模擬網路延遲
# 模擬偶爾失敗
if random.random() < 0.1:
raise ConnectionError(f"無法連線到 {url}")
return PageResult(
url=url,
status=200,
content_length=random.randint(1000, 50000),
elapsed=round(time.time() - start, 2),
)
def run_crawler(urls: list[str], max_workers: int = 5):
"""多執行緒爬蟲"""
results: list[PageResult] = []
errors: list[tuple[str, str]] = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {
executor.submit(crawl_page, url): url
for url in urls
}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results.append(result)
print(f" ✅ {result.url} ({result.elapsed}s)")
except Exception as e:
errors.append((url, str(e)))
print(f" ❌ {url}: {e}")
return results, errors
# 模擬 20 個 URL
urls = [f"https://chatptt.com/page/{i}" for i in range(1, 21)]
print("🕷️ 拍拍爬蟲啟動!\n")
start = time.time()
results, errors = run_crawler(urls, max_workers=5)
total_time = round(time.time() - start, 2)
print(f"\n📊 爬取結果:")
print(f" 成功: {len(results)} 頁")
print(f" 失敗: {len(errors)} 頁")
print(f" 耗時: {total_time}s(循序預估: ~{len(urls) * 0.85:.0f}s)")
八、threading 的陷阱與最佳實踐 #
🚫 常見陷阱 #
1. GIL 讓 CPU 密集任務沒加速效果:
# ❌ 這樣用 threading 不會更快!
def cpu_heavy(n):
return sum(i * i for i in range(n))
# CPU 密集請改用 multiprocessing
from multiprocessing import Pool
with Pool(4) as p:
results = p.map(cpu_heavy, [10**7] * 4)
2. 忘記用 Lock 保護共享狀態:
# ❌ 危險!多個 thread 同時讀寫 list
shared_list = []
def bad_append(item):
shared_list.append(item) # append 本身是 thread-safe
# 但如果是 read-modify-write 就不行:
# shared_list[0] = shared_list[0] + 1 ← race condition!
3. Deadlock(死結):
# ❌ 兩個 lock 交叉取用 → deadlock
lock_a = threading.Lock()
lock_b = threading.Lock()
def task_1():
with lock_a:
time.sleep(0.1)
with lock_b: # 等 task_2 釋放 lock_b
print("task_1")
def task_2():
with lock_b:
time.sleep(0.1)
with lock_a: # 等 task_1 釋放 lock_a
print("task_2")
# ✅ 解法:永遠用固定順序取 lock(先 a 再 b)
✅ 最佳實踐 #
- I/O 密集 → threading,CPU 密集 → multiprocessing
- 優先用
ThreadPoolExecutor,別手動管理 Thread - 共享狀態盡量少,用 Queue 傳遞資料更安全
- Lock 範圍越小越好(只包住真正需要同步的程式碼)
- 設定 timeout,避免永遠等下去
- 考慮 asyncio——如果是純 I/O 且量大,asyncio 可能更適合
九、threading vs asyncio:怎麼選? #
既然提到了,拍拍君也來簡單比較一下:
| 項目 | threading | asyncio |
|---|---|---|
| 模型 | 搶佔式(OS 排程) | 協作式(event loop) |
| 語法 | 一般 function | async/await |
| 適合 | 阻塞型 I/O(舊 API、DB driver) | 原生 async 的 I/O(aiohttp 等) |
| 複雜度 | 中(需處理 race condition) | 中(需處理 event loop) |
| Thread 數 | 受 OS 限制(通常數百) | 可輕鬆萬級 coroutine |
簡單說:如果你用的 library 支援 async → 用 asyncio。如果是傳統阻塞式 API → 用 threading。
結語 #
今天學了一整套 Python threading 的用法:
- Thread 基礎建立與
join()等待 - Daemon thread 的背景執行
- Lock / RLock / Event / Semaphore 同步機制
- ThreadPoolExecutor 現代高階介面
- 陷阱與最佳實踐
記住那個黃金法則:I/O 密集用 threading,CPU 密集用 multiprocessing。搞混了不會出錯,但會浪費效能。
多執行緒程式設計的難度不在「開 thread」,而在「thread 之間怎麼安全地共享資料」。善用 Lock 和 Queue,你就能寫出既快又穩的並行程式!
下次拍拍君會再帶更多 Python 實戰技巧,我們下篇見! 🐍✨
延伸閱讀 #
- 📖 Python 官方文件:threading
- 📖 Python 官方文件:concurrent.futures
- 📝 Python multiprocessing:突破 GIL 的平行運算 — 拍拍君的 multiprocessing 教學
- 📝 Python asyncio:非同步程式設計入門 — 拍拍君的 asyncio 教學