🎯 実践プロジェクト
📋 このステップで学ぶこと
- 複数データソースからのデータ抽出
- Pandasによるデータ変換と集計
- データウェアハウスへのロード
- レポート生成とファイル出力
- エラーハンドリングと通知
- 完全なETLパイプラインの構築
⏱️ 学習時間の目安:3.5時間
📝 前提知識:STEP 1〜24の全内容
🎯 1. プロジェクト概要
📚 例え話:建築プロジェクト
ETLパイプラインの構築は建築プロジェクトに似ています。
・設計図(DAG):全体の構成と各部品の関係を定義
・資材調達(Extract):様々な業者から材料を集める
・加工・組立(Transform):材料を加工して部品を作る
・建設(Load):加工した部品を組み立てて建物を完成
・検査(Validation):建物の品質をチェック
・引き渡し(Report):完成報告と鍵の引き渡し
今回は、ECサイトの売上データを毎日集計する「データの家」を建てます!
1-1. プロジェクトの目的
ECサイトの売上データを日次で集計し、経営陣が見るダッシュボード用のデータを作成します。
📊 ビジネス要件
- 毎日朝6時に自動実行
- 複数のデータベースから売上、商品、顧客情報を取得
- データを統合・変換して日次集計レポートを作成
- 集計結果をデータウェアハウス(DWH)に格納
- レポートをCSV形式でS3に保存
- 処理完了をSlackで通知
- エラー発生時は即座にアラート
1-2. データソース
データソース一覧
| データソース |
DB種類 |
テーブル |
主なカラム |
| 注文データ |
PostgreSQL |
orders |
order_id, customer_id, product_id, quantity, amount, order_date |
| 商品データ |
MySQL |
products |
product_id, product_name, category, price |
| 顧客データ |
PostgreSQL |
customers |
customer_id, customer_name, region, registration_date |
1-3. アーキテクチャ図
パイプライン構成図
┌─────────────────────────────────────────────────────────────────────────────────┐
│ Airflow ETL Pipeline │
└─────────────────────────────────────────────────────────────────────────────────┘
┌──────────────┐ ┌──────────────┐
│ PostgreSQL │──┐ │ Slack │
│ (注文DB) │ │ │ 通知 │
└──────────────┘ │ └──────────────┘
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ↑
┌──────────────┐ ├──→│ Extract │──→│Transform │──→│ Load │─────────┤
│ MySQL │──┤ │ (並列) │ │ (統合) │ │ (DWH) │ │
│ (商品DB) │ │ └──────────┘ └──────────┘ └──────────┘ │
└──────────────┘ │ │ │
│ ↓ │
┌──────────────┐ │ ┌──────────────┐ │
│ PostgreSQL │──┘ │ PostgreSQL │ │
│ (顧客DB) │ │ (DWH) │ │
└──────────────┘ └──────────────┘ │
│ │
↓ │
┌──────────────┐ │
│ AWS S3 │───────┘
│ (レポート) │
└──────────────┘
💡 このプロジェクトで使う技術
- Apache Airflow:ワークフローオーケストレーション
- Pandas:データ変換・集計
- SQLAlchemy:データベース接続
- boto3:S3へのファイルアップロード
- Slack API:通知
🔧 2. 環境準備
2-1. 必要なライブラリのインストール
# ターミナルで実行
pip install apache-airflow
pip install pandas
pip install sqlalchemy
pip install psycopg2-binary
pip install pymysql
pip install boto3
pip install apache-airflow-providers-slack
pip install apache-airflow-providers-postgres
pip install apache-airflow-providers-mysql
2-2. データベース接続設定
Airflow UIのAdmin > Connectionsから以下を追加します:
接続設定一覧
| Conn Id |
Conn Type |
Host |
Schema |
Port |
用途 |
| orders_db |
Postgres |
localhost |
orders_database |
5432 |
注文データ |
| products_db |
MySQL |
localhost |
products_database |
3306 |
商品データ |
| customers_db |
Postgres |
localhost |
customers_database |
5432 |
顧客データ |
| dwh_db |
Postgres |
localhost |
data_warehouse |
5432 |
DWH |
| slack_webhook |
HTTP |
hooks.slack.com |
– |
– |
Slack通知 |
📥 3. データ抽出タスク(Extract)
3-1. タスク1:注文データの抽出
# ===== 注文データ抽出タスク =====
from airflow.providers.postgres.hooks.postgres import PostgresHook
import pandas as pd
def extract_orders(**context):
“””注文データを抽出”””
execution_date = context[‘ds’] # YYYY-MM-DD形式
# PostgreSQLに接続
hook = PostgresHook(postgres_conn_id=’orders_db’)
engine = hook.get_sqlalchemy_engine()
# 前日の注文データを取得
query = f”””
SELECT
order_id,
customer_id,
product_id,
quantity,
amount,
order_date
FROM orders
WHERE DATE(order_date) = ‘{execution_date}’
“””
df = pd.read_sql(query, engine)
print(f”抽出件数: {len(df)}件”)
# 検証
if len(df) == 0:
raise ValueError(“注文データが0件です。データソースを確認してください。”)
# 一時ファイルに保存
output_path = f’/tmp/orders_{execution_date}.csv’
df.to_csv(output_path, index=False)
# 次のタスクに情報を渡す
context[‘ti’].xcom_push(key=’orders_file’, value=output_path)
context[‘ti’].xcom_push(key=’orders_count’, value=len(df))
return output_path
【実行ログ】
[2024-01-15 06:00:05] PostgreSQLに接続中…
[2024-01-15 06:00:06] クエリ実行中: SELECT order_id, customer_id, …
[2024-01-15 06:00:08] 抽出件数: 1,523件
[2024-01-15 06:00:09] ファイル保存: /tmp/orders_2024-01-15.csv
3-2. タスク2:商品データの抽出
# ===== 商品データ抽出タスク =====
from airflow.providers.mysql.hooks.mysql import MySqlHook
def extract_products(**context):
“””商品データを抽出”””
execution_date = context[‘ds’]
# MySQLに接続
hook = MySqlHook(mysql_conn_id=’products_db’)
engine = hook.get_sqlalchemy_engine()
# 全商品データを取得
query = “””
SELECT
product_id,
product_name,
category,
price
FROM products
WHERE is_active = 1
“””
df = pd.read_sql(query, engine)
print(f”商品データ抽出: {len(df)}件”)
# 一時ファイルに保存
output_path = f’/tmp/products_{execution_date}.csv’
df.to_csv(output_path, index=False)
context[‘ti’].xcom_push(key=’products_file’, value=output_path)
context[‘ti’].xcom_push(key=’products_count’, value=len(df))
return output_path
3-3. タスク3:顧客データの抽出
# ===== 顧客データ抽出タスク =====
def extract_customers(**context):
“””顧客データを抽出”””
execution_date = context[‘ds’]
# PostgreSQLに接続
hook = PostgresHook(postgres_conn_id=’customers_db’)
engine = hook.get_sqlalchemy_engine()
# 顧客データを取得
query = “””
SELECT
customer_id,
customer_name,
region,
registration_date
FROM customers
“””
df = pd.read_sql(query, engine)
print(f”顧客データ抽出: {len(df)}件”)
# 一時ファイルに保存
output_path = f’/tmp/customers_{execution_date}.csv’
df.to_csv(output_path, index=False)
context[‘ti’].xcom_push(key=’customers_file’, value=output_path)
context[‘ti’].xcom_push(key=’customers_count’, value=len(df))
return output_path
🎯 Extract タスクのポイント
- 並列実行:3つのExtractタスクは並列で実行可能
- XCom:ファイルパスと件数を次のタスクに渡す
- 検証:データが0件の場合はエラーを発生させる
- 一時ファイル:大きなデータはXComではなくファイルで渡す
🔄 4. データ変換タスク(Transform)
4-1. タスク4:データ統合と変換
# ===== データ統合・変換タスク =====
def transform_data(**context):
“””3つのデータソースを統合して変換”””
execution_date = context[‘ds’]
ti = context[‘ti’]
# 抽出したファイルのパスを取得
orders_file = ti.xcom_pull(task_ids=’extract_orders’, key=’orders_file’)
products_file = ti.xcom_pull(task_ids=’extract_products’, key=’products_file’)
customers_file = ti.xcom_pull(task_ids=’extract_customers’, key=’customers_file’)
# CSVファイルを読み込み
orders_df = pd.read_csv(orders_file)
products_df = pd.read_csv(products_file)
customers_df = pd.read_csv(customers_file)
print(f”注文: {len(orders_df)}件, 商品: {len(products_df)}件, 顧客: {len(customers_df)}件”)
# データ結合
# 1. 注文 + 商品情報
merged_df = orders_df.merge(
products_df,
on=’product_id’,
how=’left’
)
# 2. 顧客情報を追加
merged_df = merged_df.merge(
customers_df,
on=’customer_id’,
how=’left’
)
# データ変換
# 日付型に変換
merged_df[‘order_date’] = pd.to_datetime(merged_df[‘order_date’])
# 売上金額を計算(数量 × 単価)
merged_df[‘total_amount’] = merged_df[‘quantity’] * merged_df[‘price’]
# カラムを整理
final_df = merged_df[[
‘order_id’, ‘order_date’, ‘customer_id’, ‘customer_name’,
‘region’, ‘product_id’, ‘product_name’, ‘category’,
‘quantity’, ‘price’, ‘total_amount’
]]
# データ品質チェック
null_count = final_df.isnull().sum().sum()
if null_count > 0:
print(f”警告: NULL値が{null_count}件含まれています”)
# 変換後のデータを保存
output_path = f’/tmp/transformed_{execution_date}.csv’
final_df.to_csv(output_path, index=False)
print(f”変換完了: {len(final_df)}件”)
context[‘ti’].xcom_push(key=’transformed_file’, value=output_path)
context[‘ti’].xcom_push(key=’transformed_count’, value=len(final_df))
return output_path
【実行ログ】
[2024-01-15 06:00:15] 注文: 1,523件, 商品: 850件, 顧客: 12,450件
[2024-01-15 06:00:16] データ結合中…
[2024-01-15 06:00:17] 変換完了: 1,523件
[2024-01-15 06:00:18] ファイル保存: /tmp/transformed_2024-01-15.csv
4-2. タスク5:集計レポートの作成
# ===== 集計レポート作成タスク =====
def create_summary(**context):
“””日次集計レポートを作成”””
execution_date = context[‘ds’]
ti = context[‘ti’]
# 変換済みデータを読み込み
transformed_file = ti.xcom_pull(task_ids=’transform_data’, key=’transformed_file’)
df = pd.read_csv(transformed_file)
# 集計1: カテゴリ別売上
category_summary = df.groupby(‘category’).agg({
‘total_amount’: ‘sum’,
‘order_id’: ‘count’,
‘quantity’: ‘sum’
}).reset_index()
category_summary.columns = [‘category’, ‘total_sales’, ‘order_count’, ‘total_quantity’]
category_summary = category_summary.sort_values(‘total_sales’, ascending=False)
# 集計2: 地域別売上
region_summary = df.groupby(‘region’).agg({
‘total_amount’: ‘sum’,
‘order_id’: ‘count’,
‘customer_id’: ‘nunique’
}).reset_index()
region_summary.columns = [‘region’, ‘total_sales’, ‘order_count’, ‘unique_customers’]
region_summary = region_summary.sort_values(‘total_sales’, ascending=False)
# 集計3: 売れ筋商品TOP10
product_summary = df.groupby([‘product_id’, ‘product_name’]).agg({
‘total_amount’: ‘sum’,
‘quantity’: ‘sum’
}).reset_index()
product_summary = product_summary.sort_values(‘total_amount’, ascending=False).head(10)
# 全体サマリー
overall_summary = {
‘date’: execution_date,
‘total_sales’: df[‘total_amount’].sum(),
‘total_orders’: len(df),
‘total_quantity’: df[‘quantity’].sum(),
‘avg_order_value’: df[‘total_amount’].mean(),
‘unique_customers’: df[‘customer_id’].nunique()
}
# 結果を保存
category_summary.to_csv(f’/tmp/category_summary_{execution_date}.csv’, index=False)
region_summary.to_csv(f’/tmp/region_summary_{execution_date}.csv’, index=False)
product_summary.to_csv(f’/tmp/product_summary_{execution_date}.csv’, index=False)
# XComに保存
context[‘ti’].xcom_push(key=’overall_summary’, value=overall_summary)
context[‘ti’].xcom_push(key=’category_summary_file’, value=f’/tmp/category_summary_{execution_date}.csv’)
context[‘ti’].xcom_push(key=’region_summary_file’, value=f’/tmp/region_summary_{execution_date}.csv’)
context[‘ti’].xcom_push(key=’product_summary_file’, value=f’/tmp/product_summary_{execution_date}.csv’)
print(“集計完了:”)
print(f” 総売上: ¥{overall_summary[‘total_sales’]:,.0f}”)
print(f” 注文数: {overall_summary[‘total_orders’]:,}件”)
print(f” 顧客数: {overall_summary[‘unique_customers’]:,}人”)
return overall_summary
【集計結果】
集計完了:
総売上: ¥15,234,500
注文数: 1,523件
顧客数: 1,245人
💾 5. データロードタスク(Load)
5-1. タスク6:DWHへのロード
# ===== DWHロードタスク =====
def load_to_dwh(**context):
“””データウェアハウスにデータをロード”””
execution_date = context[‘ds’]
ti = context[‘ti’]
# 変換済みデータを取得
transformed_file = ti.xcom_pull(task_ids=’transform_data’, key=’transformed_file’)
df = pd.read_csv(transformed_file)
# DWHに接続
hook = PostgresHook(postgres_conn_id=’dwh_db’)
engine = hook.get_sqlalchemy_engine()
# べき等性を確保するため、既存データを削除
with engine.connect() as conn:
delete_query = f”DELETE FROM daily_sales WHERE DATE(order_date) = ‘{execution_date}'”
conn.execute(delete_query)
conn.commit()
# 新しいデータを挿入
df.to_sql(
‘daily_sales’,
engine,
if_exists=’append’,
index=False,
method=’multi’,
chunksize=1000
)
print(f”DWHにロード完了: {len(df)}件”)
# 検証: 挿入されたデータ件数を確認
verify_query = f”SELECT COUNT(*) FROM daily_sales WHERE DATE(order_date) = ‘{execution_date}'”
with engine.connect() as conn:
result = conn.execute(verify_query).fetchone()
loaded_count = result[0]
if loaded_count != len(df):
raise ValueError(f”ロード件数が一致しません: 期待{len(df)}件 vs 実際{loaded_count}件”)
print(f”検証OK: {loaded_count}件”)
return loaded_count
5-2. タスク7:サマリーテーブルへのロード
# ===== サマリーテーブルロードタスク =====
def load_summary_to_dwh(**context):
“””集計結果をDWHのサマリーテーブルにロード”””
execution_date = context[‘ds’]
ti = context[‘ti’]
# 集計データを取得
overall_summary = ti.xcom_pull(task_ids=’create_summary’, key=’overall_summary’)
# DWHに接続
hook = PostgresHook(postgres_conn_id=’dwh_db’)
engine = hook.get_sqlalchemy_engine()
# サマリーテーブルに挿入
summary_df = pd.DataFrame([overall_summary])
# 既存データを削除(べき等性)
with engine.connect() as conn:
delete_query = f”DELETE FROM daily_summary WHERE date = ‘{execution_date}'”
conn.execute(delete_query)
conn.commit()
# 挿入
summary_df.to_sql(
‘daily_summary’,
engine,
if_exists=’append’,
index=False
)
print(“サマリーテーブルへのロード完了”)
return True
⚠️ べき等性の重要性
ロード処理では必ず既存データを削除してから挿入します。
これにより、何度実行しても同じ結果になります(べき等性)。
べき等性がないと、再実行時にデータが重複してしまいます!
📊 6. レポート生成とS3アップロード
6-1. タスク8:レポート生成
# ===== レポート生成タスク =====
def generate_csv_report(**context):
“””経営陣向けのCSVレポートを生成”””
execution_date = context[‘ds’]
ti = context[‘ti’]
# 集計データを取得
category_file = ti.xcom_pull(task_ids=’create_summary’, key=’category_summary_file’)
region_file = ti.xcom_pull(task_ids=’create_summary’, key=’region_summary_file’)
product_file = ti.xcom_pull(task_ids=’create_summary’, key=’product_summary_file’)
overall_summary = ti.xcom_pull(task_ids=’create_summary’, key=’overall_summary’)
# データを読み込み
category_df = pd.read_csv(category_file)
region_df = pd.read_csv(region_file)
product_df = pd.read_csv(product_file)
# 統合レポートを作成
with open(f’/tmp/daily_report_{execution_date}.txt’, ‘w’, encoding=’utf-8′) as f:
f.write(“=” * 60 + “\n”)
f.write(f”日次売上レポート – {execution_date}\n”)
f.write(“=” * 60 + “\n\n”)
f.write(“【全体サマリー】\n”)
f.write(f”総売上: ¥{overall_summary[‘total_sales’]:,.0f}\n”)
f.write(f”注文数: {overall_summary[‘total_orders’]:,}件\n”)
f.write(f”販売数量: {overall_summary[‘total_quantity’]:,}個\n”)
f.write(f”平均注文額: ¥{overall_summary[‘avg_order_value’]:,.0f}\n”)
f.write(f”利用顧客数: {overall_summary[‘unique_customers’]:,}人\n\n”)
f.write(“【カテゴリ別売上】\n”)
f.write(category_df.to_string(index=False))
f.write(“\n\n”)
f.write(“【地域別売上】\n”)
f.write(region_df.to_string(index=False))
f.write(“\n\n”)
f.write(“【売れ筋商品TOP10】\n”)
f.write(product_df.to_string(index=False))
f.write(“\n”)
# レポートファイルのパスを保存
context[‘ti’].xcom_push(key=’report_files’, value=[
f’/tmp/daily_report_{execution_date}.txt’,
f’/tmp/category_summary_{execution_date}.csv’,
f’/tmp/region_summary_{execution_date}.csv’,
f’/tmp/product_summary_{execution_date}.csv’
])
print(“レポート生成完了”)
return True
6-2. タスク9:S3にアップロード
# ===== S3アップロードタスク =====
import boto3
from botocore.exceptions import ClientError
def upload_to_s3(**context):
“””レポートファイルをS3にアップロード”””
execution_date = context[‘ds’]
ti = context[‘ti’]
# レポートファイルのリストを取得
report_files = ti.xcom_pull(task_ids=’generate_csv_report’, key=’report_files’)
# S3クライアントを作成
s3_client = boto3.client(‘s3’)
bucket_name = ‘my-company-reports’
uploaded_files = []
for local_file in report_files:
# S3のキー(パス)を決定
file_name = local_file.split(‘/’)[-1]
s3_key = f’daily_reports/{execution_date}/{file_name}’
try:
# アップロード
s3_client.upload_file(local_file, bucket_name, s3_key)
print(f”アップロード成功: {s3_key}”)
uploaded_files.append(s3_key)
except ClientError as e:
print(f”アップロード失敗: {local_file}”)
raise
# アップロード結果を保存
context[‘ti’].xcom_push(key=’s3_files’, value=uploaded_files)
print(f”S3アップロード完了: {len(uploaded_files)}ファイル”)
return uploaded_files
📢 7. 通知タスク
7-1. タスク10:Slack通知
# ===== Slack通知タスク =====
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
def create_slack_message(**context):
“””Slack通知用のメッセージを作成”””
execution_date = context[‘ds’]
ti = context[‘ti’]
# 集計結果を取得
overall_summary = ti.xcom_pull(task_ids=’create_summary’, key=’overall_summary’)
orders_count = ti.xcom_pull(task_ids=’extract_orders’, key=’orders_count’)
transformed_count = ti.xcom_pull(task_ids=’transform_data’, key=’transformed_count’)
s3_files = ti.xcom_pull(task_ids=’upload_to_s3′, key=’s3_files’)
# メッセージを作成
message = f”””
:chart_with_upwards_trend: *日次売上パイプライン完了*
*実行日:* {execution_date}
*処理結果:*
• 抽出件数: {orders_count:,}件
• 変換件数: {transformed_count:,}件
• DWHロード: 完了
*売上サマリー:*
• 総売上: ¥{overall_summary[‘total_sales’]:,.0f}
• 注文数: {overall_summary[‘total_orders’]:,}件
• 顧客数: {overall_summary[‘unique_customers’]:,}人
• 平均注文額: ¥{overall_summary[‘avg_order_value’]:,.0f}
*レポート:*
• S3にアップロード済み ({len(s3_files)}ファイル)
“””
# Slackに送信
hook = SlackWebhookHook(slack_webhook_conn_id=’slack_webhook’)
hook.send_text(message)
return message
7-2. エラー通知
# ===== エラー通知用コールバック =====
def send_error_notification(context):
“””エラー発生時の通知”””
task_instance = context[‘task_instance’]
exception = context.get(‘exception’)
execution_date = context[‘ds’]
error_message = f”””
:x: *パイプラインでエラーが発生しました*
*実行日:* {execution_date}
*DAG:* `{task_instance.dag_id}`
*失敗タスク:* `{task_instance.task_id}`
*エラー内容:* {exception}
<!channel> 確認と対応をお願いします。
“””
hook = SlackWebhookHook(slack_webhook_conn_id=’slack_webhook’)
hook.send_text(error_message)
🔗 8. DAG全体の構築
8-1. 完全なDAGコード
# ===== 完全なDAGファイル: sales_daily_pipeline.py =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# デフォルト設定
default_args = {
‘owner’: ‘data_team’,
‘depends_on_past’: False,
‘email’: [‘data-team@company.com’],
‘email_on_failure’: True,
‘email_on_retry’: False,
‘retries’: 2,
‘retry_delay’: timedelta(minutes=5),
‘execution_timeout’: timedelta(minutes=30),
‘on_failure_callback’: send_error_notification
}
# DAGの定義
with DAG(
dag_id=’sales_daily_pipeline’,
default_args=default_args,
description=’売上データの日次集計パイプライン’,
schedule_interval=’0 6 * * *’, # 毎日朝6時
start_date=datetime(2024, 1, 1),
catchup=False,
tags=[‘production’, ‘etl’, ‘sales’]
) as dag:
# ===== Extract Tasks =====
extract_orders_task = PythonOperator(
task_id=’extract_orders’,
python_callable=extract_orders
)
extract_products_task = PythonOperator(
task_id=’extract_products’,
python_callable=extract_products
)
extract_customers_task = PythonOperator(
task_id=’extract_customers’,
python_callable=extract_customers
)
# ===== Transform Tasks =====
transform_task = PythonOperator(
task_id=’transform_data’,
python_callable=transform_data
)
create_summary_task = PythonOperator(
task_id=’create_summary’,
python_callable=create_summary
)
# ===== Load Tasks =====
load_dwh_task = PythonOperator(
task_id=’load_to_dwh’,
python_callable=load_to_dwh
)
load_summary_task = PythonOperator(
task_id=’load_summary_to_dwh’,
python_callable=load_summary_to_dwh
)
# ===== Report Tasks =====
generate_report_task = PythonOperator(
task_id=’generate_csv_report’,
python_callable=generate_csv_report
)
upload_s3_task = PythonOperator(
task_id=’upload_to_s3′,
python_callable=upload_to_s3
)
# ===== Notification Tasks =====
notify_task = PythonOperator(
task_id=’notify_slack’,
python_callable=create_slack_message
)
# ===== タスク依存関係の定義 =====
# Extract (並列実行)
[extract_orders_task, extract_products_task, extract_customers_task] >> transform_task
# Transform → Load
transform_task >> create_summary_task
# Load (並列実行)
create_summary_task >> [load_dwh_task, load_summary_task]
# Report
create_summary_task >> generate_report_task >> upload_s3_task
# Notification (全タスク完了後)
[load_dwh_task, load_summary_task, upload_s3_task] >> notify_task
タスク依存関係図
┌────────────────┐ ┌─────────────────┐ ┌───────────────────┐
│extract_orders │ │extract_products │ │extract_customers │
└───────┬────────┘ └────────┬────────┘ └─────────┬─────────┘
│ │ │
└────────────────────┼─────────────────────┘
│
▼
┌────────────────┐
│transform_data │
└───────┬────────┘
│
▼
┌─────────────────┐
│create_summary │
└───────┬─────────┘
│
┌────────────────┼────────────────┐
│ │ │
▼ ▼ ▼
┌────────────┐ ┌─────────────┐ ┌─────────────────┐
│load_to_dwh │ │load_summary │ │generate_report │
└──────┬─────┘ └──────┬──────┘ └────────┬────────┘
│ │ │
│ │ ▼
│ │ ┌────────────────┐
│ │ │upload_to_s3 │
│ │ └────────┬───────┘
│ │ │
└───────────────┼──────────────────┘
│
▼
┌────────────────┐
│notify_slack │
└────────────────┘
🎯 タスク依存関係のポイント
- Extract:3つのタスクを並列実行(時間短縮)
- Transform:Extractが全て完了してから実行
- Load:DWHとサマリーを並列でロード
- Report:集計完了後にレポート生成
- Notification:全タスク完了後に通知
🧪 9. テストと検証
9-1. DAG検証コマンド
# DAGファイルの構文チェック
python /path/to/sales_daily_pipeline.py
# DAGが正しく認識されるか確認
airflow dags list
# DAGの構造を確認
airflow dags show sales_daily_pipeline
# 特定の日付でテスト実行
airflow dags test sales_daily_pipeline 2024-01-15
9-2. データ品質チェック
# ===== データ検証タスク =====
def validate_pipeline_output(**context):
“””パイプライン出力を検証”””
execution_date = context[‘ds’]
# DWHに接続
hook = PostgresHook(postgres_conn_id=’dwh_db’)
engine = hook.get_sqlalchemy_engine()
# 検証1: データ件数
count_query = f”SELECT COUNT(*) FROM daily_sales WHERE DATE(order_date) = ‘{execution_date}'”
with engine.connect() as conn:
count = conn.execute(count_query).fetchone()[0]
if count == 0:
raise ValueError(“ロードされたデータが0件です”)
# 検証2: NULL値チェック
null_query = f”””
SELECT
SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) as null_customers,
SUM(CASE WHEN product_id IS NULL THEN 1 ELSE 0 END) as null_products
FROM daily_sales
WHERE DATE(order_date) = ‘{execution_date}’
“””
with engine.connect() as conn:
result = conn.execute(null_query).fetchone()
if any(result):
raise ValueError(f”NULL値が検出されました: {result}”)
print(“✅ データ検証完了: すべて正常”)
return True
📝 STEP 25 のまとめ
✅ このステップで学んだこと
- 複数データソースからの並列データ抽出
- Pandasによるデータ統合と変換
- 集計処理と多様なレポート作成
- DWHへのべき等なデータロード
- S3へのファイルアップロード
- Slack通知とエラーハンドリング
- 完全なETLパイプラインの構築
💡 本番運用のチェックリスト
- ✅ べき等性:何度実行しても同じ結果
- ✅ 並列処理:Extractタスクを並列化して高速化
- ✅ データ検証:各ステップで品質チェック
- ✅ エラーハンドリング:リトライと通知
- ✅ 監視:Slackでリアルタイム通知
- ✅ ログ:詳細なログ出力でトラブルシューティング
🎯 次のステップの予告
次のSTEP 26では、「最終プロジェクト」として、さらに複雑な多ソース統合パイプラインに挑戦します!
- API、データベース、CSVファイルなど様々なデータソース
- データ品質チェックの実装
- 複雑なワークフローの設計
- 完全な本番運用パイプライン