STEP 30:プロジェクト② ETLパイプライン構築

🔄 STEP 30: プロジェクト② ETLパイプライン構築

マルチソースからのデータ統合 – 本番レベルのETLパイプラインを構築しよう

📋 このプロジェクトで学ぶこと

  • ETL(Extract/Transform/Load)の本格的な実装
  • 複数データソースからのデータ抽出
  • データクレンジングと品質チェック
  • 複雑なデータ変換・JOIN・集計
  • パーティション最適化とParquet書き込み
  • エラーハンドリングとログ出力
  • EMR/Dataprocでの実行

前提条件: STEP 1-29の全知識

🎯 1. プロジェクト概要 – ETLとは何か

💡 ETLパイプラインとは?

ETLは「Extract(抽出)→ Transform(変換)→ Load(格納)」の略で、データを収集・加工・保存する一連の処理フローです。

  • Extract:さまざまなソースからデータを取り出す
  • Transform:データをクリーニング・結合・集計して分析可能な形に変換する
  • Load:処理済みデータを分析用データベースやデータウェアハウスに保存する

現代のデータエンジニアリングにおいて、ETLパイプライン構築は最も重要なスキルの1つです。

📊 あなたのミッション

あなたはデータエンジニアです。
複数のデータソース(売上、顧客、商品、在庫)を統合し、分析用のデータマートを構築してください:

  1. S3/GCSから複数のデータソースを読み込む
  2. データクレンジングと品質チェック
  3. データを結合・集計して分析用テーブル作成
  4. 最適化されたParquet形式で保存
  5. エラーハンドリングと詳細ログ出力

1.1 データソースの構成

データソース 形式 レコード数 サイズ
売上データ CSV 1000万件 5 GB
顧客データ JSON 100万件 500 MB
商品データ Parquet 10万件 50 MB
在庫データ CSV 10万件 10 MB

📥 2. プロジェクトのセットアップ

💡 ロギングの重要性

本番環境では「いつ」「何が」「どうなったか」を追跡できることが非常に重要です。Pythonのloggingモジュールを使うと、処理の進捗やエラー情報を体系的に記録できます。

2.1 必要なライブラリのインポートとロギング設定

# etl_pipeline_project.py
# ===================================
# 必要なライブラリのインポート
# ===================================

# SparkSession: Spark処理の入り口
from pyspark.sql import SparkSession

# functions: データ変換で使う関数群(col, sum, avg等)
from pyspark.sql.functions import *

# types: スキーマ定義で使う型(String, Integer等)
from pyspark.sql.types import *

# sys: コマンドライン引数の処理用
import sys

# logging: 処理状況をログに記録するため
import logging

# time: 処理時間の計測用
import time

# ===================================
# ロギング設定
# ===================================
# format: ログの出力形式(時刻 - レベル - メッセージ)
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
# このスクリプト専用のロガーを作成
logger = logging.getLogger(__name__)

📝 ポイント:loggingの使い分け

  • logger.info():通常の処理状況を記録
  • logger.warning():注意が必要な状況を記録
  • logger.error():エラー発生時に記録

2.2 SparkSession作成関数

💡 関数化する理由

SparkSessionの作成を関数化することで、設定の変更が容易になり、コードの再利用性も高まります。また、テスト時に異なる設定で実行することも簡単になります。

# ===================================
# SparkSession作成関数
# ===================================
def create_spark_session(app_name="ETL-Pipeline"):
    """
    Spark Sessionを作成する関数
    
    引数:
        app_name: アプリケーション名(Spark UIで表示される)
    
    戻り値:
        設定済みのSparkSessionオブジェクト
    """
    return SparkSession.builder \
        .appName(app_name) \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .getOrCreate()

# 【設定の解説】
# spark.sql.adaptive.enabled: 
#   → Sparkが実行時にクエリプランを動的に最適化
#   → データの偏りに応じてパーティション調整
# 
# spark.sql.adaptive.coalescePartitions.enabled:
#   → 小さすぎるパーティションを自動で結合
#   → シャッフル後のパフォーマンス向上

📥 3. データ抽出(Extract)

💡 スキーマを明示的に定義する理由

スキーマを明示的に定義することで以下のメリットがあります:

  • パフォーマンス向上:推論処理が不要になり読み込みが高速化
  • 型の厳密化:想定外のデータ型を早期に検出
  • NULL処理の制御:各カラムでNULLを許容するか指定可能

3.1 データ抽出関数

# ===================================
# データ抽出関数
# ===================================
def extract_data(spark, base_path, date):
    """
    複数のデータソースからデータを抽出する
    
    引数:
        spark: SparkSessionオブジェクト
        base_path: データが格納されているベースパス(gs://bucket/path)
        date: 処理対象日付(例: 2024-11-20)
    
    戻り値:
        4つのDataFrame(売上、顧客、商品、在庫)
    """
    logger.info("=== データ抽出開始 ===")
    
    try:
        # ----------------------------------------
        # 1. 売上データ(CSV形式)の読み込み
        # ----------------------------------------
        logger.info("売上データ読み込み中...")
        
        # スキーマを明示的に定義(型推論よりも高速)
        sales_schema = StructType([
            StructField("sale_id", StringType(), True),       # 売上ID
            StructField("sale_date", DateType(), True),       # 売上日
            StructField("customer_id", StringType(), True),   # 顧客ID
            StructField("product_id", StringType(), True),    # 商品ID
            StructField("quantity", IntegerType(), True),     # 数量
            StructField("unit_price", DoubleType(), True),    # 単価
            StructField("total_amount", DoubleType(), True)   # 合計金額
        ])
        
        # CSVファイルを読み込み
        # ワイルドカード(*.csv)で複数ファイルを一度に読み込み
        sales = spark.read \
            .schema(sales_schema) \
            .csv(f"{base_path}/sales/date={date}/*.csv", header=True)
        
        logger.info(f"売上データ: {sales.count():,}件")
        
        # ----------------------------------------
        # 2. 顧客データ(JSON形式)の読み込み
        # ----------------------------------------
        logger.info("顧客データ読み込み中...")
        
        # JSONはスキーマ自動推論が高速なのでそのまま読み込み
        customers = spark.read \
            .json(f"{base_path}/customers/*.json")
        
        logger.info(f"顧客データ: {customers.count():,}件")
        
        # ----------------------------------------
        # 3. 商品データ(Parquet形式)の読み込み
        # ----------------------------------------
        logger.info("商品データ読み込み中...")
        
        # Parquetはスキーマ情報を持っているので指定不要
        products = spark.read \
            .parquet(f"{base_path}/products/")
        
        logger.info(f"商品データ: {products.count():,}件")
        
        # ----------------------------------------
        # 4. 在庫データ(CSV形式)の読み込み
        # ----------------------------------------
        logger.info("在庫データ読み込み中...")
        
        # 小さいデータなので型推論を使用
        inventory = spark.read \
            .csv(f"{base_path}/inventory/", header=True, inferSchema=True)
        
        logger.info(f"在庫データ: {inventory.count():,}件")
        
        logger.info("=== データ抽出完了 ===\n")
        return sales, customers, products, inventory
        
    except Exception as e:
        # エラー発生時はログに記録して再スロー
        logger.error(f"データ抽出エラー: {str(e)}")
        raise

⚠️ 注意:try-exceptの重要性

本番環境では、ファイルが存在しない、形式が不正など、様々なエラーが発生します。try-exceptで適切にエラーをキャッチし、ログに記録することで、問題の原因特定が容易になります。

🧹 4. データ品質チェック

💡 なぜ品質チェックが必要か?

「Garbage In, Garbage Out」という格言があるように、入力データの品質が低いと、どんなに優れた分析も意味を持ちません。データ品質チェックは、問題を早期に検出し、信頼性の高い分析結果を保証するために不可欠です。

4.1 データ品質チェック関数

# ===================================
# データ品質チェック関数
# ===================================
def validate_data_quality(df, df_name):
    """
    DataFrameのデータ品質をチェックする
    
    引数:
        df: チェック対象のDataFrame
        df_name: データ名(ログ出力用)
    
    戻り値:
        True(チェック通過)/ エラー発生時は例外をスロー
    """
    logger.info(f"=== {df_name} データ品質チェック ===")
    
    # ----------------------------------------
    # 1. レコード数のチェック
    # ----------------------------------------
    count = df.count()
    logger.info(f"レコード数: {count:,}件")
    
    # 0件の場合はエラー
    if count == 0:
        raise ValueError(f"{df_name}にデータがありません")
    
    # ----------------------------------------
    # 2. NULL値のチェック
    # ----------------------------------------
    # 各カラムのNULL数を集計
    null_counts = df.select([
        sum(col(c).isNull().cast('int')).alias(c)
        for c in df.columns
    ]).collect()[0].asDict()
    
    # 結果を表示
    for col_name, null_count in null_counts.items():
        if null_count > 0:
            null_rate = null_count / count
            logger.warning(f"{col_name}: {null_count:,}個のNULL ({null_rate:.2%})")
            
            # NULL率10%以上は深刻な問題
            if null_rate > 0.1:
                logger.error(f"{col_name}のNULL率が高すぎます")
    
    # ----------------------------------------
    # 3. 重複チェック
    # ----------------------------------------
    # IDカラムがあれば重複をチェック
    if 'id' in df.columns or df.columns[0].endswith('_id'):
        id_col = 'id' if 'id' in df.columns else df.columns[0]
        duplicate_count = count - df.select(id_col).distinct().count()
        
        if duplicate_count > 0:
            logger.warning(f"重複レコード: {duplicate_count:,}件")
    
    logger.info(f"=== {df_name} チェック完了 ===\n")
    return True

🧹 5. データクレンジング(Transform – Part 1)

💡 データクレンジングとは?

データクレンジングは、分析に適した形式にデータを整える作業です。具体的には以下の処理を行います:

  • NULL値の処理(削除または補完)
  • 不正な値の除外(負の数量など)
  • データの整合性チェック
  • フォーマットの統一

5.1 データクレンジング関数

# ===================================
# データクレンジング関数
# ===================================
def clean_data(sales, customers, products, inventory):
    """
    各データソースをクレンジングする
    
    引数:
        sales: 売上データ
        customers: 顧客データ
        products: 商品データ
        inventory: 在庫データ
    
    戻り値:
        クレンジング済みの4つのDataFrame
    """
    logger.info("=== データクレンジング開始 ===")
    
    # ----------------------------------------
    # 1. 売上データのクレンジング
    # ----------------------------------------
    logger.info("売上データクレンジング中...")
    
    sales_clean = sales \
        .dropna(subset=['sale_id', 'customer_id', 'product_id']) \
        .filter(col('quantity') > 0) \
        .filter(col('unit_price') > 0) \
        .filter(col('total_amount') > 0) \
        .withColumn('total_amount_calc', col('quantity') * col('unit_price')) \
        .filter(abs(col('total_amount') - col('total_amount_calc')) < 0.01) \
        .drop('total_amount_calc')
    
    # 【処理の解説】
    # dropna: 必須フィールドがNULLのレコードを除外
    # filter(quantity > 0): 数量が0以下のレコードを除外
    # filter(unit_price > 0): 単価が0以下のレコードを除外
    # withColumn: 数量×単価で合計金額を再計算
    # filter(abs(...)): 計算値と記録値の差が0.01以上のレコードを除外
    #   → データの整合性チェック
    
    logger.info(f"クレンジング後: {sales_clean.count():,}件")
    
    # ----------------------------------------
    # 2. 顧客データのクレンジング
    # ----------------------------------------
    logger.info("顧客データクレンジング中...")
    
    customers_clean = customers \
        .dropna(subset=['customer_id']) \
        .withColumn('email_lower', lower(col('email'))) \
        .withColumn('phone_clean', regexp_replace('phone', '[^0-9]', ''))
    
    # 【処理の解説】
    # lower(email): メールアドレスを小文字に統一
    # regexp_replace: 電話番号から数字以外を除去
    
    # ----------------------------------------
    # 3. 商品データのクレンジング
    # ----------------------------------------
    logger.info("商品データクレンジング中...")
    
    products_clean = products \
        .dropna(subset=['product_id']) \
        .filter(col('price') > 0)
    
    # ----------------------------------------
    # 4. 在庫データのクレンジング
    # ----------------------------------------
    logger.info("在庫データクレンジング中...")
    
    inventory_clean = inventory \
        .dropna(subset=['product_id']) \
        .filter(col('stock_quantity') >= 0)
    
    logger.info("=== データクレンジング完了 ===\n")
    return sales_clean, customers_clean, products_clean, inventory_clean

🔗 6. データ変換・JOIN・集計(Transform – Part 2)

💡 JOINの戦略

複数のテーブルを結合する際は、以下の点を考慮します:

  • 結合順序:小さいテーブルから結合するとパフォーマンスが向上
  • 結合タイプ:left joinはNULLが発生する可能性がある
  • キーの選択:一意性のあるキーで結合する

6.1 データ変換・結合関数

# ===================================
# データ変換・結合関数
# ===================================
def transform_data(sales, customers, products, inventory):
    """
    データを結合し、分析用のカラムを追加する
    
    引数:
        sales: クレンジング済み売上データ
        customers: クレンジング済み顧客データ
        products: クレンジング済み商品データ
        inventory: クレンジング済み在庫データ
    
    戻り値:
        結合・変換済みのDataFrame
    """
    logger.info("=== データ変換開始 ===")
    
    # ----------------------------------------
    # 1. 売上 × 顧客 を結合
    # ----------------------------------------
    logger.info("売上×顧客 結合中...")
    
    # left join: 売上データを基準に顧客情報を追加
    # 顧客情報がない売上も保持される
    sales_with_customer = sales.join(
        customers,
        on='customer_id',
        how='left'
    )
    
    # ----------------------------------------
    # 2. 結果 × 商品 を結合
    # ----------------------------------------
    logger.info("売上×商品 結合中...")
    
    sales_enriched = sales_with_customer.join(
        products,
        on='product_id',
        how='left'
    )
    
    # ----------------------------------------
    # 3. 結果 × 在庫 を結合
    # ----------------------------------------
    logger.info("売上×在庫 結合中...")
    
    sales_complete = sales_enriched.join(
        inventory,
        on='product_id',
        how='left'
    )
    
    # ----------------------------------------
    # 4. 分析用カラムの追加
    # ----------------------------------------
    logger.info("カラム生成中...")
    
    sales_final = sales_complete \
        .withColumn('year', year('sale_date')) \
        .withColumn('month', month('sale_date')) \
        .withColumn('day', dayofmonth('sale_date')) \
        .withColumn('revenue', col('quantity') * col('unit_price')) \
        .withColumn('profit',
            col('revenue') - (col('quantity') * col('cost_price'))
        )
    
    # 【追加カラムの解説】
    # year, month, day: 日付から抽出(パーティション用)
    # revenue: 売上金額(数量 × 単価)
    # profit: 利益(売上 - 原価)
    
    logger.info(f"変換後: {sales_final.count():,}件")
    logger.info("=== データ変換完了 ===\n")
    
    return sales_final

6.2 集計関数

# ===================================
# データ集計関数
# ===================================
def aggregate_data(sales_final):
    """
    各種サマリーテーブルを作成する
    
    引数:
        sales_final: 変換済みの売上データ
    
    戻り値:
        3つのサマリーDataFrame(日次、商品別、顧客別)
    """
    logger.info("=== データ集計開始 ===")
    
    # ----------------------------------------
    # 1. 日次サマリー
    # ----------------------------------------
    logger.info("日次サマリー作成中...")
    
    daily_summary = sales_final \
        .groupBy('year', 'month', 'day', 'category', 'region') \
        .agg(
            count('*').alias('order_count'),
            sum('revenue').alias('total_revenue'),
            sum('profit').alias('total_profit'),
            avg('revenue').alias('avg_order_value'),
            countDistinct('customer_id').alias('unique_customers'),
            sum('quantity').alias('total_quantity')
        )
    
    # 【集計内容】
    # order_count: 注文数
    # total_revenue: 売上合計
    # total_profit: 利益合計
    # avg_order_value: 平均注文額
    # unique_customers: ユニーク顧客数
    # total_quantity: 販売数量合計
    
    # ----------------------------------------
    # 2. 商品別サマリー
    # ----------------------------------------
    logger.info("商品別サマリー作成中...")
    
    product_summary = sales_final \
        .groupBy('product_id', 'product_name', 'category') \
        .agg(
            count('*').alias('sales_count'),
            sum('quantity').alias('total_quantity'),
            sum('revenue').alias('total_revenue'),
            avg('unit_price').alias('avg_price')
        ) \
        .orderBy(desc('total_revenue'))
    
    # ----------------------------------------
    # 3. 顧客別サマリー
    # ----------------------------------------
    logger.info("顧客別サマリー作成中...")
    
    customer_summary = sales_final \
        .groupBy('customer_id', 'customer_name', 'region') \
        .agg(
            count('*').alias('purchase_count'),
            sum('revenue').alias('total_spent'),
            avg('revenue').alias('avg_order_value'),
            max('sale_date').alias('last_purchase_date')
        )
    
    logger.info("=== データ集計完了 ===\n")
    return daily_summary, product_summary, customer_summary

💾 7. データ格納(Load)

💡 Parquet形式と圧縮の選択

Parquetは列指向フォーマットで、分析クエリに最適です:

  • 特定の列だけ読み込めるので高速
  • 高い圧縮率でストレージコスト削減
  • スキーマ情報を保持

Snappy圧縮は、圧縮率と速度のバランスが良く、Sparkでのデフォルト推奨です。

7.1 データ保存関数

# ===================================
# データ保存関数
# ===================================
def load_data(sales_final, daily_summary, product_summary, 
              customer_summary, output_path):
    """
    処理済みデータをParquet形式で保存する
    
    引数:
        sales_final: 詳細データ
        daily_summary: 日次サマリー
        product_summary: 商品別サマリー
        customer_summary: 顧客別サマリー
        output_path: 出力先パス
    """
    logger.info("=== データ保存開始 ===")
    
    try:
        # ----------------------------------------
        # 1. 詳細データ(パーティション分割)
        # ----------------------------------------
        logger.info("詳細データ保存中...")
        
        sales_final \
            .repartition(100, 'year', 'month') \
            .write \
            .mode('overwrite') \
            .partitionBy('year', 'month') \
            .option('compression', 'snappy') \
            .parquet(f"{output_path}/sales_detail/")
        
        # 【処理の解説】
        # repartition(100, 'year', 'month'):
        #   → 年・月でパーティション、各100ファイル程度に分割
        # mode('overwrite'):
        #   → 既存データを上書き
        # partitionBy('year', 'month'):
        #   → year=2024/month=11/ のようなディレクトリ構造で保存
        # option('compression', 'snappy'):
        #   → Snappy圧縮を使用(高速・まずまずの圧縮率)
        
        logger.info(f"保存完了: {output_path}/sales_detail/")
        
        # ----------------------------------------
        # 2. 日次サマリー
        # ----------------------------------------
        logger.info("日次サマリー保存中...")
        
        daily_summary \
            .write \
            .mode('overwrite') \
            .partitionBy('year', 'month') \
            .parquet(f"{output_path}/daily_summary/")
        
        # ----------------------------------------
        # 3. 商品別サマリー
        # ----------------------------------------
        logger.info("商品別サマリー保存中...")
        
        product_summary \
            .write \
            .mode('overwrite') \
            .parquet(f"{output_path}/product_summary/")
        
        # ----------------------------------------
        # 4. 顧客別サマリー
        # ----------------------------------------
        logger.info("顧客別サマリー保存中...")
        
        customer_summary \
            .write \
            .mode('overwrite') \
            .partitionBy('region') \
            .parquet(f"{output_path}/customer_summary/")
        
        logger.info("=== データ保存完了 ===\n")
        
    except Exception as e:
        logger.error(f"データ保存エラー: {str(e)}")
        raise

7.2 メタデータ保存関数

💡 メタデータの重要性

メタデータは「データについてのデータ」です。処理の実行日時、所要時間、成否などを記録することで、後からの監査やトラブルシューティングに役立ちます。

# ===================================
# メタデータ保存関数
# ===================================
def save_metadata(spark, output_path, start_time, end_time):
    """
    処理のメタデータを保存する
    
    引数:
        spark: SparkSessionオブジェクト
        output_path: 出力先パス
        start_time: 処理開始時刻(エポック秒)
        end_time: 処理終了時刻(エポック秒)
    """
    from datetime import datetime
    
    # メタデータの作成
    metadata = {
        "pipeline_name": "ETL-Pipeline",
        "execution_date": datetime.now().isoformat(),
        "start_time": start_time,
        "end_time": end_time,
        "duration_seconds": end_time - start_time,
        "output_path": output_path,
        "status": "SUCCESS"
    }
    
    # DataFrameに変換して保存
    metadata_df = spark.createDataFrame([metadata])
    metadata_df.write \
        .mode('overwrite') \
        .json(f"{output_path}/metadata/")

⚡ 8. メイン処理とエラーハンドリング

8.1 メイン関数

# ===================================
# メイン処理
# ===================================
def main():
    """
    ETLパイプラインのメイン処理
    
    使い方:
        spark-submit etl_pipeline.py <input_path> <output_path> <date>
    
    例:
        spark-submit etl_pipeline.py gs://bucket/input gs://bucket/output 2024-11-20
    """
    # ----------------------------------------
    # コマンドライン引数のチェック
    # ----------------------------------------
    if len(sys.argv) != 4:
        logger.error("使い方: spark-submit etl.py   ")
        sys.exit(1)
    
    input_path = sys.argv[1]   # 入力パス
    output_path = sys.argv[2]  # 出力パス
    date = sys.argv[3]         # 処理日付
    
    start_time = time.time()
    
    try:
        # ----------------------------------------
        # 1. Spark Session作成
        # ----------------------------------------
        spark = create_spark_session()
        
        # ----------------------------------------
        # 2. Extract: データ抽出
        # ----------------------------------------
        sales, customers, products, inventory = extract_data(
            spark, input_path, date
        )
        
        # ----------------------------------------
        # 3. データ品質チェック
        # ----------------------------------------
        validate_data_quality(sales, "売上データ")
        validate_data_quality(customers, "顧客データ")
        validate_data_quality(products, "商品データ")
        validate_data_quality(inventory, "在庫データ")
        
        # ----------------------------------------
        # 4. Transform: クレンジング
        # ----------------------------------------
        sales_clean, customers_clean, products_clean, inventory_clean = \
            clean_data(sales, customers, products, inventory)
        
        # ----------------------------------------
        # 5. Transform: 変換・結合
        # ----------------------------------------
        sales_final = transform_data(
            sales_clean, customers_clean, products_clean, inventory_clean
        )
        
        # ----------------------------------------
        # 6. Transform: 集計
        # ----------------------------------------
        daily_summary, product_summary, customer_summary = \
            aggregate_data(sales_final)
        
        # ----------------------------------------
        # 7. Load: データ保存
        # ----------------------------------------
        load_data(sales_final, daily_summary, product_summary,
                  customer_summary, output_path)
        
        end_time = time.time()
        
        # ----------------------------------------
        # 8. メタデータ保存
        # ----------------------------------------
        save_metadata(spark, output_path, start_time, end_time)
        
        # ----------------------------------------
        # 9. 完了サマリー
        # ----------------------------------------
        logger.info("=" * 50)
        logger.info("ETLパイプライン実行完了")
        logger.info("=" * 50)
        logger.info(f"処理時間: {end_time - start_time:.2f}秒")
        logger.info(f"出力先: {output_path}")
        logger.info("=" * 50)
        
        return 0  # 成功
        
    except Exception as e:
        logger.error(f"ETLパイプライン失敗: {str(e)}")
        return 1  # 失敗
        
    finally:
        # SparkSessionを確実に終了
        if 'spark' in locals():
            spark.stop()


# スクリプト実行のエントリーポイント
if __name__ == "__main__":
    exit_code = main()
    sys.exit(exit_code)

☁️ 9. クラウド環境での実行

9.1 AWS EMRでの実行

# AWS EMRでジョブを投入
aws emr add-steps \
    --cluster-id j-XXXXXXXXXXXXX \
    --steps Type=Spark,Name="ETL-Pipeline",\
ActionOnFailure=CONTINUE,\
Args=[--deploy-mode,cluster,\
--master,yarn,\
gs://my-bucket/scripts/etl_pipeline_project.py,\
gs://my-bucket/input,\
gs://my-bucket/output,\
2024-11-20]

# 【パラメータ解説】
# --cluster-id: EMRクラスターID
# Type=Spark: Sparkジョブとして実行
# ActionOnFailure=CONTINUE: 失敗しても次のステップを実行
# --deploy-mode cluster: Driverをクラスター上で実行
# --master yarn: YARNをクラスターマネージャーとして使用

9.2 GCP Dataprocでの実行

# GCP Dataprocでジョブを投入
gcloud dataproc jobs submit pyspark \
    gs://my-bucket/scripts/etl_pipeline_project.py \
    --cluster=my-cluster \
    --region=us-central1 \
    -- \
    gs://my-bucket/input \
    gs://my-bucket/output \
    2024-11-20

# 【パラメータ解説】
# gs://...: スクリプトのGCSパス
# --cluster: Dataprocクラスター名
# --region: クラスターのリージョン
# -- 以降: スクリプトへの引数

📝 まとめ

✅ このプロジェクトで学んだこと

  • ETLパイプラインの本格的な実装
  • 複数データソースの統合
  • データ品質チェックの自動化
  • 複雑な変換・JOIN・集計
  • パーティション最適化
  • エラーハンドリングとロギング
  • クラウド環境での実行

🎯 成果物チェックリスト

  • □ 売上詳細データ(Parquet、パーティション分割)
  • □ 日次サマリー
  • □ 商品別サマリー
  • □ 顧客別サマリー
  • □ メタデータ
  • □ 実行ログ

💡 次のプロジェクトへ

お疲れ様でした!ETLパイプライン構築を完了しました。
次のSTEP 31では、機械学習向けデータ前処理に挑戦します。

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 30

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE