一. 前言:有些程式不是現在跑,是「等一下」跑 #
你一定遇過這種需求:每天早上九點寄報表、每五分鐘同步資料、使用者按下按鈕後三十分鐘再提醒、半夜自動清暫存檔。
這些事情看起來都可以用 while True 加 time.sleep() 解決。
while True:
do_something()
time.sleep(300)
但拍拍君要先把你的手按住一下。
這種寫法可以當玩具,放進長期運作的程式就很快開始痛:程式重啟後狀態不清楚、任務跑太久會疊在一起、想改成工作日早上跑就變成時間判斷地獄。
今天的主角 APScheduler,就是用來處理「時間到了就執行 Python 函式」的工具。
你可以把它想成 Python 程式內建的小型 cron,但它比系統 cron 更容易跟你的 app 狀態、參數、管理介面接在一起。
它不是萬能背景工作系統,也不是 Celery 替代品;可是對內部工具、小型服務、Bot、自動化腳本、資料同步任務來說,常常剛剛好。
如果你看過 python-watchdog 那篇,可以這樣分:watchdog 是「檔案變了就做事」,APScheduler 是「時間到了就做事」。
二. 安裝:一個套件就能開始排程 #
先建立一個乾淨專案。
mkdir pypy-scheduler-demo
cd pypy-scheduler-demo
uv init
uv add apscheduler
不用 uv 的話,也可以用 pip。
python -m venv .venv
source .venv/bin/activate
pip install apscheduler
確認版本。
python -c "import apscheduler; print(apscheduler.__version__)"
APScheduler 有五個常見名詞:Scheduler 管理任務,Trigger 決定什麼時候跑,Job 是被排進去的工作,Executor 負責執行,Job store 負責保存任務資料。 一開始不用全部背起來,先把「Scheduler + Trigger + Job」這三個概念抓住就好。
三. 第一個排程:每三秒說一次 hi #
建立 hello_scheduler.py。
from datetime import datetime
import time
from apscheduler.schedulers.background import BackgroundScheduler
def say_hi() -> None:
now = datetime.now().strftime("%H:%M:%S")
print(f"[{now}] 拍拍君:hi,時間到了!")
scheduler = BackgroundScheduler()
scheduler.add_job(say_hi, "interval", seconds=3)
scheduler.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
scheduler.shutdown()
print("排程器已關閉")
執行它。
python hello_scheduler.py
你會看到它每三秒印一次訊息。
BackgroundScheduler 會在背景 thread 裡執行任務,所以主程式不能馬上結束;範例中的 while True 只是讓程式活著,不是拿來控制排程時間。
真正決定「每三秒」的是 add_job(say_hi, "interval", seconds=3)。
這已經比手寫 sleep() 好,因為 scheduler 知道每個 job 的下次執行時間,也能同時管理多個 job。
四. 三種 Trigger:date、interval、cron #
APScheduler 最常用的 trigger 有三種。
date:只在某個時間點跑一次。
interval:每隔固定時間跑一次。
cron:用類似 cron 的規則排程。
先看 date,適合「使用者做了某件事,稍後提醒」。
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
def remind(message: str) -> None:
print(f"提醒:{message}")
scheduler = BackgroundScheduler()
scheduler.start()
run_at = datetime.now() + timedelta(seconds=10)
scheduler.add_job(
remind,
"date",
run_date=run_at,
args=["喝水,不要變成乾掉的拍拍君"],
)
interval 適合固定頻率工作,例如每五分鐘同步一次資料。
scheduler.add_job(
sync_cache,
"interval",
minutes=5,
id="sync-cache",
)
cron 適合人類語意的行程,例如工作日早上九點半寄報表。
scheduler.add_job(
send_report,
"cron",
day_of_week="mon-fri",
hour=9,
minute=30,
id="weekday-report",
)
如果你原本想寫一堆 if now.hour == ...,請停手,交給 cron trigger,大家都會比較快樂。
五. 把 job 寫得可管理:id、args、kwargs #
專案裡通常不會只有一個任務,所以 job 一定要命名。
APScheduler 用 id 管理 job。
def fetch_feed(feed_name: str, limit: int = 20) -> None:
print(f"抓取 {feed_name},最多 {limit} 筆")
scheduler.add_job(
fetch_feed,
"interval",
minutes=10,
id="fetch-tech-feed",
args=["tech"],
kwargs={"limit": 50},
)
args 是位置參數,kwargs 是關鍵字參數。
如果你的程式啟動時會重新註冊固定任務,開發時常搭配 replace_existing=True。
scheduler.add_job(
fetch_feed,
"interval",
minutes=10,
id="fetch-tech-feed",
replace_existing=True,
args=["tech"],
kwargs={"limit": 50},
)
這表示如果同名 job 已經存在,就覆蓋掉。 也可以列出、暫停、恢復、移除 job。
for job in scheduler.get_jobs():
print(job.id, job.next_run_time)
scheduler.pause_job("fetch-tech-feed")
scheduler.resume_job("fetch-tech-feed")
scheduler.remove_job("fetch-tech-feed")
一旦你的工具需要管理頁面或 API,穩定的 job id 就會變得非常重要。
六. 實戰範例:定時產生小型報表 #
假設有一份 events.jsonl,每一行是一筆事件。
{"user": "pypy", "event": "open", "value": 1}
{"user": "pypy", "event": "click", "value": 3}
{"user": "chatPTT", "event": "open", "value": 1}
我們想每分鐘產生一次摘要報表。
from collections import Counter
from datetime import datetime
import json
from pathlib import Path
import time
from apscheduler.schedulers.background import BackgroundScheduler
DATA = Path("events.jsonl")
REPORT = Path("report.txt")
def generate_report() -> None:
counter: Counter[str] = Counter()
total_value = 0
if DATA.exists():
for line in DATA.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
row = json.loads(line)
counter[row["event"]] += 1
total_value += int(row.get("value", 0))
lines = [
f"更新時間:{datetime.now():%Y-%m-%d %H:%M:%S}",
f"總 value:{total_value}",
"事件統計:",
]
lines += [f"- {event}: {count}" for event, count in counter.most_common()]
REPORT.write_text("\n".join(lines) + "\n", encoding="utf-8")
print("報表已更新")
scheduler = BackgroundScheduler()
scheduler.add_job(generate_report, "interval", minutes=1, id="report", replace_existing=True)
scheduler.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
scheduler.shutdown()
這個範例很樸素,但很接近真實內部工具:資料慢慢累積,報表不用每次 request 都重算,背景任務定期更新結果。 如果任務比較耗時,這種設計會讓使用者體驗穩很多。
七. 避免任務疊在一起:max_instances、coalesce、misfire #
排程最可怕的狀況之一,是任務跑太久。 例如你設定每分鐘跑一次,但某次 API 很慢,跑了三分鐘;下一輪要不要同時再開一個? 如果不控制,很容易把系統打爆。
scheduler.add_job(
generate_report,
"interval",
minutes=1,
id="report",
max_instances=1,
coalesce=True,
misfire_grace_time=30,
)
max_instances=1 表示同一個 job 同時最多跑一份,通常是背景同步任務的好預設。
coalesce=True 表示如果錯過很多次執行,只補跑一次;程式卡住五分鐘,不會醒來後連跑五次。
misfire_grace_time=30 表示如果晚了超過 30 秒,就放棄那次執行。
這些設定不是裝飾品,它們是部署環境的安全帶。
你要先決定:任務可以跳過嗎?可以重疊嗎?晚到多久還有意義?
APScheduler 提供開關,工程師要提供判斷。
八. 錯誤處理:不要讓失敗默默消失 #
背景任務最討厭的地方,是失敗時沒人在看。 所以 job 裡請至少記錄錯誤。
import logging
logger = logging.getLogger(__name__)
def sync_remote_data() -> None:
try:
rows = download_rows()
save_rows(rows)
except Exception:
logger.exception("同步遠端資料失敗")
raise
如果你想監聽 job 成功或失敗,可以加 listener。
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
def job_listener(event) -> None:
if event.exception:
print(f"job {event.job_id} 失敗:{event.exception}")
else:
print(f"job {event.job_id} 完成")
scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
正式環境不要只 print(),至少接到 logging,再進一步可以送到通知或監控系統。
如果任務是呼叫外部 API,也可以搭配 python-tenacity 做短暫重試。
但請記得:排程器負責「什麼時候再跑」,重試工具負責「這次執行中短暫失敗怎麼處理」。
兩者混在一起會變成很難除錯的義大利麵。
九. AsyncIO 整合:AsyncIOScheduler #
如果你的程式本來就是 asyncio 架構,可以用 AsyncIOScheduler。
import asyncio
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def ping_api() -> None:
await asyncio.sleep(0.5)
print(f"ping 完成:{datetime.now():%H:%M:%S}")
async def main() -> None:
scheduler = AsyncIOScheduler()
scheduler.add_job(ping_api, "interval", seconds=5, id="ping-api")
scheduler.start()
try:
while True:
await asyncio.sleep(1)
finally:
scheduler.shutdown()
if __name__ == "__main__":
asyncio.run(main())
這適合本來就大量使用 async HTTP client、Bot framework 或 event loop 的程式。
但不要只是因為 async 看起來比較潮就硬上;如果任務都是普通同步 I/O,BackgroundScheduler 通常已經夠用。
如果任務是 CPU 密集,async 也不會神奇變快,那時候該考慮 multiprocessing、外部 worker,或把重活丟給別的服務。
十. 持久化 Job:程式重啟後還記得任務 #
預設情況下,APScheduler 的 job 存在記憶體裡,程式重啟後就沒了。
如果固定任務都寫在程式碼裡,啟動時重新 add_job() 就好。
但如果使用者可以動態建立提醒,就需要 job store。
常見做法是用 SQLite。
uv add sqlalchemy
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
jobstores = {
"default": SQLAlchemyJobStore(url="sqlite:///jobs.sqlite"),
}
scheduler = BackgroundScheduler(jobstores=jobstores)
這樣 job 會被保存到 SQLite。 不過有一個超重要限制:可持久化的 job function 必須能被 Python 用 import 找到。 請不要用 lambda、巢狀函式、臨時 closure。
# tasks.py
def send_reminder(user_id: str, message: str) -> None:
print(user_id, message)
from tasks import send_reminder
scheduler.add_job(
send_reminder,
"date",
run_date=run_at,
kwargs={"user_id": "pypy", "message": "記得休息"},
)
持久化很方便,但也代表你要注意資料庫備份、callable 版本變更,以及不要讓多個 scheduler 同時搶同一份 job store。
十一. 放進 Web App 要小心:多 worker 會重複跑 #
很多人會想在 FastAPI 或 Flask 裡直接啟動 APScheduler,小型內部工具可以,但要非常小心部署方式。 如果你用 gunicorn 開四個 worker,每個 worker 都執行啟動程式,結果同一個排程任務會跑四份。 這種 bug 很討厭,因為它通常不是立刻爆炸,而是悄悄寄四封信、同步四次資料、扣四次額度。 更穩的做法是把 scheduler 獨立成一個 process。
web process: 負責 API / UI
scheduler process: 負責定時任務
worker process: 負責重型背景工作(可選)
對小專案來說,可以用一個 scheduler.py 單獨跑。
python scheduler.py
部署時再用 systemd、supervisord、Docker Compose 或平台提供的 process manager 管它。 Web App 和排程分開,出問題時也比較好查。
十二. APScheduler vs cron vs Celery:到底該選誰? #
選 APScheduler,如果任務跟 Python app 關係很近、需要動態新增/暫停/刪除 job、任務量不大,但想混用 date、interval、cron。 選系統 cron,如果任務很固定,只是每天或每小時跑一支腳本,不需要在 app 裡管理。 選 Celery、RQ、Dramatiq 這類 worker queue,如果任務很多、很重、要水平擴展,或需要可靠佇列、重試、結果保存。 簡單說:APScheduler 是程式內排程器,cron 是系統層排程器,Celery 類工具是分散式背景工作系統。 不要為了一個每天一次的小報表架 Celery,也不要用 APScheduler 硬撐百萬級任務佇列。 工具選對大小,頭髮會多留一點。
十三. 部署前檢查清單 #
真的要上線前,逐項看過。
- Scheduler 是否只會啟動一份?
- 每個 job 是否都有穩定 id?
- 長任務是否設定
max_instances? - 錯過執行時間時,是否設定
misfire_grace_time? - 需要補跑多次,還是
coalesce=True就好? - job 失敗是否會被 logging 或監控看到?
- 外部 API 失敗是否有合理 retry?
- 任務是否可以重複執行而不造成資料損壞?
- 時區是否明確?
- 程式關閉時是否有
scheduler.shutdown()? - 如果用持久化 job store,callable 是否可 import?
- 多 worker 部署是否會重複執行? 特別提醒時區。 如果你需要固定某個地區時間,請明確設定 timezone。
from zoneinfo import ZoneInfo
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler(timezone=ZoneInfo("Asia/Taipei"))
不要假設伺服器時區跟你腦中一樣;容器、雲端平台、CI 環境常常不是。 時間一錯,排程就會變成靈異事件。
十四. 一個完整的小骨架 #
最後把今天重點濃縮成一個 scheduler.py。
import logging, signal, time
from zoneinfo import ZoneInfo
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.schedulers.background import BackgroundScheduler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pypy.scheduler")
def cleanup_temp_files(): logger.info("清理暫存檔:完成")
def send_daily_report(): logger.info("每日報表:完成")
def on_job_event(event):
logger.info("job ok: %s", event.job_id) if not event.exception else logger.exception("job failed: %s", event.job_id)
scheduler = BackgroundScheduler(timezone=ZoneInfo("Asia/Taipei"))
scheduler.add_listener(on_job_event, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.add_job(cleanup_temp_files, "interval", hours=1, id="cleanup", replace_existing=True, max_instances=1, coalesce=True, misfire_grace_time=300)
scheduler.add_job(send_daily_report, "cron", hour=9, minute=30, id="daily-report", replace_existing=True, max_instances=1, coalesce=True, misfire_grace_time=900)
stop = False
def handle_stop(signum, frame): # noqa: ARG001
global stop
stop = True
signal.signal(signal.SIGTERM, handle_stop)
signal.signal(signal.SIGINT, handle_stop)
scheduler.start()
while not stop:
time.sleep(1)
scheduler.shutdown(wait=True)
這個骨架重點不是功能多,而是有幾個好習慣:scheduler 建立邏輯集中、每個 job 都有 id、長任務有 max_instances、錯過執行有 grace time、程式收到 SIGTERM 會關閉 scheduler。
正式部署時,這些小細節會讓你少掉很多半夜除錯;排程 bug 很常不是馬上爆,而是默默重複跑、漏跑、晚跑。
結語:排程的重點不是「跑」,是「可預期地跑」 #
APScheduler 的入門門檻很低,一行 add_job() 就能讓函式定時執行。
但真正重要的是後面的工程習慣:你要知道任務什麼時候跑、失敗時誰會知道、跑太久時會不會疊在一起、程式重啟後任務還在不在。
如果你只是想讓 Python 腳本每天做點事,APScheduler 是很舒服的選擇。
如果需求變大,也可以把它當成通往 cron、worker queue、資料管線系統之前的中繼站。
先用小工具把流程跑順,再根據痛點升級架構,這才是不會被工具綁架的做法。
今天就先這樣,去把那個「我晚點再手動跑」的小任務交給 scheduler 吧。
拍拍君相信你可以少手動一次,就多一點人生自由。🕒