快轉到主要內容
  1. 教學文章/

Python APScheduler 實戰:讓程式定時執行背景工作

·9 分鐘· loading · loading · ·
Python Apscheduler Scheduler Cron Automation Background-Jobs
每日拍拍
作者
每日拍拍
科學家 X 科技宅宅
目錄
Python 學習 - 本文屬於一個選集。
§ 57: 本文

一. 前言:有些程式不是現在跑,是「等一下」跑
#

你一定遇過這種需求:每天早上九點寄報表、每五分鐘同步資料、使用者按下按鈕後三十分鐘再提醒、半夜自動清暫存檔。 這些事情看起來都可以用 while Truetime.sleep() 解決。

while True:
    do_something()
    time.sleep(300)

但拍拍君要先把你的手按住一下。 這種寫法可以當玩具,放進長期運作的程式就很快開始痛:程式重啟後狀態不清楚、任務跑太久會疊在一起、想改成工作日早上跑就變成時間判斷地獄。 今天的主角 APScheduler,就是用來處理「時間到了就執行 Python 函式」的工具。 你可以把它想成 Python 程式內建的小型 cron,但它比系統 cron 更容易跟你的 app 狀態、參數、管理介面接在一起。 它不是萬能背景工作系統,也不是 Celery 替代品;可是對內部工具、小型服務、Bot、自動化腳本、資料同步任務來說,常常剛剛好。 如果你看過 python-watchdog 那篇,可以這樣分:watchdog 是「檔案變了就做事」,APScheduler 是「時間到了就做事」。

二. 安裝:一個套件就能開始排程
#

先建立一個乾淨專案。

mkdir pypy-scheduler-demo
cd pypy-scheduler-demo
uv init
uv add apscheduler

不用 uv 的話,也可以用 pip。

python -m venv .venv
source .venv/bin/activate
pip install apscheduler

確認版本。

python -c "import apscheduler; print(apscheduler.__version__)"

APScheduler 有五個常見名詞:Scheduler 管理任務,Trigger 決定什麼時候跑,Job 是被排進去的工作,Executor 負責執行,Job store 負責保存任務資料。 一開始不用全部背起來,先把「Scheduler + Trigger + Job」這三個概念抓住就好。

三. 第一個排程:每三秒說一次 hi
#

建立 hello_scheduler.py

from datetime import datetime
import time
from apscheduler.schedulers.background import BackgroundScheduler
def say_hi() -> None:
    now = datetime.now().strftime("%H:%M:%S")
    print(f"[{now}] 拍拍君:hi,時間到了!")
scheduler = BackgroundScheduler()
scheduler.add_job(say_hi, "interval", seconds=3)
scheduler.start()
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.shutdown()
    print("排程器已關閉")

執行它。

python hello_scheduler.py

你會看到它每三秒印一次訊息。 BackgroundScheduler 會在背景 thread 裡執行任務,所以主程式不能馬上結束;範例中的 while True 只是讓程式活著,不是拿來控制排程時間。 真正決定「每三秒」的是 add_job(say_hi, "interval", seconds=3)。 這已經比手寫 sleep() 好,因為 scheduler 知道每個 job 的下次執行時間,也能同時管理多個 job。

四. 三種 Trigger:date、interval、cron
#

APScheduler 最常用的 trigger 有三種。 date:只在某個時間點跑一次。 interval:每隔固定時間跑一次。 cron:用類似 cron 的規則排程。 先看 date,適合「使用者做了某件事,稍後提醒」。

from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
def remind(message: str) -> None:
    print(f"提醒:{message}")
scheduler = BackgroundScheduler()
scheduler.start()
run_at = datetime.now() + timedelta(seconds=10)
scheduler.add_job(
    remind,
    "date",
    run_date=run_at,
    args=["喝水,不要變成乾掉的拍拍君"],
)

interval 適合固定頻率工作,例如每五分鐘同步一次資料。

scheduler.add_job(
    sync_cache,
    "interval",
    minutes=5,
    id="sync-cache",
)

cron 適合人類語意的行程,例如工作日早上九點半寄報表。

scheduler.add_job(
    send_report,
    "cron",
    day_of_week="mon-fri",
    hour=9,
    minute=30,
    id="weekday-report",
)

如果你原本想寫一堆 if now.hour == ...,請停手,交給 cron trigger,大家都會比較快樂。

五. 把 job 寫得可管理:id、args、kwargs
#

專案裡通常不會只有一個任務,所以 job 一定要命名。 APScheduler 用 id 管理 job。

def fetch_feed(feed_name: str, limit: int = 20) -> None:
    print(f"抓取 {feed_name},最多 {limit} 筆")
scheduler.add_job(
    fetch_feed,
    "interval",
    minutes=10,
    id="fetch-tech-feed",
    args=["tech"],
    kwargs={"limit": 50},
)

args 是位置參數,kwargs 是關鍵字參數。 如果你的程式啟動時會重新註冊固定任務,開發時常搭配 replace_existing=True

scheduler.add_job(
    fetch_feed,
    "interval",
    minutes=10,
    id="fetch-tech-feed",
    replace_existing=True,
    args=["tech"],
    kwargs={"limit": 50},
)

這表示如果同名 job 已經存在,就覆蓋掉。 也可以列出、暫停、恢復、移除 job。

for job in scheduler.get_jobs():
    print(job.id, job.next_run_time)
scheduler.pause_job("fetch-tech-feed")
scheduler.resume_job("fetch-tech-feed")
scheduler.remove_job("fetch-tech-feed")

一旦你的工具需要管理頁面或 API,穩定的 job id 就會變得非常重要。

六. 實戰範例:定時產生小型報表
#

假設有一份 events.jsonl,每一行是一筆事件。

{"user": "pypy", "event": "open", "value": 1}
{"user": "pypy", "event": "click", "value": 3}
{"user": "chatPTT", "event": "open", "value": 1}

我們想每分鐘產生一次摘要報表。

from collections import Counter
from datetime import datetime
import json
from pathlib import Path
import time
from apscheduler.schedulers.background import BackgroundScheduler
DATA = Path("events.jsonl")
REPORT = Path("report.txt")
def generate_report() -> None:
    counter: Counter[str] = Counter()
    total_value = 0
    if DATA.exists():
        for line in DATA.read_text(encoding="utf-8").splitlines():
            if not line.strip():
                continue
            row = json.loads(line)
            counter[row["event"]] += 1
            total_value += int(row.get("value", 0))
    lines = [
        f"更新時間:{datetime.now():%Y-%m-%d %H:%M:%S}",
        f"總 value:{total_value}",
        "事件統計:",
    ]
    lines += [f"- {event}: {count}" for event, count in counter.most_common()]
    REPORT.write_text("\n".join(lines) + "\n", encoding="utf-8")
    print("報表已更新")
scheduler = BackgroundScheduler()
scheduler.add_job(generate_report, "interval", minutes=1, id="report", replace_existing=True)
scheduler.start()
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.shutdown()

這個範例很樸素,但很接近真實內部工具:資料慢慢累積,報表不用每次 request 都重算,背景任務定期更新結果。 如果任務比較耗時,這種設計會讓使用者體驗穩很多。

七. 避免任務疊在一起:max_instances、coalesce、misfire
#

排程最可怕的狀況之一,是任務跑太久。 例如你設定每分鐘跑一次,但某次 API 很慢,跑了三分鐘;下一輪要不要同時再開一個? 如果不控制,很容易把系統打爆。

scheduler.add_job(
    generate_report,
    "interval",
    minutes=1,
    id="report",
    max_instances=1,
    coalesce=True,
    misfire_grace_time=30,
)

max_instances=1 表示同一個 job 同時最多跑一份,通常是背景同步任務的好預設。 coalesce=True 表示如果錯過很多次執行,只補跑一次;程式卡住五分鐘,不會醒來後連跑五次。 misfire_grace_time=30 表示如果晚了超過 30 秒,就放棄那次執行。 這些設定不是裝飾品,它們是部署環境的安全帶。 你要先決定:任務可以跳過嗎?可以重疊嗎?晚到多久還有意義? APScheduler 提供開關,工程師要提供判斷。

八. 錯誤處理:不要讓失敗默默消失
#

背景任務最討厭的地方,是失敗時沒人在看。 所以 job 裡請至少記錄錯誤。

import logging
logger = logging.getLogger(__name__)
def sync_remote_data() -> None:
    try:
        rows = download_rows()
        save_rows(rows)
    except Exception:
        logger.exception("同步遠端資料失敗")
        raise

如果你想監聽 job 成功或失敗,可以加 listener。

from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
def job_listener(event) -> None:
    if event.exception:
        print(f"job {event.job_id} 失敗:{event.exception}")
    else:
        print(f"job {event.job_id} 完成")
scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

正式環境不要只 print(),至少接到 logging,再進一步可以送到通知或監控系統。 如果任務是呼叫外部 API,也可以搭配 python-tenacity 做短暫重試。 但請記得:排程器負責「什麼時候再跑」,重試工具負責「這次執行中短暫失敗怎麼處理」。 兩者混在一起會變成很難除錯的義大利麵。

九. AsyncIO 整合:AsyncIOScheduler
#

如果你的程式本來就是 asyncio 架構,可以用 AsyncIOScheduler

import asyncio
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def ping_api() -> None:
    await asyncio.sleep(0.5)
    print(f"ping 完成:{datetime.now():%H:%M:%S}")
async def main() -> None:
    scheduler = AsyncIOScheduler()
    scheduler.add_job(ping_api, "interval", seconds=5, id="ping-api")
    scheduler.start()
    try:
        while True:
            await asyncio.sleep(1)
    finally:
        scheduler.shutdown()
if __name__ == "__main__":
    asyncio.run(main())

這適合本來就大量使用 async HTTP client、Bot framework 或 event loop 的程式。 但不要只是因為 async 看起來比較潮就硬上;如果任務都是普通同步 I/O,BackgroundScheduler 通常已經夠用。 如果任務是 CPU 密集,async 也不會神奇變快,那時候該考慮 multiprocessing、外部 worker,或把重活丟給別的服務。

十. 持久化 Job:程式重啟後還記得任務
#

預設情況下,APScheduler 的 job 存在記憶體裡,程式重啟後就沒了。 如果固定任務都寫在程式碼裡,啟動時重新 add_job() 就好。 但如果使用者可以動態建立提醒,就需要 job store。 常見做法是用 SQLite。

uv add sqlalchemy
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
jobstores = {
    "default": SQLAlchemyJobStore(url="sqlite:///jobs.sqlite"),
}
scheduler = BackgroundScheduler(jobstores=jobstores)

這樣 job 會被保存到 SQLite。 不過有一個超重要限制:可持久化的 job function 必須能被 Python 用 import 找到。 請不要用 lambda、巢狀函式、臨時 closure。

# tasks.py
def send_reminder(user_id: str, message: str) -> None:
    print(user_id, message)
from tasks import send_reminder
scheduler.add_job(
    send_reminder,
    "date",
    run_date=run_at,
    kwargs={"user_id": "pypy", "message": "記得休息"},
)

持久化很方便,但也代表你要注意資料庫備份、callable 版本變更,以及不要讓多個 scheduler 同時搶同一份 job store。

十一. 放進 Web App 要小心:多 worker 會重複跑
#

很多人會想在 FastAPI 或 Flask 裡直接啟動 APScheduler,小型內部工具可以,但要非常小心部署方式。 如果你用 gunicorn 開四個 worker,每個 worker 都執行啟動程式,結果同一個排程任務會跑四份。 這種 bug 很討厭,因為它通常不是立刻爆炸,而是悄悄寄四封信、同步四次資料、扣四次額度。 更穩的做法是把 scheduler 獨立成一個 process。

web process:       負責 API / UI
scheduler process: 負責定時任務
worker process:    負責重型背景工作(可選)

對小專案來說,可以用一個 scheduler.py 單獨跑。

python scheduler.py

部署時再用 systemd、supervisord、Docker Compose 或平台提供的 process manager 管它。 Web App 和排程分開,出問題時也比較好查。

十二. APScheduler vs cron vs Celery:到底該選誰?
#

選 APScheduler,如果任務跟 Python app 關係很近、需要動態新增/暫停/刪除 job、任務量不大,但想混用 date、interval、cron。 選系統 cron,如果任務很固定,只是每天或每小時跑一支腳本,不需要在 app 裡管理。 選 Celery、RQ、Dramatiq 這類 worker queue,如果任務很多、很重、要水平擴展,或需要可靠佇列、重試、結果保存。 簡單說:APScheduler 是程式內排程器,cron 是系統層排程器,Celery 類工具是分散式背景工作系統。 不要為了一個每天一次的小報表架 Celery,也不要用 APScheduler 硬撐百萬級任務佇列。 工具選對大小,頭髮會多留一點。

十三. 部署前檢查清單
#

真的要上線前,逐項看過。

  • Scheduler 是否只會啟動一份?
  • 每個 job 是否都有穩定 id?
  • 長任務是否設定 max_instances
  • 錯過執行時間時,是否設定 misfire_grace_time
  • 需要補跑多次,還是 coalesce=True 就好?
  • job 失敗是否會被 logging 或監控看到?
  • 外部 API 失敗是否有合理 retry?
  • 任務是否可以重複執行而不造成資料損壞?
  • 時區是否明確?
  • 程式關閉時是否有 scheduler.shutdown()
  • 如果用持久化 job store,callable 是否可 import?
  • 多 worker 部署是否會重複執行? 特別提醒時區。 如果你需要固定某個地區時間,請明確設定 timezone。
from zoneinfo import ZoneInfo
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler(timezone=ZoneInfo("Asia/Taipei"))

不要假設伺服器時區跟你腦中一樣;容器、雲端平台、CI 環境常常不是。 時間一錯,排程就會變成靈異事件。

十四. 一個完整的小骨架
#

最後把今天重點濃縮成一個 scheduler.py

import logging, signal, time
from zoneinfo import ZoneInfo
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.schedulers.background import BackgroundScheduler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pypy.scheduler")
def cleanup_temp_files(): logger.info("清理暫存檔:完成")
def send_daily_report(): logger.info("每日報表:完成")
def on_job_event(event):
    logger.info("job ok: %s", event.job_id) if not event.exception else logger.exception("job failed: %s", event.job_id)
scheduler = BackgroundScheduler(timezone=ZoneInfo("Asia/Taipei"))
scheduler.add_listener(on_job_event, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.add_job(cleanup_temp_files, "interval", hours=1, id="cleanup", replace_existing=True, max_instances=1, coalesce=True, misfire_grace_time=300)
scheduler.add_job(send_daily_report, "cron", hour=9, minute=30, id="daily-report", replace_existing=True, max_instances=1, coalesce=True, misfire_grace_time=900)
stop = False
def handle_stop(signum, frame):  # noqa: ARG001
    global stop
    stop = True
signal.signal(signal.SIGTERM, handle_stop)
signal.signal(signal.SIGINT, handle_stop)
scheduler.start()
while not stop:
    time.sleep(1)
scheduler.shutdown(wait=True)

這個骨架重點不是功能多,而是有幾個好習慣:scheduler 建立邏輯集中、每個 job 都有 id、長任務有 max_instances、錯過執行有 grace time、程式收到 SIGTERM 會關閉 scheduler。 正式部署時,這些小細節會讓你少掉很多半夜除錯;排程 bug 很常不是馬上爆,而是默默重複跑、漏跑、晚跑。

結語:排程的重點不是「跑」,是「可預期地跑」
#

APScheduler 的入門門檻很低,一行 add_job() 就能讓函式定時執行。 但真正重要的是後面的工程習慣:你要知道任務什麼時候跑、失敗時誰會知道、跑太久時會不會疊在一起、程式重啟後任務還在不在。 如果你只是想讓 Python 腳本每天做點事,APScheduler 是很舒服的選擇。 如果需求變大,也可以把它當成通往 cron、worker queue、資料管線系統之前的中繼站。 先用小工具把流程跑順,再根據痛點升級架構,這才是不會被工具綁架的做法。 今天就先這樣,去把那個「我晚點再手動跑」的小任務交給 scheduler 吧。 拍拍君相信你可以少手動一次,就多一點人生自由。🕒

延伸閱讀
#

Python 學習 - 本文屬於一個選集。
§ 57: 本文

相關文章

Python argparse 實戰:CLI 參數解析、旗標設計與 subcommands 完全攻略
·9 分鐘· loading · loading
Python Argparse Cli Command-Line Automation Developer-Tools
Python subprocess:外部命令執行與管道串接完全指南
·8 分鐘· loading · loading
Python Subprocess Shell Automation Cli
Python DuckDB 實戰:用 SQL 快速分析 CSV 與 Parquet
·6 分鐘· loading · loading
Python Duckdb SQL Parquet Data-Analysis Developer-Tools
Rich + Typer:打造漂亮又好用的 Python CLI 體驗
·10 分鐘· loading · loading
Python Rich Typer Cli Command-Line Developer-Tools
Streamlit 部署實戰:Secrets、設定檔與雲端上線完整攻略
·8 分鐘· loading · loading
Python Streamlit Deployment Secrets Config Cloud
Python Typer 進階:巢狀 subcommands、callback 與 CLI 架構
·9 分鐘· loading · loading
Python Typer Cli Command-Line Developer-Tools Testing