🛡️ STEP 17: エラーハンドリングとロギング
エラーに強く、問題を追跡できるETLパイプラインを作ろう
📋 このステップで学ぶこと
- try-except の実践的な使い方
- loggingモジュールの活用
- エラー発生時のリトライ戦略
- ログファイルの設計
- 実践演習:堅牢なETLスクリプト作成
⏱️ 学習時間の目安:2時間
📝 練習問題:10問(基礎4問・応用4問・発展2問)
🎯 1. なぜエラーハンドリングとロギングが重要なのか?
1-1. 実務でのよくある問題
ETLパイプラインは夜間バッチで自動実行されることが多いです。
朝出社したら「エラーで止まっていた…」なんてことも。
📚 例え話:飛行機のブラックボックス
飛行機には「ブラックボックス」という記録装置がありますよね。
事故が起きたとき、何が起きたかを後から調べるためです。
・エラーハンドリング = トラブル時の「緊急対応マニュアル」
・ロギング = 飛行機の「ブラックボックス」
どちらも「何か起きたときに対処できる」ための備えです。
😱 エラーハンドリングがないとこうなる
- エラーが出ても気づかない
- どこで止まったかわからない
- 原因を調べるのに時間がかかる
- データが中途半端な状態になる
✅ エラーハンドリングとロギングがあると
- エラーが出ても処理を続行できる
- 問題箇所がすぐにわかる
- エラー内容が記録される
- 自動でリトライできる
- メールやSlackで通知できる
1-2. エラーの種類と対処方法
エラーの種類と対処方法
| エラーの種類 | 対処方法 | 具体例 |
|---|---|---|
| 一時的なエラー | リトライする | ネットワークエラー、DB接続タイムアウト |
| 修正可能なエラー | 記録してスキップ | データ型の不一致、欠損値の多いレコード |
| 致命的なエラー | 処理を停止 | 認証エラー、ディスク容量不足 |
🔧 2. try-except の実践的な使い方
2-1. 基本的な try-except
try-exceptは、エラーが出てもプログラムを止めない仕組みです。
# ===== エラーハンドリングの比較 =====
import pandas as pd
# ❌ エラーハンドリングなし(エラーで止まる)
def load_csv_bad(filename):
df = pd.read_csv(filename)
return df
# ✅ エラーハンドリングあり(エラーでも止まらない)
def load_csv_good(filename):
try:
df = pd.read_csv(filename)
print(f”✅ ファイル読み込み成功: {filename}”)
return df
except FileNotFoundError:
print(f”❌ ファイルが見つかりません: {filename}”)
return None
except pd.errors.EmptyDataError:
print(f”❌ ファイルが空です: {filename}”)
return None
except Exception as e:
print(f”❌ 予期しないエラー: {e}”)
return None
# 使用例
df = load_csv_good(‘存在しないファイル.csv’)
【実行結果】
❌ ファイルが見つかりません: 存在しないファイル.csv
2-2. 複数の例外をキャッチ
# ===== 複数の例外をキャッチ =====
def safe_divide(a, b):
“””安全な割り算”””
try:
result = a / b
print(f”✅ 計算成功: {a} ÷ {b} = {result}”)
return result
except ZeroDivisionError:
print(“❌ 0で割ることはできません”)
return None
except TypeError:
print(“❌ 数値以外が入力されました”)
return None
except Exception as e:
print(f”❌ 予期しないエラー: {e}”)
return None
# テスト
safe_divide(10, 2) # 成功
safe_divide(10, 0) # ZeroDivisionError
safe_divide(10, “a”) # TypeError
【実行結果】
✅ 計算成功: 10 ÷ 2 = 5.0
❌ 0で割ることはできません
❌ 数値以外が入力されました
2-3. finally で必ずクリーンアップ
finallyブロックは、エラーが出ても出なくても必ず実行されます。
# ===== finallyで必ずクリーンアップ =====
def read_file_with_cleanup(filename):
“””ファイルを読んで必ずクローズする”””
f = None
try:
f = open(filename, ‘r’)
content = f.read()
print(“✅ ファイル読み込み成功”)
return content
except FileNotFoundError:
print(“❌ ファイルが見つかりません”)
return None
finally:
# エラーが出ても出なくても、ファイルをクローズ
if f:
f.close()
print(“🔒 ファイルをクローズしました”)
# 使用例
read_file_with_cleanup(‘test.txt’)
【実行結果】
❌ ファイルが見つかりません
🔒 ファイルをクローズしました
2-4. with文を使った自動クリーンアップ(推奨)
# ===== with文を使うと自動でクローズされる =====
def read_file_best(filename):
try:
with open(filename, ‘r’) as f:
content = f.read()
print(“✅ ファイル読み込み成功”)
return content
except FileNotFoundError:
print(“❌ ファイルが見つかりません”)
return None
# ファイルは自動でクローズされる
💡 例外処理のベストプラクティス
- 具体的な例外から先にキャッチする
- Exceptionは最後に書く(汎用的すぎる)
- エラーメッセージはわかりやすく
- with文を使ってリソースを自動管理
2-5. カスタム例外の作成
# ===== カスタム例外クラス =====
class DataValidationError(Exception):
“””データ検証エラー”””
pass
def validate_age(age):
“””年齢を検証”””
try:
age = int(age)
if age < 0 or age > 150:
raise DataValidationError(f”年齢が範囲外です: {age}”)
print(f”✅ 年齢検証OK: {age}”)
return age
except ValueError:
print(f”❌ 年齢が数値ではありません: {age}”)
return None
except DataValidationError as e:
print(f”❌ {e}”)
return None
# テスト
validate_age(25) # OK
validate_age(200) # 範囲外
validate_age(“abc”) # 数値ではない
【実行結果】
✅ 年齢検証OK: 25
❌ 年齢が範囲外です: 200
❌ 年齢が数値ではありません: abc
📝 3. loggingモジュールの活用
3-1. なぜprintではなくloggingを使うのか?
printとloggingの比較
| 機能 | logging | |
|---|---|---|
| ログレベル | なし | DEBUG, INFO, WARNING, ERROR, CRITICAL |
| ファイル保存 | できない | 簡単にできる |
| タイムスタンプ | 手動で追加 | 自動で追加 |
| 本番/開発の切替 | 難しい | 簡単 |
3-2. loggingの基本
# ===== loggingの基本 =====
import logging
# ロガーの設定
logging.basicConfig(
level=logging.INFO,
format=’%(asctime)s – %(levelname)s – %(message)s’
)
# ログ出力
logging.debug(“デバッグ情報”) # 表示されない(INFOレベル以上のみ)
logging.info(“処理開始”)
logging.warning(“警告メッセージ”)
logging.error(“エラーが発生”)
logging.critical(“致命的なエラー”)
【実行結果】
2024-01-15 10:30:00,123 – INFO – 処理開始
2024-01-15 10:30:00,124 – WARNING – 警告メッセージ
2024-01-15 10:30:00,125 – ERROR – エラーが発生
2024-01-15 10:30:00,126 – CRITICAL – 致命的なエラー
3-3. ログレベルの使い分け
ログレベルの意味と使い分け
| レベル | 数値 | 使い方 |
|---|---|---|
| DEBUG | 10 | 開発時のデバッグ情報(詳細すぎる情報) |
| INFO | 20 | 通常の動作ログ(処理の進捗、件数など) |
| WARNING | 30 | 警告(処理は続行するが注意が必要) |
| ERROR | 40 | エラー(処理は失敗したが続行可能) |
| CRITICAL | 50 | 致命的エラー(システム停止レベル) |
3-4. ファイルにログを保存
# ===== ファイルにログを保存 =====
import logging
# ファイルにログを保存する設定
logging.basicConfig(
level=logging.INFO,
format=’%(asctime)s – %(levelname)s – %(message)s’,
handlers=[
logging.FileHandler(‘etl.log’, encoding=’utf-8′), # ファイルに保存
logging.StreamHandler() # コンソールにも表示
]
)
logging.info(“ETL処理を開始します”)
logging.info(“データを読み込み中…”)
logging.warning(“欠損値が見つかりました”)
logging.info(“ETL処理が完了しました”)
# ログファイルの内容を確認
print(“\n=== ログファイル(etl.log)の内容 ===”)
with open(‘etl.log’, ‘r’, encoding=’utf-8′) as f:
print(f.read())
【実行結果】
2024-01-15 10:30:00,123 – INFO – ETL処理を開始します
2024-01-15 10:30:00,124 – INFO – データを読み込み中…
2024-01-15 10:30:00,125 – WARNING – 欠損値が見つかりました
2024-01-15 10:30:00,126 – INFO – ETL処理が完了しました
=== ログファイル(etl.log)の内容 ===
2024-01-15 10:30:00,123 – INFO – ETL処理を開始します
2024-01-15 10:30:00,124 – INFO – データを読み込み中…
2024-01-15 10:30:00,125 – WARNING – 欠損値が見つかりました
2024-01-15 10:30:00,126 – INFO – ETL処理が完了しました
3-5. ローテーション付きログ(実務向け)
ログファイルが大きくなりすぎないように、自動でローテーションさせます。
# ===== ローテーション付きログ =====
import logging
from logging.handlers import RotatingFileHandler
# ロガーを作成
logger = logging.getLogger(‘etl_pipeline’)
logger.setLevel(logging.DEBUG)
# フォーマットを定義
formatter = logging.Formatter(
‘%(asctime)s – %(name)s – %(levelname)s – %(funcName)s:%(lineno)d – %(message)s’,
datefmt=’%Y-%m-%d %H:%M:%S’
)
# ファイルハンドラ(ローテーション付き)
file_handler = RotatingFileHandler(
‘etl_pipeline.log’,
maxBytes=10*1024*1024, # 10MB
backupCount=5, # 5世代保持
encoding=’utf-8′
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
# コンソールハンドラ
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO) # コンソールはINFO以上のみ
console_handler.setFormatter(formatter)
# ロガーにハンドラを追加
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# 使用例
logger.debug(“デバッグ情報(ファイルのみに記録)”)
logger.info(“処理開始”)
logger.error(“エラー発生”)
【実行結果】
2024-01-15 10:30:00 – etl_pipeline – INFO – <module>:25 – 処理開始
2024-01-15 10:30:00 – etl_pipeline – ERROR – <module>:26 – エラー発生
💡 ローテーションのメリット
- ディスク容量の節約:古いログが自動削除される
- ログの管理が楽:日付ごとに分かれる
- パフォーマンス維持:巨大なログファイルを避けられる
🔄 4. リトライ戦略
4-1. なぜリトライが必要か?
ネットワークエラーやDB接続タイムアウトなど、一時的なエラーは再試行すると成功することがあります。
4-2. 基本的なリトライ実装
# ===== 基本的なリトライ =====
import time
import logging
logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’)
logger = logging.getLogger(__name__)
def retry_function(func, max_retries=3, delay=1):
“””
関数をリトライする
Parameters:
———–
func : function
実行する関数
max_retries : int
最大リトライ回数
delay : int
リトライ間隔(秒)
“””
for attempt in range(max_retries):
try:
result = func()
logger.info(f”✅ 成功({attempt + 1}回目)”)
return result
except Exception as e:
logger.warning(f”❌ 失敗({attempt + 1}回目): {e}”)
if attempt < max_retries - 1:
logger.info(f"⏳ {delay}秒後にリトライします...")
time.sleep(delay)
else:
logger.error(f"❌ {max_retries}回リトライしましたが失敗しました")
raise
# テスト用の関数(3回目で成功)
attempt_count = 0
def unstable_function():
global attempt_count
attempt_count += 1
if attempt_count < 3:
raise Exception(f"一時的なエラー({attempt_count}回目)")
return "成功!"
# 実行
result = retry_function(unstable_function, max_retries=3, delay=1)
【実行結果】
2024-01-15 10:30:00,123 – WARNING – ❌ 失敗(1回目): 一時的なエラー(1回目)
2024-01-15 10:30:00,124 – INFO – ⏳ 1秒後にリトライします…
2024-01-15 10:30:01,125 – WARNING – ❌ 失敗(2回目): 一時的なエラー(2回目)
2024-01-15 10:30:01,126 – INFO – ⏳ 1秒後にリトライします…
2024-01-15 10:30:02,127 – INFO – ✅ 成功(3回目)
4-3. デコレータを使ったリトライ(実務向け)
# ===== リトライデコレータ =====
import time
import logging
from functools import wraps
logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’)
logger = logging.getLogger(__name__)
def retry(max_retries=3, delay=1, backoff=2):
“””
リトライデコレータ
Parameters:
———–
max_retries : int
最大リトライ回数
delay : int
初回リトライまでの待機時間(秒)
backoff : int
リトライごとの待機時間の倍率(指数バックオフ)
“””
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
current_delay = delay
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
logger.warning(f”❌ {func.__name__} 失敗({attempt + 1}回目): {e}”)
if attempt < max_retries - 1:
logger.info(f"⏳ {current_delay}秒後にリトライ...")
time.sleep(current_delay)
current_delay *= backoff # 待機時間を倍に
else:
logger.error(f"❌ {func.__name__} が{max_retries}回失敗しました")
raise
return wrapper
return decorator
# 使用例
@retry(max_retries=3, delay=1, backoff=2)
def fetch_data_from_api():
"""APIからデータ取得(リトライ付き)"""
import random
if random.random() < 0.7: # 70%の確率で失敗
raise Exception("API接続エラー")
logger.info("✅ APIからデータ取得成功")
return {"data": [1, 2, 3]}
# 実行
data = fetch_data_from_api()
【実行結果】
2024-01-15 10:30:00,123 – WARNING – ❌ fetch_data_from_api 失敗(1回目): API接続エラー
2024-01-15 10:30:00,124 – INFO – ⏳ 1秒後にリトライ…
2024-01-15 10:30:01,125 – WARNING – ❌ fetch_data_from_api 失敗(2回目): API接続エラー
2024-01-15 10:30:01,126 – INFO – ⏳ 2秒後にリトライ…
2024-01-15 10:30:03,127 – INFO – ✅ APIからデータ取得成功
4-4. tenacityライブラリを使ったリトライ
pip install tenacity
# ===== tenacityライブラリを使用 =====
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import logging
logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(levelname)s – %(message)s’)
logger = logging.getLogger(__name__)
# 特定の例外のみリトライ
@retry(
stop=stop_after_attempt(3), # 最大3回
wait=wait_exponential(multiplier=1), # 指数バックオフ(1秒、2秒、4秒…)
retry=retry_if_exception_type(ConnectionError), # ConnectionErrorのみリトライ
reraise=True
)
def connect_to_database():
“””データベース接続(リトライ付き)”””
logger.info(“データベースに接続中…”)
import random
if random.random() < 0.5:
raise ConnectionError("接続タイムアウト")
logger.info("✅ データベース接続成功")
return "connected"
# 実行
try:
result = connect_to_database()
except ConnectionError:
logger.error("データベース接続に失敗しました")
📊 リトライ戦略の比較
| 戦略 | 待機時間 | 特徴 |
|---|---|---|
| 固定間隔 | 1秒 → 1秒 → 1秒 | シンプル、短期的なエラー向け |
| 指数バックオフ | 1秒 → 2秒 → 4秒 | サーバー負荷軽減、推奨 |
| ジッター付き | 1.2秒 → 2.5秒 → 4.1秒 | 同時リトライの衝突回避 |
📋 5. 実践演習:堅牢なETLスクリプト
5-1. すべてを組み合わせた完全なETLパイプライン
# ===== 堅牢なETLパイプライン =====
import pandas as pd
import logging
from logging.handlers import RotatingFileHandler
import time
from functools import wraps
# =====================================
# 1. ロギング設定
# =====================================
def setup_logger(name, log_file, level=logging.INFO):
“””ロガーのセットアップ”””
logger = logging.getLogger(name)
logger.setLevel(level)
# フォーマット
formatter = logging.Formatter(
‘%(asctime)s – %(levelname)s – %(funcName)s – %(message)s’,
datefmt=’%Y-%m-%d %H:%M:%S’
)
# ファイルハンドラ
file_handler = RotatingFileHandler(
log_file,
maxBytes=10*1024*1024,
backupCount=5,
encoding=’utf-8′
)
file_handler.setFormatter(formatter)
# コンソールハンドラ
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
# ロガーを作成
logger = setup_logger(‘etl_pipeline’, ‘etl_pipeline.log’)
# =====================================
# 2. リトライデコレータ
# =====================================
def retry(max_retries=3, delay=1, backoff=2):
“””リトライデコレータ”””
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
current_delay = delay
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
logger.warning(f”リトライ {attempt + 1}/{max_retries}: {e}”)
if attempt < max_retries - 1:
time.sleep(current_delay)
current_delay *= backoff
else:
raise
return wrapper
return decorator
# =====================================
# 3. ETL関数
# =====================================
@retry(max_retries=3, delay=1)
def extract_data(file_path):
"""データ抽出"""
logger.info(f"データ抽出開始: {file_path}")
try:
df = pd.read_csv(file_path)
logger.info(f"✅ データ抽出成功: {len(df)}件")
return df
except FileNotFoundError:
logger.error(f"❌ ファイルが見つかりません: {file_path}")
raise
def transform_data(df):
"""データ変換"""
logger.info("データ変換開始")
try:
# 欠損値チェック
missing_count = df.isnull().sum().sum()
if missing_count > 0:
logger.warning(f”⚠️ 欠損値が{missing_count}個見つかりました”)
df = df.fillna(0)
logger.info(“欠損値を0で補完しました”)
# 変換処理
if ‘金額’ in df.columns:
df[‘金額’] = df[‘金額’] * 1.1 # 税込み価格に変換
logger.info(f”✅ データ変換成功: {len(df)}件”)
return df
except KeyError as e:
logger.error(f”❌ 必要なカラムがありません: {e}”)
raise
@retry(max_retries=3, delay=2)
def load_data(df, output_path):
“””データロード”””
logger.info(f”データロード開始: {output_path}”)
try:
df.to_csv(output_path, index=False, encoding=’utf-8-sig’)
logger.info(f”✅ データロード成功: {len(df)}件”)
return True
except PermissionError:
logger.error(f”❌ 書き込み権限がありません: {output_path}”)
raise
# =====================================
# 4. メイン処理
# =====================================
def run_etl_pipeline(input_file, output_file):
“””ETLパイプライン実行”””
logger.info(“=” * 50)
logger.info(“ETLパイプライン開始”)
logger.info(“=” * 50)
start_time = time.time()
try:
# Extract
df = extract_data(input_file)
# Transform
df = transform_data(df)
# Load
load_data(df, output_file)
# 実行時間
elapsed_time = time.time() – start_time
logger.info(“=” * 50)
logger.info(f”✅ ETLパイプライン完了({elapsed_time:.2f}秒)”)
logger.info(“=” * 50)
return True
except Exception as e:
elapsed_time = time.time() – start_time
logger.error(“=” * 50)
logger.error(f”❌ ETLパイプライン失敗({elapsed_time:.2f}秒)”)
logger.error(f”エラー内容: {e}”)
logger.error(“=” * 50)
return False
# =====================================
# 5. 実行
# =====================================
if __name__ == ‘__main__’:
# テストデータを作成
test_df = pd.DataFrame({
‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’],
‘金額’: [100, 150, 80]
})
test_df.to_csv(‘input.csv’, index=False)
# ETL実行
success = run_etl_pipeline(‘input.csv’, ‘output.csv’)
【実行結果】
2024-01-15 10:30:00 – INFO – run_etl_pipeline – ==================================================
2024-01-15 10:30:00 – INFO – run_etl_pipeline – ETLパイプライン開始
2024-01-15 10:30:00 – INFO – run_etl_pipeline – ==================================================
2024-01-15 10:30:00 – INFO – extract_data – データ抽出開始: input.csv
2024-01-15 10:30:00 – INFO – extract_data – ✅ データ抽出成功: 3件
2024-01-15 10:30:00 – INFO – transform_data – データ変換開始
2024-01-15 10:30:00 – INFO – transform_data – ✅ データ変換成功: 3件
2024-01-15 10:30:00 – INFO – load_data – データロード開始: output.csv
2024-01-15 10:30:00 – INFO – load_data – ✅ データロード成功: 3件
2024-01-15 10:30:00 – INFO – run_etl_pipeline – ==================================================
2024-01-15 10:30:00 – INFO – run_etl_pipeline – ✅ ETLパイプライン完了(0.15秒)
2024-01-15 10:30:00 – INFO – run_etl_pipeline – ==================================================
📝 STEP 17 のまとめ
✅ このステップで学んだこと
- try-except:エラーをキャッチして処理を続行
- logging:ログレベルを使い分けてファイルに記録
- リトライ:一時的なエラーに対して自動再試行
- 堅牢なETL:すべてを組み合わせた実践的なコード
💡 重要ポイント
- printではなくloggingモジュールを使う
- 具体的な例外から順にキャッチする
- 一時的なエラーはリトライする
- ログファイルはローテーションさせる
- エラーメッセージはわかりやすく
🎯 次のステップの予告
次のSTEP 18では、「パフォーマンス最適化」を学びます。
- チャンクによる分割処理
- マルチプロセス処理
- メモリ効率化
📝 練習問題
問題 1
基礎
ファイルを読み込む関数にtry-exceptを追加して、FileNotFoundErrorをキャッチしてください。
def read_file(filename):
# ここにコードを書いてください
pass
【解答】
def read_file(filename):
try:
with open(filename, ‘r’, encoding=’utf-8′) as f:
content = f.read()
print(f”✅ ファイル読み込み成功: {filename}”)
return content
except FileNotFoundError:
print(f”❌ ファイルが見つかりません: {filename}”)
return None
except Exception as e:
print(f”❌ 予期しないエラー: {e}”)
return None
# テスト
read_file(‘test.txt’)
問題 2
基礎
loggingモジュールを使って、INFO、WARNING、ERRORのログを出力してください。
【解答】
import logging
# 基本設定
logging.basicConfig(
level=logging.INFO,
format=’%(asctime)s – %(levelname)s – %(message)s’
)
# ログ出力
logging.info(“処理を開始しました”)
logging.warning(“データに欠損値があります”)
logging.error(“データベース接続に失敗しました”)
問題 3
基礎
ログをファイル(app.log)に保存する設定を書いてください。
【解答】
import logging
# ファイルとコンソールに出力する設定
logging.basicConfig(
level=logging.INFO,
format=’%(asctime)s – %(levelname)s – %(message)s’,
handlers=[
logging.FileHandler(‘app.log’, encoding=’utf-8′),
logging.StreamHandler()
]
)
# テスト
logging.info(“アプリケーションを開始しました”)
logging.info(“処理完了”)
問題 4
基礎
基本的なリトライ処理を実装してください。最大3回リトライし、1秒間隔で待機します。
【解答】
import time
def simple_retry(func, max_retries=3, delay=1):
“””シンプルなリトライ処理”””
for attempt in range(max_retries):
try:
result = func()
print(f”✅ 成功({attempt + 1}回目)”)
return result
except Exception as e:
print(f”❌ 失敗({attempt + 1}回目): {e}”)
if attempt < max_retries - 1:
print(f"⏳ {delay}秒後にリトライ...")
time.sleep(delay)
else:
print("❌ 全てのリトライに失敗しました")
raise
# テスト
def test_func():
import random
if random.random() < 0.7:
raise Exception("一時的なエラー")
return "成功"
simple_retry(test_func)
問題 5
応用
ローテーション付きのロガーを設定してください(最大5MB、3世代保持)。
【解答】
import logging
from logging.handlers import RotatingFileHandler
def setup_rotating_logger(name, log_file):
“””ローテーション付きロガーのセットアップ”””
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
‘%(asctime)s – %(name)s – %(levelname)s – %(message)s’,
datefmt=’%Y-%m-%d %H:%M:%S’
)
# ローテーションハンドラ
file_handler = RotatingFileHandler(
log_file,
maxBytes=5*1024*1024, # 5MB
backupCount=3, # 3世代
encoding=’utf-8′
)
file_handler.setFormatter(formatter)
# コンソールハンドラ
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
# 使用例
logger = setup_rotating_logger(‘my_app’, ‘my_app.log’)
logger.info(“アプリケーション開始”)
問題 6
応用
指数バックオフ付きのリトライデコレータを作成してください。
【解答】
import time
import logging
from functools import wraps
logging.basicConfig(level=logging.INFO, format=’%(message)s’)
logger = logging.getLogger(__name__)
def retry_with_backoff(max_retries=3, initial_delay=1, backoff=2):
“””指数バックオフ付きリトライデコレータ”””
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
delay = initial_delay
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
logger.warning(f”失敗({attempt + 1}回目): {e}”)
if attempt < max_retries - 1:
logger.info(f"⏳ {delay}秒後にリトライ...")
time.sleep(delay)
delay *= backoff # 指数バックオフ
else:
raise
return wrapper
return decorator
# 使用例
@retry_with_backoff(max_retries=3, initial_delay=1, backoff=2)
def unstable_api_call():
import random
if random.random() < 0.6:
raise ConnectionError("接続エラー")
return "成功"
result = unstable_api_call()
問題 7
応用
カスタム例外クラスを作成し、データ検証でエラーを発生させてください。
【解答】
class DataValidationError(Exception):
“””データ検証エラー”””
def __init__(self, message, field=None):
self.message = message
self.field = field
super().__init__(self.message)
def validate_user_data(data):
“””ユーザーデータを検証”””
try:
# 名前の検証
if ‘name’ not in data or not data[‘name’]:
raise DataValidationError(“名前は必須です”, field=’name’)
# 年齢の検証
if ‘age’ in data:
age = int(data[‘age’])
if age < 0 or age > 150:
raise DataValidationError(f”年齢が範囲外です: {age}”, field=’age’)
# メールの検証
if ‘email’ in data and ‘@’ not in data[‘email’]:
raise DataValidationError(“メールアドレスの形式が不正です”, field=’email’)
print(“✅ データ検証OK”)
return True
except DataValidationError as e:
print(f”❌ 検証エラー [{e.field}]: {e.message}”)
return False
except ValueError as e:
print(f”❌ 型変換エラー: {e}”)
return False
# テスト
validate_user_data({‘name’: ‘田中’, ‘age’: 25, ‘email’: ‘tanaka@example.com’})
validate_user_data({‘name’: ”, ‘age’: 25})
validate_user_data({‘name’: ‘鈴木’, ‘age’: 200})
問題 8
応用
特定の例外のみリトライするデコレータを作成してください。
【解答】
import time
from functools import wraps
def retry_on_exception(exception_types, max_retries=3, delay=1):
“””特定の例外のみリトライするデコレータ”””
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except exception_types as e:
print(f”リトライ対象エラー: {e}”)
if attempt < max_retries - 1:
print(f"⏳ {delay}秒後にリトライ...")
time.sleep(delay)
else:
raise
except Exception as e:
# リトライ対象外の例外はそのまま発生
print(f"リトライ対象外エラー: {e}")
raise
return wrapper
return decorator
# 使用例(ConnectionErrorのみリトライ)
@retry_on_exception((ConnectionError, TimeoutError), max_retries=3, delay=1)
def fetch_data():
import random
r = random.random()
if r < 0.3:
raise ConnectionError("接続エラー")
elif r < 0.5:
raise ValueError("値エラー") # これはリトライされない
return "成功"
fetch_data()
問題 9
発展
エラーハンドリングとロギングを組み込んだ完全なETLパイプラインを作成してください。
【解答】
import pandas as pd
import logging
from logging.handlers import RotatingFileHandler
import time
from functools import wraps
class ETLPipeline:
def __init__(self, log_file=’etl.log’):
self.logger = self._setup_logger(log_file)
def _setup_logger(self, log_file):
logger = logging.getLogger(‘etl’)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
‘%(asctime)s – %(levelname)s – %(message)s’
)
file_handler = RotatingFileHandler(
log_file, maxBytes=10*1024*1024, backupCount=5
)
file_handler.setFormatter(formatter)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
def _retry(self, func, max_retries=3, delay=1):
for attempt in range(max_retries):
try:
return func()
except Exception as e:
self.logger.warning(f”リトライ {attempt + 1}/{max_retries}: {e}”)
if attempt < max_retries - 1:
time.sleep(delay)
else:
raise
def extract(self, file_path):
self.logger.info(f"Extract開始: {file_path}")
try:
df = pd.read_csv(file_path)
self.logger.info(f"✅ Extract完了: {len(df)}件")
return df
except Exception as e:
self.logger.error(f"❌ Extractエラー: {e}")
raise
def transform(self, df):
self.logger.info("Transform開始")
try:
missing = df.isnull().sum().sum()
if missing > 0:
self.logger.warning(f”欠損値: {missing}個”)
df = df.fillna(0)
self.logger.info(f”✅ Transform完了: {len(df)}件”)
return df
except Exception as e:
self.logger.error(f”❌ Transformエラー: {e}”)
raise
def load(self, df, output_path):
self.logger.info(f”Load開始: {output_path}”)
try:
df.to_csv(output_path, index=False, encoding=’utf-8-sig’)
self.logger.info(f”✅ Load完了: {len(df)}件”)
except Exception as e:
self.logger.error(f”❌ Loadエラー: {e}”)
raise
def run(self, input_file, output_file):
self.logger.info(“=” * 40)
self.logger.info(“ETLパイプライン開始”)
start = time.time()
try:
df = self.extract(input_file)
df = self.transform(df)
self.load(df, output_file)
elapsed = time.time() – start
self.logger.info(f”✅ 完了({elapsed:.2f}秒)”)
self.logger.info(“=” * 40)
return True
except Exception as e:
self.logger.error(f”❌ 失敗: {e}”)
return False
# 使用例
pipeline = ETLPipeline()
pipeline.run(‘input.csv’, ‘output.csv’)
問題 10
発展
エラー通知機能(コンソール出力)を持つETLパイプラインを作成してください。
【解答】
import logging
import time
from datetime import datetime
class ETLWithNotification:
def __init__(self):
self.logger = self._setup_logger()
self.errors = []
self.warnings = []
def _setup_logger(self):
logging.basicConfig(
level=logging.INFO,
format=’%(asctime)s – %(levelname)s – %(message)s’
)
return logging.getLogger(‘etl_notify’)
def _notify_error(self, error_message):
“””エラー通知(実際はSlack/メール等)”””
notification = f”””
========================================
🚨 ETLエラー通知
—————————————-
時刻: {datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’)}
エラー: {error_message}
========================================
“””
print(notification)
self.errors.append(error_message)
def _notify_summary(self, success, elapsed_time):
“””完了通知”””
status = “✅ 成功” if success else “❌ 失敗”
notification = f”””
========================================
📊 ETL実行サマリー
—————————————-
ステータス: {status}
実行時間: {elapsed_time:.2f}秒
警告数: {len(self.warnings)}件
エラー数: {len(self.errors)}件
========================================
“””
print(notification)
def run(self, process_func):
“””ETL実行(通知付き)”””
start = time.time()
success = False
try:
process_func()
success = True
except Exception as e:
self._notify_error(str(e))
finally:
elapsed = time.time() – start
self._notify_summary(success, elapsed)
return success
# 使用例
etl = ETLWithNotification()
def my_etl_process():
print(“処理中…”)
# 意図的にエラーを発生
raise ValueError(“データ形式が不正です”)
etl.run(my_etl_process)
学習メモ
ETL・データパイプライン構築 - Step 17
📋 過去のメモ一覧
▼