📋 このステップで学ぶこと
- cronライクなスケジュール設定
- start_dateとschedule_intervalの理解
- execution_date(処理対象日)の概念
- バックフィル(catchup)
- 手動トリガーの方法
⏱️ 学習時間の目安:3時間
📝 練習問題:10問(基礎4問・応用4問・発展2問)
🎯 1. スケジュール実行の基礎
1-1. なぜスケジュール実行が重要か?
ETLパイプラインは、定期的に自動実行することが多いです。
毎日、毎時間、毎週など、決まった時間に実行させる必要があります。
📚 例え話:目覚まし時計
Airflowのスケジュール機能は目覚まし時計に似ています。
・schedule_interval = 何時に鳴らすか(毎朝7時など)
・start_date = いつから使い始めるか
・catchup = 鳴らなかった分も後から鳴らすか
・手動トリガー = 今すぐ鳴らすボタン
一度設定すれば、毎日自動で起こしてくれますよね?
Airflowも同様に、設定すれば自動で実行してくれます。
📊 よくあるスケジュール例
- 毎日朝9時:日次レポート作成
- 毎時0分:リアルタイムデータ集計
- 毎週月曜朝:週次サマリー作成
- 毎月1日:月次売上集計
1-2. スケジュール実行の仕組み
実行タイミングの計算
【公式】
実行時刻 = start_date + schedule_interval
【例:毎日実行(@daily)】
start_date = 2024-01-01 00:00
schedule_interval = @daily(24時間)
タイムライン:
2024-01-01 00:00 ← start_date(この時点では実行されない)
│
▼ +24時間
2024-01-02 00:00 ← 初回実行 ✅
│
▼ +24時間
2024-01-03 00:00 ← 2回目実行 ✅
│
▼ +24時間
2024-01-04 00:00 ← 3回目実行 ✅
💡 重要な概念:execution_date
Airflowは「過去のデータ」を処理する設計です。
例えば、2024-01-02 00:00に実行されるDAGは、
2024-01-01のデータを処理します。
この「処理対象の日時」をexecution_dateと呼びます。
📅 2. schedule_interval の詳細
2-1. プリセット値
Airflowのプリセット値
| プリセット |
意味 |
cron形式 |
用途例 |
None |
手動実行のみ |
– |
テスト、不定期実行 |
@once |
一度だけ実行 |
– |
初期化処理 |
@hourly |
毎時0分 |
0 * * * * |
リアルタイム集計 |
@daily |
毎日0時0分 |
0 0 * * * |
日次レポート |
@weekly |
毎週日曜0時 |
0 0 * * 0 |
週次サマリー |
@monthly |
毎月1日0時 |
0 0 1 * * |
月次売上集計 |
@yearly |
毎年1月1日0時 |
0 0 1 1 * |
年次レポート |
2-2. cron形式の詳細
# ===== cron形式の構成 =====
# * * * * *
# ┬ ┬ ┬ ┬ ┬
# │ │ │ │ └─ 曜日(0-6、0=日曜、7=日曜)
# │ │ │ └─── 月(1-12)
# │ │ └───── 日(1-31)
# │ └─────── 時(0-23)
# └───────── 分(0-59)
# 【特殊文字】
# * : すべて
# , : 複数指定(例: 1,3,5)
# – : 範囲指定(例: 1-5)
# / : 間隔指定(例: */15 = 15分ごと)
2-3. よく使うcronパターン
実務でよく使うcronパターン
| cron形式 |
意味 |
用途 |
'0 9 * * *' |
毎日9時0分 |
朝のレポート |
'30 9 * * *' |
毎日9時30分 |
朝のレポート |
'*/15 * * * *' |
15分ごと |
高頻度監視 |
'0 */3 * * *' |
3時間ごと |
中頻度集計 |
'0 9 * * 1-5' |
平日9時 |
営業日レポート |
'0 10 * * 6,0' |
週末10時 |
週末バッチ |
'0 9 1,15 * *' |
毎月1日と15日の9時 |
半月レポート |
'0 0 1 * *' |
毎月1日0時 |
月次バッチ |
2-4. 実装例
# ===== スケジュール設定の実装例 =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def daily_report():
print(“日次レポートを作成中…”)
# 例1: 毎日朝9時に実行
with DAG(
dag_id=’daily_report_9am’,
start_date=datetime(2024, 1, 1),
schedule_interval=’0 9 * * *’, # 毎日9時0分
catchup=False
) as dag:
task = PythonOperator(
task_id=’create_report’,
python_callable=daily_report
)
# 例2: 15分ごとに実行
with DAG(
dag_id=’every_15_minutes’,
start_date=datetime(2024, 1, 1),
schedule_interval=’*/15 * * * *’, # 15分ごと
catchup=False
) as dag:
task = PythonOperator(
task_id=’check_data’,
python_callable=lambda: print(“データチェック中…”)
)
# 例3: 平日のみ8時に実行
with DAG(
dag_id=’weekday_morning’,
start_date=datetime(2024, 1, 1),
schedule_interval=’0 8 * * 1-5′, # 月〜金 8時0分
catchup=False
) as dag:
task = PythonOperator(
task_id=’morning_task’,
python_callable=lambda: print(“おはようございます”)
)
2-5. timedeltaを使ったスケジュール
# ===== timedeltaを使ったスケジュール =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# 30分ごとに実行
with DAG(
dag_id=’every_30_minutes’,
start_date=datetime(2024, 1, 1),
schedule_interval=timedelta(minutes=30),
catchup=False
) as dag:
task = PythonOperator(
task_id=’my_task’,
python_callable=lambda: print(“30分ごとの実行”)
)
# 6時間ごとに実行
with DAG(
dag_id=’every_6_hours’,
start_date=datetime(2024, 1, 1),
schedule_interval=timedelta(hours=6),
catchup=False
) as dag:
task = PythonOperator(
task_id=’my_task’,
python_callable=lambda: print(“6時間ごとの実行”)
)
【Airflow UIでの表示】
DAG: every_30_minutes
├── Schedule: 0:30:00 (timedelta)
├── 次回実行: 2024-01-15 10:00:00
└── 前回実行: 2024-01-15 09:30:00
🕐 3. start_date と execution_date
3-1. start_date の意味
start_dateは、DAGが実行を開始する日時です。
ただし、実際の初回実行はstart_date + schedule_intervalになります。
⚠️ よくある誤解
start_date = 2024-01-01 00:00
schedule_interval = @daily
→ 2024-01-01には実行されません!
→ 初回実行は2024-01-02 00:00です
3-2. execution_date の概念
execution_date と 実行時刻の関係
【execution_date = 処理対象のデータの日時】
タイムライン:
2024-01-01 00:00 ─────────────────────┐
│ │
│ ← この日のデータを処理 │
│ │
2024-01-02 00:00 ←───── 実行時刻 ────┘
execution_date = 2024-01-01
なぜこうなる?
→ データは「過去のもの」を処理するから
→ 2024-01-01が終わってから、そのデータを処理する
3-3. execution_dateを使う
# ===== execution_dateを使う =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def process_data(**context):
# execution_dateを取得
execution_date = context[‘execution_date’]
print(f”実行日時: {datetime.now()}”)
print(f”処理対象日: {execution_date}”)
# 日付をファイル名に使う
filename = f”data_{execution_date.strftime(‘%Y%m%d’)}.csv”
print(f”処理するファイル: {filename}”)
with DAG(
dag_id=’use_execution_date’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
task = PythonOperator(
task_id=’process’,
python_callable=process_data
)
【実行ログ(2024-01-02 00:00に実行された場合)】
実行日時: 2024-01-02 00:00:15
処理対象日: 2024-01-01 00:00:00+00:00
処理するファイル: data_20240101.csv
3-4. start_dateの設定方法
# ===== start_dateの設定方法 =====
# 方法1: 固定の日時を指定(推奨)
from datetime import datetime
start_date = datetime(2024, 1, 1)
# 方法2: 現在から相対的に指定(非推奨)
from datetime import datetime, timedelta
start_date = datetime.now() – timedelta(days=1) # ❌ 避ける
# 方法3: pendulumを使う(タイムゾーン対応、推奨)
import pendulum
start_date = pendulum.datetime(2024, 1, 1, tz=’Asia/Tokyo’)
🎯 ベストプラクティス
- start_dateは固定値を使う(datetime.now()は避ける)
- タイムゾーンを明示する(pendulum推奨)
- 過去の日付を指定してOK
🔄 4. catchup(バックフィル)
4-1. catchupとは?
catchupは、過去の未実行分を自動で実行するかを指定するパラメータです。
catchupの動作
【設定】
start_date = 2024-01-01
今日 = 2024-01-10
schedule_interval = @daily
【catchup=True の場合】
2024-01-02 ✅ 実行
2024-01-03 ✅ 実行
2024-01-04 ✅ 実行
2024-01-05 ✅ 実行
2024-01-06 ✅ 実行
2024-01-07 ✅ 実行
2024-01-08 ✅ 実行
2024-01-09 ✅ 実行
2024-01-10 ✅ 実行
→ 9日分すべて実行(バックフィル)
【catchup=False の場合】
2024-01-02 ⏭️ スキップ
2024-01-03 ⏭️ スキップ
...
2024-01-09 ⏭️ スキップ
2024-01-10 ✅ 実行
→ 最新の1回分だけ実行
4-2. catchupの使い分け
catchupの設定ガイド
| 設定 |
使い所 |
注意点 |
catchup=False (推奨) |
・最新データだけ処理 ・過去データは不要 ・新規DAG追加時 |
過去データは処理されない |
| catchup=True |
・過去データも遡って処理 ・履歴データの再計算 ・データ整合性が重要 |
リソース大量消費の可能性 |
4-3. 実装例
# ===== catchupの設定 =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def process(**context):
execution_date = context[‘execution_date’]
print(f”処理日: {execution_date}”)
# catchup=False(推奨)
with DAG(
dag_id=’no_catchup’,
start_date=datetime(2024, 1, 1), # 過去の日付でもOK
schedule_interval=’@daily’,
catchup=False # 過去分は実行しない
) as dag:
task = PythonOperator(
task_id=’process’,
python_callable=process
)
# catchup=True(過去分も実行)
with DAG(
dag_id=’with_catchup’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=True, # 過去分も実行する
max_active_runs=3 # 同時実行は最大3つまで
) as dag:
task = PythonOperator(
task_id=’process’,
python_callable=process
)
⚠️ catchup=True の注意点
- 大量の過去データがあると、一気に実行される
- リソースを大量消費する可能性
- 本番環境ではmax_active_runsで制限する
🎮 5. 手動トリガー
5-1. 手動実行の方法
DAGをすぐに実行したいときは、手動でトリガーします。
手動実行の方法
| 方法 |
手順 |
用途 |
| Web UI |
▶ボタンをクリック |
開発・テスト |
| CLI |
airflow dags trigger |
スクリプト連携 |
| REST API |
POSTリクエスト |
外部システム連携 |
5-2. CLIからトリガー
# DAGを手動実行
airflow dags trigger my_dag_id
# 特定の日付で実行
airflow dags trigger my_dag_id –exec-date 2024-01-01
# 設定を渡して実行
airflow dags trigger my_dag_id –conf ‘{“key”: “value”}’
【実行結果】
$ airflow dags trigger my_dag_id
Created <DagRun my_dag_id @ 2024-01-15T10:30:00+00:00: manual__2024-01-15T10:30:00+00:00, state:queued>
5-3. REST APIからトリガー
# ===== REST APIからトリガー =====
import requests
import json
# Airflow API エンドポイント
url = ‘http://localhost:8080/api/v1/dags/my_dag_id/dagRuns’
# 認証情報
auth = (‘airflow’, ‘airflow’)
# リクエストボディ
data = {
“conf”: {},
“dag_run_id”: “manual_run_001”
}
# POSTリクエスト
response = requests.post(
url,
auth=auth,
headers={‘Content-Type’: ‘application/json’},
data=json.dumps(data)
)
print(response.status_code)
print(response.json())
【実行結果】
200
{
“dag_id”: “my_dag_id”,
“dag_run_id”: “manual_run_001”,
“state”: “queued”,
“execution_date”: “2024-01-15T10:30:00+00:00”
}
5-4. 手動実行専用のDAG
# ===== 手動実行専用DAG =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# 手動実行専用(自動実行しない)
with DAG(
dag_id=’manual_only’,
start_date=datetime(2024, 1, 1),
schedule_interval=None, # 手動実行のみ
catchup=False
) as dag:
task = PythonOperator(
task_id=’manual_task’,
python_callable=lambda: print(“手動で実行されました”)
)
🎯 schedule_interval=None の用途
- テスト用DAG
- 不定期実行タスク
- 手動でのみ実行したいタスク
- 外部システムからのトリガー専用
💼 6. 実践演習:毎日朝9時実行のパイプライン
6-1. 要件
- 毎日朝9時に自動実行
- 前日のデータを処理
- 処理結果をファイルに保存
- 完了通知
6-2. 完全なDAG実装
# ===== 毎日朝9時に実行される日次レポートDAG =====
# ファイル名: daily_report_9am.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import logging
logger = logging.getLogger(__name__)
# =====================================
# タスク関数の定義
# =====================================
def extract_yesterday_data(**context):
“””前日のデータを抽出”””
execution_date = context[‘execution_date’]
target_date = execution_date.strftime(‘%Y-%m-%d’)
logger.info(f”データ抽出開始: {target_date}”)
# ダミーデータ(実際はDBやAPIから取得)
data = {
‘日付’: [target_date] * 5,
‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’, ‘ぶどう’, ‘いちご’],
‘売上’: [15000, 25000, 10000, 30000, 20000]
}
df = pd.DataFrame(data)
temp_file = f’/tmp/extracted_{target_date}.csv’
df.to_csv(temp_file, index=False)
logger.info(f”データ抽出完了: {len(df)}件”)
return temp_file
def transform_data(**context):
“””データを変換”””
ti = context[‘ti’]
temp_file = ti.xcom_pull(task_ids=’extract’)
logger.info(f”データ変換開始: {temp_file}”)
df = pd.read_csv(temp_file)
df[‘税込売上’] = (df[‘売上’] * 1.1).astype(int)
df[‘手数料’] = (df[‘売上’] * 0.05).astype(int)
df[‘純利益’] = df[‘税込売上’] – df[‘手数料’]
execution_date = context[‘execution_date’]
target_date = execution_date.strftime(‘%Y-%m-%d’)
transformed_file = f’/tmp/transformed_{target_date}.csv’
df.to_csv(transformed_file, index=False)
logger.info(f”データ変換完了: {len(df)}件”)
return transformed_file
def create_report(**context):
“””レポートを作成”””
ti = context[‘ti’]
transformed_file = ti.xcom_pull(task_ids=’transform’)
df = pd.read_csv(transformed_file)
total_sales = df[‘売上’].sum()
total_profit = df[‘純利益’].sum()
execution_date = context[‘execution_date’]
target_date = execution_date.strftime(‘%Y-%m-%d’)
report_file = f’/tmp/daily_report_{target_date}.txt’
with open(report_file, ‘w’, encoding=’utf-8′) as f:
f.write(f”日次レポート – {target_date}\n”)
f.write(“=” * 50 + “\n\n”)
f.write(f”総売上: ¥{total_sales:,}\n”)
f.write(f”純利益: ¥{total_profit:,}\n”)
f.write(f”利益率: {(total_profit/total_sales*100):.1f}%\n”)
logger.info(f”レポート作成完了: {report_file}”)
return report_file
def notify_completion(**context):
“””完了通知”””
execution_date = context[‘execution_date’]
target_date = execution_date.strftime(‘%Y-%m-%d’)
logger.info(f”✅ {target_date}の日次レポートが完了しました”)
# 実際はここでメールやSlackに通知
# =====================================
# DAGの定義
# =====================================
default_args = {
‘owner’: ‘data_team’,
‘depends_on_past’: False,
‘email’: [‘admin@example.com’],
‘email_on_failure’: True,
‘retries’: 2,
‘retry_delay’: timedelta(minutes=5)
}
with DAG(
dag_id=’daily_report_9am’,
default_args=default_args,
description=’毎日朝9時に実行される日次レポート’,
start_date=datetime(2024, 1, 1),
schedule_interval=’0 9 * * *’, # 毎日9時0分
catchup=False,
tags=[‘daily’, ‘report’]
) as dag:
extract = PythonOperator(
task_id=’extract’,
python_callable=extract_yesterday_data
)
transform = PythonOperator(
task_id=’transform’,
python_callable=transform_data
)
report = PythonOperator(
task_id=’report’,
python_callable=create_report
)
notify = PythonOperator(
task_id=’notify’,
python_callable=notify_completion
)
extract >> transform >> report >> notify
【実行ログ(2024-01-02 09:00に実行)】
[2024-01-02 09:00:01] INFO – データ抽出開始: 2024-01-01
[2024-01-02 09:00:02] INFO – データ抽出完了: 5件
[2024-01-02 09:00:03] INFO – データ変換開始: /tmp/extracted_2024-01-01.csv
[2024-01-02 09:00:04] INFO – データ変換完了: 5件
[2024-01-02 09:00:05] INFO – レポート作成完了: /tmp/daily_report_2024-01-01.txt
[2024-01-02 09:00:06] INFO – ✅ 2024-01-01の日次レポートが完了しました
📝 STEP 21 のまとめ
✅ このステップで学んだこと
- schedule_interval:プリセット値とcron形式
- start_date:DAGの開始日時
- execution_date:処理対象のデータの日時
- catchup:過去の未実行分を実行するか
- 手動トリガー:Web UI、CLI、API
💡 重要ポイント
- 実行時刻 = start_date + schedule_interval
- catchup=False を推奨
- cron形式で柔軟なスケジュール設定
- execution_dateは処理対象日
🎯 次のステップの予告
次のSTEP 22では、「タスク間のデータ受け渡し」を学びます。
- XCom(クロスコミュニケーション)
- TaskFlow APIの使い方
- Variables と Connections
📝 練習問題
問題 1
基礎
毎日朝8時に実行されるDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def morning_task():
print(“朝8時のタスク”)
with DAG(
dag_id=’daily_8am’,
start_date=datetime(2024, 1, 1),
schedule_interval=’0 8 * * *’, # 毎日8時0分
catchup=False
) as dag:
task = PythonOperator(
task_id=’morning_task’,
python_callable=morning_task
)
問題 2
基礎
毎時30分に実行されるDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def hourly_task():
print(“毎時30分のタスク”)
with DAG(
dag_id=’hourly_30′,
start_date=datetime(2024, 1, 1),
schedule_interval=’30 * * * *’, # 毎時30分
catchup=False
) as dag:
task = PythonOperator(
task_id=’hourly_task’,
python_callable=hourly_task
)
問題 3
基礎
手動実行のみ(schedule_interval=None)のDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def manual_task():
print(“手動で実行されました”)
with DAG(
dag_id=’manual_only’,
start_date=datetime(2024, 1, 1),
schedule_interval=None, # 手動実行のみ
catchup=False
) as dag:
task = PythonOperator(
task_id=’manual_task’,
python_callable=manual_task
)
問題 4
基礎
@weeklyプリセットを使って、毎週実行されるDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def weekly_task():
print(“週次タスク実行”)
with DAG(
dag_id=’weekly_dag’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@weekly’, # 毎週日曜0時
catchup=False
) as dag:
task = PythonOperator(
task_id=’weekly_task’,
python_callable=weekly_task
)
問題 5
応用
平日(月〜金)の朝9時に実行されるDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def weekday_task():
print(“平日朝9時のタスク”)
with DAG(
dag_id=’weekday_9am’,
start_date=datetime(2024, 1, 1),
schedule_interval=’0 9 * * 1-5′, # 平日9時
catchup=False
) as dag:
task = PythonOperator(
task_id=’weekday_task’,
python_callable=weekday_task
)
問題 6
応用
15分ごとに実行されるDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def check_task():
print(“15分ごとのチェック”)
with DAG(
dag_id=’every_15_minutes’,
start_date=datetime(2024, 1, 1),
schedule_interval=’*/15 * * * *’, # 15分ごと
catchup=False
) as dag:
task = PythonOperator(
task_id=’check_task’,
python_callable=check_task
)
問題 7
応用
execution_dateを取得して、処理対象日をログに出力するDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
def process_data(**context):
execution_date = context[‘execution_date’]
target_date = execution_date.strftime(‘%Y-%m-%d’)
logger.info(f”処理対象日: {target_date}”)
with DAG(
dag_id=’use_execution_date’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
task = PythonOperator(
task_id=’process’,
python_callable=process_data
)
問題 8
応用
timedeltaを使って、2時間ごとに実行されるDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def bi_hourly_task():
print(“2時間ごとのタスク”)
with DAG(
dag_id=’every_2_hours’,
start_date=datetime(2024, 1, 1),
schedule_interval=timedelta(hours=2),
catchup=False
) as dag:
task = PythonOperator(
task_id=’bi_hourly_task’,
python_callable=bi_hourly_task
)
問題 9
発展
catchup=Trueで、max_active_runs=2を設定したDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
def backfill_task(**context):
execution_date = context[‘execution_date’]
logger.info(f”バックフィル処理: {execution_date}”)
default_args = {
‘owner’: ‘data_engineer’,
‘retries’: 1,
‘retry_delay’: timedelta(minutes=5)
}
with DAG(
dag_id=’controlled_catchup’,
default_args=default_args,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=True, # 過去分を実行
max_active_runs=2 # 同時実行は最大2つまで
) as dag:
task = PythonOperator(
task_id=’backfill_task’,
python_callable=backfill_task
)
問題 10
発展
毎月1日と15日の朝10時に実行される月2回のDAGを作成してください。処理対象月をログに出力してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
def semi_monthly_report(**context):
execution_date = context[‘execution_date’]
target_month = execution_date.strftime(‘%Y年%m月’)
day = execution_date.day
if day == 1:
period = “上旬”
else:
period = “下旬”
logger.info(f”処理対象: {target_month}{period}”)
logger.info(“半月レポートを作成中…”)
default_args = {
‘owner’: ‘data_team’,
‘retries’: 2,
‘retry_delay’: timedelta(minutes=5)
}
with DAG(
dag_id=’semi_monthly_report’,
default_args=default_args,
description=’毎月1日と15日に実行’,
start_date=datetime(2024, 1, 1),
schedule_interval=’0 10 1,15 * *’, # 毎月1日と15日の10時
catchup=False,
tags=[‘monthly’, ‘report’]
) as dag:
task = PythonOperator(
task_id=’semi_monthly_report’,
python_callable=semi_monthly_report
)