一、前言 #
嗨,這裡是拍拍君!🐍
上一篇我們學了 subprocess 讓 Python 呼叫外部命令。今天來聊另一個常見需求——怎麼讓 Python 跑得更快?
你可能聽過 Python 有個東西叫 GIL(Global Interpreter Lock),它讓 threading 在 CPU 密集任務上基本沒用。很多人因此說「Python 好慢」——但拍拍君要告訴你,multiprocessing 模組可以完全突破 GIL 的限制。
它的原理很簡單:不用多執行緒,用多程序。每個程序有自己的 Python 直譯器和記憶體空間,GIL 管不到。
今天拍拍君會從基礎開始,帶你走過 Process、Pool、Queue、共享記憶體,到實戰的效能對比。Let’s go!
二、先搞懂 GIL 到底是什麼 #
GIL 的本質 #
GIL 是 CPython 直譯器裡的一把全域鎖,確保同一時間只有一個執行緒在跑 Python bytecode:
import threading
import time
counter = 0
def count_up():
global counter
for _ in range(10_000_000):
counter += 1 # 這行不是 thread-safe 的!
# 兩個執行緒同時跑
t1 = threading.Thread(target=count_up)
t2 = threading.Thread(target=count_up)
start = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
elapsed = time.time() - start
print(f"counter = {counter}") # 不是 20,000,000!
print(f"耗時:{elapsed:.2f} 秒") # 比單執行緒還慢 😱
什麼時候 GIL 是問題? #
| 任務類型 | threading | multiprocessing |
|---|---|---|
| I/O 密集(網路、檔案) | ✅ 有效 | ✅ 有效(但殺雞用牛刀) |
| CPU 密集(運算、壓縮) | ❌ 被 GIL 卡住 | ✅ 真正平行 |
所以記住:
- I/O 密集 → 用
threading或asyncio(上次教過) - CPU 密集 → 用
multiprocessing
三、Process:最基本的多程序 #
建立子程序 #
from multiprocessing import Process
import os
def worker(name):
print(f"Worker {name}: PID={os.getpid()}, 父程序={os.getppid()}")
if __name__ == "__main__":
print(f"主程序 PID={os.getpid()}")
# 建立兩個子程序
p1 = Process(target=worker, args=("A",))
p2 = Process(target=worker, args=("B",))
p1.start() # 啟動程序
p2.start()
p1.join() # 等待完成
p2.join()
print("全部完成!")
輸出(PID 每次不同):
主程序 PID=12345
Worker A: PID=12346, 父程序=12345
Worker B: PID=12347, 父程序=12345
全部完成!
⚠️ if __name__ == "__main__" 超級重要
#
在 macOS 和 Windows 上,multiprocessing 用 spawn 方式建立子程序,這表示它會重新 import 你的模組。如果沒有 if __name__ == "__main__" 保護,子程序會無限遞迴地建立新程序。
# ❌ 沒有保護——macOS/Windows 上會爆炸
from multiprocessing import Process
def worker():
print("hello")
p = Process(target=worker)
p.start() # 💥 RuntimeError 或無限遞迴
# ✅ 正確
from multiprocessing import Process
def worker():
print("hello")
if __name__ == "__main__":
p = Process(target=worker)
p.start()
p.join()
帶回傳值的 Process #
Process 本身沒有直接的回傳值機制,但可以用 Queue 或 Value:
from multiprocessing import Process, Queue
def compute_square(numbers, queue):
"""計算平方和,結果放進 Queue"""
result = sum(n ** 2 for n in numbers)
queue.put(result)
if __name__ == "__main__":
data = list(range(1_000_000))
mid = len(data) // 2
q = Queue()
p1 = Process(target=compute_square, args=(data[:mid], q))
p2 = Process(target=compute_square, args=(data[mid:], q))
p1.start()
p2.start()
p1.join()
p2.join()
total = q.get() + q.get()
print(f"平方和 = {total}")
四、Pool:程序池(最常用!) #
手動管理 Process 很麻煩。大多數情況下,你要的是 Pool — 一個程序池,自動分配工作:
Pool.map — 平行的 map #
from multiprocessing import Pool
import time
import math
def is_prime(n):
"""判斷質數(故意用慢的方法來展示效果)"""
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
if __name__ == "__main__":
numbers = list(range(100_000, 200_000))
# 單程序
start = time.time()
results_single = list(map(is_prime, numbers))
single_time = time.time() - start
print(f"單程序:{single_time:.2f} 秒")
# 多程序(Pool)
start = time.time()
with Pool(4) as pool:
results_multi = pool.map(is_prime, numbers)
multi_time = time.time() - start
print(f"4 程序:{multi_time:.2f} 秒")
print(f"加速比:{single_time / multi_time:.1f}x")
print(f"找到 {sum(results_multi)} 個質數")
在 4 核 CPU 上的典型輸出:
單程序:3.21 秒
4 程序:0.92 秒
加速比:3.5x
找到 8713 個質數
Pool 的其他方法 #
from multiprocessing import Pool
def process_item(x):
return x ** 2
if __name__ == "__main__":
with Pool(4) as pool:
# map — 等所有結果回來
results = pool.map(process_item, range(100))
# imap — 惰性迭代,省記憶體
for result in pool.imap(process_item, range(100)):
pass # 一個一個拿
# imap_unordered — 誰先算完誰先回
for result in pool.imap_unordered(process_item, range(100)):
pass # 順序不保證,但更快
# apply_async — 非同步提交單一任務
future = pool.apply_async(process_item, (42,))
print(future.get(timeout=5)) # 1764
# starmap — 多個參數
pairs = [(1, 2), (3, 4), (5, 6)]
results = pool.starmap(pow, pairs)
print(results) # [1, 81, 15625]
chunksize 調校 #
Pool.map 預設會把資料切成很多小塊分給各程序。如果任務很輕量,通訊開銷會大於計算時間:
if __name__ == "__main__":
data = list(range(1_000_000))
with Pool(4) as pool:
# 預設 chunksize — 可能很慢
results = pool.map(process_item, data)
# 手動調大 chunksize — 減少通訊次數
results = pool.map(process_item, data, chunksize=10_000)
經驗法則:chunksize = len(data) // (pool_size * 4) 通常是不錯的起點。
五、程序間通訊 #
Queue — 先進先出隊列 #
from multiprocessing import Process, Queue
import time
def producer(queue, items):
"""生產者:把資料塞進 Queue"""
for item in items:
queue.put(item)
time.sleep(0.1) # 模擬生產時間
queue.put(None) # 哨兵值,表示生產結束
def consumer(queue, name):
"""消費者:從 Queue 拿資料"""
while True:
item = queue.get()
if item is None:
break
print(f"[{name}] 處理:{item}")
if __name__ == "__main__":
q = Queue()
prod = Process(target=producer, args=(q, ["任務A", "任務B", "任務C"]))
cons = Process(target=consumer, args=(q, "Worker-1"))
prod.start()
cons.start()
prod.join()
cons.join()
Pipe — 雙向管道(兩個程序之間) #
from multiprocessing import Process, Pipe
def ping(conn):
conn.send("ping")
response = conn.recv()
print(f"收到:{response}")
conn.close()
def pong(conn):
msg = conn.recv()
print(f"收到:{msg}")
conn.send("pong")
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p1 = Process(target=ping, args=(parent_conn,))
p2 = Process(target=pong, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
Pipe 比 Queue 快,但只能兩個程序之間用。多個程序要通訊就用 Queue。
六、共享記憶體 #
多程序的每個程序有獨立的記憶體空間,所以不能直接共用變數。multiprocessing 提供了幾種方式:
Value 和 Array — 共享基本型別 #
from multiprocessing import Process, Value, Array
def increment(shared_counter, shared_array, lock):
for i in range(10000):
with lock: # 必須加鎖!
shared_counter.value += 1
# 修改共享陣列
for i in range(len(shared_array)):
shared_array[i] += 1
if __name__ == "__main__":
from multiprocessing import Lock
counter = Value('i', 0) # 'i' = int
arr = Array('d', [0.0] * 5) # 'd' = double
lock = Lock()
processes = [
Process(target=increment, args=(counter, arr, lock))
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"counter = {counter.value}") # 40000
print(f"array = {list(arr)}") # [4.0, 4.0, 4.0, 4.0, 4.0]
shared_memory(Python 3.8+)— 高效能共享 #
from multiprocessing import shared_memory, Process
import numpy as np
def worker(shm_name, shape, dtype):
"""在子程序中存取共享記憶體"""
existing_shm = shared_memory.SharedMemory(name=shm_name)
array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# 直接修改共享的 numpy array
array *= 2
existing_shm.close()
if __name__ == "__main__":
# 建立 numpy array
original = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
# 把它放進共享記憶體
shm = shared_memory.SharedMemory(create=True, size=original.nbytes)
shared_array = np.ndarray(original.shape, dtype=original.dtype, buffer=shm.buf)
shared_array[:] = original[:] # 複製資料進去
print(f"修改前:{shared_array}") # [1. 2. 3. 4. 5.]
p = Process(target=worker, args=(shm.name, original.shape, original.dtype))
p.start()
p.join()
print(f"修改後:{shared_array}") # [2. 4. 6. 8. 10.]
# 清理
shm.close()
shm.unlink()
shared_memory 比 Value/Array 更靈活,特別適合大型 numpy 陣列。
七、Manager — 共享複雜資料結構 #
如果你需要共享 dict、list 這種複雜型別,Manager 幫你搞定:
from multiprocessing import Process, Manager
def collect_results(shared_dict, shared_list, worker_id):
"""每個 worker 把結果寫入共享的 dict 和 list"""
shared_dict[f"worker_{worker_id}"] = worker_id ** 2
shared_list.append(f"done_{worker_id}")
if __name__ == "__main__":
with Manager() as manager:
d = manager.dict() # 共享字典
l = manager.list() # 共享列表
processes = [
Process(target=collect_results, args=(d, l, i))
for i in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"dict: {dict(d)}")
# {'worker_0': 0, 'worker_1': 1, 'worker_2': 4, 'worker_3': 9}
print(f"list: {list(l)}")
# ['done_0', 'done_1', 'done_2', 'done_3'](順序可能不同)
⚠️ Manager 背後用的是 proxy 物件和 socket 通訊,比 Value/Array 慢很多。只有在真的需要共享複雜結構時才用。
八、ProcessPoolExecutor:更現代的介面 #
Python 3.2 引入的 concurrent.futures 提供了更高階的介面:
from concurrent.futures import ProcessPoolExecutor, as_completed
import math
import time
def factorize(n):
"""分解質因數"""
factors = []
d = 2
while d * d <= n:
while n % d == 0:
factors.append(d)
n //= d
d += 1
if n > 1:
factors.append(n)
return factors
if __name__ == "__main__":
numbers = [
112272535095293, 112582705942171, 112272535095293,
115280095190773, 115797848077099, 117450548693743,
]
# 方法一:map — 保持順序
with ProcessPoolExecutor(max_workers=4) as executor:
start = time.time()
results = list(executor.map(factorize, numbers))
print(f"map 耗時:{time.time() - start:.2f} 秒")
for n, factors in zip(numbers, results):
print(f" {n} = {' × '.join(map(str, factors))}")
# 方法二:submit + as_completed — 誰先完成先處理
with ProcessPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(factorize, n): n for n in numbers}
for future in as_completed(futures):
n = futures[future]
factors = future.result()
print(f"{n} = {' × '.join(map(str, factors))}")
Pool vs ProcessPoolExecutor 怎麼選? #
| 特性 | Pool |
ProcessPoolExecutor |
|---|---|---|
| 介面風格 | multiprocessing 原生 | concurrent.futures 統一 |
imap_unordered |
✅ 有 | ❌ 要用 as_completed |
starmap |
✅ 有 | ❌ 要自己 wrap |
| 異常處理 | 較麻煩 | future.exception() |
與 threading 切換 |
不行 | 換成 ThreadPoolExecutor 就好 |
拍拍君的建議:新程式碼用 ProcessPoolExecutor,除非你需要 imap_unordered 或 starmap。
九、實戰:效能對比 #
讓我們跑個真實的 benchmark — 用蒙地卡羅法估計圓周率:
import random
import time
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
def monte_carlo_pi(num_samples):
"""蒙地卡羅法估計 π"""
inside = 0
random.seed() # 每個程序要重新 seed!
for _ in range(num_samples):
x = random.random()
y = random.random()
if x * x + y * y <= 1.0:
inside += 1
return inside
if __name__ == "__main__":
total_samples = 50_000_000
num_workers = 4
samples_per_worker = total_samples // num_workers
# 單程序
start = time.time()
inside = monte_carlo_pi(total_samples)
pi_single = 4.0 * inside / total_samples
single_time = time.time() - start
print(f"單程序:π ≈ {pi_single:.6f}, 耗時 {single_time:.2f} 秒")
# Pool
start = time.time()
with Pool(num_workers) as pool:
results = pool.map(monte_carlo_pi, [samples_per_worker] * num_workers)
pi_pool = 4.0 * sum(results) / total_samples
pool_time = time.time() - start
print(f"Pool: π ≈ {pi_pool:.6f}, 耗時 {pool_time:.2f} 秒")
# ProcessPoolExecutor
start = time.time()
with ProcessPoolExecutor(num_workers) as executor:
results = list(executor.map(
monte_carlo_pi,
[samples_per_worker] * num_workers,
))
pi_ppe = 4.0 * sum(results) / total_samples
ppe_time = time.time() - start
print(f"PPE: π ≈ {pi_ppe:.6f}, 耗時 {ppe_time:.2f} 秒")
print(f"\n加速比:Pool={single_time/pool_time:.1f}x, PPE={single_time/ppe_time:.1f}x")
在 M4 Mac mini 上的典型結果:
單程序:π ≈ 3.141587, 耗時 22.31 秒
Pool: π ≈ 3.141640, 耗時 5.89 秒
PPE: π ≈ 3.141553, 耗時 6.02 秒
加速比:Pool=3.8x, PPE=3.7x
接近 4 倍加速!因為蒙地卡羅法是 100% CPU 密集,完美適合 multiprocessing。
十、常見陷阱與最佳實踐 #
陷阱 1:忘記 if __name__ == "__main__"
#
前面講過了。在 macOS/Windows 上會炸。
陷阱 2:傳遞不可 pickle 的物件 #
multiprocessing 用 pickle 序列化資料傳給子程序。Lambda、開啟的檔案、socket 都不能 pickle:
# ❌ lambda 不能 pickle
with Pool(4) as pool:
pool.map(lambda x: x ** 2, range(10))
# PicklingError!
# ✅ 用具名函式
def square(x):
return x ** 2
with Pool(4) as pool:
pool.map(square, range(10))
陷阱 3:程序啟動開銷 #
每個程序啟動都有開銷(fork/spawn + import 模組)。如果任務太小,平行反而更慢:
import time
from multiprocessing import Pool
def tiny_task(x):
return x + 1
if __name__ == "__main__":
# 太小的任務 — multiprocessing 反而慢
start = time.time()
with Pool(4) as pool:
pool.map(tiny_task, range(100))
print(f"Pool: {time.time() - start:.4f} 秒")
start = time.time()
list(map(tiny_task, range(100)))
print(f"單程序: {time.time() - start:.4f} 秒")
# Pool: 0.1500 秒 😱
# 單程序: 0.0001 秒
陷阱 4:忘記 random seed #
子程序如果沒有重新 seed,可能產生相同的隨機數:
import random
from multiprocessing import Pool
def generate_random(_):
# ❌ 可能每個程序產生一樣的數
return random.random()
def generate_random_safe(_):
# ✅ 重新 seed
random.seed()
return random.random()
最佳實踐清單 #
# ✅ 1. 永遠用 if __name__ == "__main__"
# ✅ 2. 用 Pool/ProcessPoolExecutor 而非手動管 Process
# ✅ 3. 確保傳給子程序的資料可以 pickle
# ✅ 4. CPU 核心數用 os.cpu_count() 取得
# ✅ 5. 大資料用 shared_memory 而非 Queue 傳遞
# ✅ 6. 用 context manager (with) 確保資源清理
# ✅ 7. 子程序裡重新 seed 隨機數產生器
十一、multiprocessing vs 其他方案 #
| 方案 | 適用場景 | GIL 影響 | 資料共享 |
|---|---|---|---|
threading |
I/O 密集 | 受限 | 直接共享(要鎖) |
asyncio |
I/O 密集(大量併發) | 受限 | 直接共享(單執行緒) |
multiprocessing |
CPU 密集 | 不受影響 | 需要特殊機制 |
joblib |
ML/科學計算 | 不受影響 | 自動序列化 |
ray |
分散式運算 | 不受影響 | 分散式物件 |
mpi4py |
HPC 叢集 | 不受影響 | MPI 通訊 |
如果你做科學計算,joblib 或 ray 可能更方便。但理解 multiprocessing 是基礎——其他工具底層都用到它。
十二、Python 3.12+ 的 GIL 改革 #
Python 3.13 開始實驗性支援 free-threaded mode(PEP 703),可以關掉 GIL:
# 安裝 free-threaded 版本
# 注意:這還是實驗性功能
python3.13t -c "import sys; print(sys._is_gil_enabled())"
# False(如果成功關掉 GIL)
但在 free-threaded Python 完全成熟之前(預計 Python 3.15+),multiprocessing 仍然是 CPU 密集任務的最可靠方案。
十三、總結 #
今天我們學了:
- GIL 是什麼 — 為什麼
threading在 CPU 密集任務上沒用 - Process — 最基本的多程序建立方式
- Pool — 程序池,
map/imap/starmap三兄弟 - Queue 和 Pipe — 程序間通訊
- 共享記憶體 —
Value、Array、shared_memory - Manager — 共享複雜資料結構
- ProcessPoolExecutor — 更現代的介面
- 常見陷阱 — pickle、seed、啟動開銷
拍拍君的經驗是:80% 的場景用 Pool.map 就搞定了。只有在需要程序間通訊或共享大型資料時,才需要去碰 Queue、shared_memory 這些進階工具。
記住:不要為了平行而平行。如果你的任務在幾秒內就跑完了,multiprocessing 的啟動開銷可能比你省下的時間還多。先量測,再最佳化!
下次見啦,拍拍!🐍✨