🔄 STEP 30: プロジェクト② ETLパイプライン構築
マルチソースからのデータ統合 – 本番レベルのETLパイプラインを構築しよう
📋 このプロジェクトで学ぶこと
- ETL(Extract/Transform/Load)の本格的な実装
- 複数データソースからのデータ抽出
- データクレンジングと品質チェック
- 複雑なデータ変換・JOIN・集計
- パーティション最適化とParquet書き込み
- エラーハンドリングとログ出力
- EMR/Dataprocでの実行
前提条件: STEP 1-29の全知識
🎯 1. プロジェクト概要 – ETLとは何か
💡 ETLパイプラインとは?
ETLは「Extract(抽出)→ Transform(変換)→ Load(格納)」の略で、データを収集・加工・保存する一連の処理フローです。
- Extract:さまざまなソースからデータを取り出す
- Transform:データをクリーニング・結合・集計して分析可能な形に変換する
- Load:処理済みデータを分析用データベースやデータウェアハウスに保存する
現代のデータエンジニアリングにおいて、ETLパイプライン構築は最も重要なスキルの1つです。
📊 あなたのミッション
あなたはデータエンジニアです。
複数のデータソース(売上、顧客、商品、在庫)を統合し、分析用のデータマートを構築してください:
- S3/GCSから複数のデータソースを読み込む
- データクレンジングと品質チェック
- データを結合・集計して分析用テーブル作成
- 最適化されたParquet形式で保存
- エラーハンドリングと詳細ログ出力
1.1 データソースの構成
| データソース | 形式 | レコード数 | サイズ |
|---|---|---|---|
| 売上データ | CSV | 1000万件 | 5 GB |
| 顧客データ | JSON | 100万件 | 500 MB |
| 商品データ | Parquet | 10万件 | 50 MB |
| 在庫データ | CSV | 10万件 | 10 MB |
📥 2. プロジェクトのセットアップ
💡 ロギングの重要性
本番環境では「いつ」「何が」「どうなったか」を追跡できることが非常に重要です。Pythonのloggingモジュールを使うと、処理の進捗やエラー情報を体系的に記録できます。
2.1 必要なライブラリのインポートとロギング設定
# etl_pipeline_project.py
# ===================================
# 必要なライブラリのインポート
# ===================================
# SparkSession: Spark処理の入り口
from pyspark.sql import SparkSession
# functions: データ変換で使う関数群(col, sum, avg等)
from pyspark.sql.functions import *
# types: スキーマ定義で使う型(String, Integer等)
from pyspark.sql.types import *
# sys: コマンドライン引数の処理用
import sys
# logging: 処理状況をログに記録するため
import logging
# time: 処理時間の計測用
import time
# ===================================
# ロギング設定
# ===================================
# format: ログの出力形式(時刻 - レベル - メッセージ)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# このスクリプト専用のロガーを作成
logger = logging.getLogger(__name__)
📝 ポイント:loggingの使い分け
logger.info():通常の処理状況を記録logger.warning():注意が必要な状況を記録logger.error():エラー発生時に記録
2.2 SparkSession作成関数
💡 関数化する理由
SparkSessionの作成を関数化することで、設定の変更が容易になり、コードの再利用性も高まります。また、テスト時に異なる設定で実行することも簡単になります。
# ===================================
# SparkSession作成関数
# ===================================
def create_spark_session(app_name="ETL-Pipeline"):
"""
Spark Sessionを作成する関数
引数:
app_name: アプリケーション名(Spark UIで表示される)
戻り値:
設定済みのSparkSessionオブジェクト
"""
return SparkSession.builder \
.appName(app_name) \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()
# 【設定の解説】
# spark.sql.adaptive.enabled:
# → Sparkが実行時にクエリプランを動的に最適化
# → データの偏りに応じてパーティション調整
#
# spark.sql.adaptive.coalescePartitions.enabled:
# → 小さすぎるパーティションを自動で結合
# → シャッフル後のパフォーマンス向上
📥 3. データ抽出(Extract)
💡 スキーマを明示的に定義する理由
スキーマを明示的に定義することで以下のメリットがあります:
- パフォーマンス向上:推論処理が不要になり読み込みが高速化
- 型の厳密化:想定外のデータ型を早期に検出
- NULL処理の制御:各カラムでNULLを許容するか指定可能
3.1 データ抽出関数
# ===================================
# データ抽出関数
# ===================================
def extract_data(spark, base_path, date):
"""
複数のデータソースからデータを抽出する
引数:
spark: SparkSessionオブジェクト
base_path: データが格納されているベースパス(gs://bucket/path)
date: 処理対象日付(例: 2024-11-20)
戻り値:
4つのDataFrame(売上、顧客、商品、在庫)
"""
logger.info("=== データ抽出開始 ===")
try:
# ----------------------------------------
# 1. 売上データ(CSV形式)の読み込み
# ----------------------------------------
logger.info("売上データ読み込み中...")
# スキーマを明示的に定義(型推論よりも高速)
sales_schema = StructType([
StructField("sale_id", StringType(), True), # 売上ID
StructField("sale_date", DateType(), True), # 売上日
StructField("customer_id", StringType(), True), # 顧客ID
StructField("product_id", StringType(), True), # 商品ID
StructField("quantity", IntegerType(), True), # 数量
StructField("unit_price", DoubleType(), True), # 単価
StructField("total_amount", DoubleType(), True) # 合計金額
])
# CSVファイルを読み込み
# ワイルドカード(*.csv)で複数ファイルを一度に読み込み
sales = spark.read \
.schema(sales_schema) \
.csv(f"{base_path}/sales/date={date}/*.csv", header=True)
logger.info(f"売上データ: {sales.count():,}件")
# ----------------------------------------
# 2. 顧客データ(JSON形式)の読み込み
# ----------------------------------------
logger.info("顧客データ読み込み中...")
# JSONはスキーマ自動推論が高速なのでそのまま読み込み
customers = spark.read \
.json(f"{base_path}/customers/*.json")
logger.info(f"顧客データ: {customers.count():,}件")
# ----------------------------------------
# 3. 商品データ(Parquet形式)の読み込み
# ----------------------------------------
logger.info("商品データ読み込み中...")
# Parquetはスキーマ情報を持っているので指定不要
products = spark.read \
.parquet(f"{base_path}/products/")
logger.info(f"商品データ: {products.count():,}件")
# ----------------------------------------
# 4. 在庫データ(CSV形式)の読み込み
# ----------------------------------------
logger.info("在庫データ読み込み中...")
# 小さいデータなので型推論を使用
inventory = spark.read \
.csv(f"{base_path}/inventory/", header=True, inferSchema=True)
logger.info(f"在庫データ: {inventory.count():,}件")
logger.info("=== データ抽出完了 ===\n")
return sales, customers, products, inventory
except Exception as e:
# エラー発生時はログに記録して再スロー
logger.error(f"データ抽出エラー: {str(e)}")
raise
⚠️ 注意:try-exceptの重要性
本番環境では、ファイルが存在しない、形式が不正など、様々なエラーが発生します。try-exceptで適切にエラーをキャッチし、ログに記録することで、問題の原因特定が容易になります。
🧹 4. データ品質チェック
💡 なぜ品質チェックが必要か?
「Garbage In, Garbage Out」という格言があるように、入力データの品質が低いと、どんなに優れた分析も意味を持ちません。データ品質チェックは、問題を早期に検出し、信頼性の高い分析結果を保証するために不可欠です。
4.1 データ品質チェック関数
# ===================================
# データ品質チェック関数
# ===================================
def validate_data_quality(df, df_name):
"""
DataFrameのデータ品質をチェックする
引数:
df: チェック対象のDataFrame
df_name: データ名(ログ出力用)
戻り値:
True(チェック通過)/ エラー発生時は例外をスロー
"""
logger.info(f"=== {df_name} データ品質チェック ===")
# ----------------------------------------
# 1. レコード数のチェック
# ----------------------------------------
count = df.count()
logger.info(f"レコード数: {count:,}件")
# 0件の場合はエラー
if count == 0:
raise ValueError(f"{df_name}にデータがありません")
# ----------------------------------------
# 2. NULL値のチェック
# ----------------------------------------
# 各カラムのNULL数を集計
null_counts = df.select([
sum(col(c).isNull().cast('int')).alias(c)
for c in df.columns
]).collect()[0].asDict()
# 結果を表示
for col_name, null_count in null_counts.items():
if null_count > 0:
null_rate = null_count / count
logger.warning(f"{col_name}: {null_count:,}個のNULL ({null_rate:.2%})")
# NULL率10%以上は深刻な問題
if null_rate > 0.1:
logger.error(f"{col_name}のNULL率が高すぎます")
# ----------------------------------------
# 3. 重複チェック
# ----------------------------------------
# IDカラムがあれば重複をチェック
if 'id' in df.columns or df.columns[0].endswith('_id'):
id_col = 'id' if 'id' in df.columns else df.columns[0]
duplicate_count = count - df.select(id_col).distinct().count()
if duplicate_count > 0:
logger.warning(f"重複レコード: {duplicate_count:,}件")
logger.info(f"=== {df_name} チェック完了 ===\n")
return True
🧹 5. データクレンジング(Transform – Part 1)
💡 データクレンジングとは?
データクレンジングは、分析に適した形式にデータを整える作業です。具体的には以下の処理を行います:
- NULL値の処理(削除または補完)
- 不正な値の除外(負の数量など)
- データの整合性チェック
- フォーマットの統一
5.1 データクレンジング関数
# ===================================
# データクレンジング関数
# ===================================
def clean_data(sales, customers, products, inventory):
"""
各データソースをクレンジングする
引数:
sales: 売上データ
customers: 顧客データ
products: 商品データ
inventory: 在庫データ
戻り値:
クレンジング済みの4つのDataFrame
"""
logger.info("=== データクレンジング開始 ===")
# ----------------------------------------
# 1. 売上データのクレンジング
# ----------------------------------------
logger.info("売上データクレンジング中...")
sales_clean = sales \
.dropna(subset=['sale_id', 'customer_id', 'product_id']) \
.filter(col('quantity') > 0) \
.filter(col('unit_price') > 0) \
.filter(col('total_amount') > 0) \
.withColumn('total_amount_calc', col('quantity') * col('unit_price')) \
.filter(abs(col('total_amount') - col('total_amount_calc')) < 0.01) \
.drop('total_amount_calc')
# 【処理の解説】
# dropna: 必須フィールドがNULLのレコードを除外
# filter(quantity > 0): 数量が0以下のレコードを除外
# filter(unit_price > 0): 単価が0以下のレコードを除外
# withColumn: 数量×単価で合計金額を再計算
# filter(abs(...)): 計算値と記録値の差が0.01以上のレコードを除外
# → データの整合性チェック
logger.info(f"クレンジング後: {sales_clean.count():,}件")
# ----------------------------------------
# 2. 顧客データのクレンジング
# ----------------------------------------
logger.info("顧客データクレンジング中...")
customers_clean = customers \
.dropna(subset=['customer_id']) \
.withColumn('email_lower', lower(col('email'))) \
.withColumn('phone_clean', regexp_replace('phone', '[^0-9]', ''))
# 【処理の解説】
# lower(email): メールアドレスを小文字に統一
# regexp_replace: 電話番号から数字以外を除去
# ----------------------------------------
# 3. 商品データのクレンジング
# ----------------------------------------
logger.info("商品データクレンジング中...")
products_clean = products \
.dropna(subset=['product_id']) \
.filter(col('price') > 0)
# ----------------------------------------
# 4. 在庫データのクレンジング
# ----------------------------------------
logger.info("在庫データクレンジング中...")
inventory_clean = inventory \
.dropna(subset=['product_id']) \
.filter(col('stock_quantity') >= 0)
logger.info("=== データクレンジング完了 ===\n")
return sales_clean, customers_clean, products_clean, inventory_clean
🔗 6. データ変換・JOIN・集計(Transform – Part 2)
💡 JOINの戦略
複数のテーブルを結合する際は、以下の点を考慮します:
- 結合順序:小さいテーブルから結合するとパフォーマンスが向上
- 結合タイプ:left joinはNULLが発生する可能性がある
- キーの選択:一意性のあるキーで結合する
6.1 データ変換・結合関数
# ===================================
# データ変換・結合関数
# ===================================
def transform_data(sales, customers, products, inventory):
"""
データを結合し、分析用のカラムを追加する
引数:
sales: クレンジング済み売上データ
customers: クレンジング済み顧客データ
products: クレンジング済み商品データ
inventory: クレンジング済み在庫データ
戻り値:
結合・変換済みのDataFrame
"""
logger.info("=== データ変換開始 ===")
# ----------------------------------------
# 1. 売上 × 顧客 を結合
# ----------------------------------------
logger.info("売上×顧客 結合中...")
# left join: 売上データを基準に顧客情報を追加
# 顧客情報がない売上も保持される
sales_with_customer = sales.join(
customers,
on='customer_id',
how='left'
)
# ----------------------------------------
# 2. 結果 × 商品 を結合
# ----------------------------------------
logger.info("売上×商品 結合中...")
sales_enriched = sales_with_customer.join(
products,
on='product_id',
how='left'
)
# ----------------------------------------
# 3. 結果 × 在庫 を結合
# ----------------------------------------
logger.info("売上×在庫 結合中...")
sales_complete = sales_enriched.join(
inventory,
on='product_id',
how='left'
)
# ----------------------------------------
# 4. 分析用カラムの追加
# ----------------------------------------
logger.info("カラム生成中...")
sales_final = sales_complete \
.withColumn('year', year('sale_date')) \
.withColumn('month', month('sale_date')) \
.withColumn('day', dayofmonth('sale_date')) \
.withColumn('revenue', col('quantity') * col('unit_price')) \
.withColumn('profit',
col('revenue') - (col('quantity') * col('cost_price'))
)
# 【追加カラムの解説】
# year, month, day: 日付から抽出(パーティション用)
# revenue: 売上金額(数量 × 単価)
# profit: 利益(売上 - 原価)
logger.info(f"変換後: {sales_final.count():,}件")
logger.info("=== データ変換完了 ===\n")
return sales_final
6.2 集計関数
# ===================================
# データ集計関数
# ===================================
def aggregate_data(sales_final):
"""
各種サマリーテーブルを作成する
引数:
sales_final: 変換済みの売上データ
戻り値:
3つのサマリーDataFrame(日次、商品別、顧客別)
"""
logger.info("=== データ集計開始 ===")
# ----------------------------------------
# 1. 日次サマリー
# ----------------------------------------
logger.info("日次サマリー作成中...")
daily_summary = sales_final \
.groupBy('year', 'month', 'day', 'category', 'region') \
.agg(
count('*').alias('order_count'),
sum('revenue').alias('total_revenue'),
sum('profit').alias('total_profit'),
avg('revenue').alias('avg_order_value'),
countDistinct('customer_id').alias('unique_customers'),
sum('quantity').alias('total_quantity')
)
# 【集計内容】
# order_count: 注文数
# total_revenue: 売上合計
# total_profit: 利益合計
# avg_order_value: 平均注文額
# unique_customers: ユニーク顧客数
# total_quantity: 販売数量合計
# ----------------------------------------
# 2. 商品別サマリー
# ----------------------------------------
logger.info("商品別サマリー作成中...")
product_summary = sales_final \
.groupBy('product_id', 'product_name', 'category') \
.agg(
count('*').alias('sales_count'),
sum('quantity').alias('total_quantity'),
sum('revenue').alias('total_revenue'),
avg('unit_price').alias('avg_price')
) \
.orderBy(desc('total_revenue'))
# ----------------------------------------
# 3. 顧客別サマリー
# ----------------------------------------
logger.info("顧客別サマリー作成中...")
customer_summary = sales_final \
.groupBy('customer_id', 'customer_name', 'region') \
.agg(
count('*').alias('purchase_count'),
sum('revenue').alias('total_spent'),
avg('revenue').alias('avg_order_value'),
max('sale_date').alias('last_purchase_date')
)
logger.info("=== データ集計完了 ===\n")
return daily_summary, product_summary, customer_summary
💾 7. データ格納(Load)
💡 Parquet形式と圧縮の選択
Parquetは列指向フォーマットで、分析クエリに最適です:
- 特定の列だけ読み込めるので高速
- 高い圧縮率でストレージコスト削減
- スキーマ情報を保持
Snappy圧縮は、圧縮率と速度のバランスが良く、Sparkでのデフォルト推奨です。
7.1 データ保存関数
# ===================================
# データ保存関数
# ===================================
def load_data(sales_final, daily_summary, product_summary,
customer_summary, output_path):
"""
処理済みデータをParquet形式で保存する
引数:
sales_final: 詳細データ
daily_summary: 日次サマリー
product_summary: 商品別サマリー
customer_summary: 顧客別サマリー
output_path: 出力先パス
"""
logger.info("=== データ保存開始 ===")
try:
# ----------------------------------------
# 1. 詳細データ(パーティション分割)
# ----------------------------------------
logger.info("詳細データ保存中...")
sales_final \
.repartition(100, 'year', 'month') \
.write \
.mode('overwrite') \
.partitionBy('year', 'month') \
.option('compression', 'snappy') \
.parquet(f"{output_path}/sales_detail/")
# 【処理の解説】
# repartition(100, 'year', 'month'):
# → 年・月でパーティション、各100ファイル程度に分割
# mode('overwrite'):
# → 既存データを上書き
# partitionBy('year', 'month'):
# → year=2024/month=11/ のようなディレクトリ構造で保存
# option('compression', 'snappy'):
# → Snappy圧縮を使用(高速・まずまずの圧縮率)
logger.info(f"保存完了: {output_path}/sales_detail/")
# ----------------------------------------
# 2. 日次サマリー
# ----------------------------------------
logger.info("日次サマリー保存中...")
daily_summary \
.write \
.mode('overwrite') \
.partitionBy('year', 'month') \
.parquet(f"{output_path}/daily_summary/")
# ----------------------------------------
# 3. 商品別サマリー
# ----------------------------------------
logger.info("商品別サマリー保存中...")
product_summary \
.write \
.mode('overwrite') \
.parquet(f"{output_path}/product_summary/")
# ----------------------------------------
# 4. 顧客別サマリー
# ----------------------------------------
logger.info("顧客別サマリー保存中...")
customer_summary \
.write \
.mode('overwrite') \
.partitionBy('region') \
.parquet(f"{output_path}/customer_summary/")
logger.info("=== データ保存完了 ===\n")
except Exception as e:
logger.error(f"データ保存エラー: {str(e)}")
raise
7.2 メタデータ保存関数
💡 メタデータの重要性
メタデータは「データについてのデータ」です。処理の実行日時、所要時間、成否などを記録することで、後からの監査やトラブルシューティングに役立ちます。
# ===================================
# メタデータ保存関数
# ===================================
def save_metadata(spark, output_path, start_time, end_time):
"""
処理のメタデータを保存する
引数:
spark: SparkSessionオブジェクト
output_path: 出力先パス
start_time: 処理開始時刻(エポック秒)
end_time: 処理終了時刻(エポック秒)
"""
from datetime import datetime
# メタデータの作成
metadata = {
"pipeline_name": "ETL-Pipeline",
"execution_date": datetime.now().isoformat(),
"start_time": start_time,
"end_time": end_time,
"duration_seconds": end_time - start_time,
"output_path": output_path,
"status": "SUCCESS"
}
# DataFrameに変換して保存
metadata_df = spark.createDataFrame([metadata])
metadata_df.write \
.mode('overwrite') \
.json(f"{output_path}/metadata/")
⚡ 8. メイン処理とエラーハンドリング
8.1 メイン関数
# ===================================
# メイン処理
# ===================================
def main():
"""
ETLパイプラインのメイン処理
使い方:
spark-submit etl_pipeline.py <input_path> <output_path> <date>
例:
spark-submit etl_pipeline.py gs://bucket/input gs://bucket/output 2024-11-20
"""
# ----------------------------------------
# コマンドライン引数のチェック
# ----------------------------------------
if len(sys.argv) != 4:
logger.error("使い方: spark-submit etl.py ")
sys.exit(1)
input_path = sys.argv[1] # 入力パス
output_path = sys.argv[2] # 出力パス
date = sys.argv[3] # 処理日付
start_time = time.time()
try:
# ----------------------------------------
# 1. Spark Session作成
# ----------------------------------------
spark = create_spark_session()
# ----------------------------------------
# 2. Extract: データ抽出
# ----------------------------------------
sales, customers, products, inventory = extract_data(
spark, input_path, date
)
# ----------------------------------------
# 3. データ品質チェック
# ----------------------------------------
validate_data_quality(sales, "売上データ")
validate_data_quality(customers, "顧客データ")
validate_data_quality(products, "商品データ")
validate_data_quality(inventory, "在庫データ")
# ----------------------------------------
# 4. Transform: クレンジング
# ----------------------------------------
sales_clean, customers_clean, products_clean, inventory_clean = \
clean_data(sales, customers, products, inventory)
# ----------------------------------------
# 5. Transform: 変換・結合
# ----------------------------------------
sales_final = transform_data(
sales_clean, customers_clean, products_clean, inventory_clean
)
# ----------------------------------------
# 6. Transform: 集計
# ----------------------------------------
daily_summary, product_summary, customer_summary = \
aggregate_data(sales_final)
# ----------------------------------------
# 7. Load: データ保存
# ----------------------------------------
load_data(sales_final, daily_summary, product_summary,
customer_summary, output_path)
end_time = time.time()
# ----------------------------------------
# 8. メタデータ保存
# ----------------------------------------
save_metadata(spark, output_path, start_time, end_time)
# ----------------------------------------
# 9. 完了サマリー
# ----------------------------------------
logger.info("=" * 50)
logger.info("ETLパイプライン実行完了")
logger.info("=" * 50)
logger.info(f"処理時間: {end_time - start_time:.2f}秒")
logger.info(f"出力先: {output_path}")
logger.info("=" * 50)
return 0 # 成功
except Exception as e:
logger.error(f"ETLパイプライン失敗: {str(e)}")
return 1 # 失敗
finally:
# SparkSessionを確実に終了
if 'spark' in locals():
spark.stop()
# スクリプト実行のエントリーポイント
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)
☁️ 9. クラウド環境での実行
9.1 AWS EMRでの実行
# AWS EMRでジョブを投入
aws emr add-steps \
--cluster-id j-XXXXXXXXXXXXX \
--steps Type=Spark,Name="ETL-Pipeline",\
ActionOnFailure=CONTINUE,\
Args=[--deploy-mode,cluster,\
--master,yarn,\
gs://my-bucket/scripts/etl_pipeline_project.py,\
gs://my-bucket/input,\
gs://my-bucket/output,\
2024-11-20]
# 【パラメータ解説】
# --cluster-id: EMRクラスターID
# Type=Spark: Sparkジョブとして実行
# ActionOnFailure=CONTINUE: 失敗しても次のステップを実行
# --deploy-mode cluster: Driverをクラスター上で実行
# --master yarn: YARNをクラスターマネージャーとして使用
9.2 GCP Dataprocでの実行
# GCP Dataprocでジョブを投入
gcloud dataproc jobs submit pyspark \
gs://my-bucket/scripts/etl_pipeline_project.py \
--cluster=my-cluster \
--region=us-central1 \
-- \
gs://my-bucket/input \
gs://my-bucket/output \
2024-11-20
# 【パラメータ解説】
# gs://...: スクリプトのGCSパス
# --cluster: Dataprocクラスター名
# --region: クラスターのリージョン
# -- 以降: スクリプトへの引数
📝 まとめ
✅ このプロジェクトで学んだこと
- ETLパイプラインの本格的な実装
- 複数データソースの統合
- データ品質チェックの自動化
- 複雑な変換・JOIN・集計
- パーティション最適化
- エラーハンドリングとロギング
- クラウド環境での実行
🎯 成果物チェックリスト
- □ 売上詳細データ(Parquet、パーティション分割)
- □ 日次サマリー
- □ 商品別サマリー
- □ 顧客別サマリー
- □ メタデータ
- □ 実行ログ
💡 次のプロジェクトへ
お疲れ様でした!ETLパイプライン構築を完了しました。
次のSTEP 31では、機械学習向けデータ前処理に挑戦します。
学習メモ
ビッグデータ処理(Apache Spark) - Step 30