STEP 26:実践的なSparkジョブ

🛠️ STEP 26: 実践的なSparkジョブ設計

本番環境を想定したエラーハンドリングとログ設計をマスターしよう

📋 このステップで学ぶこと

  • ジョブスクリプトの基本構成
  • エラーハンドリングの実装
  • ログ出力の設計
  • 設定ファイルの管理
  • 本番ジョブのベストプラクティス
💡 このステップの重要性

本番環境では、「動けばOK」では不十分です。
エラー時の対応、ログの確認、再実行の容易さなど、運用を見据えた設計が必要です。

📁 1. ジョブスクリプトの基本構成

1-1. 本番ジョブの構成要素

本番環境で使用するSparkジョブには、以下の要素が必要です。

要素 目的 重要度
引数処理 入出力パス、日付などを外部から指定 ★★★★★
エラーハンドリング 例外発生時の適切な処理 ★★★★★
ログ出力 処理状況の記録、デバッグ情報 ★★★★☆
バリデーション 入力データの検証 ★★★★☆
メトリクス記録 処理件数、所要時間の記録 ★★★☆☆

1-2. 基本テンプレート

# =====================================================
# 本番用Sparkジョブ テンプレート
# =====================================================

import sys
import logging
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# ロガーの設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


def create_spark_session(app_name):
    """SparkSessionを作成"""
    return SparkSession.builder \
        .appName(app_name) \
        .getOrCreate()


def validate_input(spark, input_path):
    """入力データの検証"""
    try:
        df = spark.read.parquet(input_path)
        count = df.count()
        logger.info(f"入力データ件数: {count}")
        
        if count == 0:
            raise ValueError("入力データが空です")
        
        return df
    except Exception as e:
        logger.error(f"入力データの読み込みに失敗: {e}")
        raise


def process_data(df):
    """メインの処理ロジック"""
    logger.info("データ処理を開始...")
    
    # ここに実際の処理を記述
    result = df.groupBy("category").agg(
        F.sum("amount").alias("total_amount"),
        F.count("*").alias("count")
    )
    
    logger.info("データ処理完了")
    return result


def save_output(df, output_path):
    """結果の保存"""
    logger.info(f"出力先: {output_path}")
    
    df.write.parquet(output_path, mode="overwrite")
    
    logger.info("出力完了")


def main(input_path, output_path):
    """メイン処理"""
    start_time = datetime.now()
    logger.info(f"ジョブ開始: {start_time}")
    
    spark = None
    exit_code = 0
    
    try:
        # SparkSession作成
        spark = create_spark_session("MySparkJob")
        
        # 入力データ読み込み・検証
        df = validate_input(spark, input_path)
        
        # 処理実行
        result = process_data(df)
        
        # 結果保存
        save_output(result, output_path)
        
        logger.info("ジョブ正常終了")
        
    except Exception as e:
        logger.error(f"ジョブ失敗: {e}")
        exit_code = 1
        
    finally:
        if spark:
            spark.stop()
        
        end_time = datetime.now()
        duration = end_time - start_time
        logger.info(f"処理時間: {duration}")
    
    sys.exit(exit_code)


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: spark-submit job.py <input_path> <output_path>")
        sys.exit(1)
    
    input_path = sys.argv[1]
    output_path = sys.argv[2]
    
    main(input_path, output_path)

1-3. ジョブの実行

# ジョブの実行例

# ローカル実行
spark-submit job.py \
    /path/to/input \
    /path/to/output

# EMR / Dataprocでの実行
spark-submit job.py \
    s3://bucket/input \
    s3://bucket/output

# または
spark-submit job.py \
    gs://bucket/input \
    gs://bucket/output

🛡️ 2. エラーハンドリング

2-1. 想定されるエラーの種類

エラー種類 原因 対処法
入力データエラー ファイルが存在しない、形式が違う 事前チェック、明確なエラーメッセージ
スキーマエラー カラムが不足、型が違う スキーマ検証、デフォルト値設定
メモリエラー データが大きすぎる パーティション調整、キャッシュ削除
出力エラー 権限がない、ディスク不足 事前権限チェック、リトライ

2-2. try-except-finallyパターン

# 適切なエラーハンドリング

def robust_data_load(spark, path, schema=None):
    """堅牢なデータ読み込み"""
    try:
        if schema:
            df = spark.read.schema(schema).parquet(path)
        else:
            df = spark.read.parquet(path)
        
        # 空データチェック
        if df.rdd.isEmpty():
            logger.warning(f"警告: データが空です - {path}")
        
        return df
        
    except AnalysisException as e:
        # ファイルが見つからない場合
        logger.error(f"ファイルが見つかりません: {path}")
        raise FileNotFoundError(f"入力ファイルが存在しません: {path}")
        
    except Exception as e:
        logger.error(f"データ読み込みエラー: {e}")
        raise

2-3. リトライ機能の実装

import time

def retry_operation(func, max_retries=3, delay=10):
    """リトライ機能付き関数実行"""
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt < max_retries - 1:
                logger.warning(
                    f"処理失敗({attempt + 1}/{max_retries}): {e}"
                    f" {delay}秒後にリトライ..."
                )
                time.sleep(delay)
            else:
                logger.error(f"最大リトライ回数に達しました: {e}")
                raise

# 使用例
def save_with_retry(df, output_path):
    def _save():
        df.write.parquet(output_path, mode="overwrite")
    
    retry_operation(_save, max_retries=3, delay=30)

2-4. データ品質チェック

def validate_data_quality(df, checks):
    """データ品質チェック"""
    errors = []
    
    # NULL値チェック
    if "no_nulls" in checks:
        for col_name in checks["no_nulls"]:
            null_count = df.filter(F.col(col_name).isNull()).count()
            if null_count > 0:
                errors.append(f"{col_name}にNULL値が{null_count}件あります")
    
    # 必須カラムチェック
    if "required_columns" in checks:
        existing_cols = set(df.columns)
        for col_name in checks["required_columns"]:
            if col_name not in existing_cols:
                errors.append(f"必須カラム {col_name} がありません")
    
    # 重複チェック
    if "unique_columns" in checks:
        unique_cols = checks["unique_columns"]
        dup_count = df.groupBy(unique_cols).count() \
                      .filter(F.col("count") > 1).count()
        if dup_count > 0:
            errors.append(f"重複レコードが{dup_count}件あります")
    
    if errors:
        for error in errors:
            logger.error(error)
        raise ValueError("データ品質チェックに失敗しました")
    
    logger.info("データ品質チェックOK")
    return True

# 使用例
checks = {
    "no_nulls": ["user_id", "order_date"],
    "required_columns": ["user_id", "amount", "order_date"],
    "unique_columns": ["order_id"]
}
validate_data_quality(df, checks)

📋 3. ログ設計

3-1. ログレベルの使い分け

レベル 用途
DEBUG デバッグ用の詳細情報 変数の値、中間結果
INFO 処理の進捗、正常動作 処理開始/終了、件数
WARNING 注意が必要だが継続可能 データが空、遅延発生
ERROR エラー発生、処理失敗 例外発生、検証失敗

3-2. 構造化ログの実装

import json
from datetime import datetime

class StructuredLogger:
    """構造化ログを出力するクラス"""
    
    def __init__(self, job_name):
        self.job_name = job_name
        self.job_id = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    def _log(self, level, message, **kwargs):
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "level": level,
            "job_name": self.job_name,
            "job_id": self.job_id,
            "message": message,
            **kwargs
        }
        print(json.dumps(log_entry, ensure_ascii=False))
    
    def info(self, message, **kwargs):
        self._log("INFO", message, **kwargs)
    
    def error(self, message, **kwargs):
        self._log("ERROR", message, **kwargs)
    
    def metrics(self, **kwargs):
        """メトリクスを記録"""
        self._log("METRICS", "metrics_recorded", **kwargs)

# 使用例
logger = StructuredLogger("SalesAnalysisJob")

logger.info("処理開始", input_path="s3://bucket/input")
logger.info("データ読み込み完了", record_count=1000000)
logger.metrics(
    input_records=1000000,
    output_records=50000,
    duration_seconds=120
)
{"timestamp": "2024-01-15T10:30:00", "level": "INFO", "job_name": "SalesAnalysisJob", "job_id": "20240115_103000", "message": "処理開始", "input_path": "s3://bucket/input"}
{"timestamp": "2024-01-15T10:32:00", "level": "INFO", "job_name": "SalesAnalysisJob", "job_id": "20240115_103000", "message": "データ読み込み完了", "record_count": 1000000}
{"timestamp": "2024-01-15T10:34:00", "level": "METRICS", "job_name": "SalesAnalysisJob", "job_id": "20240115_103000", "message": "metrics_recorded", "input_records": 1000000, "output_records": 50000, "duration_seconds": 120}
💡 構造化ログのメリット

検索しやすい:CloudWatch Logs、BigQueryなどで検索可能
分析しやすい:メトリクスの可視化が容易
アラート設定:特定のエラーで通知可能

⚙️ 4. 設定ファイルの管理

4-1. 環境別設定の分離

# config.yaml(設定ファイル)

development:
  input_path: /home/user/data/input
  output_path: /home/user/data/output
  spark:
    master: local[*]
    executor_memory: 2g

staging:
  input_path: s3://staging-bucket/input
  output_path: s3://staging-bucket/output
  spark:
    master: yarn
    executor_memory: 4g

production:
  input_path: s3://prod-bucket/input
  output_path: s3://prod-bucket/output
  spark:
    master: yarn
    executor_memory: 8g
    num_executors: 10

4-2. 設定を読み込むコード

import yaml
import os

def load_config(env=None):
    """環境に応じた設定を読み込む"""
    env = env or os.getenv("SPARK_ENV", "development")
    
    config_path = os.path.join(
        os.path.dirname(__file__), 
        "config.yaml"
    )
    
    with open(config_path, 'r') as f:
        all_config = yaml.safe_load(f)
    
    if env not in all_config:
        raise ValueError(f"Unknown environment: {env}")
    
    return all_config[env]

# 使用例
config = load_config("production")
print(config["input_path"])  # s3://prod-bucket/input

4-3. コマンドライン引数との組み合わせ

import argparse

def parse_args():
    """コマンドライン引数をパース"""
    parser = argparse.ArgumentParser(description='Spark Job')
    
    parser.add_argument(
        '--env', 
        default='development',
        choices=['development', 'staging', 'production'],
        help='実行環境'
    )
    parser.add_argument(
        '--date',
        required=True,
        help='処理対象日 (YYYY-MM-DD)'
    )
    parser.add_argument(
        '--dry-run',
        action='store_true',
        help='ドライラン(保存しない)'
    )
    
    return parser.parse_args()

# 実行例:
# spark-submit job.py --env production --date 2024-01-15

✅ 5. 本番ジョブのベストプラクティス

5-1. ジョブ設計のチェックリスト

📋 本番投入前のチェックリスト

□ 引数処理
 ・入出力パスを外部から指定可能
 ・日付パラメータで再実行が容易

□ エラーハンドリング
 ・入力データの存在チェック
 ・スキーマ検証
 ・適切な例外処理

□ ログ
 ・処理開始/終了の記録
 ・入出力件数の記録
 ・エラー時の詳細情報

□ 冪等性(べきとうせい)
 ・何度実行しても同じ結果
 ・mode=”overwrite”の使用

□ テスト
 ・小さいデータでの動作確認
 ・エッジケースのテスト

5-2. 冪等性(べきとうせい)の確保

冪等性とは、「何度実行しても同じ結果になる」という性質です。
本番ジョブでは、再実行時にデータが重複しないよう設計が必要です。

# ❌ 悪い例:追記モード(再実行でデータ重複)
df.write.parquet(output_path, mode="append")

# ✅ 良い例:上書きモード(再実行しても同じ結果)
df.write.parquet(output_path, mode="overwrite")

# ✅ 良い例:日付パーティションで上書き
df.write.partitionBy("date").parquet(
    output_path, 
    mode="overwrite"
)

5-3. 完成版:本番用ジョブテンプレート

# =====================================================
# 本番用Sparkジョブ 完成版テンプレート
# =====================================================

import sys
import argparse
import logging
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# ロガー設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)


class SalesAnalysisJob:
    """売上分析ジョブ"""
    
    def __init__(self, args):
        self.args = args
        self.spark = None
        self.start_time = datetime.now()
        self.metrics = {}
    
    def run(self):
        """ジョブのメイン処理"""
        exit_code = 0
        
        try:
            logger.info(f"ジョブ開始: date={self.args.date}")
            
            # 1. SparkSession作成
            self.spark = self._create_spark_session()
            
            # 2. データ読み込み
            df = self._load_data()
            self.metrics['input_count'] = df.count()
            logger.info(f"入力件数: {self.metrics['input_count']}")
            
            # 3. データ処理
            result = self._process(df)
            self.metrics['output_count'] = result.count()
            
            # 4. 結果保存
            if not self.args.dry_run:
                self._save(result)
            else:
                logger.info("ドライラン: 保存をスキップ")
            
            logger.info("ジョブ正常終了")
            
        except Exception as e:
            logger.error(f"ジョブ失敗: {e}", exc_info=True)
            exit_code = 1
            
        finally:
            self._cleanup()
        
        return exit_code
    
    def _create_spark_session(self):
        return SparkSession.builder \
            .appName(f"SalesAnalysis_{self.args.date}") \
            .getOrCreate()
    
    def _load_data(self):
        input_path = f"{self.args.input_path}/date={self.args.date}"
        logger.info(f"読み込み: {input_path}")
        return self.spark.read.parquet(input_path)
    
    def _process(self, df):
        logger.info("データ処理開始...")
        result = df.groupBy("category").agg(
            F.sum("amount").alias("total_sales")
        )
        return result
    
    def _save(self, df):
        output_path = f"{self.args.output_path}/date={self.args.date}"
        logger.info(f"保存先: {output_path}")
        df.write.parquet(output_path, mode="overwrite")
    
    def _cleanup(self):
        if self.spark:
            self.spark.stop()
        
        duration = datetime.now() - self.start_time
        self.metrics['duration_seconds'] = duration.total_seconds()
        logger.info(f"処理時間: {duration}")
        logger.info(f"メトリクス: {self.metrics}")


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--date', required=True, help='処理日 (YYYY-MM-DD)')
    parser.add_argument('--input-path', required=True, help='入力パス')
    parser.add_argument('--output-path', required=True, help='出力パス')
    parser.add_argument('--dry-run', action='store_true', help='ドライラン')
    return parser.parse_args()


if __name__ == "__main__":
    args = parse_args()
    job = SalesAnalysisJob(args)
    sys.exit(job.run())
# 実行コマンド
spark-submit sales_job.py \
    --date 2024-01-15 \
    --input-path s3://bucket/sales \
    --output-path s3://bucket/summary

# ログ出力例
2024-01-15 10:00:00 [INFO] ジョブ開始: date=2024-01-15
2024-01-15 10:00:01 [INFO] 読み込み: s3://bucket/sales/date=2024-01-15
2024-01-15 10:00:30 [INFO] 入力件数: 1000000
2024-01-15 10:00:31 [INFO] データ処理開始...
2024-01-15 10:02:00 [INFO] 保存先: s3://bucket/summary/date=2024-01-15
2024-01-15 10:02:30 [INFO] ジョブ正常終了
2024-01-15 10:02:30 [INFO] 処理時間: 0:02:30
2024-01-15 10:02:30 [INFO] メトリクス: {'input_count': 1000000, 'output_count': 50, 'duration_seconds': 150.0}

📝 練習問題

問題 1 基礎

エラーハンドリング

データフレームが空の場合に警告ログを出力し、Trueを返す関数を書いてください。

【解答】
def is_empty_df(df):
    if df.rdd.isEmpty():
        logger.warning("データフレームが空です")
        return True
    return False
問題 2 応用

引数パース

argparseを使って、–input-path(必須)、–output-path(必須)、–env(デフォルト:development)を受け取るparse_args関数を書いてください。

【解答】
import argparse

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--input-path', required=True)
    parser.add_argument('--output-path', required=True)
    parser.add_argument('--env', default='development')
    return parser.parse_args()
問題 3 応用

リトライ機能

最大3回まで10秒間隔でリトライする関数を実装してください。

【解答】
import time

def retry(func, max_retries=3, delay=10):
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt < max_retries - 1:
                logger.warning(f"リトライ {attempt + 1}/{max_retries}")
                time.sleep(delay)
            else:
                raise
問題 4 実践

データ品質チェック

指定されたカラムにNULL値がないことを検証し、NULL値があればエラーメッセージをログに出力する関数を書いてください。

【解答】
def check_no_nulls(df, columns):
    """指定カラムにNULL値がないかチェック"""
    errors = []
    
    for col_name in columns:
        null_count = df.filter(F.col(col_name).isNull()).count()
        if null_count > 0:
            errors.append(f"{col_name}: {null_count}件のNULL")
            logger.error(f"{col_name}にNULL値が{null_count}件あります")
    
    if errors:
        raise ValueError(f"NULL値チェック失敗: {errors}")
    
    logger.info("NULL値チェックOK")
    return True

❓ よくある質問

Q1: ジョブが失敗したとき、どうやって原因を調べる?
1. ログを確認(S3/GCSのログバケット)、2. Spark Web UIでステージを確認3. エラーメッセージを検索の順で調査します。
Q2: 冪等性はなぜ重要?
本番環境では失敗→再実行が日常的に発生します。冪等性がないと、再実行でデータが重複したり、不整合が発生したりします。
Q3: ドライランはどういう時に使う?
本番データで処理をテストしたい時に使います。データは読み込んで処理するが、保存はスキップするので、安全に動作確認できます。
Q4: クラスでジョブを書くメリットは?
テストしやすい(モック化が容易)、再利用しやすい(継承でカスタマイズ)、状態管理しやすい(メトリクスなど)といったメリットがあります。

📝 STEP 26 のまとめ

✅ このステップで学んだこと

ジョブ構成:引数処理、エラーハンドリング、ログ、メトリクス
エラーハンドリング:try-except-finally、リトライ、検証
ログ設計:レベル使い分け、構造化ログ
設定管理:環境別設定、コマンドライン引数
冪等性:何度実行しても同じ結果

🎉 Part 6(Spark on Cloud)完了!

STEP 23〜26で、クラウド環境でのSpark実行を学びました。
・AWS EMR / GCP Dataprocの使い方
・コスト最適化(スポットインスタンス、自動スケーリング)
・本番用ジョブの設計

🎯 次のステップの予告

次のPart 7では、「高度なトピック」を学びます。
Spark MLlibによる機械学習と、構造化ストリーミングを習得しましょう!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 26

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE