快轉到主要內容
  1. 教學文章/

Python multiprocessing:突破 GIL 的平行運算完全指南

·9 分鐘· loading · loading · ·
Python Multiprocessing Parallel Concurrency Performance
每日拍拍
作者
每日拍拍
科學家 X 科技宅宅
目錄
Python 學習 - 本文屬於一個選集。
§ 25: 本文

一、前言
#

嗨,這裡是拍拍君!🐍

上一篇我們學了 subprocess 讓 Python 呼叫外部命令。今天來聊另一個常見需求——怎麼讓 Python 跑得更快?

你可能聽過 Python 有個東西叫 GIL(Global Interpreter Lock),它讓 threading 在 CPU 密集任務上基本沒用。很多人因此說「Python 好慢」——但拍拍君要告訴你,multiprocessing 模組可以完全突破 GIL 的限制

它的原理很簡單:不用多執行緒,用多程序。每個程序有自己的 Python 直譯器和記憶體空間,GIL 管不到。

今天拍拍君會從基礎開始,帶你走過 ProcessPoolQueue、共享記憶體,到實戰的效能對比。Let’s go!

二、先搞懂 GIL 到底是什麼
#

GIL 的本質
#

GIL 是 CPython 直譯器裡的一把全域鎖,確保同一時間只有一個執行緒在跑 Python bytecode:

import threading
import time

counter = 0

def count_up():
    global counter
    for _ in range(10_000_000):
        counter += 1  # 這行不是 thread-safe 的!

# 兩個執行緒同時跑
t1 = threading.Thread(target=count_up)
t2 = threading.Thread(target=count_up)

start = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
elapsed = time.time() - start

print(f"counter = {counter}")   # 不是 20,000,000!
print(f"耗時:{elapsed:.2f} 秒")  # 比單執行緒還慢 😱

什麼時候 GIL 是問題?
#

任務類型 threading multiprocessing
I/O 密集(網路、檔案) ✅ 有效 ✅ 有效(但殺雞用牛刀)
CPU 密集(運算、壓縮) ❌ 被 GIL 卡住 ✅ 真正平行

所以記住:

  • I/O 密集 → 用 threadingasyncio上次教過
  • CPU 密集 → 用 multiprocessing

三、Process:最基本的多程序
#

建立子程序
#

from multiprocessing import Process
import os

def worker(name):
    print(f"Worker {name}: PID={os.getpid()}, 父程序={os.getppid()}")

if __name__ == "__main__":
    print(f"主程序 PID={os.getpid()}")

    # 建立兩個子程序
    p1 = Process(target=worker, args=("A",))
    p2 = Process(target=worker, args=("B",))

    p1.start()  # 啟動程序
    p2.start()

    p1.join()   # 等待完成
    p2.join()

    print("全部完成!")

輸出(PID 每次不同):

主程序 PID=12345
Worker A: PID=12346, 父程序=12345
Worker B: PID=12347, 父程序=12345
全部完成!

⚠️ if __name__ == "__main__" 超級重要
#

在 macOS 和 Windows 上,multiprocessingspawn 方式建立子程序,這表示它會重新 import 你的模組。如果沒有 if __name__ == "__main__" 保護,子程序會無限遞迴地建立新程序。

# ❌ 沒有保護——macOS/Windows 上會爆炸
from multiprocessing import Process

def worker():
    print("hello")

p = Process(target=worker)
p.start()  # 💥 RuntimeError 或無限遞迴

# ✅ 正確
from multiprocessing import Process

def worker():
    print("hello")

if __name__ == "__main__":
    p = Process(target=worker)
    p.start()
    p.join()

帶回傳值的 Process
#

Process 本身沒有直接的回傳值機制,但可以用 QueueValue

from multiprocessing import Process, Queue

def compute_square(numbers, queue):
    """計算平方和,結果放進 Queue"""
    result = sum(n ** 2 for n in numbers)
    queue.put(result)

if __name__ == "__main__":
    data = list(range(1_000_000))
    mid = len(data) // 2
    q = Queue()

    p1 = Process(target=compute_square, args=(data[:mid], q))
    p2 = Process(target=compute_square, args=(data[mid:], q))

    p1.start()
    p2.start()
    p1.join()
    p2.join()

    total = q.get() + q.get()
    print(f"平方和 = {total}")

四、Pool:程序池(最常用!)
#

手動管理 Process 很麻煩。大多數情況下,你要的是 Pool — 一個程序池,自動分配工作:

Pool.map — 平行的 map
#

from multiprocessing import Pool
import time
import math

def is_prime(n):
    """判斷質數(故意用慢的方法來展示效果)"""
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

if __name__ == "__main__":
    numbers = list(range(100_000, 200_000))

    # 單程序
    start = time.time()
    results_single = list(map(is_prime, numbers))
    single_time = time.time() - start
    print(f"單程序:{single_time:.2f} 秒")

    # 多程序(Pool)
    start = time.time()
    with Pool(4) as pool:
        results_multi = pool.map(is_prime, numbers)
    multi_time = time.time() - start
    print(f"4 程序:{multi_time:.2f} 秒")

    print(f"加速比:{single_time / multi_time:.1f}x")
    print(f"找到 {sum(results_multi)} 個質數")

在 4 核 CPU 上的典型輸出:

單程序:3.21 秒
4 程序:0.92 秒
加速比:3.5x
找到 8713 個質數

Pool 的其他方法
#

from multiprocessing import Pool

def process_item(x):
    return x ** 2

if __name__ == "__main__":
    with Pool(4) as pool:
        # map — 等所有結果回來
        results = pool.map(process_item, range(100))

        # imap — 惰性迭代,省記憶體
        for result in pool.imap(process_item, range(100)):
            pass  # 一個一個拿

        # imap_unordered — 誰先算完誰先回
        for result in pool.imap_unordered(process_item, range(100)):
            pass  # 順序不保證,但更快

        # apply_async — 非同步提交單一任務
        future = pool.apply_async(process_item, (42,))
        print(future.get(timeout=5))  # 1764

        # starmap — 多個參數
        pairs = [(1, 2), (3, 4), (5, 6)]
        results = pool.starmap(pow, pairs)
        print(results)  # [1, 81, 15625]

chunksize 調校
#

Pool.map 預設會把資料切成很多小塊分給各程序。如果任務很輕量,通訊開銷會大於計算時間:

if __name__ == "__main__":
    data = list(range(1_000_000))

    with Pool(4) as pool:
        # 預設 chunksize — 可能很慢
        results = pool.map(process_item, data)

        # 手動調大 chunksize — 減少通訊次數
        results = pool.map(process_item, data, chunksize=10_000)

經驗法則:chunksize = len(data) // (pool_size * 4) 通常是不錯的起點。

五、程序間通訊
#

Queue — 先進先出隊列
#

from multiprocessing import Process, Queue
import time

def producer(queue, items):
    """生產者:把資料塞進 Queue"""
    for item in items:
        queue.put(item)
        time.sleep(0.1)  # 模擬生產時間
    queue.put(None)  # 哨兵值,表示生產結束

def consumer(queue, name):
    """消費者:從 Queue 拿資料"""
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"[{name}] 處理:{item}")

if __name__ == "__main__":
    q = Queue()

    prod = Process(target=producer, args=(q, ["任務A", "任務B", "任務C"]))
    cons = Process(target=consumer, args=(q, "Worker-1"))

    prod.start()
    cons.start()

    prod.join()
    cons.join()

Pipe — 雙向管道(兩個程序之間)
#

from multiprocessing import Process, Pipe

def ping(conn):
    conn.send("ping")
    response = conn.recv()
    print(f"收到:{response}")
    conn.close()

def pong(conn):
    msg = conn.recv()
    print(f"收到:{msg}")
    conn.send("pong")
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()

    p1 = Process(target=ping, args=(parent_conn,))
    p2 = Process(target=pong, args=(child_conn,))

    p1.start()
    p2.start()
    p1.join()
    p2.join()

PipeQueue 快,但只能兩個程序之間用。多個程序要通訊就用 Queue

六、共享記憶體
#

多程序的每個程序有獨立的記憶體空間,所以不能直接共用變數。multiprocessing 提供了幾種方式:

Value 和 Array — 共享基本型別
#

from multiprocessing import Process, Value, Array

def increment(shared_counter, shared_array, lock):
    for i in range(10000):
        with lock:  # 必須加鎖!
            shared_counter.value += 1
    # 修改共享陣列
    for i in range(len(shared_array)):
        shared_array[i] += 1

if __name__ == "__main__":
    from multiprocessing import Lock

    counter = Value('i', 0)       # 'i' = int
    arr = Array('d', [0.0] * 5)   # 'd' = double
    lock = Lock()

    processes = [
        Process(target=increment, args=(counter, arr, lock))
        for _ in range(4)
    ]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(f"counter = {counter.value}")   # 40000
    print(f"array = {list(arr)}")          # [4.0, 4.0, 4.0, 4.0, 4.0]

shared_memory(Python 3.8+)— 高效能共享
#

from multiprocessing import shared_memory, Process
import numpy as np

def worker(shm_name, shape, dtype):
    """在子程序中存取共享記憶體"""
    existing_shm = shared_memory.SharedMemory(name=shm_name)
    array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)

    # 直接修改共享的 numpy array
    array *= 2

    existing_shm.close()

if __name__ == "__main__":
    # 建立 numpy array
    original = np.array([1.0, 2.0, 3.0, 4.0, 5.0])

    # 把它放進共享記憶體
    shm = shared_memory.SharedMemory(create=True, size=original.nbytes)
    shared_array = np.ndarray(original.shape, dtype=original.dtype, buffer=shm.buf)
    shared_array[:] = original[:]  # 複製資料進去

    print(f"修改前:{shared_array}")  # [1. 2. 3. 4. 5.]

    p = Process(target=worker, args=(shm.name, original.shape, original.dtype))
    p.start()
    p.join()

    print(f"修改後:{shared_array}")  # [2. 4. 6. 8. 10.]

    # 清理
    shm.close()
    shm.unlink()

shared_memoryValue/Array 更靈活,特別適合大型 numpy 陣列。

七、Manager — 共享複雜資料結構
#

如果你需要共享 dictlist 這種複雜型別,Manager 幫你搞定:

from multiprocessing import Process, Manager

def collect_results(shared_dict, shared_list, worker_id):
    """每個 worker 把結果寫入共享的 dict 和 list"""
    shared_dict[f"worker_{worker_id}"] = worker_id ** 2
    shared_list.append(f"done_{worker_id}")

if __name__ == "__main__":
    with Manager() as manager:
        d = manager.dict()    # 共享字典
        l = manager.list()    # 共享列表

        processes = [
            Process(target=collect_results, args=(d, l, i))
            for i in range(4)
        ]

        for p in processes:
            p.start()
        for p in processes:
            p.join()

        print(f"dict: {dict(d)}")
        # {'worker_0': 0, 'worker_1': 1, 'worker_2': 4, 'worker_3': 9}
        print(f"list: {list(l)}")
        # ['done_0', 'done_1', 'done_2', 'done_3'](順序可能不同)

⚠️ Manager 背後用的是 proxy 物件和 socket 通訊,Value/Array 慢很多。只有在真的需要共享複雜結構時才用。

八、ProcessPoolExecutor:更現代的介面
#

Python 3.2 引入的 concurrent.futures 提供了更高階的介面:

from concurrent.futures import ProcessPoolExecutor, as_completed
import math
import time

def factorize(n):
    """分解質因數"""
    factors = []
    d = 2
    while d * d <= n:
        while n % d == 0:
            factors.append(d)
            n //= d
        d += 1
    if n > 1:
        factors.append(n)
    return factors

if __name__ == "__main__":
    numbers = [
        112272535095293, 112582705942171, 112272535095293,
        115280095190773, 115797848077099, 117450548693743,
    ]

    # 方法一:map — 保持順序
    with ProcessPoolExecutor(max_workers=4) as executor:
        start = time.time()
        results = list(executor.map(factorize, numbers))
        print(f"map 耗時:{time.time() - start:.2f} 秒")
        for n, factors in zip(numbers, results):
            print(f"  {n} = {' × '.join(map(str, factors))}")

    # 方法二:submit + as_completed — 誰先完成先處理
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = {executor.submit(factorize, n): n for n in numbers}
        for future in as_completed(futures):
            n = futures[future]
            factors = future.result()
            print(f"{n} = {' × '.join(map(str, factors))}")

Pool vs ProcessPoolExecutor 怎麼選?
#

特性 Pool ProcessPoolExecutor
介面風格 multiprocessing 原生 concurrent.futures 統一
imap_unordered ✅ 有 ❌ 要用 as_completed
starmap ✅ 有 ❌ 要自己 wrap
異常處理 較麻煩 future.exception()
threading 切換 不行 換成 ThreadPoolExecutor 就好

拍拍君的建議:新程式碼用 ProcessPoolExecutor,除非你需要 imap_unorderedstarmap

九、實戰:效能對比
#

讓我們跑個真實的 benchmark — 用蒙地卡羅法估計圓周率:

import random
import time
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor

def monte_carlo_pi(num_samples):
    """蒙地卡羅法估計 π"""
    inside = 0
    random.seed()  # 每個程序要重新 seed!
    for _ in range(num_samples):
        x = random.random()
        y = random.random()
        if x * x + y * y <= 1.0:
            inside += 1
    return inside

if __name__ == "__main__":
    total_samples = 50_000_000
    num_workers = 4
    samples_per_worker = total_samples // num_workers

    # 單程序
    start = time.time()
    inside = monte_carlo_pi(total_samples)
    pi_single = 4.0 * inside / total_samples
    single_time = time.time() - start
    print(f"單程序:π ≈ {pi_single:.6f}, 耗時 {single_time:.2f} 秒")

    # Pool
    start = time.time()
    with Pool(num_workers) as pool:
        results = pool.map(monte_carlo_pi, [samples_per_worker] * num_workers)
    pi_pool = 4.0 * sum(results) / total_samples
    pool_time = time.time() - start
    print(f"Pool:  π ≈ {pi_pool:.6f}, 耗時 {pool_time:.2f} 秒")

    # ProcessPoolExecutor
    start = time.time()
    with ProcessPoolExecutor(num_workers) as executor:
        results = list(executor.map(
            monte_carlo_pi,
            [samples_per_worker] * num_workers,
        ))
    pi_ppe = 4.0 * sum(results) / total_samples
    ppe_time = time.time() - start
    print(f"PPE:   π ≈ {pi_ppe:.6f}, 耗時 {ppe_time:.2f} 秒")

    print(f"\n加速比:Pool={single_time/pool_time:.1f}x, PPE={single_time/ppe_time:.1f}x")

在 M4 Mac mini 上的典型結果:

單程序:π ≈ 3.141587, 耗時 22.31 秒
Pool:  π ≈ 3.141640, 耗時 5.89 秒
PPE:   π ≈ 3.141553, 耗時 6.02 秒

加速比:Pool=3.8x, PPE=3.7x

接近 4 倍加速!因為蒙地卡羅法是 100% CPU 密集,完美適合 multiprocessing。

十、常見陷阱與最佳實踐
#

陷阱 1:忘記 if __name__ == "__main__"
#

前面講過了。在 macOS/Windows 上會炸。

陷阱 2:傳遞不可 pickle 的物件
#

multiprocessing 用 pickle 序列化資料傳給子程序。Lambda、開啟的檔案、socket 都不能 pickle:

# ❌ lambda 不能 pickle
with Pool(4) as pool:
    pool.map(lambda x: x ** 2, range(10))
# PicklingError!

# ✅ 用具名函式
def square(x):
    return x ** 2

with Pool(4) as pool:
    pool.map(square, range(10))

陷阱 3:程序啟動開銷
#

每個程序啟動都有開銷(fork/spawn + import 模組)。如果任務太小,平行反而更慢:

import time
from multiprocessing import Pool

def tiny_task(x):
    return x + 1

if __name__ == "__main__":
    # 太小的任務 — multiprocessing 反而慢
    start = time.time()
    with Pool(4) as pool:
        pool.map(tiny_task, range(100))
    print(f"Pool: {time.time() - start:.4f} 秒")

    start = time.time()
    list(map(tiny_task, range(100)))
    print(f"單程序: {time.time() - start:.4f} 秒")
    # Pool: 0.1500 秒 😱
    # 單程序: 0.0001 秒

陷阱 4:忘記 random seed
#

子程序如果沒有重新 seed,可能產生相同的隨機數:

import random
from multiprocessing import Pool

def generate_random(_):
    # ❌ 可能每個程序產生一樣的數
    return random.random()

def generate_random_safe(_):
    # ✅ 重新 seed
    random.seed()
    return random.random()

最佳實踐清單
#

# ✅ 1. 永遠用 if __name__ == "__main__"
# ✅ 2. 用 Pool/ProcessPoolExecutor 而非手動管 Process
# ✅ 3. 確保傳給子程序的資料可以 pickle
# ✅ 4. CPU 核心數用 os.cpu_count() 取得
# ✅ 5. 大資料用 shared_memory 而非 Queue 傳遞
# ✅ 6. 用 context manager (with) 確保資源清理
# ✅ 7. 子程序裡重新 seed 隨機數產生器

十一、multiprocessing vs 其他方案
#

方案 適用場景 GIL 影響 資料共享
threading I/O 密集 受限 直接共享(要鎖)
asyncio I/O 密集(大量併發) 受限 直接共享(單執行緒)
multiprocessing CPU 密集 不受影響 需要特殊機制
joblib ML/科學計算 不受影響 自動序列化
ray 分散式運算 不受影響 分散式物件
mpi4py HPC 叢集 不受影響 MPI 通訊

如果你做科學計算,joblibray 可能更方便。但理解 multiprocessing 是基礎——其他工具底層都用到它。

十二、Python 3.12+ 的 GIL 改革
#

Python 3.13 開始實驗性支援 free-threaded mode(PEP 703),可以關掉 GIL:

# 安裝 free-threaded 版本
# 注意:這還是實驗性功能
python3.13t -c "import sys; print(sys._is_gil_enabled())"
# False(如果成功關掉 GIL)

但在 free-threaded Python 完全成熟之前(預計 Python 3.15+),multiprocessing 仍然是 CPU 密集任務的最可靠方案。

十三、總結
#

今天我們學了:

  1. GIL 是什麼 — 為什麼 threading 在 CPU 密集任務上沒用
  2. Process — 最基本的多程序建立方式
  3. Pool — 程序池,map/imap/starmap 三兄弟
  4. Queue 和 Pipe — 程序間通訊
  5. 共享記憶體ValueArrayshared_memory
  6. Manager — 共享複雜資料結構
  7. ProcessPoolExecutor — 更現代的介面
  8. 常見陷阱 — pickle、seed、啟動開銷

拍拍君的經驗是:80% 的場景用 Pool.map 就搞定了。只有在需要程序間通訊或共享大型資料時,才需要去碰 Queueshared_memory 這些進階工具。

記住:不要為了平行而平行。如果你的任務在幾秒內就跑完了,multiprocessing 的啟動開銷可能比你省下的時間還多。先量測,再最佳化!

下次見啦,拍拍!🐍✨

Python 學習 - 本文屬於一個選集。
§ 25: 本文

相關文章

Python subprocess:外部命令執行與管道串接完全指南
·8 分鐘· loading · loading
Python Subprocess Shell Automation Cli
Python 裝飾器:讓你的函式穿上超能力外套
·7 分鐘· loading · loading
Python Decorator 裝飾器 進階語法 設計模式
Python itertools:迭代器的瑞士刀
·6 分鐘· loading · loading
Python Itertools Stdlib Functional-Programming 效能
MLX 入門教學:在 Apple Silicon 上跑機器學習
·4 分鐘· loading · loading
Python Mlx Apple-Silicon Machine-Learning Deep-Learning
FastAPI:Python 最潮的 Web API 框架
·5 分鐘· loading · loading
Python Fastapi Web Api Async
Docker for Python:讓你的程式在任何地方都能跑
·6 分鐘· loading · loading
Python Docker Container Devops 部署