STEP 24:エラーハンドリングとモニタリング

🛡️ STEP 24: エラーハンドリングとモニタリング

本番運用に耐える堅牢なパイプラインを構築しよう

📋 このステップで学ぶこと

  • リトライ設定の方法
  • タイムアウト設定
  • エラー通知の実装(メール・Slack)
  • DAGの監視とログ確認
  • アラートの設定
  • 堅牢なパイプライン設計のベストプラクティス

⏱️ 学習時間の目安:3時間

📝 練習問題:10問(基礎4問・応用4問・発展2問)

🎯 1. エラーハンドリングの重要性

1-1. なぜエラーハンドリングが必要?

本番環境では、エラーは必ず発生します。重要なのは、エラーが起きた時に適切に対処できることです。

📚 例え話:自動車の安全装置

エラーハンドリングは自動車の安全装置に似ています。

リトライ = エンジン再始動(一時的な不調を解消)
タイムアウト = 燃料切れ警告(無限に走り続けない)
エラー通知 = 故障ランプ(問題をドライバーに知らせる)
ログ = ドライブレコーダー(後で原因を調査)

どんなに優秀な車でも、安全装置なしで高速道路は走れません。
同様に、エラーハンドリングなしで本番運用はできません!

1-2. よくあるエラーの種類

エラーの種類と対処法
エラー種類 原因 リトライ効果 対処法
ネットワークエラー DB・API接続失敗 ◎ 効果大 リトライ + 指数バックオフ
タイムアウト 処理が時間内に終わらない ○ 効果あり タイムアウト延長 + 処理分割
リソース不足 メモリ・ディスク不足 △ 効果薄い リソース増強 + 処理最適化
データエラー 不正なデータ形式 ✕ 効果なし データ検証 + アラート
外部サービス障害 APIが応答しない ○ 効果あり リトライ + フォールバック
エラーハンドリングの流れ
┌─────────────────────────────────────────────────────────────────┐
│                      タスク実行                                   │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
                        ┌──────────┐
                        │ 成功?   │
                        └──────────┘
                         │       │
                    Yes  │       │  No
                         ▼       ▼
                    ┌──────┐  ┌──────────────┐
                    │ 完了 │  │ リトライ可能? │
                    └──────┘  └──────────────┘
                                  │       │
                             Yes  │       │  No
                                  ▼       ▼
                           ┌──────────┐  ┌──────────────┐
                           │リトライ  │  │ 最終失敗     │
                           │(N回まで) │  │ ・エラー通知 │
                           └──────────┘  │ ・ログ記録   │
                                  │      └──────────────┘
                                  │
                           ┌──────────┐
                           │成功/失敗│→ 失敗ならエラー通知
                           └──────────┘
        
📝 エラーハンドリングの3原則
  1. 予測する:起こりうるエラーを事前に想定
  2. 対処する:リトライ、代替処理などで対応
  3. 通知する:エラーを適切な人に即座に伝える

🔄 2. リトライ設定

2-1. 基本的なリトライ設定

一時的なエラー(ネットワーク障害など)は、リトライすることで解決できることがあります。

# ===== 基本的なリトライ設定 ===== from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta # デフォルト設定 default_args = { ‘owner’: ‘data_team’, ‘retries’: 3, # 失敗したら3回リトライ ‘retry_delay’: timedelta(minutes=5), # リトライ間隔は5分 ‘retry_exponential_backoff’: True, # 指数バックオフ ‘max_retry_delay’: timedelta(minutes=30) # 最大待機時間 } def my_task(): print(“タスク実行中…”) return “完了” with DAG( dag_id=’retry_example’, default_args=default_args, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: task = PythonOperator( task_id=’task_with_retry’, python_callable=my_task )
💡 指数バックオフとは?

リトライする間隔をだんだん長くしていく方法です。

例:retry_exponential_backoff=Trueの場合
・1回目のリトライ:5分後
・2回目のリトライ:10分後
・3回目のリトライ:20分後

一時的な障害が回復する時間を稼げます!

2-2. タスク別のリトライ設定

# ===== タスクごとに個別設定 ===== # 重要なタスクは多めにリトライ critical_task = PythonOperator( task_id=’critical_task’, python_callable=critical_function, retries=5, retry_delay=timedelta(minutes=2) ) # 重要度が低いタスクは少なめ non_critical_task = PythonOperator( task_id=’non_critical_task’, python_callable=non_critical_function, retries=1, retry_delay=timedelta(minutes=1) )

2-3. 条件付きリトライ

# ===== エラーの種類によってリトライを決める ===== from airflow.exceptions import AirflowException def smart_retry_function(): “””エラーの種類によってリトライを決める””” try: result = call_external_api() return result except ConnectionError as e: # 接続エラーはリトライする print(f”接続エラー: {e}”) raise # リトライさせる except ValueError as e: # データエラーはリトライしても無駄 print(f”データエラー: {e}”) raise AirflowException(f”リトライ不可能なエラー: {e}”) task = PythonOperator( task_id=’smart_retry_task’, python_callable=smart_retry_function, retries=3 )
【リトライ時のログ例】 [2024-01-15 09:00:00] 試行 1/4: 接続エラー発生 [2024-01-15 09:05:00] 試行 2/4: 接続エラー発生 [2024-01-15 09:15:00] 試行 3/4: 成功!

⏰ 3. タイムアウト設定

3-1. タスクレベルのタイムアウト

処理がいつまでも終わらないのを防ぐため、タイムアウトを設定します。

# ===== タスクのタイムアウト ===== from datetime import timedelta task = PythonOperator( task_id=’task_with_timeout’, python_callable=long_running_function, execution_timeout=timedelta(minutes=30) # 30分でタイムアウト )

3-2. センサーのタイムアウト

# ===== センサーのタイムアウト ===== from airflow.sensors.filesystem import FileSensor wait_for_file = FileSensor( task_id=’wait_for_file’, filepath=’/data/input.csv’, poke_interval=60, # 60秒ごとにチェック timeout=3600, # 1時間でタイムアウト mode=’poke’ )
タイムアウト設定の目安
処理タイプ 通常処理時間 推奨タイムアウト 倍率
軽量な処理 〜1分 5分 5倍
通常のETL 5〜10分 30分 3倍
大規模処理 30分〜1時間 2時間 2倍
ファイル待機 不定 1〜2時間

📧 4. エラー通知の実装

4-1. メールによるエラー通知

# ===== メールによるエラー通知 ===== from airflow.utils.email import send_email def failure_callback(context): “””タスク失敗時のコールバック””” task_instance = context[‘task_instance’] exception = context.get(‘exception’) subject = f”[AIRFLOW ERROR] {task_instance.dag_id}.{task_instance.task_id}” html_content = f”””

タスクが失敗しました

項目内容
DAG{task_instance.dag_id}
タスク{task_instance.task_id}
実行時刻{context[‘execution_date’]}
試行回数{task_instance.try_number}
エラー内容{exception}
“”” send_email( to=’admin@example.com’, subject=subject, html_content=html_content ) # DAGに設定 default_args = { ‘on_failure_callback’: failure_callback }

4-2. Slackによるエラー通知

# ===== Slackによるエラー通知 ===== from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook def slack_failure_callback(context): “””Slackにエラー通知””” task_instance = context[‘task_instance’] exception = context.get(‘exception’) slack_msg = f””” :x: *タスクが失敗しました* *DAG:* `{task_instance.dag_id}` *タスク:* `{task_instance.task_id}` *実行時刻:* {context[‘execution_date’]} *試行回数:* {task_instance.try_number} *エラー:* {exception} Airflow UIからログを確認してください。 “”” hook = SlackWebhookHook(slack_webhook_conn_id=’slack_webhook’) hook.send_text(slack_msg) task = PythonOperator( task_id=’task’, python_callable=some_function, on_failure_callback=slack_failure_callback )

4-3. 成功時の通知

# ===== 成功時の通知 ===== def success_callback(context): “””タスク成功時の通知””” task_instance = context[‘task_instance’] hook = SlackWebhookHook(slack_webhook_conn_id=’slack_webhook’) hook.send_text(f””” :white_check_mark: *タスクが正常に完了しました* *DAG:* `{task_instance.dag_id}` *タスク:* `{task_instance.task_id}` *実行時刻:* {context[‘execution_date’]} “””) task = PythonOperator( task_id=’task’, python_callable=some_function, on_success_callback=success_callback, on_failure_callback=slack_failure_callback )
🎯 コールバック関数の種類
  • on_failure_callback:すべてのリトライが失敗した後に実行
  • on_success_callback:タスク成功時に実行
  • on_retry_callback:リトライするたびに実行
  • sla_miss_callback:SLA違反時に実行

📊 5. DAGの監視とログ確認

5-1. Airflow UIでの監視

📈 主要な監視画面
  • DAGs画面:全DAGの一覧と実行状況
  • Grid View:タスクの実行履歴を時系列で表示
  • Graph View:DAGの構造と各タスクの状態
  • Gantt Chart:タスクの実行時間を視覚化
  • Task Duration:タスクごとの処理時間推移

5-2. ログの出力方法

# ===== ログの出力方法 ===== import logging def my_task(**context): logger = logging.getLogger(__name__) logger.info(“処理を開始します”) logger.debug(“デバッグ情報: データ件数 = 1000”) logger.warning(“警告: NULL値が含まれています”) logger.error(“エラー: データベース接続に失敗”) # 標準出力もログに記録される print(“標準出力もログに記録されます”) return “完了”
【Airflow UIでのログ表示】 [2024-01-15 09:00:00,123] {my_dag.py:10} INFO – 処理を開始します [2024-01-15 09:00:00,124] {my_dag.py:11} DEBUG – デバッグ情報: データ件数 = 1000 [2024-01-15 09:00:00,125] {my_dag.py:12} WARNING – 警告: NULL値が含まれています [2024-01-15 09:00:00,126] {my_dag.py:14} INFO – 標準出力もログに記録されます

5-3. メトリクスの記録

# ===== メトリクスの記録 ===== def process_data(**context): “””処理結果をメトリクスとして記録””” import pandas as pd df = pd.read_csv(‘/data/input.csv’) # 処理 processed_count = len(df) error_count = df[‘status’].value_counts().get(‘error’, 0) success_rate = (processed_count – error_count) / processed_count # メトリクスをXComに保存 context[‘ti’].xcom_push(key=’processed_count’, value=processed_count) context[‘ti’].xcom_push(key=’error_count’, value=error_count) context[‘ti’].xcom_push(key=’success_rate’, value=success_rate) print(f”処理件数: {processed_count}”) print(f”エラー件数: {error_count}”) print(f”成功率: {success_rate:.2%}”) return processed_count

🏗️ 6. 堅牢なパイプライン設計

6-1. べき等性の確保

べき等性とは、何度実行しても同じ結果になる性質のことです。

# ===== べき等な処理の例 ===== def idempotent_load(**context): “””べき等な処理の例””” from sqlalchemy import create_engine import pandas as pd execution_date = context[‘ds’] engine = create_engine(‘postgresql://user:pass@localhost/db’) # 既存データを削除してから挿入(UPSERT) delete_query = f”DELETE FROM daily_summary WHERE date = ‘{execution_date}'” engine.execute(delete_query) # 新しいデータを挿入 df = pd.read_csv(f’/data/processed_{execution_date}.csv’) df.to_sql(‘daily_summary’, engine, if_exists=’append’, index=False) # 何度実行しても同じ結果になる! print(f”✅ {execution_date}のデータをロード完了”)
⚠️ べき等性がない場合の問題

べき等性がないと、再実行時に以下の問題が発生します:
・データが重複する
・集計結果が不正確になる
データ不整合が発生する

6-2. 段階的な処理とチェックポイント

# ===== 段階的な処理とチェックポイント ===== with DAG( dag_id=’robust_pipeline’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: # ステップ1: データ抽出 extract = PythonOperator( task_id=’extract_data’, python_callable=extract_data, retries=3 ) # チェックポイント1: 抽出データ検証 validate_extract = PythonOperator( task_id=’validate_extracted_data’, python_callable=validate_extracted_data ) # ステップ2: データ変換 transform = PythonOperator( task_id=’transform_data’, python_callable=transform_data, retries=2 ) # チェックポイント2: 変換データ検証 validate_transform = PythonOperator( task_id=’validate_transformed_data’, python_callable=validate_transformed_data ) # ステップ3: データロード load = PythonOperator( task_id=’load_data’, python_callable=load_data, retries=3 ) # 成功通知 notify = SlackWebhookOperator( task_id=’notify_success’, slack_webhook_conn_id=’slack_webhook’, message=”✅ パイプラインが正常に完了しました” ) # 依存関係 extract >> validate_extract >> transform >> validate_transform >> load >> notify
堅牢なパイプラインの構造
┌─────────┐     ┌──────────┐     ┌─────────┐     ┌──────────┐     ┌──────┐     ┌────────┐
│ Extract │ ──→ │ Validate │ ──→ │Transform│ ──→ │ Validate │ ──→ │ Load │ ──→ │ Notify │
│(retries │     │ Extract  │     │(retries │     │Transform │     │(retry│     │        │
│   =3)   │     │          │     │   =2)   │     │          │     │ =3)  │     │        │
└─────────┘     └──────────┘     └─────────┘     └──────────┘     └──────┘     └────────┘
     │               │                │               │              │
     ▼               ▼                ▼               ▼              ▼
   失敗時           失敗時            失敗時          失敗時         失敗時
   アラート         アラート          アラート        アラート       アラート
        
🎯 堅牢なパイプライン設計の6つのポイント
  1. べき等性:何度実行しても同じ結果
  2. 検証:各ステップでデータ品質チェック
  3. リトライ:一時的なエラーに対応
  4. タイムアウト:無限ループを防ぐ
  5. 通知:エラーを即座に検知
  6. ログ:トラブルシューティングに備える

💼 7. 実践演習:堅牢なETLパイプライン

7-1. 要件

  • リトライ設定(3回、指数バックオフ)
  • タイムアウト設定(30分)
  • エラー通知(Slack)
  • 成功通知(Slack)
  • データ検証

7-2. 完全な実装

# ===== 堅牢なETLパイプライン ===== from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook from datetime import datetime, timedelta import pandas as pd import logging logger = logging.getLogger(__name__) def slack_alert(context, message_type=’failure’): “””Slack通知を送信””” ti = context[‘task_instance’] if message_type == ‘failure’: emoji = ‘:x:’ title = ‘タスクが失敗しました’ exception = context.get(‘exception’, ‘Unknown’) extra = f”*エラー:* {exception}” else: emoji = ‘:white_check_mark:’ title = ‘パイプラインが完了しました’ extra = “” msg = f””” {emoji} *{title}* *DAG:* `{ti.dag_id}` *タスク:* `{ti.task_id}` *実行日:* {context[‘ds’]} {extra} “”” hook = SlackWebhookHook(slack_webhook_conn_id=’slack_webhook’) hook.send_text(msg) def failure_callback(context): slack_alert(context, ‘failure’) def success_callback(context): slack_alert(context, ‘success’) def extract_data(**context): “””データ抽出””” logger.info(“データ抽出開始”) df = pd.DataFrame({ ‘id’: range(1, 101), ‘value’: [i * 10 for i in range(1, 101)] }) file_path = f”/tmp/extracted_{context[‘ds’]}.csv” df.to_csv(file_path, index=False) logger.info(f”✅ 抽出完了: {len(df)}件 → {file_path}”) return file_path def validate_data(**context): “””データ検証””” ti = context[‘ti’] file_path = ti.xcom_pull(task_ids=’extract’) logger.info(f”データ検証開始: {file_path}”) df = pd.read_csv(file_path) # 検証1: レコード数 if len(df) < 10: raise ValueError(f"レコード数が少なすぎます: {len(df)}") # 検証2: NULL値 null_count = df.isnull().sum().sum() if null_count > 0: logger.warning(f”NULL値が{null_count}件あります”) logger.info(f”✅ 検証完了: {len(df)}件”) return file_path def transform_data(**context): “””データ変換””” ti = context[‘ti’] file_path = ti.xcom_pull(task_ids=’validate’) logger.info(f”データ変換開始: {file_path}”) df = pd.read_csv(file_path) df[‘doubled’] = df[‘value’] * 2 output_path = f”/tmp/transformed_{context[‘ds’]}.csv” df.to_csv(output_path, index=False) logger.info(f”✅ 変換完了: {len(df)}件 → {output_path}”) return output_path def load_data(**context): “””データロード””” ti = context[‘ti’] file_path = ti.xcom_pull(task_ids=’transform’) logger.info(f”データロード開始: {file_path}”) df = pd.read_csv(file_path) # べき等性: 既存データを削除してから挿入 final_path = f”/tmp/final_{context[‘ds’]}.csv” df.to_csv(final_path, index=False) logger.info(f”✅ ロード完了: {len(df)}件 → {final_path}”) # メトリクスを記録 ti.xcom_push(key=’record_count’, value=len(df)) ti.xcom_push(key=’total_value’, value=int(df[‘doubled’].sum())) return final_path default_args = { ‘owner’: ‘data_team’, ‘retries’: 3, ‘retry_delay’: timedelta(minutes=5), ‘retry_exponential_backoff’: True, ‘on_failure_callback’: failure_callback } with DAG( dag_id=’robust_etl_pipeline’, default_args=default_args, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False, tags=[‘etl’, ‘robust’] ) as dag: extract = PythonOperator( task_id=’extract’, python_callable=extract_data, execution_timeout=timedelta(minutes=30) ) validate = PythonOperator( task_id=’validate’, python_callable=validate_data ) transform = PythonOperator( task_id=’transform’, python_callable=transform_data ) load = PythonOperator( task_id=’load’, python_callable=load_data, on_success_callback=success_callback ) extract >> validate >> transform >> load
【実行ログ】 [extract] データ抽出開始 [extract] ✅ 抽出完了: 100件 → /tmp/extracted_2024-01-15.csv [validate] データ検証開始: /tmp/extracted_2024-01-15.csv [validate] ✅ 検証完了: 100件 [transform] データ変換開始: /tmp/extracted_2024-01-15.csv [transform] ✅ 変換完了: 100件 → /tmp/transformed_2024-01-15.csv [load] データロード開始: /tmp/transformed_2024-01-15.csv [load] ✅ ロード完了: 100件 → /tmp/final_2024-01-15.csv [Slack] ✅ パイプラインが完了しました

📝 STEP 24 のまとめ

✅ このステップで学んだこと
  • リトライ設定:回数、間隔、指数バックオフ
  • タイムアウト設定:タスクとセンサーのタイムアウト
  • エラー通知:メール、Slack、コールバック関数
  • 監視:Airflow UIでの監視、ログ確認
  • 堅牢な設計:べき等性、検証、段階的処理
💡 本番運用のチェックリスト
  • ✅ すべてのタスクにリトライ設定をしているか?
  • ✅ タイムアウトは適切に設定されているか?
  • ✅ エラー通知は確実に届くか?
  • ✅ ログは十分に出力されているか?
  • ✅ データ品質チェックは実装されているか?
  • ✅ べき等性は保証されているか?
🎯 次のステップの予告

次のSTEP 25では、「実践ETLパイプライン構築」を行います。

  • 売上データの日次集計パイプライン
  • 複数データソースからの抽出
  • DWHへのデータ投入
  • レポート生成と通知

📝 練習問題

問題 1 基礎

リトライ3回、間隔5分の基本的なDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta def my_task(): print(“タスク実行中…”) return “完了” default_args = { ‘owner’: ‘data_team’, ‘retries’: 3, ‘retry_delay’: timedelta(minutes=5) } with DAG( dag_id=’basic_retry_dag’, default_args=default_args, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: task = PythonOperator( task_id=’my_task’, python_callable=my_task )
問題 2 基礎

タイムアウト30分を設定したタスクを作成してください。

【解答】
from datetime import timedelta task = PythonOperator( task_id=’task_with_timeout’, python_callable=my_function, execution_timeout=timedelta(minutes=30) )
問題 3 基礎

on_failure_callbackを使って、失敗時にprint文でエラーメッセージを表示するDAGを作成してください。

【解答】
def failure_alert(context): ti = context[‘task_instance’] exception = context.get(‘exception’) print(f”エラー発生! タスク: {ti.task_id}, エラー: {exception}”) def risky_task(): raise Exception(“テストエラー”) with DAG( dag_id=’failure_callback_dag’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: task = PythonOperator( task_id=’risky_task’, python_callable=risky_task, on_failure_callback=failure_alert )
問題 4 基礎

指数バックオフを有効にしたリトライ設定を作成してください。

【解答】
default_args = { ‘retries’: 3, ‘retry_delay’: timedelta(minutes=5), ‘retry_exponential_backoff’: True, ‘max_retry_delay’: timedelta(minutes=30) }
問題 5 応用

タスクが失敗したときにSlackに通知するDAGを作成してください。

【解答】
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook def slack_failure_alert(context): ti = context[‘task_instance’] exception = context.get(‘exception’) hook = SlackWebhookHook(slack_webhook_conn_id=’slack_webhook’) hook.send_text(f””” :x: *タスク失敗* *DAG:* `{ti.dag_id}` *タスク:* `{ti.task_id}` *エラー:* {exception} “””) def risky_task(): import random if random.random() < 0.5: raise Exception("ランダムエラー") return "成功" with DAG( dag_id='slack_failure_dag', start_date=datetime(2024, 1, 1), schedule_interval='@daily', catchup=False ) as dag: task = PythonOperator( task_id='risky_task', python_callable=risky_task, on_failure_callback=slack_failure_alert )
問題 6 応用

成功時と失敗時の両方で通知を送るDAGを作成してください。

【解答】
def success_callback(context): ti = context[‘task_instance’] print(f”✅ 成功: {ti.task_id}”) def failure_callback(context): ti = context[‘task_instance’] print(f”❌ 失敗: {ti.task_id}”) task = PythonOperator( task_id=’task’, python_callable=my_function, on_success_callback=success_callback, on_failure_callback=failure_callback )
問題 7 応用

べき等な処理として、既存データを削除してから挿入するロード関数を作成してください。

【解答】
def idempotent_load(**context): “””べき等なロード処理””” from sqlalchemy import create_engine import pandas as pd execution_date = context[‘ds’] engine = create_engine(‘postgresql://user:pass@localhost/db’) # 既存データを削除 delete_query = f”DELETE FROM daily_data WHERE date = ‘{execution_date}'” engine.execute(delete_query) # 新しいデータを挿入 df = pd.read_csv(f’/tmp/data_{execution_date}.csv’) df.to_sql(‘daily_data’, engine, if_exists=’append’, index=False) print(f”✅ {execution_date}のデータをロード完了”)
問題 8 応用

ログを出力しながらデータを処理するタスクを作成してください。INFO、WARNING、ERRORの3種類のログを含めてください。

【解答】
import logging def process_with_logging(**context): logger = logging.getLogger(__name__) logger.info(“処理を開始します”) # データ処理 data = [1, 2, None, 4, 5] if None in data: logger.warning(“NULL値が含まれています”) try: result = sum([x for x in data if x is not None]) logger.info(f”処理完了: 合計 = {result}”) return result except Exception as e: logger.error(f”処理中にエラー発生: {e}”) raise
問題 9 発展

データ品質チェックを行い、問題があればアラートを送り、問題なければ処理を続行するDAGを作成してください。

【解答】
from airflow.operators.python import BranchPythonOperator import pandas as pd def check_data_quality(**context): “””データ品質チェック””” df = pd.read_csv(‘/data/input.csv’) issues = [] if len(df) < 10: issues.append(f"レコード数が少ない: {len(df)}件") null_ratio = df.isnull().sum().sum() / (len(df) * len(df.columns)) if null_ratio > 0.1: issues.append(f”NULL値が多い: {null_ratio:.2%}”) if issues: context[‘ti’].xcom_push(key=’issues’, value=’\n’.join(issues)) return ‘send_alert’ return ‘process_data’ def process_data(): print(“データ処理を実行中…”) def send_alert(**context): issues = context[‘ti’].xcom_pull(task_ids=’check_quality’, key=’issues’) print(f”⚠️ データ品質アラート:\n{issues}”) with DAG( dag_id=’quality_check_dag’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: check = BranchPythonOperator( task_id=’check_quality’, python_callable=check_data_quality ) alert = PythonOperator( task_id=’send_alert’, python_callable=send_alert ) process = PythonOperator( task_id=’process_data’, python_callable=process_data ) check >> [alert, process]
問題 10 発展

リトライ設定、タイムアウト設定、エラー通知、成功通知、べき等性を備えた完全なETLパイプラインを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import pandas as pd import logging logger = logging.getLogger(__name__) def failure_callback(context): ti = context[‘task_instance’] logger.error(f”❌ 失敗: {ti.dag_id}.{ti.task_id}”) def success_callback(context): ti = context[‘task_instance’] logger.info(f”✅ 成功: {ti.dag_id}.{ti.task_id}”) def extract(**context): logger.info(“Extract開始”) df = pd.DataFrame({‘id’: range(100), ‘value’: range(100)}) path = f”/tmp/extracted_{context[‘ds’]}.csv” df.to_csv(path, index=False) logger.info(f”Extract完了: {len(df)}件”) return path def transform(**context): ti = context[‘ti’] path = ti.xcom_pull(task_ids=’extract’) logger.info(f”Transform開始: {path}”) df = pd.read_csv(path) df[‘doubled’] = df[‘value’] * 2 output = f”/tmp/transformed_{context[‘ds’]}.csv” df.to_csv(output, index=False) logger.info(f”Transform完了: {len(df)}件”) return output def load(**context): ti = context[‘ti’] path = ti.xcom_pull(task_ids=’transform’) logger.info(f”Load開始: {path}”) df = pd.read_csv(path) # べき等性: 既存ファイルを上書き final = f”/tmp/final_{context[‘ds’]}.csv” df.to_csv(final, index=False) logger.info(f”Load完了: {len(df)}件”) default_args = { ‘retries’: 3, ‘retry_delay’: timedelta(minutes=5), ‘retry_exponential_backoff’: True, ‘on_failure_callback’: failure_callback } with DAG( dag_id=’complete_robust_etl’, default_args=default_args, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: extract_task = PythonOperator( task_id=’extract’, python_callable=extract, execution_timeout=timedelta(minutes=30) ) transform_task = PythonOperator( task_id=’transform’, python_callable=transform, execution_timeout=timedelta(minutes=30) ) load_task = PythonOperator( task_id=’load’, python_callable=load, execution_timeout=timedelta(minutes=30), on_success_callback=success_callback ) extract_task >> transform_task >> load_task
📝

学習メモ

ETL・データパイプライン構築 - Step 24

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE