STEP 21:スケジュール実行とトリガー

⏰ STEP 21: スケジュール実行とトリガー

DAGを自動実行する設定とトリガー方法を学ぼう

📋 このステップで学ぶこと

  • cronライクなスケジュール設定
  • start_dateとschedule_intervalの理解
  • execution_date(処理対象日)の概念
  • バックフィル(catchup)
  • 手動トリガーの方法

⏱️ 学習時間の目安:3時間

📝 練習問題:10問(基礎4問・応用4問・発展2問)

🎯 1. スケジュール実行の基礎

1-1. なぜスケジュール実行が重要か?

ETLパイプラインは、定期的に自動実行することが多いです。
毎日、毎時間、毎週など、決まった時間に実行させる必要があります。

📚 例え話:目覚まし時計

Airflowのスケジュール機能は目覚まし時計に似ています。

schedule_interval = 何時に鳴らすか(毎朝7時など)
start_date = いつから使い始めるか
catchup = 鳴らなかった分も後から鳴らすか
手動トリガー = 今すぐ鳴らすボタン

一度設定すれば、毎日自動で起こしてくれますよね?
Airflowも同様に、設定すれば自動で実行してくれます。

📊 よくあるスケジュール例
  • 毎日朝9時:日次レポート作成
  • 毎時0分:リアルタイムデータ集計
  • 毎週月曜朝:週次サマリー作成
  • 毎月1日:月次売上集計

1-2. スケジュール実行の仕組み

実行タイミングの計算
【公式】
実行時刻 = start_date + schedule_interval

【例:毎日実行(@daily)】
start_date = 2024-01-01 00:00
schedule_interval = @daily(24時間)

タイムライン:
2024-01-01 00:00  ← start_date(この時点では実行されない)
       │
       ▼ +24時間
2024-01-02 00:00  ← 初回実行 ✅
       │
       ▼ +24時間
2024-01-03 00:00  ← 2回目実行 ✅
       │
       ▼ +24時間
2024-01-04 00:00  ← 3回目実行 ✅
        
💡 重要な概念:execution_date

Airflowは「過去のデータ」を処理する設計です。

例えば、2024-01-02 00:00に実行されるDAGは、
2024-01-01のデータを処理します。

この「処理対象の日時」をexecution_dateと呼びます。

📅 2. schedule_interval の詳細

2-1. プリセット値

Airflowのプリセット値
プリセット 意味 cron形式 用途例
None 手動実行のみ テスト、不定期実行
@once 一度だけ実行 初期化処理
@hourly 毎時0分 0 * * * * リアルタイム集計
@daily 毎日0時0分 0 0 * * * 日次レポート
@weekly 毎週日曜0時 0 0 * * 0 週次サマリー
@monthly 毎月1日0時 0 0 1 * * 月次売上集計
@yearly 毎年1月1日0時 0 0 1 1 * 年次レポート

2-2. cron形式の詳細

# ===== cron形式の構成 ===== # * * * * * # ┬ ┬ ┬ ┬ ┬ # │ │ │ │ └─ 曜日(0-6、0=日曜、7=日曜) # │ │ │ └─── 月(1-12) # │ │ └───── 日(1-31) # │ └─────── 時(0-23) # └───────── 分(0-59) # 【特殊文字】 # * : すべて # , : 複数指定(例: 1,3,5) # – : 範囲指定(例: 1-5) # / : 間隔指定(例: */15 = 15分ごと)

2-3. よく使うcronパターン

実務でよく使うcronパターン
cron形式 意味 用途
'0 9 * * *' 毎日9時0分 朝のレポート
'30 9 * * *' 毎日9時30分 朝のレポート
'*/15 * * * *' 15分ごと 高頻度監視
'0 */3 * * *' 3時間ごと 中頻度集計
'0 9 * * 1-5' 平日9時 営業日レポート
'0 10 * * 6,0' 週末10時 週末バッチ
'0 9 1,15 * *' 毎月1日と15日の9時 半月レポート
'0 0 1 * *' 毎月1日0時 月次バッチ

2-4. 実装例

# ===== スケジュール設定の実装例 ===== from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def daily_report(): print(“日次レポートを作成中…”) # 例1: 毎日朝9時に実行 with DAG( dag_id=’daily_report_9am’, start_date=datetime(2024, 1, 1), schedule_interval=’0 9 * * *’, # 毎日9時0分 catchup=False ) as dag: task = PythonOperator( task_id=’create_report’, python_callable=daily_report ) # 例2: 15分ごとに実行 with DAG( dag_id=’every_15_minutes’, start_date=datetime(2024, 1, 1), schedule_interval=’*/15 * * * *’, # 15分ごと catchup=False ) as dag: task = PythonOperator( task_id=’check_data’, python_callable=lambda: print(“データチェック中…”) ) # 例3: 平日のみ8時に実行 with DAG( dag_id=’weekday_morning’, start_date=datetime(2024, 1, 1), schedule_interval=’0 8 * * 1-5′, # 月〜金 8時0分 catchup=False ) as dag: task = PythonOperator( task_id=’morning_task’, python_callable=lambda: print(“おはようございます”) )

2-5. timedeltaを使ったスケジュール

# ===== timedeltaを使ったスケジュール ===== from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta # 30分ごとに実行 with DAG( dag_id=’every_30_minutes’, start_date=datetime(2024, 1, 1), schedule_interval=timedelta(minutes=30), catchup=False ) as dag: task = PythonOperator( task_id=’my_task’, python_callable=lambda: print(“30分ごとの実行”) ) # 6時間ごとに実行 with DAG( dag_id=’every_6_hours’, start_date=datetime(2024, 1, 1), schedule_interval=timedelta(hours=6), catchup=False ) as dag: task = PythonOperator( task_id=’my_task’, python_callable=lambda: print(“6時間ごとの実行”) )
【Airflow UIでの表示】 DAG: every_30_minutes ├── Schedule: 0:30:00 (timedelta) ├── 次回実行: 2024-01-15 10:00:00 └── 前回実行: 2024-01-15 09:30:00

🕐 3. start_date と execution_date

3-1. start_date の意味

start_dateは、DAGが実行を開始する日時です。
ただし、実際の初回実行はstart_date + schedule_intervalになります。

⚠️ よくある誤解

start_date = 2024-01-01 00:00
schedule_interval = @daily

2024-01-01には実行されません!
→ 初回実行は2024-01-02 00:00です

3-2. execution_date の概念

execution_date と 実行時刻の関係
【execution_date = 処理対象のデータの日時】

タイムライン:
2024-01-01 00:00 ─────────────────────┐
     │                               │
     │  ← この日のデータを処理        │
     │                               │
2024-01-02 00:00 ←───── 実行時刻 ────┘
                        execution_date = 2024-01-01

なぜこうなる?
→ データは「過去のもの」を処理するから
→ 2024-01-01が終わってから、そのデータを処理する
        

3-3. execution_dateを使う

# ===== execution_dateを使う ===== from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def process_data(**context): # execution_dateを取得 execution_date = context[‘execution_date’] print(f”実行日時: {datetime.now()}”) print(f”処理対象日: {execution_date}”) # 日付をファイル名に使う filename = f”data_{execution_date.strftime(‘%Y%m%d’)}.csv” print(f”処理するファイル: {filename}”) with DAG( dag_id=’use_execution_date’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: task = PythonOperator( task_id=’process’, python_callable=process_data )
【実行ログ(2024-01-02 00:00に実行された場合)】 実行日時: 2024-01-02 00:00:15 処理対象日: 2024-01-01 00:00:00+00:00 処理するファイル: data_20240101.csv

3-4. start_dateの設定方法

# ===== start_dateの設定方法 ===== # 方法1: 固定の日時を指定(推奨) from datetime import datetime start_date = datetime(2024, 1, 1) # 方法2: 現在から相対的に指定(非推奨) from datetime import datetime, timedelta start_date = datetime.now() – timedelta(days=1) # ❌ 避ける # 方法3: pendulumを使う(タイムゾーン対応、推奨) import pendulum start_date = pendulum.datetime(2024, 1, 1, tz=’Asia/Tokyo’)
🎯 ベストプラクティス
  • start_dateは固定値を使う(datetime.now()は避ける)
  • タイムゾーンを明示する(pendulum推奨
  • 過去の日付を指定してOK

🔄 4. catchup(バックフィル)

4-1. catchupとは?

catchupは、過去の未実行分を自動で実行するかを指定するパラメータです。

catchupの動作
【設定】
start_date = 2024-01-01
今日 = 2024-01-10
schedule_interval = @daily

【catchup=True の場合】
2024-01-02 ✅ 実行
2024-01-03 ✅ 実行
2024-01-04 ✅ 実行
2024-01-05 ✅ 実行
2024-01-06 ✅ 実行
2024-01-07 ✅ 実行
2024-01-08 ✅ 実行
2024-01-09 ✅ 実行
2024-01-10 ✅ 実行
→ 9日分すべて実行(バックフィル)

【catchup=False の場合】
2024-01-02 ⏭️ スキップ
2024-01-03 ⏭️ スキップ
    ...
2024-01-09 ⏭️ スキップ
2024-01-10 ✅ 実行
→ 最新の1回分だけ実行
        

4-2. catchupの使い分け

catchupの設定ガイド
設定 使い所 注意点
catchup=False
(推奨)
・最新データだけ処理
・過去データは不要
・新規DAG追加時
過去データは処理されない
catchup=True ・過去データも遡って処理
・履歴データの再計算
・データ整合性が重要
リソース大量消費の可能性

4-3. 実装例

# ===== catchupの設定 ===== from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def process(**context): execution_date = context[‘execution_date’] print(f”処理日: {execution_date}”) # catchup=False(推奨) with DAG( dag_id=’no_catchup’, start_date=datetime(2024, 1, 1), # 過去の日付でもOK schedule_interval=’@daily’, catchup=False # 過去分は実行しない ) as dag: task = PythonOperator( task_id=’process’, python_callable=process ) # catchup=True(過去分も実行) with DAG( dag_id=’with_catchup’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=True, # 過去分も実行する max_active_runs=3 # 同時実行は最大3つまで ) as dag: task = PythonOperator( task_id=’process’, python_callable=process )
⚠️ catchup=True の注意点
  • 大量の過去データがあると、一気に実行される
  • リソースを大量消費する可能性
  • 本番環境ではmax_active_runsで制限する

🎮 5. 手動トリガー

5-1. 手動実行の方法

DAGをすぐに実行したいときは、手動でトリガーします。

手動実行の方法
方法 手順 用途
Web UI ▶ボタンをクリック 開発・テスト
CLI airflow dags trigger スクリプト連携
REST API POSTリクエスト 外部システム連携

5-2. CLIからトリガー

# DAGを手動実行 airflow dags trigger my_dag_id # 特定の日付で実行 airflow dags trigger my_dag_id –exec-date 2024-01-01 # 設定を渡して実行 airflow dags trigger my_dag_id –conf ‘{“key”: “value”}’
【実行結果】 $ airflow dags trigger my_dag_id Created <DagRun my_dag_id @ 2024-01-15T10:30:00+00:00: manual__2024-01-15T10:30:00+00:00, state:queued>

5-3. REST APIからトリガー

# ===== REST APIからトリガー ===== import requests import json # Airflow API エンドポイント url = ‘http://localhost:8080/api/v1/dags/my_dag_id/dagRuns’ # 認証情報 auth = (‘airflow’, ‘airflow’) # リクエストボディ data = { “conf”: {}, “dag_run_id”: “manual_run_001” } # POSTリクエスト response = requests.post( url, auth=auth, headers={‘Content-Type’: ‘application/json’}, data=json.dumps(data) ) print(response.status_code) print(response.json())
【実行結果】 200 { “dag_id”: “my_dag_id”, “dag_run_id”: “manual_run_001”, “state”: “queued”, “execution_date”: “2024-01-15T10:30:00+00:00” }

5-4. 手動実行専用のDAG

# ===== 手動実行専用DAG ===== from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime # 手動実行専用(自動実行しない) with DAG( dag_id=’manual_only’, start_date=datetime(2024, 1, 1), schedule_interval=None, # 手動実行のみ catchup=False ) as dag: task = PythonOperator( task_id=’manual_task’, python_callable=lambda: print(“手動で実行されました”) )
🎯 schedule_interval=None の用途
  • テスト用DAG
  • 不定期実行タスク
  • 手動でのみ実行したいタスク
  • 外部システムからのトリガー専用

💼 6. 実践演習:毎日朝9時実行のパイプライン

6-1. 要件

  • 毎日朝9時に自動実行
  • 前日のデータを処理
  • 処理結果をファイルに保存
  • 完了通知

6-2. 完全なDAG実装

# ===== 毎日朝9時に実行される日次レポートDAG ===== # ファイル名: daily_report_9am.py from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import pandas as pd import logging logger = logging.getLogger(__name__) # ===================================== # タスク関数の定義 # ===================================== def extract_yesterday_data(**context): “””前日のデータを抽出””” execution_date = context[‘execution_date’] target_date = execution_date.strftime(‘%Y-%m-%d’) logger.info(f”データ抽出開始: {target_date}”) # ダミーデータ(実際はDBやAPIから取得) data = { ‘日付’: [target_date] * 5, ‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’, ‘ぶどう’, ‘いちご’], ‘売上’: [15000, 25000, 10000, 30000, 20000] } df = pd.DataFrame(data) temp_file = f’/tmp/extracted_{target_date}.csv’ df.to_csv(temp_file, index=False) logger.info(f”データ抽出完了: {len(df)}件”) return temp_file def transform_data(**context): “””データを変換””” ti = context[‘ti’] temp_file = ti.xcom_pull(task_ids=’extract’) logger.info(f”データ変換開始: {temp_file}”) df = pd.read_csv(temp_file) df[‘税込売上’] = (df[‘売上’] * 1.1).astype(int) df[‘手数料’] = (df[‘売上’] * 0.05).astype(int) df[‘純利益’] = df[‘税込売上’] – df[‘手数料’] execution_date = context[‘execution_date’] target_date = execution_date.strftime(‘%Y-%m-%d’) transformed_file = f’/tmp/transformed_{target_date}.csv’ df.to_csv(transformed_file, index=False) logger.info(f”データ変換完了: {len(df)}件”) return transformed_file def create_report(**context): “””レポートを作成””” ti = context[‘ti’] transformed_file = ti.xcom_pull(task_ids=’transform’) df = pd.read_csv(transformed_file) total_sales = df[‘売上’].sum() total_profit = df[‘純利益’].sum() execution_date = context[‘execution_date’] target_date = execution_date.strftime(‘%Y-%m-%d’) report_file = f’/tmp/daily_report_{target_date}.txt’ with open(report_file, ‘w’, encoding=’utf-8′) as f: f.write(f”日次レポート – {target_date}\n”) f.write(“=” * 50 + “\n\n”) f.write(f”総売上: ¥{total_sales:,}\n”) f.write(f”純利益: ¥{total_profit:,}\n”) f.write(f”利益率: {(total_profit/total_sales*100):.1f}%\n”) logger.info(f”レポート作成完了: {report_file}”) return report_file def notify_completion(**context): “””完了通知””” execution_date = context[‘execution_date’] target_date = execution_date.strftime(‘%Y-%m-%d’) logger.info(f”✅ {target_date}の日次レポートが完了しました”) # 実際はここでメールやSlackに通知 # ===================================== # DAGの定義 # ===================================== default_args = { ‘owner’: ‘data_team’, ‘depends_on_past’: False, ‘email’: [‘admin@example.com’], ‘email_on_failure’: True, ‘retries’: 2, ‘retry_delay’: timedelta(minutes=5) } with DAG( dag_id=’daily_report_9am’, default_args=default_args, description=’毎日朝9時に実行される日次レポート’, start_date=datetime(2024, 1, 1), schedule_interval=’0 9 * * *’, # 毎日9時0分 catchup=False, tags=[‘daily’, ‘report’] ) as dag: extract = PythonOperator( task_id=’extract’, python_callable=extract_yesterday_data ) transform = PythonOperator( task_id=’transform’, python_callable=transform_data ) report = PythonOperator( task_id=’report’, python_callable=create_report ) notify = PythonOperator( task_id=’notify’, python_callable=notify_completion ) extract >> transform >> report >> notify
【実行ログ(2024-01-02 09:00に実行)】 [2024-01-02 09:00:01] INFO – データ抽出開始: 2024-01-01 [2024-01-02 09:00:02] INFO – データ抽出完了: 5件 [2024-01-02 09:00:03] INFO – データ変換開始: /tmp/extracted_2024-01-01.csv [2024-01-02 09:00:04] INFO – データ変換完了: 5件 [2024-01-02 09:00:05] INFO – レポート作成完了: /tmp/daily_report_2024-01-01.txt [2024-01-02 09:00:06] INFO – ✅ 2024-01-01の日次レポートが完了しました

📝 STEP 21 のまとめ

✅ このステップで学んだこと
  • schedule_interval:プリセット値とcron形式
  • start_date:DAGの開始日時
  • execution_date:処理対象のデータの日時
  • catchup:過去の未実行分を実行するか
  • 手動トリガー:Web UI、CLI、API
💡 重要ポイント
  • 実行時刻 = start_date + schedule_interval
  • catchup=False を推奨
  • cron形式で柔軟なスケジュール設定
  • execution_dateは処理対象日
🎯 次のステップの予告

次のSTEP 22では、「タスク間のデータ受け渡し」を学びます。

  • XCom(クロスコミュニケーション)
  • TaskFlow APIの使い方
  • Variables と Connections

📝 練習問題

問題 1 基礎

毎日朝8時に実行されるDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def morning_task(): print(“朝8時のタスク”) with DAG( dag_id=’daily_8am’, start_date=datetime(2024, 1, 1), schedule_interval=’0 8 * * *’, # 毎日8時0分 catchup=False ) as dag: task = PythonOperator( task_id=’morning_task’, python_callable=morning_task )
問題 2 基礎

毎時30分に実行されるDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def hourly_task(): print(“毎時30分のタスク”) with DAG( dag_id=’hourly_30′, start_date=datetime(2024, 1, 1), schedule_interval=’30 * * * *’, # 毎時30分 catchup=False ) as dag: task = PythonOperator( task_id=’hourly_task’, python_callable=hourly_task )
問題 3 基礎

手動実行のみ(schedule_interval=None)のDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def manual_task(): print(“手動で実行されました”) with DAG( dag_id=’manual_only’, start_date=datetime(2024, 1, 1), schedule_interval=None, # 手動実行のみ catchup=False ) as dag: task = PythonOperator( task_id=’manual_task’, python_callable=manual_task )
問題 4 基礎

@weeklyプリセットを使って、毎週実行されるDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def weekly_task(): print(“週次タスク実行”) with DAG( dag_id=’weekly_dag’, start_date=datetime(2024, 1, 1), schedule_interval=’@weekly’, # 毎週日曜0時 catchup=False ) as dag: task = PythonOperator( task_id=’weekly_task’, python_callable=weekly_task )
問題 5 応用

平日(月〜金)の朝9時に実行されるDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def weekday_task(): print(“平日朝9時のタスク”) with DAG( dag_id=’weekday_9am’, start_date=datetime(2024, 1, 1), schedule_interval=’0 9 * * 1-5′, # 平日9時 catchup=False ) as dag: task = PythonOperator( task_id=’weekday_task’, python_callable=weekday_task )
問題 6 応用

15分ごとに実行されるDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def check_task(): print(“15分ごとのチェック”) with DAG( dag_id=’every_15_minutes’, start_date=datetime(2024, 1, 1), schedule_interval=’*/15 * * * *’, # 15分ごと catchup=False ) as dag: task = PythonOperator( task_id=’check_task’, python_callable=check_task )
問題 7 応用

execution_dateを取得して、処理対象日をログに出力するDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime import logging logger = logging.getLogger(__name__) def process_data(**context): execution_date = context[‘execution_date’] target_date = execution_date.strftime(‘%Y-%m-%d’) logger.info(f”処理対象日: {target_date}”) with DAG( dag_id=’use_execution_date’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: task = PythonOperator( task_id=’process’, python_callable=process_data )
問題 8 応用

timedeltaを使って、2時間ごとに実行されるDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta def bi_hourly_task(): print(“2時間ごとのタスク”) with DAG( dag_id=’every_2_hours’, start_date=datetime(2024, 1, 1), schedule_interval=timedelta(hours=2), catchup=False ) as dag: task = PythonOperator( task_id=’bi_hourly_task’, python_callable=bi_hourly_task )
問題 9 発展

catchup=Trueで、max_active_runs=2を設定したDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) def backfill_task(**context): execution_date = context[‘execution_date’] logger.info(f”バックフィル処理: {execution_date}”) default_args = { ‘owner’: ‘data_engineer’, ‘retries’: 1, ‘retry_delay’: timedelta(minutes=5) } with DAG( dag_id=’controlled_catchup’, default_args=default_args, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=True, # 過去分を実行 max_active_runs=2 # 同時実行は最大2つまで ) as dag: task = PythonOperator( task_id=’backfill_task’, python_callable=backfill_task )
問題 10 発展

毎月1日と15日の朝10時に実行される月2回のDAGを作成してください。処理対象月をログに出力してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) def semi_monthly_report(**context): execution_date = context[‘execution_date’] target_month = execution_date.strftime(‘%Y年%m月’) day = execution_date.day if day == 1: period = “上旬” else: period = “下旬” logger.info(f”処理対象: {target_month}{period}”) logger.info(“半月レポートを作成中…”) default_args = { ‘owner’: ‘data_team’, ‘retries’: 2, ‘retry_delay’: timedelta(minutes=5) } with DAG( dag_id=’semi_monthly_report’, default_args=default_args, description=’毎月1日と15日に実行’, start_date=datetime(2024, 1, 1), schedule_interval=’0 10 1,15 * *’, # 毎月1日と15日の10時 catchup=False, tags=[‘monthly’, ‘report’] ) as dag: task = PythonOperator( task_id=’semi_monthly_report’, python_callable=semi_monthly_report )
📝

学習メモ

ETL・データパイプライン構築 - Step 21

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE