📋 このステップで学ぶこと
リトライ設定の方法
タイムアウト設定
エラー通知の実装(メール・Slack)
DAGの監視とログ確認
アラートの設定
堅牢なパイプライン設計のベストプラクティス
⏱️ 学習時間の目安: 3時間
📝 練習問題: 10問(基礎4問・応用4問・発展2問)
🎯 1. エラーハンドリングの重要性
1-1. なぜエラーハンドリングが必要?
本番環境では、エラーは必ず発生します 。重要なのは、エラーが起きた時に適切に対処できること です。
📚 例え話:自動車の安全装置
エラーハンドリングは自動車の安全装置 に似ています。
・リトライ = エンジン再始動(一時的な不調を解消)
・タイムアウト = 燃料切れ警告(無限に走り続けない)
・エラー通知 = 故障ランプ(問題をドライバーに知らせる)
・ログ = ドライブレコーダー(後で原因を調査)
どんなに優秀な車でも、安全装置なしで高速道路は走れません。
同様に、エラーハンドリングなしで本番運用はできません!
1-2. よくあるエラーの種類
エラーの種類と対処法
エラー種類
原因
リトライ効果
対処法
ネットワークエラー
DB・API接続失敗
◎ 効果大
リトライ + 指数バックオフ
タイムアウト
処理が時間内に終わらない
○ 効果あり
タイムアウト延長 + 処理分割
リソース不足
メモリ・ディスク不足
△ 効果薄い
リソース増強 + 処理最適化
データエラー
不正なデータ形式
✕ 効果なし
データ検証 + アラート
外部サービス障害
APIが応答しない
○ 効果あり
リトライ + フォールバック
エラーハンドリングの流れ
┌─────────────────────────────────────────────────────────────────┐
│ タスク実行 │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌──────────┐
│ 成功? │
└──────────┘
│ │
Yes │ │ No
▼ ▼
┌──────┐ ┌──────────────┐
│ 完了 │ │ リトライ可能? │
└──────┘ └──────────────┘
│ │
Yes │ │ No
▼ ▼
┌──────────┐ ┌──────────────┐
│リトライ │ │ 最終失敗 │
│(N回まで) │ │ ・エラー通知 │
└──────────┘ │ ・ログ記録 │
│ └──────────────┘
│
┌──────────┐
│成功/失敗│→ 失敗ならエラー通知
└──────────┘
📝 エラーハンドリングの3原則
予測する :起こりうるエラーを事前に想定
対処する :リトライ、代替処理などで対応
通知する :エラーを適切な人に即座に伝える
🔄 2. リトライ設定
2-1. 基本的なリトライ設定
一時的なエラー(ネットワーク障害など)は、リトライすることで解決 できることがあります。
# ===== 基本的なリトライ設定 =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# デフォルト設定
default_args = {
‘owner’: ‘data_team’,
‘retries’: 3, # 失敗したら3回リトライ
‘retry_delay’: timedelta(minutes=5), # リトライ間隔は5分
‘retry_exponential_backoff’: True, # 指数バックオフ
‘max_retry_delay’: timedelta(minutes=30) # 最大待機時間
}
def my_task():
print(“タスク実行中…”)
return “完了”
with DAG(
dag_id=’retry_example’,
default_args=default_args,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
task = PythonOperator(
task_id=’task_with_retry’,
python_callable=my_task
)
💡 指数バックオフとは?
リトライする間隔をだんだん長く していく方法です。
例:retry_exponential_backoff=Trueの場合
・1回目のリトライ:5分後
・2回目のリトライ:10分後
・3回目のリトライ:20分後
一時的な障害が回復する時間を稼げます!
2-2. タスク別のリトライ設定
# ===== タスクごとに個別設定 =====
# 重要なタスクは多めにリトライ
critical_task = PythonOperator(
task_id=’critical_task’,
python_callable=critical_function,
retries=5,
retry_delay=timedelta(minutes=2)
)
# 重要度が低いタスクは少なめ
non_critical_task = PythonOperator(
task_id=’non_critical_task’,
python_callable=non_critical_function,
retries=1,
retry_delay=timedelta(minutes=1)
)
2-3. 条件付きリトライ
# ===== エラーの種類によってリトライを決める =====
from airflow.exceptions import AirflowException
def smart_retry_function():
“””エラーの種類によってリトライを決める”””
try:
result = call_external_api()
return result
except ConnectionError as e:
# 接続エラーはリトライする
print(f”接続エラー: {e}”)
raise # リトライさせる
except ValueError as e:
# データエラーはリトライしても無駄
print(f”データエラー: {e}”)
raise AirflowException(f”リトライ不可能なエラー: {e}”)
task = PythonOperator(
task_id=’smart_retry_task’,
python_callable=smart_retry_function,
retries=3
)
【リトライ時のログ例】
[2024-01-15 09:00:00] 試行 1/4: 接続エラー発生
[2024-01-15 09:05:00] 試行 2/4: 接続エラー発生
[2024-01-15 09:15:00] 試行 3/4: 成功!
⏰ 3. タイムアウト設定
3-1. タスクレベルのタイムアウト
処理がいつまでも終わらない のを防ぐため、タイムアウトを設定します。
# ===== タスクのタイムアウト =====
from datetime import timedelta
task = PythonOperator(
task_id=’task_with_timeout’,
python_callable=long_running_function,
execution_timeout=timedelta(minutes=30) # 30分でタイムアウト
)
3-2. センサーのタイムアウト
# ===== センサーのタイムアウト =====
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id=’wait_for_file’,
filepath=’/data/input.csv’,
poke_interval=60, # 60秒ごとにチェック
timeout=3600, # 1時間でタイムアウト
mode=’poke’
)
タイムアウト設定の目安
処理タイプ
通常処理時間
推奨タイムアウト
倍率
軽量な処理
〜1分
5分
5倍
通常のETL
5〜10分
30分
3倍
大規模処理
30分〜1時間
2時間
2倍
ファイル待機
不定
1〜2時間
–
📧 4. エラー通知の実装
4-1. メールによるエラー通知
# ===== メールによるエラー通知 =====
from airflow.utils.email import send_email
def failure_callback(context):
“””タスク失敗時のコールバック”””
task_instance = context[‘task_instance’]
exception = context.get(‘exception’)
subject = f”[AIRFLOW ERROR] {task_instance.dag_id}.{task_instance.task_id}”
html_content = f”””
タスクが失敗しました
項目 内容
DAG {task_instance.dag_id}
タスク {task_instance.task_id}
実行時刻 {context[‘execution_date’]}
試行回数 {task_instance.try_number}
エラー内容 {exception}
“””
send_email(
to=’admin@example.com’,
subject=subject,
html_content=html_content
)
# DAGに設定
default_args = {
‘on_failure_callback’: failure_callback
}
4-2. Slackによるエラー通知
# ===== Slackによるエラー通知 =====
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
def slack_failure_callback(context):
“””Slackにエラー通知”””
task_instance = context[‘task_instance’]
exception = context.get(‘exception’)
slack_msg = f”””
:x: *タスクが失敗しました*
*DAG:* `{task_instance.dag_id}`
*タスク:* `{task_instance.task_id}`
*実行時刻:* {context[‘execution_date’]}
*試行回数:* {task_instance.try_number}
*エラー:* {exception}
Airflow UIからログを確認してください。
“””
hook = SlackWebhookHook(slack_webhook_conn_id=’slack_webhook’)
hook.send_text(slack_msg)
task = PythonOperator(
task_id=’task’,
python_callable=some_function,
on_failure_callback=slack_failure_callback
)
4-3. 成功時の通知
# ===== 成功時の通知 =====
def success_callback(context):
“””タスク成功時の通知”””
task_instance = context[‘task_instance’]
hook = SlackWebhookHook(slack_webhook_conn_id=’slack_webhook’)
hook.send_text(f”””
:white_check_mark: *タスクが正常に完了しました*
*DAG:* `{task_instance.dag_id}`
*タスク:* `{task_instance.task_id}`
*実行時刻:* {context[‘execution_date’]}
“””)
task = PythonOperator(
task_id=’task’,
python_callable=some_function,
on_success_callback=success_callback,
on_failure_callback=slack_failure_callback
)
🎯 コールバック関数の種類
on_failure_callback :すべてのリトライが失敗した後に実行
on_success_callback :タスク成功時に実行
on_retry_callback :リトライするたびに実行
sla_miss_callback :SLA違反時に実行
📊 5. DAGの監視とログ確認
5-1. Airflow UIでの監視
📈 主要な監視画面
DAGs画面 :全DAGの一覧と実行状況
Grid View :タスクの実行履歴を時系列で表示
Graph View :DAGの構造と各タスクの状態
Gantt Chart :タスクの実行時間を視覚化
Task Duration :タスクごとの処理時間推移
5-2. ログの出力方法
# ===== ログの出力方法 =====
import logging
def my_task(**context):
logger = logging.getLogger(__name__)
logger.info(“処理を開始します”)
logger.debug(“デバッグ情報: データ件数 = 1000”)
logger.warning(“警告: NULL値が含まれています”)
logger.error(“エラー: データベース接続に失敗”)
# 標準出力もログに記録される
print(“標準出力もログに記録されます”)
return “完了”
【Airflow UIでのログ表示】
[2024-01-15 09:00:00,123] {my_dag.py:10} INFO – 処理を開始します
[2024-01-15 09:00:00,124] {my_dag.py:11} DEBUG – デバッグ情報: データ件数 = 1000
[2024-01-15 09:00:00,125] {my_dag.py:12} WARNING – 警告: NULL値が含まれています
[2024-01-15 09:00:00,126] {my_dag.py:14} INFO – 標準出力もログに記録されます
5-3. メトリクスの記録
# ===== メトリクスの記録 =====
def process_data(**context):
“””処理結果をメトリクスとして記録”””
import pandas as pd
df = pd.read_csv(‘/data/input.csv’)
# 処理
processed_count = len(df)
error_count = df[‘status’].value_counts().get(‘error’, 0)
success_rate = (processed_count – error_count) / processed_count
# メトリクスをXComに保存
context[‘ti’].xcom_push(key=’processed_count’, value=processed_count)
context[‘ti’].xcom_push(key=’error_count’, value=error_count)
context[‘ti’].xcom_push(key=’success_rate’, value=success_rate)
print(f”処理件数: {processed_count}”)
print(f”エラー件数: {error_count}”)
print(f”成功率: {success_rate:.2%}”)
return processed_count
🏗️ 6. 堅牢なパイプライン設計
6-1. べき等性の確保
べき等性 とは、何度実行しても同じ結果になる 性質のことです。
# ===== べき等な処理の例 =====
def idempotent_load(**context):
“””べき等な処理の例”””
from sqlalchemy import create_engine
import pandas as pd
execution_date = context[‘ds’]
engine = create_engine(‘postgresql://user:pass@localhost/db’)
# 既存データを削除してから挿入(UPSERT)
delete_query = f”DELETE FROM daily_summary WHERE date = ‘{execution_date}'”
engine.execute(delete_query)
# 新しいデータを挿入
df = pd.read_csv(f’/data/processed_{execution_date}.csv’)
df.to_sql(‘daily_summary’, engine, if_exists=’append’, index=False)
# 何度実行しても同じ結果になる!
print(f”✅ {execution_date}のデータをロード完了”)
⚠️ べき等性がない場合の問題
べき等性がないと、再実行時に以下の問題が発生します:
・データが重複 する
・集計結果が不正確 になる
・データ不整合 が発生する
6-2. 段階的な処理とチェックポイント
# ===== 段階的な処理とチェックポイント =====
with DAG(
dag_id=’robust_pipeline’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
# ステップ1: データ抽出
extract = PythonOperator(
task_id=’extract_data’,
python_callable=extract_data,
retries=3
)
# チェックポイント1: 抽出データ検証
validate_extract = PythonOperator(
task_id=’validate_extracted_data’,
python_callable=validate_extracted_data
)
# ステップ2: データ変換
transform = PythonOperator(
task_id=’transform_data’,
python_callable=transform_data,
retries=2
)
# チェックポイント2: 変換データ検証
validate_transform = PythonOperator(
task_id=’validate_transformed_data’,
python_callable=validate_transformed_data
)
# ステップ3: データロード
load = PythonOperator(
task_id=’load_data’,
python_callable=load_data,
retries=3
)
# 成功通知
notify = SlackWebhookOperator(
task_id=’notify_success’,
slack_webhook_conn_id=’slack_webhook’,
message=”✅ パイプラインが正常に完了しました”
)
# 依存関係
extract >> validate_extract >> transform >> validate_transform >> load >> notify
堅牢なパイプラインの構造
┌─────────┐ ┌──────────┐ ┌─────────┐ ┌──────────┐ ┌──────┐ ┌────────┐
│ Extract │ ──→ │ Validate │ ──→ │Transform│ ──→ │ Validate │ ──→ │ Load │ ──→ │ Notify │
│(retries │ │ Extract │ │(retries │ │Transform │ │(retry│ │ │
│ =3) │ │ │ │ =2) │ │ │ │ =3) │ │ │
└─────────┘ └──────────┘ └─────────┘ └──────────┘ └──────┘ └────────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
失敗時 失敗時 失敗時 失敗時 失敗時
アラート アラート アラート アラート アラート
🎯 堅牢なパイプライン設計の6つのポイント
べき等性 :何度実行しても同じ結果
検証 :各ステップでデータ品質チェック
リトライ :一時的なエラーに対応
タイムアウト :無限ループを防ぐ
通知 :エラーを即座に検知
ログ :トラブルシューティングに備える
💼 7. 実践演習:堅牢なETLパイプライン
7-1. 要件
リトライ設定(3回、指数バックオフ)
タイムアウト設定(30分)
エラー通知(Slack)
成功通知(Slack)
データ検証
7-2. 完全な実装
# ===== 堅牢なETLパイプライン =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from datetime import datetime, timedelta
import pandas as pd
import logging
logger = logging.getLogger(__name__)
def slack_alert(context, message_type=’failure’):
“””Slack通知を送信”””
ti = context[‘task_instance’]
if message_type == ‘failure’:
emoji = ‘:x:’
title = ‘タスクが失敗しました’
exception = context.get(‘exception’, ‘Unknown’)
extra = f”*エラー:* {exception}”
else:
emoji = ‘:white_check_mark:’
title = ‘パイプラインが完了しました’
extra = “”
msg = f”””
{emoji} *{title}*
*DAG:* `{ti.dag_id}`
*タスク:* `{ti.task_id}`
*実行日:* {context[‘ds’]}
{extra}
“””
hook = SlackWebhookHook(slack_webhook_conn_id=’slack_webhook’)
hook.send_text(msg)
def failure_callback(context):
slack_alert(context, ‘failure’)
def success_callback(context):
slack_alert(context, ‘success’)
def extract_data(**context):
“””データ抽出”””
logger.info(“データ抽出開始”)
df = pd.DataFrame({
‘id’: range(1, 101),
‘value’: [i * 10 for i in range(1, 101)]
})
file_path = f”/tmp/extracted_{context[‘ds’]}.csv”
df.to_csv(file_path, index=False)
logger.info(f”✅ 抽出完了: {len(df)}件 → {file_path}”)
return file_path
def validate_data(**context):
“””データ検証”””
ti = context[‘ti’]
file_path = ti.xcom_pull(task_ids=’extract’)
logger.info(f”データ検証開始: {file_path}”)
df = pd.read_csv(file_path)
# 検証1: レコード数
if len(df) < 10:
raise ValueError(f"レコード数が少なすぎます: {len(df)}")
# 検証2: NULL値
null_count = df.isnull().sum().sum()
if null_count > 0:
logger.warning(f”NULL値が{null_count}件あります”)
logger.info(f”✅ 検証完了: {len(df)}件”)
return file_path
def transform_data(**context):
“””データ変換”””
ti = context[‘ti’]
file_path = ti.xcom_pull(task_ids=’validate’)
logger.info(f”データ変換開始: {file_path}”)
df = pd.read_csv(file_path)
df[‘doubled’] = df[‘value’] * 2
output_path = f”/tmp/transformed_{context[‘ds’]}.csv”
df.to_csv(output_path, index=False)
logger.info(f”✅ 変換完了: {len(df)}件 → {output_path}”)
return output_path
def load_data(**context):
“””データロード”””
ti = context[‘ti’]
file_path = ti.xcom_pull(task_ids=’transform’)
logger.info(f”データロード開始: {file_path}”)
df = pd.read_csv(file_path)
# べき等性: 既存データを削除してから挿入
final_path = f”/tmp/final_{context[‘ds’]}.csv”
df.to_csv(final_path, index=False)
logger.info(f”✅ ロード完了: {len(df)}件 → {final_path}”)
# メトリクスを記録
ti.xcom_push(key=’record_count’, value=len(df))
ti.xcom_push(key=’total_value’, value=int(df[‘doubled’].sum()))
return final_path
default_args = {
‘owner’: ‘data_team’,
‘retries’: 3,
‘retry_delay’: timedelta(minutes=5),
‘retry_exponential_backoff’: True,
‘on_failure_callback’: failure_callback
}
with DAG(
dag_id=’robust_etl_pipeline’,
default_args=default_args,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False,
tags=[‘etl’, ‘robust’]
) as dag:
extract = PythonOperator(
task_id=’extract’,
python_callable=extract_data,
execution_timeout=timedelta(minutes=30)
)
validate = PythonOperator(
task_id=’validate’,
python_callable=validate_data
)
transform = PythonOperator(
task_id=’transform’,
python_callable=transform_data
)
load = PythonOperator(
task_id=’load’,
python_callable=load_data,
on_success_callback=success_callback
)
extract >> validate >> transform >> load
【実行ログ】
[extract] データ抽出開始
[extract] ✅ 抽出完了: 100件 → /tmp/extracted_2024-01-15.csv
[validate] データ検証開始: /tmp/extracted_2024-01-15.csv
[validate] ✅ 検証完了: 100件
[transform] データ変換開始: /tmp/extracted_2024-01-15.csv
[transform] ✅ 変換完了: 100件 → /tmp/transformed_2024-01-15.csv
[load] データロード開始: /tmp/transformed_2024-01-15.csv
[load] ✅ ロード完了: 100件 → /tmp/final_2024-01-15.csv
[Slack] ✅ パイプラインが完了しました
📝 STEP 24 のまとめ
✅ このステップで学んだこと
リトライ設定 :回数、間隔、指数バックオフ
タイムアウト設定 :タスクとセンサーのタイムアウト
エラー通知 :メール、Slack、コールバック関数
監視 :Airflow UIでの監視、ログ確認
堅牢な設計 :べき等性、検証、段階的処理
💡 本番運用のチェックリスト
✅ すべてのタスクにリトライ設定をしているか?
✅ タイムアウトは適切に設定されているか?
✅ エラー通知は確実に届くか?
✅ ログは十分に出力されているか?
✅ データ品質チェックは実装されているか?
✅ べき等性は保証されているか?
🎯 次のステップの予告
次のSTEP 25では、「実践ETLパイプライン構築」 を行います。
売上データの日次集計パイプライン
複数データソースからの抽出
DWHへのデータ投入
レポート生成と通知
📝 練習問題
問題 1
基礎
リトライ3回、間隔5分の基本的なDAGを作成してください。
解答を見る
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def my_task():
print(“タスク実行中…”)
return “完了”
default_args = {
‘owner’: ‘data_team’,
‘retries’: 3,
‘retry_delay’: timedelta(minutes=5)
}
with DAG(
dag_id=’basic_retry_dag’,
default_args=default_args,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
task = PythonOperator(
task_id=’my_task’,
python_callable=my_task
)
問題 2
基礎
タイムアウト30分を設定したタスクを作成してください。
解答を見る
【解答】
from datetime import timedelta
task = PythonOperator(
task_id=’task_with_timeout’,
python_callable=my_function,
execution_timeout=timedelta(minutes=30)
)
問題 3
基礎
on_failure_callbackを使って、失敗時にprint文でエラーメッセージを表示するDAGを作成してください。
解答を見る
【解答】
def failure_alert(context):
ti = context[‘task_instance’]
exception = context.get(‘exception’)
print(f”エラー発生! タスク: {ti.task_id}, エラー: {exception}”)
def risky_task():
raise Exception(“テストエラー”)
with DAG(
dag_id=’failure_callback_dag’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
task = PythonOperator(
task_id=’risky_task’,
python_callable=risky_task,
on_failure_callback=failure_alert
)
問題 4
基礎
指数バックオフを有効にしたリトライ設定を作成してください。
解答を見る
【解答】
default_args = {
‘retries’: 3,
‘retry_delay’: timedelta(minutes=5),
‘retry_exponential_backoff’: True,
‘max_retry_delay’: timedelta(minutes=30)
}
問題 5
応用
タスクが失敗したときにSlackに通知するDAGを作成してください。
解答を見る
【解答】
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
def slack_failure_alert(context):
ti = context[‘task_instance’]
exception = context.get(‘exception’)
hook = SlackWebhookHook(slack_webhook_conn_id=’slack_webhook’)
hook.send_text(f”””
:x: *タスク失敗*
*DAG:* `{ti.dag_id}`
*タスク:* `{ti.task_id}`
*エラー:* {exception}
“””)
def risky_task():
import random
if random.random() < 0.5:
raise Exception("ランダムエラー")
return "成功"
with DAG(
dag_id='slack_failure_dag',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False
) as dag:
task = PythonOperator(
task_id='risky_task',
python_callable=risky_task,
on_failure_callback=slack_failure_alert
)
問題 6
応用
成功時と失敗時の両方で通知を送るDAGを作成してください。
解答を見る
【解答】
def success_callback(context):
ti = context[‘task_instance’]
print(f”✅ 成功: {ti.task_id}”)
def failure_callback(context):
ti = context[‘task_instance’]
print(f”❌ 失敗: {ti.task_id}”)
task = PythonOperator(
task_id=’task’,
python_callable=my_function,
on_success_callback=success_callback,
on_failure_callback=failure_callback
)
問題 7
応用
べき等な処理として、既存データを削除してから挿入するロード関数を作成してください。
解答を見る
【解答】
def idempotent_load(**context):
“””べき等なロード処理”””
from sqlalchemy import create_engine
import pandas as pd
execution_date = context[‘ds’]
engine = create_engine(‘postgresql://user:pass@localhost/db’)
# 既存データを削除
delete_query = f”DELETE FROM daily_data WHERE date = ‘{execution_date}'”
engine.execute(delete_query)
# 新しいデータを挿入
df = pd.read_csv(f’/tmp/data_{execution_date}.csv’)
df.to_sql(‘daily_data’, engine, if_exists=’append’, index=False)
print(f”✅ {execution_date}のデータをロード完了”)
問題 8
応用
ログを出力しながらデータを処理するタスクを作成してください。INFO、WARNING、ERRORの3種類のログを含めてください。
解答を見る
【解答】
import logging
def process_with_logging(**context):
logger = logging.getLogger(__name__)
logger.info(“処理を開始します”)
# データ処理
data = [1, 2, None, 4, 5]
if None in data:
logger.warning(“NULL値が含まれています”)
try:
result = sum([x for x in data if x is not None])
logger.info(f”処理完了: 合計 = {result}”)
return result
except Exception as e:
logger.error(f”処理中にエラー発生: {e}”)
raise
問題 9
発展
データ品質チェックを行い、問題があればアラートを送り、問題なければ処理を続行するDAGを作成してください。
解答を見る
【解答】
from airflow.operators.python import BranchPythonOperator
import pandas as pd
def check_data_quality(**context):
“””データ品質チェック”””
df = pd.read_csv(‘/data/input.csv’)
issues = []
if len(df) < 10:
issues.append(f"レコード数が少ない: {len(df)}件")
null_ratio = df.isnull().sum().sum() / (len(df) * len(df.columns))
if null_ratio > 0.1:
issues.append(f”NULL値が多い: {null_ratio:.2%}”)
if issues:
context[‘ti’].xcom_push(key=’issues’, value=’\n’.join(issues))
return ‘send_alert’
return ‘process_data’
def process_data():
print(“データ処理を実行中…”)
def send_alert(**context):
issues = context[‘ti’].xcom_pull(task_ids=’check_quality’, key=’issues’)
print(f”⚠️ データ品質アラート:\n{issues}”)
with DAG(
dag_id=’quality_check_dag’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
check = BranchPythonOperator(
task_id=’check_quality’,
python_callable=check_data_quality
)
alert = PythonOperator(
task_id=’send_alert’,
python_callable=send_alert
)
process = PythonOperator(
task_id=’process_data’,
python_callable=process_data
)
check >> [alert, process]
問題 10
発展
リトライ設定、タイムアウト設定、エラー通知、成功通知、べき等性を備えた完全なETLパイプラインを作成してください。
解答を見る
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import logging
logger = logging.getLogger(__name__)
def failure_callback(context):
ti = context[‘task_instance’]
logger.error(f”❌ 失敗: {ti.dag_id}.{ti.task_id}”)
def success_callback(context):
ti = context[‘task_instance’]
logger.info(f”✅ 成功: {ti.dag_id}.{ti.task_id}”)
def extract(**context):
logger.info(“Extract開始”)
df = pd.DataFrame({‘id’: range(100), ‘value’: range(100)})
path = f”/tmp/extracted_{context[‘ds’]}.csv”
df.to_csv(path, index=False)
logger.info(f”Extract完了: {len(df)}件”)
return path
def transform(**context):
ti = context[‘ti’]
path = ti.xcom_pull(task_ids=’extract’)
logger.info(f”Transform開始: {path}”)
df = pd.read_csv(path)
df[‘doubled’] = df[‘value’] * 2
output = f”/tmp/transformed_{context[‘ds’]}.csv”
df.to_csv(output, index=False)
logger.info(f”Transform完了: {len(df)}件”)
return output
def load(**context):
ti = context[‘ti’]
path = ti.xcom_pull(task_ids=’transform’)
logger.info(f”Load開始: {path}”)
df = pd.read_csv(path)
# べき等性: 既存ファイルを上書き
final = f”/tmp/final_{context[‘ds’]}.csv”
df.to_csv(final, index=False)
logger.info(f”Load完了: {len(df)}件”)
default_args = {
‘retries’: 3,
‘retry_delay’: timedelta(minutes=5),
‘retry_exponential_backoff’: True,
‘on_failure_callback’: failure_callback
}
with DAG(
dag_id=’complete_robust_etl’,
default_args=default_args,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
extract_task = PythonOperator(
task_id=’extract’,
python_callable=extract,
execution_timeout=timedelta(minutes=30)
)
transform_task = PythonOperator(
task_id=’transform’,
python_callable=transform,
execution_timeout=timedelta(minutes=30)
)
load_task = PythonOperator(
task_id=’load’,
python_callable=load,
execution_timeout=timedelta(minutes=30),
on_success_callback=success_callback
)
extract_task >> transform_task >> load_task