STEP 25:実践ETLパイプライン構築

🚀 STEP 25: 実践ETLパイプライン構築

これまでの知識を総動員して、本格的な売上データ集計パイプラインを作ろう

🎯 実践プロジェクト

📋 このステップで学ぶこと

  • 複数データソースからのデータ抽出
  • Pandasによるデータ変換と集計
  • データウェアハウスへのロード
  • レポート生成とファイル出力
  • エラーハンドリングと通知
  • 完全なETLパイプラインの構築

⏱️ 学習時間の目安:3.5時間

📝 前提知識:STEP 1〜24の全内容

🎯 1. プロジェクト概要

📚 例え話:建築プロジェクト

ETLパイプラインの構築は建築プロジェクトに似ています。

設計図(DAG):全体の構成と各部品の関係を定義
資材調達(Extract):様々な業者から材料を集める
加工・組立(Transform):材料を加工して部品を作る
建設(Load):加工した部品を組み立てて建物を完成
検査(Validation):建物の品質をチェック
引き渡し(Report):完成報告と鍵の引き渡し

今回は、ECサイトの売上データを毎日集計する「データの家」を建てます!

1-1. プロジェクトの目的

ECサイトの売上データを日次で集計し、経営陣が見るダッシュボード用のデータを作成します。

📊 ビジネス要件
  • 毎日朝6時に自動実行
  • 複数のデータベースから売上、商品、顧客情報を取得
  • データを統合・変換して日次集計レポートを作成
  • 集計結果をデータウェアハウス(DWH)に格納
  • レポートをCSV形式でS3に保存
  • 処理完了をSlackで通知
  • エラー発生時は即座にアラート

1-2. データソース

データソース一覧
データソース DB種類 テーブル 主なカラム
注文データ PostgreSQL orders order_id, customer_id, product_id, quantity, amount, order_date
商品データ MySQL products product_id, product_name, category, price
顧客データ PostgreSQL customers customer_id, customer_name, region, registration_date

1-3. アーキテクチャ図

パイプライン構成図
┌─────────────────────────────────────────────────────────────────────────────────┐
│                              Airflow ETL Pipeline                               │
└─────────────────────────────────────────────────────────────────────────────────┘

┌──────────────┐                                                   ┌──────────────┐
│  PostgreSQL  │──┐                                                │    Slack     │
│  (注文DB)    │  │                                                │    通知      │
└──────────────┘  │                                                └──────────────┘
                  │   ┌──────────┐   ┌──────────┐   ┌──────────┐         ↑
┌──────────────┐  ├──→│ Extract  │──→│Transform │──→│   Load   │─────────┤
│    MySQL     │──┤   │ (並列)   │   │ (統合)   │   │  (DWH)   │         │
│  (商品DB)    │  │   └──────────┘   └──────────┘   └──────────┘         │
└──────────────┘  │                                      │               │
                  │                                      ↓               │
┌──────────────┐  │                              ┌──────────────┐       │
│  PostgreSQL  │──┘                              │  PostgreSQL  │       │
│  (顧客DB)    │                                 │    (DWH)     │       │
└──────────────┘                                 └──────────────┘       │
                                                        │               │
                                                        ↓               │
                                                 ┌──────────────┐       │
                                                 │   AWS S3     │───────┘
                                                 │  (レポート)   │
                                                 └──────────────┘
        
💡 このプロジェクトで使う技術
  • Apache Airflow:ワークフローオーケストレーション
  • Pandas:データ変換・集計
  • SQLAlchemy:データベース接続
  • boto3:S3へのファイルアップロード
  • Slack API:通知

🔧 2. 環境準備

2-1. 必要なライブラリのインストール

# ターミナルで実行 pip install apache-airflow pip install pandas pip install sqlalchemy pip install psycopg2-binary pip install pymysql pip install boto3 pip install apache-airflow-providers-slack pip install apache-airflow-providers-postgres pip install apache-airflow-providers-mysql

2-2. データベース接続設定

Airflow UIのAdmin > Connectionsから以下を追加します:

接続設定一覧
Conn Id Conn Type Host Schema Port 用途
orders_db Postgres localhost orders_database 5432 注文データ
products_db MySQL localhost products_database 3306 商品データ
customers_db Postgres localhost customers_database 5432 顧客データ
dwh_db Postgres localhost data_warehouse 5432 DWH
slack_webhook HTTP hooks.slack.com Slack通知

📥 3. データ抽出タスク(Extract)

3-1. タスク1:注文データの抽出

# ===== 注文データ抽出タスク ===== from airflow.providers.postgres.hooks.postgres import PostgresHook import pandas as pd def extract_orders(**context): “””注文データを抽出””” execution_date = context[‘ds’] # YYYY-MM-DD形式 # PostgreSQLに接続 hook = PostgresHook(postgres_conn_id=’orders_db’) engine = hook.get_sqlalchemy_engine() # 前日の注文データを取得 query = f””” SELECT order_id, customer_id, product_id, quantity, amount, order_date FROM orders WHERE DATE(order_date) = ‘{execution_date}’ “”” df = pd.read_sql(query, engine) print(f”抽出件数: {len(df)}件”) # 検証 if len(df) == 0: raise ValueError(“注文データが0件です。データソースを確認してください。”) # 一時ファイルに保存 output_path = f’/tmp/orders_{execution_date}.csv’ df.to_csv(output_path, index=False) # 次のタスクに情報を渡す context[‘ti’].xcom_push(key=’orders_file’, value=output_path) context[‘ti’].xcom_push(key=’orders_count’, value=len(df)) return output_path
【実行ログ】 [2024-01-15 06:00:05] PostgreSQLに接続中… [2024-01-15 06:00:06] クエリ実行中: SELECT order_id, customer_id, … [2024-01-15 06:00:08] 抽出件数: 1,523件 [2024-01-15 06:00:09] ファイル保存: /tmp/orders_2024-01-15.csv

3-2. タスク2:商品データの抽出

# ===== 商品データ抽出タスク ===== from airflow.providers.mysql.hooks.mysql import MySqlHook def extract_products(**context): “””商品データを抽出””” execution_date = context[‘ds’] # MySQLに接続 hook = MySqlHook(mysql_conn_id=’products_db’) engine = hook.get_sqlalchemy_engine() # 全商品データを取得 query = “”” SELECT product_id, product_name, category, price FROM products WHERE is_active = 1 “”” df = pd.read_sql(query, engine) print(f”商品データ抽出: {len(df)}件”) # 一時ファイルに保存 output_path = f’/tmp/products_{execution_date}.csv’ df.to_csv(output_path, index=False) context[‘ti’].xcom_push(key=’products_file’, value=output_path) context[‘ti’].xcom_push(key=’products_count’, value=len(df)) return output_path

3-3. タスク3:顧客データの抽出

# ===== 顧客データ抽出タスク ===== def extract_customers(**context): “””顧客データを抽出””” execution_date = context[‘ds’] # PostgreSQLに接続 hook = PostgresHook(postgres_conn_id=’customers_db’) engine = hook.get_sqlalchemy_engine() # 顧客データを取得 query = “”” SELECT customer_id, customer_name, region, registration_date FROM customers “”” df = pd.read_sql(query, engine) print(f”顧客データ抽出: {len(df)}件”) # 一時ファイルに保存 output_path = f’/tmp/customers_{execution_date}.csv’ df.to_csv(output_path, index=False) context[‘ti’].xcom_push(key=’customers_file’, value=output_path) context[‘ti’].xcom_push(key=’customers_count’, value=len(df)) return output_path
🎯 Extract タスクのポイント
  • 並列実行:3つのExtractタスクは並列で実行可能
  • XCom:ファイルパスと件数を次のタスクに渡す
  • 検証:データが0件の場合はエラーを発生させる
  • 一時ファイル:大きなデータはXComではなくファイルで渡す

🔄 4. データ変換タスク(Transform)

4-1. タスク4:データ統合と変換

# ===== データ統合・変換タスク ===== def transform_data(**context): “””3つのデータソースを統合して変換””” execution_date = context[‘ds’] ti = context[‘ti’] # 抽出したファイルのパスを取得 orders_file = ti.xcom_pull(task_ids=’extract_orders’, key=’orders_file’) products_file = ti.xcom_pull(task_ids=’extract_products’, key=’products_file’) customers_file = ti.xcom_pull(task_ids=’extract_customers’, key=’customers_file’) # CSVファイルを読み込み orders_df = pd.read_csv(orders_file) products_df = pd.read_csv(products_file) customers_df = pd.read_csv(customers_file) print(f”注文: {len(orders_df)}件, 商品: {len(products_df)}件, 顧客: {len(customers_df)}件”) # データ結合 # 1. 注文 + 商品情報 merged_df = orders_df.merge( products_df, on=’product_id’, how=’left’ ) # 2. 顧客情報を追加 merged_df = merged_df.merge( customers_df, on=’customer_id’, how=’left’ ) # データ変換 # 日付型に変換 merged_df[‘order_date’] = pd.to_datetime(merged_df[‘order_date’]) # 売上金額を計算(数量 × 単価) merged_df[‘total_amount’] = merged_df[‘quantity’] * merged_df[‘price’] # カラムを整理 final_df = merged_df[[ ‘order_id’, ‘order_date’, ‘customer_id’, ‘customer_name’, ‘region’, ‘product_id’, ‘product_name’, ‘category’, ‘quantity’, ‘price’, ‘total_amount’ ]] # データ品質チェック null_count = final_df.isnull().sum().sum() if null_count > 0: print(f”警告: NULL値が{null_count}件含まれています”) # 変換後のデータを保存 output_path = f’/tmp/transformed_{execution_date}.csv’ final_df.to_csv(output_path, index=False) print(f”変換完了: {len(final_df)}件”) context[‘ti’].xcom_push(key=’transformed_file’, value=output_path) context[‘ti’].xcom_push(key=’transformed_count’, value=len(final_df)) return output_path
【実行ログ】 [2024-01-15 06:00:15] 注文: 1,523件, 商品: 850件, 顧客: 12,450件 [2024-01-15 06:00:16] データ結合中… [2024-01-15 06:00:17] 変換完了: 1,523件 [2024-01-15 06:00:18] ファイル保存: /tmp/transformed_2024-01-15.csv

4-2. タスク5:集計レポートの作成

# ===== 集計レポート作成タスク ===== def create_summary(**context): “””日次集計レポートを作成””” execution_date = context[‘ds’] ti = context[‘ti’] # 変換済みデータを読み込み transformed_file = ti.xcom_pull(task_ids=’transform_data’, key=’transformed_file’) df = pd.read_csv(transformed_file) # 集計1: カテゴリ別売上 category_summary = df.groupby(‘category’).agg({ ‘total_amount’: ‘sum’, ‘order_id’: ‘count’, ‘quantity’: ‘sum’ }).reset_index() category_summary.columns = [‘category’, ‘total_sales’, ‘order_count’, ‘total_quantity’] category_summary = category_summary.sort_values(‘total_sales’, ascending=False) # 集計2: 地域別売上 region_summary = df.groupby(‘region’).agg({ ‘total_amount’: ‘sum’, ‘order_id’: ‘count’, ‘customer_id’: ‘nunique’ }).reset_index() region_summary.columns = [‘region’, ‘total_sales’, ‘order_count’, ‘unique_customers’] region_summary = region_summary.sort_values(‘total_sales’, ascending=False) # 集計3: 売れ筋商品TOP10 product_summary = df.groupby([‘product_id’, ‘product_name’]).agg({ ‘total_amount’: ‘sum’, ‘quantity’: ‘sum’ }).reset_index() product_summary = product_summary.sort_values(‘total_amount’, ascending=False).head(10) # 全体サマリー overall_summary = { ‘date’: execution_date, ‘total_sales’: df[‘total_amount’].sum(), ‘total_orders’: len(df), ‘total_quantity’: df[‘quantity’].sum(), ‘avg_order_value’: df[‘total_amount’].mean(), ‘unique_customers’: df[‘customer_id’].nunique() } # 結果を保存 category_summary.to_csv(f’/tmp/category_summary_{execution_date}.csv’, index=False) region_summary.to_csv(f’/tmp/region_summary_{execution_date}.csv’, index=False) product_summary.to_csv(f’/tmp/product_summary_{execution_date}.csv’, index=False) # XComに保存 context[‘ti’].xcom_push(key=’overall_summary’, value=overall_summary) context[‘ti’].xcom_push(key=’category_summary_file’, value=f’/tmp/category_summary_{execution_date}.csv’) context[‘ti’].xcom_push(key=’region_summary_file’, value=f’/tmp/region_summary_{execution_date}.csv’) context[‘ti’].xcom_push(key=’product_summary_file’, value=f’/tmp/product_summary_{execution_date}.csv’) print(“集計完了:”) print(f” 総売上: ¥{overall_summary[‘total_sales’]:,.0f}”) print(f” 注文数: {overall_summary[‘total_orders’]:,}件”) print(f” 顧客数: {overall_summary[‘unique_customers’]:,}人”) return overall_summary
【集計結果】 集計完了: 総売上: ¥15,234,500 注文数: 1,523件 顧客数: 1,245人

💾 5. データロードタスク(Load)

5-1. タスク6:DWHへのロード

# ===== DWHロードタスク ===== def load_to_dwh(**context): “””データウェアハウスにデータをロード””” execution_date = context[‘ds’] ti = context[‘ti’] # 変換済みデータを取得 transformed_file = ti.xcom_pull(task_ids=’transform_data’, key=’transformed_file’) df = pd.read_csv(transformed_file) # DWHに接続 hook = PostgresHook(postgres_conn_id=’dwh_db’) engine = hook.get_sqlalchemy_engine() # べき等性を確保するため、既存データを削除 with engine.connect() as conn: delete_query = f”DELETE FROM daily_sales WHERE DATE(order_date) = ‘{execution_date}'” conn.execute(delete_query) conn.commit() # 新しいデータを挿入 df.to_sql( ‘daily_sales’, engine, if_exists=’append’, index=False, method=’multi’, chunksize=1000 ) print(f”DWHにロード完了: {len(df)}件”) # 検証: 挿入されたデータ件数を確認 verify_query = f”SELECT COUNT(*) FROM daily_sales WHERE DATE(order_date) = ‘{execution_date}'” with engine.connect() as conn: result = conn.execute(verify_query).fetchone() loaded_count = result[0] if loaded_count != len(df): raise ValueError(f”ロード件数が一致しません: 期待{len(df)}件 vs 実際{loaded_count}件”) print(f”検証OK: {loaded_count}件”) return loaded_count

5-2. タスク7:サマリーテーブルへのロード

# ===== サマリーテーブルロードタスク ===== def load_summary_to_dwh(**context): “””集計結果をDWHのサマリーテーブルにロード””” execution_date = context[‘ds’] ti = context[‘ti’] # 集計データを取得 overall_summary = ti.xcom_pull(task_ids=’create_summary’, key=’overall_summary’) # DWHに接続 hook = PostgresHook(postgres_conn_id=’dwh_db’) engine = hook.get_sqlalchemy_engine() # サマリーテーブルに挿入 summary_df = pd.DataFrame([overall_summary]) # 既存データを削除(べき等性) with engine.connect() as conn: delete_query = f”DELETE FROM daily_summary WHERE date = ‘{execution_date}'” conn.execute(delete_query) conn.commit() # 挿入 summary_df.to_sql( ‘daily_summary’, engine, if_exists=’append’, index=False ) print(“サマリーテーブルへのロード完了”) return True
⚠️ べき等性の重要性

ロード処理では必ず既存データを削除してから挿入します。
これにより、何度実行しても同じ結果になります(べき等性)。

べき等性がないと、再実行時にデータが重複してしまいます!

📊 6. レポート生成とS3アップロード

6-1. タスク8:レポート生成

# ===== レポート生成タスク ===== def generate_csv_report(**context): “””経営陣向けのCSVレポートを生成””” execution_date = context[‘ds’] ti = context[‘ti’] # 集計データを取得 category_file = ti.xcom_pull(task_ids=’create_summary’, key=’category_summary_file’) region_file = ti.xcom_pull(task_ids=’create_summary’, key=’region_summary_file’) product_file = ti.xcom_pull(task_ids=’create_summary’, key=’product_summary_file’) overall_summary = ti.xcom_pull(task_ids=’create_summary’, key=’overall_summary’) # データを読み込み category_df = pd.read_csv(category_file) region_df = pd.read_csv(region_file) product_df = pd.read_csv(product_file) # 統合レポートを作成 with open(f’/tmp/daily_report_{execution_date}.txt’, ‘w’, encoding=’utf-8′) as f: f.write(“=” * 60 + “\n”) f.write(f”日次売上レポート – {execution_date}\n”) f.write(“=” * 60 + “\n\n”) f.write(“【全体サマリー】\n”) f.write(f”総売上: ¥{overall_summary[‘total_sales’]:,.0f}\n”) f.write(f”注文数: {overall_summary[‘total_orders’]:,}件\n”) f.write(f”販売数量: {overall_summary[‘total_quantity’]:,}個\n”) f.write(f”平均注文額: ¥{overall_summary[‘avg_order_value’]:,.0f}\n”) f.write(f”利用顧客数: {overall_summary[‘unique_customers’]:,}人\n\n”) f.write(“【カテゴリ別売上】\n”) f.write(category_df.to_string(index=False)) f.write(“\n\n”) f.write(“【地域別売上】\n”) f.write(region_df.to_string(index=False)) f.write(“\n\n”) f.write(“【売れ筋商品TOP10】\n”) f.write(product_df.to_string(index=False)) f.write(“\n”) # レポートファイルのパスを保存 context[‘ti’].xcom_push(key=’report_files’, value=[ f’/tmp/daily_report_{execution_date}.txt’, f’/tmp/category_summary_{execution_date}.csv’, f’/tmp/region_summary_{execution_date}.csv’, f’/tmp/product_summary_{execution_date}.csv’ ]) print(“レポート生成完了”) return True

6-2. タスク9:S3にアップロード

# ===== S3アップロードタスク ===== import boto3 from botocore.exceptions import ClientError def upload_to_s3(**context): “””レポートファイルをS3にアップロード””” execution_date = context[‘ds’] ti = context[‘ti’] # レポートファイルのリストを取得 report_files = ti.xcom_pull(task_ids=’generate_csv_report’, key=’report_files’) # S3クライアントを作成 s3_client = boto3.client(‘s3’) bucket_name = ‘my-company-reports’ uploaded_files = [] for local_file in report_files: # S3のキー(パス)を決定 file_name = local_file.split(‘/’)[-1] s3_key = f’daily_reports/{execution_date}/{file_name}’ try: # アップロード s3_client.upload_file(local_file, bucket_name, s3_key) print(f”アップロード成功: {s3_key}”) uploaded_files.append(s3_key) except ClientError as e: print(f”アップロード失敗: {local_file}”) raise # アップロード結果を保存 context[‘ti’].xcom_push(key=’s3_files’, value=uploaded_files) print(f”S3アップロード完了: {len(uploaded_files)}ファイル”) return uploaded_files

📢 7. 通知タスク

7-1. タスク10:Slack通知

# ===== Slack通知タスク ===== from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook def create_slack_message(**context): “””Slack通知用のメッセージを作成””” execution_date = context[‘ds’] ti = context[‘ti’] # 集計結果を取得 overall_summary = ti.xcom_pull(task_ids=’create_summary’, key=’overall_summary’) orders_count = ti.xcom_pull(task_ids=’extract_orders’, key=’orders_count’) transformed_count = ti.xcom_pull(task_ids=’transform_data’, key=’transformed_count’) s3_files = ti.xcom_pull(task_ids=’upload_to_s3′, key=’s3_files’) # メッセージを作成 message = f””” :chart_with_upwards_trend: *日次売上パイプライン完了* *実行日:* {execution_date} *処理結果:* • 抽出件数: {orders_count:,}件 • 変換件数: {transformed_count:,}件 • DWHロード: 完了 *売上サマリー:* • 総売上: ¥{overall_summary[‘total_sales’]:,.0f} • 注文数: {overall_summary[‘total_orders’]:,}件 • 顧客数: {overall_summary[‘unique_customers’]:,}人 • 平均注文額: ¥{overall_summary[‘avg_order_value’]:,.0f} *レポート:* • S3にアップロード済み ({len(s3_files)}ファイル) “”” # Slackに送信 hook = SlackWebhookHook(slack_webhook_conn_id=’slack_webhook’) hook.send_text(message) return message

7-2. エラー通知

# ===== エラー通知用コールバック ===== def send_error_notification(context): “””エラー発生時の通知””” task_instance = context[‘task_instance’] exception = context.get(‘exception’) execution_date = context[‘ds’] error_message = f””” :x: *パイプラインでエラーが発生しました* *実行日:* {execution_date} *DAG:* `{task_instance.dag_id}` *失敗タスク:* `{task_instance.task_id}` *エラー内容:* {exception} <!channel> 確認と対応をお願いします。 “”” hook = SlackWebhookHook(slack_webhook_conn_id=’slack_webhook’) hook.send_text(error_message)

🔗 8. DAG全体の構築

8-1. 完全なDAGコード

# ===== 完全なDAGファイル: sales_daily_pipeline.py ===== from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta # デフォルト設定 default_args = { ‘owner’: ‘data_team’, ‘depends_on_past’: False, ‘email’: [‘data-team@company.com’], ‘email_on_failure’: True, ‘email_on_retry’: False, ‘retries’: 2, ‘retry_delay’: timedelta(minutes=5), ‘execution_timeout’: timedelta(minutes=30), ‘on_failure_callback’: send_error_notification } # DAGの定義 with DAG( dag_id=’sales_daily_pipeline’, default_args=default_args, description=’売上データの日次集計パイプライン’, schedule_interval=’0 6 * * *’, # 毎日朝6時 start_date=datetime(2024, 1, 1), catchup=False, tags=[‘production’, ‘etl’, ‘sales’] ) as dag: # ===== Extract Tasks ===== extract_orders_task = PythonOperator( task_id=’extract_orders’, python_callable=extract_orders ) extract_products_task = PythonOperator( task_id=’extract_products’, python_callable=extract_products ) extract_customers_task = PythonOperator( task_id=’extract_customers’, python_callable=extract_customers ) # ===== Transform Tasks ===== transform_task = PythonOperator( task_id=’transform_data’, python_callable=transform_data ) create_summary_task = PythonOperator( task_id=’create_summary’, python_callable=create_summary ) # ===== Load Tasks ===== load_dwh_task = PythonOperator( task_id=’load_to_dwh’, python_callable=load_to_dwh ) load_summary_task = PythonOperator( task_id=’load_summary_to_dwh’, python_callable=load_summary_to_dwh ) # ===== Report Tasks ===== generate_report_task = PythonOperator( task_id=’generate_csv_report’, python_callable=generate_csv_report ) upload_s3_task = PythonOperator( task_id=’upload_to_s3′, python_callable=upload_to_s3 ) # ===== Notification Tasks ===== notify_task = PythonOperator( task_id=’notify_slack’, python_callable=create_slack_message ) # ===== タスク依存関係の定義 ===== # Extract (並列実行) [extract_orders_task, extract_products_task, extract_customers_task] >> transform_task # Transform → Load transform_task >> create_summary_task # Load (並列実行) create_summary_task >> [load_dwh_task, load_summary_task] # Report create_summary_task >> generate_report_task >> upload_s3_task # Notification (全タスク完了後) [load_dwh_task, load_summary_task, upload_s3_task] >> notify_task
タスク依存関係図
┌────────────────┐  ┌─────────────────┐  ┌───────────────────┐
│extract_orders  │  │extract_products │  │extract_customers  │
└───────┬────────┘  └────────┬────────┘  └─────────┬─────────┘
        │                    │                     │
        └────────────────────┼─────────────────────┘
                             │
                             ▼
                    ┌────────────────┐
                    │transform_data  │
                    └───────┬────────┘
                            │
                            ▼
                   ┌─────────────────┐
                   │create_summary   │
                   └───────┬─────────┘
                           │
          ┌────────────────┼────────────────┐
          │                │                │
          ▼                ▼                ▼
   ┌────────────┐  ┌─────────────┐  ┌─────────────────┐
   │load_to_dwh │  │load_summary │  │generate_report  │
   └──────┬─────┘  └──────┬──────┘  └────────┬────────┘
          │               │                  │
          │               │                  ▼
          │               │         ┌────────────────┐
          │               │         │upload_to_s3    │
          │               │         └────────┬───────┘
          │               │                  │
          └───────────────┼──────────────────┘
                          │
                          ▼
                 ┌────────────────┐
                 │notify_slack    │
                 └────────────────┘
        
🎯 タスク依存関係のポイント
  • Extract:3つのタスクを並列実行(時間短縮)
  • Transform:Extractが全て完了してから実行
  • Load:DWHとサマリーを並列でロード
  • Report:集計完了後にレポート生成
  • Notification:全タスク完了後に通知

🧪 9. テストと検証

9-1. DAG検証コマンド

# DAGファイルの構文チェック python /path/to/sales_daily_pipeline.py # DAGが正しく認識されるか確認 airflow dags list # DAGの構造を確認 airflow dags show sales_daily_pipeline # 特定の日付でテスト実行 airflow dags test sales_daily_pipeline 2024-01-15

9-2. データ品質チェック

# ===== データ検証タスク ===== def validate_pipeline_output(**context): “””パイプライン出力を検証””” execution_date = context[‘ds’] # DWHに接続 hook = PostgresHook(postgres_conn_id=’dwh_db’) engine = hook.get_sqlalchemy_engine() # 検証1: データ件数 count_query = f”SELECT COUNT(*) FROM daily_sales WHERE DATE(order_date) = ‘{execution_date}'” with engine.connect() as conn: count = conn.execute(count_query).fetchone()[0] if count == 0: raise ValueError(“ロードされたデータが0件です”) # 検証2: NULL値チェック null_query = f””” SELECT SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) as null_customers, SUM(CASE WHEN product_id IS NULL THEN 1 ELSE 0 END) as null_products FROM daily_sales WHERE DATE(order_date) = ‘{execution_date}’ “”” with engine.connect() as conn: result = conn.execute(null_query).fetchone() if any(result): raise ValueError(f”NULL値が検出されました: {result}”) print(“✅ データ検証完了: すべて正常”) return True

📝 STEP 25 のまとめ

✅ このステップで学んだこと
  • 複数データソースからの並列データ抽出
  • Pandasによるデータ統合と変換
  • 集計処理と多様なレポート作成
  • DWHへのべき等なデータロード
  • S3へのファイルアップロード
  • Slack通知とエラーハンドリング
  • 完全なETLパイプラインの構築
💡 本番運用のチェックリスト
  • べき等性:何度実行しても同じ結果
  • 並列処理:Extractタスクを並列化して高速化
  • データ検証:各ステップで品質チェック
  • エラーハンドリング:リトライと通知
  • 監視:Slackでリアルタイム通知
  • ログ:詳細なログ出力でトラブルシューティング
🎯 次のステップの予告

次のSTEP 26では、「最終プロジェクト」として、さらに複雑な多ソース統合パイプラインに挑戦します!

  • API、データベース、CSVファイルなど様々なデータソース
  • データ品質チェックの実装
  • 複雑なワークフローの設計
  • 完全な本番運用パイプライン
📝

学習メモ

ETL・データパイプライン構築 - Step 25

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE