STEP 17:エラーハンドリングとロギング

🛡️ STEP 17: エラーハンドリングとロギング

エラーに強く、問題を追跡できるETLパイプラインを作ろう

📋 このステップで学ぶこと

  • try-except の実践的な使い方
  • loggingモジュールの活用
  • エラー発生時のリトライ戦略
  • ログファイルの設計
  • 実践演習:堅牢なETLスクリプト作成

⏱️ 学習時間の目安:2時間

📝 練習問題:10問(基礎4問・応用4問・発展2問)

🎯 1. なぜエラーハンドリングとロギングが重要なのか?

1-1. 実務でのよくある問題

ETLパイプラインは夜間バッチで自動実行されることが多いです。
朝出社したら「エラーで止まっていた…」なんてことも。

📚 例え話:飛行機のブラックボックス

飛行機には「ブラックボックス」という記録装置がありますよね。
事故が起きたとき、何が起きたかを後から調べるためです。

エラーハンドリング = トラブル時の「緊急対応マニュアル」
ロギング = 飛行機の「ブラックボックス」

どちらも「何か起きたときに対処できる」ための備えです。

😱 エラーハンドリングがないとこうなる
  • エラーが出ても気づかない
  • どこで止まったかわからない
  • 原因を調べるのに時間がかかる
  • データが中途半端な状態になる
✅ エラーハンドリングとロギングがあると
  • エラーが出ても処理を続行できる
  • 問題箇所がすぐにわかる
  • エラー内容が記録される
  • 自動でリトライできる
  • メールやSlackで通知できる

1-2. エラーの種類と対処方法

エラーの種類と対処方法
エラーの種類 対処方法 具体例
一時的なエラー リトライする ネットワークエラー、DB接続タイムアウト
修正可能なエラー 記録してスキップ データ型の不一致、欠損値の多いレコード
致命的なエラー 処理を停止 認証エラー、ディスク容量不足

🔧 2. try-except の実践的な使い方

2-1. 基本的な try-except

try-exceptは、エラーが出てもプログラムを止めない仕組みです。

# ===== エラーハンドリングの比較 ===== import pandas as pd # ❌ エラーハンドリングなし(エラーで止まる) def load_csv_bad(filename): df = pd.read_csv(filename) return df # ✅ エラーハンドリングあり(エラーでも止まらない) def load_csv_good(filename): try: df = pd.read_csv(filename) print(f”✅ ファイル読み込み成功: {filename}”) return df except FileNotFoundError: print(f”❌ ファイルが見つかりません: {filename}”) return None except pd.errors.EmptyDataError: print(f”❌ ファイルが空です: {filename}”) return None except Exception as e: print(f”❌ 予期しないエラー: {e}”) return None # 使用例 df = load_csv_good(‘存在しないファイル.csv’)
【実行結果】 ❌ ファイルが見つかりません: 存在しないファイル.csv

2-2. 複数の例外をキャッチ

# ===== 複数の例外をキャッチ ===== def safe_divide(a, b): “””安全な割り算””” try: result = a / b print(f”✅ 計算成功: {a} ÷ {b} = {result}”) return result except ZeroDivisionError: print(“❌ 0で割ることはできません”) return None except TypeError: print(“❌ 数値以外が入力されました”) return None except Exception as e: print(f”❌ 予期しないエラー: {e}”) return None # テスト safe_divide(10, 2) # 成功 safe_divide(10, 0) # ZeroDivisionError safe_divide(10, “a”) # TypeError
【実行結果】 ✅ 計算成功: 10 ÷ 2 = 5.0 ❌ 0で割ることはできません ❌ 数値以外が入力されました

2-3. finally で必ずクリーンアップ

finallyブロックは、エラーが出ても出なくても必ず実行されます。

# ===== finallyで必ずクリーンアップ ===== def read_file_with_cleanup(filename): “””ファイルを読んで必ずクローズする””” f = None try: f = open(filename, ‘r’) content = f.read() print(“✅ ファイル読み込み成功”) return content except FileNotFoundError: print(“❌ ファイルが見つかりません”) return None finally: # エラーが出ても出なくても、ファイルをクローズ if f: f.close() print(“🔒 ファイルをクローズしました”) # 使用例 read_file_with_cleanup(‘test.txt’)
【実行結果】 ❌ ファイルが見つかりません 🔒 ファイルをクローズしました

2-4. with文を使った自動クリーンアップ(推奨)

# ===== with文を使うと自動でクローズされる ===== def read_file_best(filename): try: with open(filename, ‘r’) as f: content = f.read() print(“✅ ファイル読み込み成功”) return content except FileNotFoundError: print(“❌ ファイルが見つかりません”) return None # ファイルは自動でクローズされる
💡 例外処理のベストプラクティス
  • 具体的な例外から先にキャッチする
  • Exceptionは最後に書く(汎用的すぎる)
  • エラーメッセージはわかりやすく
  • with文を使ってリソースを自動管理

2-5. カスタム例外の作成

# ===== カスタム例外クラス ===== class DataValidationError(Exception): “””データ検証エラー””” pass def validate_age(age): “””年齢を検証””” try: age = int(age) if age < 0 or age > 150: raise DataValidationError(f”年齢が範囲外です: {age}”) print(f”✅ 年齢検証OK: {age}”) return age except ValueError: print(f”❌ 年齢が数値ではありません: {age}”) return None except DataValidationError as e: print(f”❌ {e}”) return None # テスト validate_age(25) # OK validate_age(200) # 範囲外 validate_age(“abc”) # 数値ではない
【実行結果】 ✅ 年齢検証OK: 25 ❌ 年齢が範囲外です: 200 ❌ 年齢が数値ではありません: abc

📝 3. loggingモジュールの活用

3-1. なぜprintではなくloggingを使うのか?

printとloggingの比較
機能 print logging
ログレベル なし DEBUG, INFO, WARNING, ERROR, CRITICAL
ファイル保存 できない 簡単にできる
タイムスタンプ 手動で追加 自動で追加
本番/開発の切替 難しい 簡単

3-2. loggingの基本

# ===== loggingの基本 ===== import logging # ロガーの設定 logging.basicConfig( level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’ ) # ログ出力 logging.debug(“デバッグ情報”) # 表示されない(INFOレベル以上のみ) logging.info(“処理開始”) logging.warning(“警告メッセージ”) logging.error(“エラーが発生”) logging.critical(“致命的なエラー”)
【実行結果】 2024-01-15 10:30:00,123 – INFO – 処理開始 2024-01-15 10:30:00,124 – WARNING – 警告メッセージ 2024-01-15 10:30:00,125 – ERROR – エラーが発生 2024-01-15 10:30:00,126 – CRITICAL – 致命的なエラー

3-3. ログレベルの使い分け

ログレベルの意味と使い分け
レベル 数値 使い方
DEBUG 10 開発時のデバッグ情報(詳細すぎる情報)
INFO 20 通常の動作ログ(処理の進捗、件数など)
WARNING 30 警告(処理は続行するが注意が必要)
ERROR 40 エラー(処理は失敗したが続行可能)
CRITICAL 50 致命的エラー(システム停止レベル)

3-4. ファイルにログを保存

# ===== ファイルにログを保存 ===== import logging # ファイルにログを保存する設定 logging.basicConfig( level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’, handlers=[ logging.FileHandler(‘etl.log’, encoding=’utf-8′), # ファイルに保存 logging.StreamHandler() # コンソールにも表示 ] ) logging.info(“ETL処理を開始します”) logging.info(“データを読み込み中…”) logging.warning(“欠損値が見つかりました”) logging.info(“ETL処理が完了しました”) # ログファイルの内容を確認 print(“\n=== ログファイル(etl.log)の内容 ===”) with open(‘etl.log’, ‘r’, encoding=’utf-8′) as f: print(f.read())
【実行結果】 2024-01-15 10:30:00,123 – INFO – ETL処理を開始します 2024-01-15 10:30:00,124 – INFO – データを読み込み中… 2024-01-15 10:30:00,125 – WARNING – 欠損値が見つかりました 2024-01-15 10:30:00,126 – INFO – ETL処理が完了しました === ログファイル(etl.log)の内容 === 2024-01-15 10:30:00,123 – INFO – ETL処理を開始します 2024-01-15 10:30:00,124 – INFO – データを読み込み中… 2024-01-15 10:30:00,125 – WARNING – 欠損値が見つかりました 2024-01-15 10:30:00,126 – INFO – ETL処理が完了しました

3-5. ローテーション付きログ(実務向け)

ログファイルが大きくなりすぎないように、自動でローテーションさせます。

# ===== ローテーション付きログ ===== import logging from logging.handlers import RotatingFileHandler # ロガーを作成 logger = logging.getLogger(‘etl_pipeline’) logger.setLevel(logging.DEBUG) # フォーマットを定義 formatter = logging.Formatter( ‘%(asctime)s – %(name)s – %(levelname)s – %(funcName)s:%(lineno)d – %(message)s’, datefmt=’%Y-%m-%d %H:%M:%S’ ) # ファイルハンドラ(ローテーション付き) file_handler = RotatingFileHandler( ‘etl_pipeline.log’, maxBytes=10*1024*1024, # 10MB backupCount=5, # 5世代保持 encoding=’utf-8′ ) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) # コンソールハンドラ console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # コンソールはINFO以上のみ console_handler.setFormatter(formatter) # ロガーにハンドラを追加 logger.addHandler(file_handler) logger.addHandler(console_handler) # 使用例 logger.debug(“デバッグ情報(ファイルのみに記録)”) logger.info(“処理開始”) logger.error(“エラー発生”)
【実行結果】 2024-01-15 10:30:00 – etl_pipeline – INFO – <module>:25 – 処理開始 2024-01-15 10:30:00 – etl_pipeline – ERROR – <module>:26 – エラー発生
💡 ローテーションのメリット
  • ディスク容量の節約:古いログが自動削除される
  • ログの管理が楽:日付ごとに分かれる
  • パフォーマンス維持:巨大なログファイルを避けられる

🔄 4. リトライ戦略

4-1. なぜリトライが必要か?

ネットワークエラーやDB接続タイムアウトなど、一時的なエラー再試行すると成功することがあります。

4-2. 基本的なリトライ実装

# ===== 基本的なリトライ ===== import time import logging logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’) logger = logging.getLogger(__name__) def retry_function(func, max_retries=3, delay=1): “”” 関数をリトライする Parameters: ———– func : function 実行する関数 max_retries : int 最大リトライ回数 delay : int リトライ間隔(秒) “”” for attempt in range(max_retries): try: result = func() logger.info(f”✅ 成功({attempt + 1}回目)”) return result except Exception as e: logger.warning(f”❌ 失敗({attempt + 1}回目): {e}”) if attempt < max_retries - 1: logger.info(f"⏳ {delay}秒後にリトライします...") time.sleep(delay) else: logger.error(f"❌ {max_retries}回リトライしましたが失敗しました") raise # テスト用の関数(3回目で成功) attempt_count = 0 def unstable_function(): global attempt_count attempt_count += 1 if attempt_count < 3: raise Exception(f"一時的なエラー({attempt_count}回目)") return "成功!" # 実行 result = retry_function(unstable_function, max_retries=3, delay=1)
【実行結果】 2024-01-15 10:30:00,123 – WARNING – ❌ 失敗(1回目): 一時的なエラー(1回目) 2024-01-15 10:30:00,124 – INFO – ⏳ 1秒後にリトライします… 2024-01-15 10:30:01,125 – WARNING – ❌ 失敗(2回目): 一時的なエラー(2回目) 2024-01-15 10:30:01,126 – INFO – ⏳ 1秒後にリトライします… 2024-01-15 10:30:02,127 – INFO – ✅ 成功(3回目)

4-3. デコレータを使ったリトライ(実務向け)

# ===== リトライデコレータ ===== import time import logging from functools import wraps logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’) logger = logging.getLogger(__name__) def retry(max_retries=3, delay=1, backoff=2): “”” リトライデコレータ Parameters: ———– max_retries : int 最大リトライ回数 delay : int 初回リトライまでの待機時間(秒) backoff : int リトライごとの待機時間の倍率(指数バックオフ) “”” def decorator(func): @wraps(func) def wrapper(*args, **kwargs): current_delay = delay for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: logger.warning(f”❌ {func.__name__} 失敗({attempt + 1}回目): {e}”) if attempt < max_retries - 1: logger.info(f"⏳ {current_delay}秒後にリトライ...") time.sleep(current_delay) current_delay *= backoff # 待機時間を倍に else: logger.error(f"❌ {func.__name__} が{max_retries}回失敗しました") raise return wrapper return decorator # 使用例 @retry(max_retries=3, delay=1, backoff=2) def fetch_data_from_api(): """APIからデータ取得(リトライ付き)""" import random if random.random() < 0.7: # 70%の確率で失敗 raise Exception("API接続エラー") logger.info("✅ APIからデータ取得成功") return {"data": [1, 2, 3]} # 実行 data = fetch_data_from_api()
【実行結果】 2024-01-15 10:30:00,123 – WARNING – ❌ fetch_data_from_api 失敗(1回目): API接続エラー 2024-01-15 10:30:00,124 – INFO – ⏳ 1秒後にリトライ… 2024-01-15 10:30:01,125 – WARNING – ❌ fetch_data_from_api 失敗(2回目): API接続エラー 2024-01-15 10:30:01,126 – INFO – ⏳ 2秒後にリトライ… 2024-01-15 10:30:03,127 – INFO – ✅ APIからデータ取得成功

4-4. tenacityライブラリを使ったリトライ

pip install tenacity
# ===== tenacityライブラリを使用 ===== from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import logging logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’) logger = logging.getLogger(__name__) # 特定の例外のみリトライ @retry( stop=stop_after_attempt(3), # 最大3回 wait=wait_exponential(multiplier=1), # 指数バックオフ(1秒、2秒、4秒…) retry=retry_if_exception_type(ConnectionError), # ConnectionErrorのみリトライ reraise=True ) def connect_to_database(): “””データベース接続(リトライ付き)””” logger.info(“データベースに接続中…”) import random if random.random() < 0.5: raise ConnectionError("接続タイムアウト") logger.info("✅ データベース接続成功") return "connected" # 実行 try: result = connect_to_database() except ConnectionError: logger.error("データベース接続に失敗しました")
📊 リトライ戦略の比較
戦略 待機時間 特徴
固定間隔 1秒 → 1秒 → 1秒 シンプル、短期的なエラー向け
指数バックオフ 1秒 → 2秒 → 4秒 サーバー負荷軽減、推奨
ジッター付き 1.2秒 → 2.5秒 → 4.1秒 同時リトライの衝突回避

📋 5. 実践演習:堅牢なETLスクリプト

5-1. すべてを組み合わせた完全なETLパイプライン

# ===== 堅牢なETLパイプライン ===== import pandas as pd import logging from logging.handlers import RotatingFileHandler import time from functools import wraps # ===================================== # 1. ロギング設定 # ===================================== def setup_logger(name, log_file, level=logging.INFO): “””ロガーのセットアップ””” logger = logging.getLogger(name) logger.setLevel(level) # フォーマット formatter = logging.Formatter( ‘%(asctime)s – %(levelname)s – %(funcName)s – %(message)s’, datefmt=’%Y-%m-%d %H:%M:%S’ ) # ファイルハンドラ file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5, encoding=’utf-8′ ) file_handler.setFormatter(formatter) # コンソールハンドラ console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger # ロガーを作成 logger = setup_logger(‘etl_pipeline’, ‘etl_pipeline.log’) # ===================================== # 2. リトライデコレータ # ===================================== def retry(max_retries=3, delay=1, backoff=2): “””リトライデコレータ””” def decorator(func): @wraps(func) def wrapper(*args, **kwargs): current_delay = delay for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: logger.warning(f”リトライ {attempt + 1}/{max_retries}: {e}”) if attempt < max_retries - 1: time.sleep(current_delay) current_delay *= backoff else: raise return wrapper return decorator # ===================================== # 3. ETL関数 # ===================================== @retry(max_retries=3, delay=1) def extract_data(file_path): """データ抽出""" logger.info(f"データ抽出開始: {file_path}") try: df = pd.read_csv(file_path) logger.info(f"✅ データ抽出成功: {len(df)}件") return df except FileNotFoundError: logger.error(f"❌ ファイルが見つかりません: {file_path}") raise def transform_data(df): """データ変換""" logger.info("データ変換開始") try: # 欠損値チェック missing_count = df.isnull().sum().sum() if missing_count > 0: logger.warning(f”⚠️ 欠損値が{missing_count}個見つかりました”) df = df.fillna(0) logger.info(“欠損値を0で補完しました”) # 変換処理 if ‘金額’ in df.columns: df[‘金額’] = df[‘金額’] * 1.1 # 税込み価格に変換 logger.info(f”✅ データ変換成功: {len(df)}件”) return df except KeyError as e: logger.error(f”❌ 必要なカラムがありません: {e}”) raise @retry(max_retries=3, delay=2) def load_data(df, output_path): “””データロード””” logger.info(f”データロード開始: {output_path}”) try: df.to_csv(output_path, index=False, encoding=’utf-8-sig’) logger.info(f”✅ データロード成功: {len(df)}件”) return True except PermissionError: logger.error(f”❌ 書き込み権限がありません: {output_path}”) raise # ===================================== # 4. メイン処理 # ===================================== def run_etl_pipeline(input_file, output_file): “””ETLパイプライン実行””” logger.info(“=” * 50) logger.info(“ETLパイプライン開始”) logger.info(“=” * 50) start_time = time.time() try: # Extract df = extract_data(input_file) # Transform df = transform_data(df) # Load load_data(df, output_file) # 実行時間 elapsed_time = time.time() – start_time logger.info(“=” * 50) logger.info(f”✅ ETLパイプライン完了({elapsed_time:.2f}秒)”) logger.info(“=” * 50) return True except Exception as e: elapsed_time = time.time() – start_time logger.error(“=” * 50) logger.error(f”❌ ETLパイプライン失敗({elapsed_time:.2f}秒)”) logger.error(f”エラー内容: {e}”) logger.error(“=” * 50) return False # ===================================== # 5. 実行 # ===================================== if __name__ == ‘__main__’: # テストデータを作成 test_df = pd.DataFrame({ ‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’], ‘金額’: [100, 150, 80] }) test_df.to_csv(‘input.csv’, index=False) # ETL実行 success = run_etl_pipeline(‘input.csv’, ‘output.csv’)
【実行結果】 2024-01-15 10:30:00 – INFO – run_etl_pipeline – ================================================== 2024-01-15 10:30:00 – INFO – run_etl_pipeline – ETLパイプライン開始 2024-01-15 10:30:00 – INFO – run_etl_pipeline – ================================================== 2024-01-15 10:30:00 – INFO – extract_data – データ抽出開始: input.csv 2024-01-15 10:30:00 – INFO – extract_data – ✅ データ抽出成功: 3件 2024-01-15 10:30:00 – INFO – transform_data – データ変換開始 2024-01-15 10:30:00 – INFO – transform_data – ✅ データ変換成功: 3件 2024-01-15 10:30:00 – INFO – load_data – データロード開始: output.csv 2024-01-15 10:30:00 – INFO – load_data – ✅ データロード成功: 3件 2024-01-15 10:30:00 – INFO – run_etl_pipeline – ================================================== 2024-01-15 10:30:00 – INFO – run_etl_pipeline – ✅ ETLパイプライン完了(0.15秒) 2024-01-15 10:30:00 – INFO – run_etl_pipeline – ==================================================

📝 STEP 17 のまとめ

✅ このステップで学んだこと
  • try-except:エラーをキャッチして処理を続行
  • logging:ログレベルを使い分けてファイルに記録
  • リトライ:一時的なエラーに対して自動再試行
  • 堅牢なETL:すべてを組み合わせた実践的なコード
💡 重要ポイント
  • printではなくloggingモジュールを使う
  • 具体的な例外から順にキャッチする
  • 一時的なエラーはリトライする
  • ログファイルはローテーションさせる
  • エラーメッセージはわかりやすく
🎯 次のステップの予告

次のSTEP 18では、「パフォーマンス最適化」を学びます。

  • チャンクによる分割処理
  • マルチプロセス処理
  • メモリ効率化

📝 練習問題

問題 1 基礎

ファイルを読み込む関数にtry-exceptを追加して、FileNotFoundErrorをキャッチしてください。

def read_file(filename): # ここにコードを書いてください pass
【解答】
def read_file(filename): try: with open(filename, ‘r’, encoding=’utf-8′) as f: content = f.read() print(f”✅ ファイル読み込み成功: {filename}”) return content except FileNotFoundError: print(f”❌ ファイルが見つかりません: {filename}”) return None except Exception as e: print(f”❌ 予期しないエラー: {e}”) return None # テスト read_file(‘test.txt’)
問題 2 基礎

loggingモジュールを使って、INFO、WARNING、ERRORのログを出力してください。

【解答】
import logging # 基本設定 logging.basicConfig( level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’ ) # ログ出力 logging.info(“処理を開始しました”) logging.warning(“データに欠損値があります”) logging.error(“データベース接続に失敗しました”)
問題 3 基礎

ログをファイル(app.log)に保存する設定を書いてください。

【解答】
import logging # ファイルとコンソールに出力する設定 logging.basicConfig( level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’, handlers=[ logging.FileHandler(‘app.log’, encoding=’utf-8′), logging.StreamHandler() ] ) # テスト logging.info(“アプリケーションを開始しました”) logging.info(“処理完了”)
問題 4 基礎

基本的なリトライ処理を実装してください。最大3回リトライし、1秒間隔で待機します。

【解答】
import time def simple_retry(func, max_retries=3, delay=1): “””シンプルなリトライ処理””” for attempt in range(max_retries): try: result = func() print(f”✅ 成功({attempt + 1}回目)”) return result except Exception as e: print(f”❌ 失敗({attempt + 1}回目): {e}”) if attempt < max_retries - 1: print(f"⏳ {delay}秒後にリトライ...") time.sleep(delay) else: print("❌ 全てのリトライに失敗しました") raise # テスト def test_func(): import random if random.random() < 0.7: raise Exception("一時的なエラー") return "成功" simple_retry(test_func)
問題 5 応用

ローテーション付きのロガーを設定してください(最大5MB、3世代保持)。

【解答】
import logging from logging.handlers import RotatingFileHandler def setup_rotating_logger(name, log_file): “””ローテーション付きロガーのセットアップ””” logger = logging.getLogger(name) logger.setLevel(logging.DEBUG) formatter = logging.Formatter( ‘%(asctime)s – %(name)s – %(levelname)s – %(message)s’, datefmt=’%Y-%m-%d %H:%M:%S’ ) # ローテーションハンドラ file_handler = RotatingFileHandler( log_file, maxBytes=5*1024*1024, # 5MB backupCount=3, # 3世代 encoding=’utf-8′ ) file_handler.setFormatter(formatter) # コンソールハンドラ console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger # 使用例 logger = setup_rotating_logger(‘my_app’, ‘my_app.log’) logger.info(“アプリケーション開始”)
問題 6 応用

指数バックオフ付きのリトライデコレータを作成してください。

【解答】
import time import logging from functools import wraps logging.basicConfig(level=logging.INFO, format=’%(message)s’) logger = logging.getLogger(__name__) def retry_with_backoff(max_retries=3, initial_delay=1, backoff=2): “””指数バックオフ付きリトライデコレータ””” def decorator(func): @wraps(func) def wrapper(*args, **kwargs): delay = initial_delay for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: logger.warning(f”失敗({attempt + 1}回目): {e}”) if attempt < max_retries - 1: logger.info(f"⏳ {delay}秒後にリトライ...") time.sleep(delay) delay *= backoff # 指数バックオフ else: raise return wrapper return decorator # 使用例 @retry_with_backoff(max_retries=3, initial_delay=1, backoff=2) def unstable_api_call(): import random if random.random() < 0.6: raise ConnectionError("接続エラー") return "成功" result = unstable_api_call()
問題 7 応用

カスタム例外クラスを作成し、データ検証でエラーを発生させてください。

【解答】
class DataValidationError(Exception): “””データ検証エラー””” def __init__(self, message, field=None): self.message = message self.field = field super().__init__(self.message) def validate_user_data(data): “””ユーザーデータを検証””” try: # 名前の検証 if ‘name’ not in data or not data[‘name’]: raise DataValidationError(“名前は必須です”, field=’name’) # 年齢の検証 if ‘age’ in data: age = int(data[‘age’]) if age < 0 or age > 150: raise DataValidationError(f”年齢が範囲外です: {age}”, field=’age’) # メールの検証 if ‘email’ in data and ‘@’ not in data[‘email’]: raise DataValidationError(“メールアドレスの形式が不正です”, field=’email’) print(“✅ データ検証OK”) return True except DataValidationError as e: print(f”❌ 検証エラー [{e.field}]: {e.message}”) return False except ValueError as e: print(f”❌ 型変換エラー: {e}”) return False # テスト validate_user_data({‘name’: ‘田中’, ‘age’: 25, ‘email’: ‘tanaka@example.com’}) validate_user_data({‘name’: ”, ‘age’: 25}) validate_user_data({‘name’: ‘鈴木’, ‘age’: 200})
問題 8 応用

特定の例外のみリトライするデコレータを作成してください。

【解答】
import time from functools import wraps def retry_on_exception(exception_types, max_retries=3, delay=1): “””特定の例外のみリトライするデコレータ””” def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except exception_types as e: print(f”リトライ対象エラー: {e}”) if attempt < max_retries - 1: print(f"⏳ {delay}秒後にリトライ...") time.sleep(delay) else: raise except Exception as e: # リトライ対象外の例外はそのまま発生 print(f"リトライ対象外エラー: {e}") raise return wrapper return decorator # 使用例(ConnectionErrorのみリトライ) @retry_on_exception((ConnectionError, TimeoutError), max_retries=3, delay=1) def fetch_data(): import random r = random.random() if r < 0.3: raise ConnectionError("接続エラー") elif r < 0.5: raise ValueError("値エラー") # これはリトライされない return "成功" fetch_data()
問題 9 発展

エラーハンドリングとロギングを組み込んだ完全なETLパイプラインを作成してください。

【解答】
import pandas as pd import logging from logging.handlers import RotatingFileHandler import time from functools import wraps class ETLPipeline: def __init__(self, log_file=’etl.log’): self.logger = self._setup_logger(log_file) def _setup_logger(self, log_file): logger = logging.getLogger(‘etl’) logger.setLevel(logging.DEBUG) formatter = logging.Formatter( ‘%(asctime)s – %(levelname)s – %(message)s’ ) file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5 ) file_handler.setFormatter(formatter) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger def _retry(self, func, max_retries=3, delay=1): for attempt in range(max_retries): try: return func() except Exception as e: self.logger.warning(f”リトライ {attempt + 1}/{max_retries}: {e}”) if attempt < max_retries - 1: time.sleep(delay) else: raise def extract(self, file_path): self.logger.info(f"Extract開始: {file_path}") try: df = pd.read_csv(file_path) self.logger.info(f"✅ Extract完了: {len(df)}件") return df except Exception as e: self.logger.error(f"❌ Extractエラー: {e}") raise def transform(self, df): self.logger.info("Transform開始") try: missing = df.isnull().sum().sum() if missing > 0: self.logger.warning(f”欠損値: {missing}個”) df = df.fillna(0) self.logger.info(f”✅ Transform完了: {len(df)}件”) return df except Exception as e: self.logger.error(f”❌ Transformエラー: {e}”) raise def load(self, df, output_path): self.logger.info(f”Load開始: {output_path}”) try: df.to_csv(output_path, index=False, encoding=’utf-8-sig’) self.logger.info(f”✅ Load完了: {len(df)}件”) except Exception as e: self.logger.error(f”❌ Loadエラー: {e}”) raise def run(self, input_file, output_file): self.logger.info(“=” * 40) self.logger.info(“ETLパイプライン開始”) start = time.time() try: df = self.extract(input_file) df = self.transform(df) self.load(df, output_file) elapsed = time.time() – start self.logger.info(f”✅ 完了({elapsed:.2f}秒)”) self.logger.info(“=” * 40) return True except Exception as e: self.logger.error(f”❌ 失敗: {e}”) return False # 使用例 pipeline = ETLPipeline() pipeline.run(‘input.csv’, ‘output.csv’)
問題 10 発展

エラー通知機能(コンソール出力)を持つETLパイプラインを作成してください。

【解答】
import logging import time from datetime import datetime class ETLWithNotification: def __init__(self): self.logger = self._setup_logger() self.errors = [] self.warnings = [] def _setup_logger(self): logging.basicConfig( level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’ ) return logging.getLogger(‘etl_notify’) def _notify_error(self, error_message): “””エラー通知(実際はSlack/メール等)””” notification = f””” ======================================== 🚨 ETLエラー通知 —————————————- 時刻: {datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’)} エラー: {error_message} ======================================== “”” print(notification) self.errors.append(error_message) def _notify_summary(self, success, elapsed_time): “””完了通知””” status = “✅ 成功” if success else “❌ 失敗” notification = f””” ======================================== 📊 ETL実行サマリー —————————————- ステータス: {status} 実行時間: {elapsed_time:.2f}秒 警告数: {len(self.warnings)}件 エラー数: {len(self.errors)}件 ======================================== “”” print(notification) def run(self, process_func): “””ETL実行(通知付き)””” start = time.time() success = False try: process_func() success = True except Exception as e: self._notify_error(str(e)) finally: elapsed = time.time() – start self._notify_summary(success, elapsed) return success # 使用例 etl = ETLWithNotification() def my_etl_process(): print(“処理中…”) # 意図的にエラーを発生 raise ValueError(“データ形式が不正です”) etl.run(my_etl_process)
📝

学習メモ

ETL・データパイプライン構築 - Step 17

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE