快轉到主要內容
  1. 教學文章/

Python threading:多執行緒並行的正確打開方式

·8 分鐘· loading · loading · ·
Python Threading Concurrency 並行 GIL
每日拍拍
作者
每日拍拍
科學家 X 科技宅宅
目錄
Python 學習 - 本文屬於一個選集。
§ 34: 本文

一、前言
#

上週拍拍君跟大家聊了 multiprocessing,學會了怎麼用多「進程」(process) 突破 GIL 做真正的平行運算。但你有沒有想過——不是每個任務都需要那麼重的武器?

想像一下這些場景:

  • 🌐 同時發 50 個 HTTP 請求抓資料
  • 📁 一邊讀檔一邊更新進度條
  • 🖥️ GUI 程式裡跑背景任務不卡 UI

這些都是 I/O 密集型 任務,用 multiprocessing 開一堆進程太浪費資源了。這時候,threading(多執行緒) 才是正確的工具。

今天就來完整學會 Python 的 threading 模組,從基礎的 Thread 到進階的 LockEventThreadPoolExecutor,一次搞定!

🔑 一句話記住:CPU 密集 → multiprocessing,I/O 密集 → threading


二、threading vs multiprocessing:先搞清楚差異
#

在動手之前,讓拍拍君幫你把這兩兄弟的差異整理清楚:

multiprocessing                    threading
┌─────────────────┐              ┌─────────────────┐
│  Process A      │              │    Thread 1      │
│  ┌───────────┐  │              │  ┌───────────┐   │
│  │ 獨立記憶體 │  │              │  │ 共享記憶體 │   │
│  │ 獨立 GIL  │  │              │  │ 同一 GIL  │   │
│  └───────────┘  │              │  └───────────┘   │
├─────────────────┤              │    Thread 2      │
│  Process B      │              │  ┌───────────┐   │
│  ┌───────────┐  │              │  │ 共享記憶體 │   │
│  │ 獨立記憶體 │  │              │  │ 同一 GIL  │   │
│  │ 獨立 GIL  │  │              │  └───────────┘   │
│  └───────────┘  │              └─────────────────┘
└─────────────────┘                 一個 Process
   多個 Process
項目 threading multiprocessing
記憶體 共享(同一 process) 獨立(各自 process)
GIL 限制 受限(同時只有一個 thread 跑 Python bytecode) 不受限(各 process 有自己的 GIL)
適合場景 I/O 密集(網路、檔案) CPU 密集(計算、壓縮)
啟動成本 輕量、快速 較重(需 fork/spawn)
資料共享 直接共用變數(但需同步) 需用 Queue、Pipe、shared memory
Debug 難度 中(race condition) 中高(跨進程除錯)

三、Thread 基礎:你的第一個多執行緒程式
#

3.1 最簡單的用法
#

import threading
import time

def download_file(filename: str) -> None:
    """模擬下載檔案"""
    print(f"⬇️  開始下載 {filename}...")
    time.sleep(2)  # 模擬 I/O 等待
    print(f"✅ {filename} 下載完成!")

# 建立執行緒
t1 = threading.Thread(target=download_file, args=("data_01.csv",))
t2 = threading.Thread(target=download_file, args=("data_02.csv",))

# 啟動
t1.start()
t2.start()

# 等待兩個都完成
t1.join()
t2.join()

print("🎉 全部下載完成!")

執行結果(兩個幾乎同時開始):

⬇️  開始下載 data_01.csv...
⬇️  開始下載 data_02.csv...
✅ data_01.csv 下載完成!
✅ data_02.csv 下載完成!
🎉 全部下載完成!

💡 如果用單執行緒,兩個 sleep(2) 要等 4 秒。用 threading 只要 ~2 秒!

3.2 用 class 繼承 Thread
#

如果你的執行緒邏輯比較複雜,可以繼承 Thread

import threading
import time

class DataProcessor(threading.Thread):
    def __init__(self, name: str, data: list):
        super().__init__()
        self.name = name
        self.data = data
        self.result = None

    def run(self):
        """Thread 啟動時自動呼叫"""
        print(f"🔧 {self.name} 開始處理 {len(self.data)} 筆資料...")
        time.sleep(1)  # 模擬處理
        self.result = [x * 2 for x in self.data]
        print(f"✅ {self.name} 處理完成!")

# 使用
p1 = DataProcessor("Worker-A", [1, 2, 3])
p2 = DataProcessor("Worker-B", [4, 5, 6])

p1.start()
p2.start()

p1.join()
p2.join()

print(f"結果 A: {p1.result}")  # [2, 4, 6]
print(f"結果 B: {p2.result}")  # [8, 10, 12]

四、Daemon Thread:背景執行緒
#

有些執行緒是「附屬的」——主程式結束時,它們也應該自動結束。這就是 daemon thread

import threading
import time

def heartbeat():
    """每秒發一次心跳"""
    while True:
        print("💓 heartbeat...")
        time.sleep(1)

# 設為 daemon thread
monitor = threading.Thread(target=heartbeat, daemon=True)
monitor.start()

# 主程式做自己的事
print("🚀 主程式開始工作...")
time.sleep(3)
print("👋 主程式結束,daemon thread 會自動終止")
🚀 主程式開始工作...
💓 heartbeat...
💓 heartbeat...
💓 heartbeat...
👋 主程式結束,daemon thread 會自動終止

⚠️ daemon thread 不會等 finallyatexit,所以不要讓它做需要善後的事(像是寫檔到一半)。


五、同步機制:Lock、Event、Semaphore
#

多執行緒共享記憶體是方便,但也是萬惡之源。來看看怎麼避免 race condition

5.1 Lock:互斥鎖
#

import threading

counter = 0
lock = threading.Lock()

def increment(n: int):
    global counter
    for _ in range(n):
        with lock:  # 同時只有一個 thread 能進入
            counter += 1

threads = [threading.Thread(target=increment, args=(100_000,)) for _ in range(5)]

for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"最終計數: {counter}")  # 保證 500000

🚨 不用 Lock 的話counter += 1 不是原子操作,五個 thread 同時改,最終結果可能小於 500000!

5.2 RLock:可重入鎖
#

如果同一個 thread 需要多次取得鎖(例如遞迴呼叫),用 RLock

import threading

rlock = threading.RLock()

def recursive_task(n: int):
    with rlock:
        if n > 0:
            print(f"  層級 {n}")
            recursive_task(n - 1)  # 同一 thread 可以再次取得鎖

t = threading.Thread(target=recursive_task, args=(3,))
t.start()
t.join()

5.3 Event:執行緒間的信號
#

Event 讓一個 thread 通知其他 thread「某件事發生了」:

import threading
import time

data_ready = threading.Event()

def producer():
    print("📦 生產者:準備資料中...")
    time.sleep(2)
    print("📦 生產者:資料準備好了!")
    data_ready.set()  # 發送信號

def consumer(name: str):
    print(f"🛒 {name}:等待資料...")
    data_ready.wait()  # 阻塞直到 event 被 set
    print(f"🛒 {name}:收到資料,開始處理!")

threads = [
    threading.Thread(target=producer),
    threading.Thread(target=consumer, args=("消費者 A",)),
    threading.Thread(target=consumer, args=("消費者 B",)),
]

for t in threads:
    t.start()
for t in threads:
    t.join()

5.4 Semaphore:限制同時存取數量
#

import threading
import time

# 最多同時 3 個連線
connection_pool = threading.Semaphore(3)

def access_database(worker_id: int):
    with connection_pool:
        print(f"🔗 Worker {worker_id} 取得連線")
        time.sleep(1)  # 模擬 DB 操作
        print(f"🔓 Worker {worker_id} 釋放連線")

threads = [threading.Thread(target=access_database, args=(i,)) for i in range(8)]

for t in threads:
    t.start()
for t in threads:
    t.join()

六、ThreadPoolExecutor:現代寫法(推薦)
#

手動管理 Thread 太囉唆?Python 3.2+ 的 concurrent.futures 提供了更優雅的介面:

6.1 基本用法
#

from concurrent.futures import ThreadPoolExecutor
import time

def fetch_url(url: str) -> str:
    """模擬抓取網頁"""
    time.sleep(1)
    return f"📄 {url} 的內容 ({len(url)} bytes)"

urls = [
    "https://example.com/api/users",
    "https://example.com/api/posts",
    "https://example.com/api/comments",
    "https://example.com/api/tags",
    "https://example.com/api/categories",
]

# 最多 3 個 worker 同時跑
with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(fetch_url, urls)

for result in results:
    print(result)

6.2 submit + as_completed:誰先完成先處理
#

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def download(file_id: int) -> dict:
    delay = random.uniform(0.5, 3.0)
    time.sleep(delay)
    return {"id": file_id, "time": round(delay, 2)}

with ThreadPoolExecutor(max_workers=4) as executor:
    # submit 回傳 Future 物件
    futures = {
        executor.submit(download, i): i
        for i in range(6)
    }

    for future in as_completed(futures):
        file_id = futures[future]
        try:
            result = future.result()
            print(f"✅ 檔案 {result['id']} 完成(耗時 {result['time']}s)")
        except Exception as e:
            print(f"❌ 檔案 {file_id} 失敗:{e}")

6.3 搭配 tqdm 顯示進度
#

搭配之前學過的 tqdm,讓多執行緒下載也有進度條:

from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time
import random

def process_item(item_id: int) -> int:
    time.sleep(random.uniform(0.1, 0.5))
    return item_id * 2

items = list(range(100))

with ThreadPoolExecutor(max_workers=8) as executor:
    futures = [executor.submit(process_item, i) for i in items]

    results = []
    with tqdm(total=len(items), desc="處理中") as pbar:
        for future in as_completed(futures):
            results.append(future.result())
            pbar.update(1)

print(f"\n✅ 處理完成,共 {len(results)} 筆")

七、實戰範例:多執行緒網頁爬蟲
#

來個實際一點的例子——用 threading 加速爬蟲:

from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import time
import random

@dataclass
class PageResult:
    url: str
    status: int
    content_length: int
    elapsed: float

def crawl_page(url: str) -> PageResult:
    """模擬爬取一個網頁"""
    start = time.time()
    time.sleep(random.uniform(0.2, 1.5))  # 模擬網路延遲

    # 模擬偶爾失敗
    if random.random() < 0.1:
        raise ConnectionError(f"無法連線到 {url}")

    return PageResult(
        url=url,
        status=200,
        content_length=random.randint(1000, 50000),
        elapsed=round(time.time() - start, 2),
    )

def run_crawler(urls: list[str], max_workers: int = 5):
    """多執行緒爬蟲"""
    results: list[PageResult] = []
    errors: list[tuple[str, str]] = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_url = {
            executor.submit(crawl_page, url): url
            for url in urls
        }

        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                result = future.result()
                results.append(result)
                print(f"  ✅ {result.url} ({result.elapsed}s)")
            except Exception as e:
                errors.append((url, str(e)))
                print(f"  ❌ {url}: {e}")

    return results, errors

# 模擬 20 個 URL
urls = [f"https://chatptt.com/page/{i}" for i in range(1, 21)]

print("🕷️ 拍拍爬蟲啟動!\n")
start = time.time()
results, errors = run_crawler(urls, max_workers=5)
total_time = round(time.time() - start, 2)

print(f"\n📊 爬取結果:")
print(f"   成功: {len(results)} 頁")
print(f"   失敗: {len(errors)} 頁")
print(f"   耗時: {total_time}s(循序預估: ~{len(urls) * 0.85:.0f}s)")

八、threading 的陷阱與最佳實踐
#

🚫 常見陷阱
#

1. GIL 讓 CPU 密集任務沒加速效果:

# ❌ 這樣用 threading 不會更快!
def cpu_heavy(n):
    return sum(i * i for i in range(n))

# CPU 密集請改用 multiprocessing
from multiprocessing import Pool
with Pool(4) as p:
    results = p.map(cpu_heavy, [10**7] * 4)

2. 忘記用 Lock 保護共享狀態:

# ❌ 危險!多個 thread 同時讀寫 list
shared_list = []

def bad_append(item):
    shared_list.append(item)  # append 本身是 thread-safe
    # 但如果是 read-modify-write 就不行:
    # shared_list[0] = shared_list[0] + 1  ← race condition!

3. Deadlock(死結):

# ❌ 兩個 lock 交叉取用 → deadlock
lock_a = threading.Lock()
lock_b = threading.Lock()

def task_1():
    with lock_a:
        time.sleep(0.1)
        with lock_b:  # 等 task_2 釋放 lock_b
            print("task_1")

def task_2():
    with lock_b:
        time.sleep(0.1)
        with lock_a:  # 等 task_1 釋放 lock_a
            print("task_2")

# ✅ 解法:永遠用固定順序取 lock(先 a 再 b)

✅ 最佳實踐
#

  1. I/O 密集 → threading,CPU 密集 → multiprocessing
  2. 優先用 ThreadPoolExecutor,別手動管理 Thread
  3. 共享狀態盡量少,用 Queue 傳遞資料更安全
  4. Lock 範圍越小越好(只包住真正需要同步的程式碼)
  5. 設定 timeout,避免永遠等下去
  6. 考慮 asyncio——如果是純 I/O 且量大,asyncio 可能更適合

九、threading vs asyncio:怎麼選?
#

既然提到了,拍拍君也來簡單比較一下:

項目 threading asyncio
模型 搶佔式(OS 排程) 協作式(event loop)
語法 一般 function async/await
適合 阻塞型 I/O(舊 API、DB driver) 原生 async 的 I/O(aiohttp 等)
複雜度 中(需處理 race condition) 中(需處理 event loop)
Thread 數 受 OS 限制(通常數百) 可輕鬆萬級 coroutine

簡單說:如果你用的 library 支援 async → 用 asyncio。如果是傳統阻塞式 API → 用 threading。


結語
#

今天學了一整套 Python threading 的用法:

  • Thread 基礎建立與 join() 等待
  • Daemon thread 的背景執行
  • Lock / RLock / Event / Semaphore 同步機制
  • ThreadPoolExecutor 現代高階介面
  • 陷阱與最佳實踐

記住那個黃金法則:I/O 密集用 threading,CPU 密集用 multiprocessing。搞混了不會出錯,但會浪費效能。

多執行緒程式設計的難度不在「開 thread」,而在「thread 之間怎麼安全地共享資料」。善用 Lock 和 Queue,你就能寫出既快又穩的並行程式!

下次拍拍君會再帶更多 Python 實戰技巧,我們下篇見! 🐍✨


延伸閱讀
#

Python 學習 - 本文屬於一個選集。
§ 34: 本文

相關文章

Python multiprocessing:突破 GIL 的平行運算完全指南
·9 分鐘· loading · loading
Python Multiprocessing Parallel Concurrency Performance
Python asyncio 非同步程式設計入門:讓你的程式不再傻等
·8 分鐘· loading · loading
Python Asyncio 非同步 並行
Python Profiling:cProfile + line_profiler 效能分析完全指南
·8 分鐘· loading · loading
Python Profiling CProfile Line_profiler Performance Optimization
Python subprocess:外部命令執行與管道串接完全指南
·8 分鐘· loading · loading
Python Subprocess Shell Automation Cli
Python 裝飾器:讓你的函式穿上超能力外套
·7 分鐘· loading · loading
Python Decorator 裝飾器 進階語法 設計模式
MLX 入門教學:在 Apple Silicon 上跑機器學習
·4 分鐘· loading · loading
Python Mlx Apple-Silicon Machine-Learning Deep-Learning