🛠️ STEP 26: 実践的なSparkジョブ設計
本番環境を想定したエラーハンドリングとログ設計をマスターしよう
📋 このステップで学ぶこと
- ジョブスクリプトの基本構成
- エラーハンドリングの実装
- ログ出力の設計
- 設定ファイルの管理
- 本番ジョブのベストプラクティス
本番環境では、「動けばOK」では不十分です。
エラー時の対応、ログの確認、再実行の容易さなど、運用を見据えた設計が必要です。
📁 1. ジョブスクリプトの基本構成
1-1. 本番ジョブの構成要素
本番環境で使用するSparkジョブには、以下の要素が必要です。
| 要素 | 目的 | 重要度 |
|---|---|---|
| 引数処理 | 入出力パス、日付などを外部から指定 | ★★★★★ |
| エラーハンドリング | 例外発生時の適切な処理 | ★★★★★ |
| ログ出力 | 処理状況の記録、デバッグ情報 | ★★★★☆ |
| バリデーション | 入力データの検証 | ★★★★☆ |
| メトリクス記録 | 処理件数、所要時間の記録 | ★★★☆☆ |
1-2. 基本テンプレート
# ===================================================== # 本番用Sparkジョブ テンプレート # ===================================================== import sys import logging from datetime import datetime from pyspark.sql import SparkSession from pyspark.sql import functions as F # ロガーの設定 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def create_spark_session(app_name): """SparkSessionを作成""" return SparkSession.builder \ .appName(app_name) \ .getOrCreate() def validate_input(spark, input_path): """入力データの検証""" try: df = spark.read.parquet(input_path) count = df.count() logger.info(f"入力データ件数: {count}") if count == 0: raise ValueError("入力データが空です") return df except Exception as e: logger.error(f"入力データの読み込みに失敗: {e}") raise def process_data(df): """メインの処理ロジック""" logger.info("データ処理を開始...") # ここに実際の処理を記述 result = df.groupBy("category").agg( F.sum("amount").alias("total_amount"), F.count("*").alias("count") ) logger.info("データ処理完了") return result def save_output(df, output_path): """結果の保存""" logger.info(f"出力先: {output_path}") df.write.parquet(output_path, mode="overwrite") logger.info("出力完了") def main(input_path, output_path): """メイン処理""" start_time = datetime.now() logger.info(f"ジョブ開始: {start_time}") spark = None exit_code = 0 try: # SparkSession作成 spark = create_spark_session("MySparkJob") # 入力データ読み込み・検証 df = validate_input(spark, input_path) # 処理実行 result = process_data(df) # 結果保存 save_output(result, output_path) logger.info("ジョブ正常終了") except Exception as e: logger.error(f"ジョブ失敗: {e}") exit_code = 1 finally: if spark: spark.stop() end_time = datetime.now() duration = end_time - start_time logger.info(f"処理時間: {duration}") sys.exit(exit_code) if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: spark-submit job.py <input_path> <output_path>") sys.exit(1) input_path = sys.argv[1] output_path = sys.argv[2] main(input_path, output_path)
1-3. ジョブの実行
# ジョブの実行例 # ローカル実行 spark-submit job.py \ /path/to/input \ /path/to/output # EMR / Dataprocでの実行 spark-submit job.py \ s3://bucket/input \ s3://bucket/output # または spark-submit job.py \ gs://bucket/input \ gs://bucket/output
🛡️ 2. エラーハンドリング
2-1. 想定されるエラーの種類
| エラー種類 | 原因 | 対処法 |
|---|---|---|
| 入力データエラー | ファイルが存在しない、形式が違う | 事前チェック、明確なエラーメッセージ |
| スキーマエラー | カラムが不足、型が違う | スキーマ検証、デフォルト値設定 |
| メモリエラー | データが大きすぎる | パーティション調整、キャッシュ削除 |
| 出力エラー | 権限がない、ディスク不足 | 事前権限チェック、リトライ |
2-2. try-except-finallyパターン
# 適切なエラーハンドリング def robust_data_load(spark, path, schema=None): """堅牢なデータ読み込み""" try: if schema: df = spark.read.schema(schema).parquet(path) else: df = spark.read.parquet(path) # 空データチェック if df.rdd.isEmpty(): logger.warning(f"警告: データが空です - {path}") return df except AnalysisException as e: # ファイルが見つからない場合 logger.error(f"ファイルが見つかりません: {path}") raise FileNotFoundError(f"入力ファイルが存在しません: {path}") except Exception as e: logger.error(f"データ読み込みエラー: {e}") raise
2-3. リトライ機能の実装
import time def retry_operation(func, max_retries=3, delay=10): """リトライ機能付き関数実行""" for attempt in range(max_retries): try: return func() except Exception as e: if attempt < max_retries - 1: logger.warning( f"処理失敗({attempt + 1}/{max_retries}): {e}" f" {delay}秒後にリトライ..." ) time.sleep(delay) else: logger.error(f"最大リトライ回数に達しました: {e}") raise # 使用例 def save_with_retry(df, output_path): def _save(): df.write.parquet(output_path, mode="overwrite") retry_operation(_save, max_retries=3, delay=30)
2-4. データ品質チェック
def validate_data_quality(df, checks): """データ品質チェック""" errors = [] # NULL値チェック if "no_nulls" in checks: for col_name in checks["no_nulls"]: null_count = df.filter(F.col(col_name).isNull()).count() if null_count > 0: errors.append(f"{col_name}にNULL値が{null_count}件あります") # 必須カラムチェック if "required_columns" in checks: existing_cols = set(df.columns) for col_name in checks["required_columns"]: if col_name not in existing_cols: errors.append(f"必須カラム {col_name} がありません") # 重複チェック if "unique_columns" in checks: unique_cols = checks["unique_columns"] dup_count = df.groupBy(unique_cols).count() \ .filter(F.col("count") > 1).count() if dup_count > 0: errors.append(f"重複レコードが{dup_count}件あります") if errors: for error in errors: logger.error(error) raise ValueError("データ品質チェックに失敗しました") logger.info("データ品質チェックOK") return True # 使用例 checks = { "no_nulls": ["user_id", "order_date"], "required_columns": ["user_id", "amount", "order_date"], "unique_columns": ["order_id"] } validate_data_quality(df, checks)
📋 3. ログ設計
3-1. ログレベルの使い分け
| レベル | 用途 | 例 |
|---|---|---|
| DEBUG | デバッグ用の詳細情報 | 変数の値、中間結果 |
| INFO | 処理の進捗、正常動作 | 処理開始/終了、件数 |
| WARNING | 注意が必要だが継続可能 | データが空、遅延発生 |
| ERROR | エラー発生、処理失敗 | 例外発生、検証失敗 |
3-2. 構造化ログの実装
import json from datetime import datetime class StructuredLogger: """構造化ログを出力するクラス""" def __init__(self, job_name): self.job_name = job_name self.job_id = datetime.now().strftime("%Y%m%d_%H%M%S") def _log(self, level, message, **kwargs): log_entry = { "timestamp": datetime.now().isoformat(), "level": level, "job_name": self.job_name, "job_id": self.job_id, "message": message, **kwargs } print(json.dumps(log_entry, ensure_ascii=False)) def info(self, message, **kwargs): self._log("INFO", message, **kwargs) def error(self, message, **kwargs): self._log("ERROR", message, **kwargs) def metrics(self, **kwargs): """メトリクスを記録""" self._log("METRICS", "metrics_recorded", **kwargs) # 使用例 logger = StructuredLogger("SalesAnalysisJob") logger.info("処理開始", input_path="s3://bucket/input") logger.info("データ読み込み完了", record_count=1000000) logger.metrics( input_records=1000000, output_records=50000, duration_seconds=120 )
{"timestamp": "2024-01-15T10:30:00", "level": "INFO", "job_name": "SalesAnalysisJob", "job_id": "20240115_103000", "message": "処理開始", "input_path": "s3://bucket/input"}
{"timestamp": "2024-01-15T10:32:00", "level": "INFO", "job_name": "SalesAnalysisJob", "job_id": "20240115_103000", "message": "データ読み込み完了", "record_count": 1000000}
{"timestamp": "2024-01-15T10:34:00", "level": "METRICS", "job_name": "SalesAnalysisJob", "job_id": "20240115_103000", "message": "metrics_recorded", "input_records": 1000000, "output_records": 50000, "duration_seconds": 120}
・検索しやすい:CloudWatch Logs、BigQueryなどで検索可能
・分析しやすい:メトリクスの可視化が容易
・アラート設定:特定のエラーで通知可能
⚙️ 4. 設定ファイルの管理
4-1. 環境別設定の分離
# config.yaml(設定ファイル)
development:
input_path: /home/user/data/input
output_path: /home/user/data/output
spark:
master: local[*]
executor_memory: 2g
staging:
input_path: s3://staging-bucket/input
output_path: s3://staging-bucket/output
spark:
master: yarn
executor_memory: 4g
production:
input_path: s3://prod-bucket/input
output_path: s3://prod-bucket/output
spark:
master: yarn
executor_memory: 8g
num_executors: 104-2. 設定を読み込むコード
import yaml import os def load_config(env=None): """環境に応じた設定を読み込む""" env = env or os.getenv("SPARK_ENV", "development") config_path = os.path.join( os.path.dirname(__file__), "config.yaml" ) with open(config_path, 'r') as f: all_config = yaml.safe_load(f) if env not in all_config: raise ValueError(f"Unknown environment: {env}") return all_config[env] # 使用例 config = load_config("production") print(config["input_path"]) # s3://prod-bucket/input
4-3. コマンドライン引数との組み合わせ
import argparse def parse_args(): """コマンドライン引数をパース""" parser = argparse.ArgumentParser(description='Spark Job') parser.add_argument( '--env', default='development', choices=['development', 'staging', 'production'], help='実行環境' ) parser.add_argument( '--date', required=True, help='処理対象日 (YYYY-MM-DD)' ) parser.add_argument( '--dry-run', action='store_true', help='ドライラン(保存しない)' ) return parser.parse_args() # 実行例: # spark-submit job.py --env production --date 2024-01-15
✅ 5. 本番ジョブのベストプラクティス
5-1. ジョブ設計のチェックリスト
□ 引数処理
・入出力パスを外部から指定可能
・日付パラメータで再実行が容易
□ エラーハンドリング
・入力データの存在チェック
・スキーマ検証
・適切な例外処理
□ ログ
・処理開始/終了の記録
・入出力件数の記録
・エラー時の詳細情報
□ 冪等性(べきとうせい)
・何度実行しても同じ結果
・mode=”overwrite”の使用
□ テスト
・小さいデータでの動作確認
・エッジケースのテスト
5-2. 冪等性(べきとうせい)の確保
冪等性とは、「何度実行しても同じ結果になる」という性質です。
本番ジョブでは、再実行時にデータが重複しないよう設計が必要です。
# ❌ 悪い例:追記モード(再実行でデータ重複) df.write.parquet(output_path, mode="append") # ✅ 良い例:上書きモード(再実行しても同じ結果) df.write.parquet(output_path, mode="overwrite") # ✅ 良い例:日付パーティションで上書き df.write.partitionBy("date").parquet( output_path, mode="overwrite" )
5-3. 完成版:本番用ジョブテンプレート
# ===================================================== # 本番用Sparkジョブ 完成版テンプレート # ===================================================== import sys import argparse import logging from datetime import datetime from pyspark.sql import SparkSession from pyspark.sql import functions as F # ロガー設定 logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s' ) logger = logging.getLogger(__name__) class SalesAnalysisJob: """売上分析ジョブ""" def __init__(self, args): self.args = args self.spark = None self.start_time = datetime.now() self.metrics = {} def run(self): """ジョブのメイン処理""" exit_code = 0 try: logger.info(f"ジョブ開始: date={self.args.date}") # 1. SparkSession作成 self.spark = self._create_spark_session() # 2. データ読み込み df = self._load_data() self.metrics['input_count'] = df.count() logger.info(f"入力件数: {self.metrics['input_count']}") # 3. データ処理 result = self._process(df) self.metrics['output_count'] = result.count() # 4. 結果保存 if not self.args.dry_run: self._save(result) else: logger.info("ドライラン: 保存をスキップ") logger.info("ジョブ正常終了") except Exception as e: logger.error(f"ジョブ失敗: {e}", exc_info=True) exit_code = 1 finally: self._cleanup() return exit_code def _create_spark_session(self): return SparkSession.builder \ .appName(f"SalesAnalysis_{self.args.date}") \ .getOrCreate() def _load_data(self): input_path = f"{self.args.input_path}/date={self.args.date}" logger.info(f"読み込み: {input_path}") return self.spark.read.parquet(input_path) def _process(self, df): logger.info("データ処理開始...") result = df.groupBy("category").agg( F.sum("amount").alias("total_sales") ) return result def _save(self, df): output_path = f"{self.args.output_path}/date={self.args.date}" logger.info(f"保存先: {output_path}") df.write.parquet(output_path, mode="overwrite") def _cleanup(self): if self.spark: self.spark.stop() duration = datetime.now() - self.start_time self.metrics['duration_seconds'] = duration.total_seconds() logger.info(f"処理時間: {duration}") logger.info(f"メトリクス: {self.metrics}") def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('--date', required=True, help='処理日 (YYYY-MM-DD)') parser.add_argument('--input-path', required=True, help='入力パス') parser.add_argument('--output-path', required=True, help='出力パス') parser.add_argument('--dry-run', action='store_true', help='ドライラン') return parser.parse_args() if __name__ == "__main__": args = parse_args() job = SalesAnalysisJob(args) sys.exit(job.run())
# 実行コマンド
spark-submit sales_job.py \
--date 2024-01-15 \
--input-path s3://bucket/sales \
--output-path s3://bucket/summary
# ログ出力例
2024-01-15 10:00:00 [INFO] ジョブ開始: date=2024-01-15
2024-01-15 10:00:01 [INFO] 読み込み: s3://bucket/sales/date=2024-01-15
2024-01-15 10:00:30 [INFO] 入力件数: 1000000
2024-01-15 10:00:31 [INFO] データ処理開始...
2024-01-15 10:02:00 [INFO] 保存先: s3://bucket/summary/date=2024-01-15
2024-01-15 10:02:30 [INFO] ジョブ正常終了
2024-01-15 10:02:30 [INFO] 処理時間: 0:02:30
2024-01-15 10:02:30 [INFO] メトリクス: {'input_count': 1000000, 'output_count': 50, 'duration_seconds': 150.0}📝 練習問題
エラーハンドリング
データフレームが空の場合に警告ログを出力し、Trueを返す関数を書いてください。
def is_empty_df(df): if df.rdd.isEmpty(): logger.warning("データフレームが空です") return True return False
引数パース
argparseを使って、–input-path(必須)、–output-path(必須)、–env(デフォルト:development)を受け取るparse_args関数を書いてください。
import argparse def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('--input-path', required=True) parser.add_argument('--output-path', required=True) parser.add_argument('--env', default='development') return parser.parse_args()
リトライ機能
最大3回まで10秒間隔でリトライする関数を実装してください。
import time def retry(func, max_retries=3, delay=10): for attempt in range(max_retries): try: return func() except Exception as e: if attempt < max_retries - 1: logger.warning(f"リトライ {attempt + 1}/{max_retries}") time.sleep(delay) else: raise
データ品質チェック
指定されたカラムにNULL値がないことを検証し、NULL値があればエラーメッセージをログに出力する関数を書いてください。
def check_no_nulls(df, columns): """指定カラムにNULL値がないかチェック""" errors = [] for col_name in columns: null_count = df.filter(F.col(col_name).isNull()).count() if null_count > 0: errors.append(f"{col_name}: {null_count}件のNULL") logger.error(f"{col_name}にNULL値が{null_count}件あります") if errors: raise ValueError(f"NULL値チェック失敗: {errors}") logger.info("NULL値チェックOK") return True
❓ よくある質問
📝 STEP 26 のまとめ
・ジョブ構成:引数処理、エラーハンドリング、ログ、メトリクス
・エラーハンドリング:try-except-finally、リトライ、検証
・ログ設計:レベル使い分け、構造化ログ
・設定管理:環境別設定、コマンドライン引数
・冪等性:何度実行しても同じ結果
STEP 23〜26で、クラウド環境でのSpark実行を学びました。
・AWS EMR / GCP Dataprocの使い方
・コスト最適化(スポットインスタンス、自動スケーリング)
・本番用ジョブの設計
次のPart 7では、「高度なトピック」を学びます。
Spark MLlibによる機械学習と、構造化ストリーミングを習得しましょう!
学習メモ
ビッグデータ処理(Apache Spark) - Step 26