🔧 STEP 26: プロジェクト② ETLパイプラインのコンテナ化
既存のPythonスクリプトをDocker化して本番レディに
📋 このプロジェクトのゴール
- ローカルで動いているPythonスクリプトをDockerコンテナ化
- 環境変数で設定を柔軟に管理
- ヘルスチェックとリトライロジックの実装
- ログ出力の最適化(標準出力 + ファイル)
- データ品質チェックの組み込み
- エラーハンドリングの実装
目標:本番環境にデプロイ可能な堅牢なETLコンテナを構築
学習時間の目安: 3時間
🔧 0. このプロジェクトの前提知識
📚 これまでの学習の復習
- Dockerfile:マルチステージビルド(STEP 14)
- 環境変数:.envファイル管理(STEP 21)
- Docker Compose:ヘルスチェック、depends_on(STEP 15-18)
- リソース制限:メモリ、CPU制限(STEP 24)
- Python:pandas、SQLAlchemy基礎
0-1. 作業ディレクトリの準備
mkdir -p ~/docker-practice/step26
cd ~/docker-practice/step26
pwd
0-2. ETLパイプラインのアーキテクチャ
【ETLパイプライン アーキテクチャ】
┌─────────────────────────────────────────────────────────────────────┐
│ ETL パイプライン │
│ │
│ 【Extract】 【Transform】 【Load】 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ CSV │───────→│ 変換 │───────→│PostgreSQL│ │
│ │ ファイル │ │ 処理 │ │ DB │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │
│ ↓ ↓ ↓ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ファイル読込│ │品質チェック│ │バッチ挿入 │ │
│ │エンコード │ │重複削除 │ │リトライ │ │
│ │検証 │ │型変換 │ │トランザクション│ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ 【共通機能】 │
│ ├── 📝 ログ出力(標準出力 + ファイル) │
│ ├── ⚙️ 環境変数による設定管理 │
│ ├── 🔄 リトライロジック(3回まで自動再試行) │
│ └── 📊 集計レポート生成 │
│ │
└─────────────────────────────────────────────────────────────────────┘
0-3. プロジェクトシナリオ
📊 シナリオ設定
あなたは以下のようなPythonスクリプトをローカルで運用しています:
- CSVファイルから売上データを読み込み
- データクレンジングと変換処理
- PostgreSQLデータベースへ挿入
- 集計レポートの生成
これをDockerコンテナ化して、どこでも同じように動作する堅牢なシステムを構築します。
0-4. 実装するベストプラクティス一覧
| カテゴリ | 実装内容 | 目的 |
|---|---|---|
| 設定管理 | 環境変数 + .envファイル | 環境ごとの設定切り替え |
| エラー処理 | try-except + リトライ | 一時的な障害への対応 |
| ログ管理 | 標準出力 + ファイル出力 | 監視とデバッグ |
| 品質保証 | DataValidatorクラス | データ品質の担保 |
| セキュリティ | 非rootユーザー実行 | コンテナセキュリティ |
| 効率化 | マルチステージビルド | イメージサイズ削減 |
| 監視 | ヘルスチェック | コンテナ健全性確認 |
| リソース | CPU/メモリ制限 | リソース消費の制御 |
📂 1. プロジェクト構造の作成
1-1. ディレクトリ作成
# プロジェクトディレクトリ作成
mkdir -p etl-pipeline-docker
cd etl-pipeline-docker
# サブディレクトリ作成
mkdir -p src
mkdir -p data/input
mkdir -p data/output
mkdir -p logs
mkdir -p tests
# 構造確認
tree -L 2 . || find . -type d
📁 最終的なディレクトリ構造
etl-pipeline-docker/
├── Dockerfile # コンテナイメージ定義
├── docker-compose.yml # マルチコンテナ構成
├── .env # 環境変数(Git管理外)
├── .dockerignore # ビルド除外ファイル
├── .gitignore # Git除外ファイル
├── requirements.txt # Python依存関係
├── init.sql # DB初期化スクリプト
├── src/
│ ├── __init__.py # パッケージ初期化
│ ├── config.py # 設定管理
│ ├── logger.py # ログ管理
│ ├── validator.py # データ品質チェック
│ └── etl.py # メインETLロジック
├── data/
│ ├── input/ # 入力CSVファイル
│ │ └── sales_data.csv
│ └── output/ # 出力レポート
├── logs/ # ログファイル
└── tests/
└── test_etl.py # テストコード
🔐 2. 環境変数と依存関係の設定
2-1. 環境変数ファイル(.env)
# .envファイルを作成
cat > .env << ‘EOF’
# データベース設定
DB_HOST=postgres
DB_PORT=5432
DB_NAME=datawarehouse
DB_USER=dataeng
DB_PASSWORD=secure_password_123
# ETL設定
BATCH_SIZE=1000
MAX_RETRIES=3
# パス設定
INPUT_DIR=/data/input
OUTPUT_DIR=/data/output
LOG_DIR=/logs
EOF
cat .env
2-2. Python依存関係(requirements.txt)
# requirements.txtを作成
cat > requirements.txt << ‘EOF’
pandas==2.1.4
psycopg2-binary==2.9.9
python-dotenv==1.0.0
sqlalchemy==2.0.23
retry==0.9.2
EOF
cat requirements.txt
2-3. Git/Docker除外ファイル
# .gitignoreを作成
cat > .gitignore << ‘EOF’
.env
__pycache__/
*.pyc
logs/*
data/output/*
.pytest_cache/
EOF
# .dockerignoreを作成
cat > .dockerignore << ‘EOF’
.git
.gitignore
.env
__pycache__
*.pyc
*.pyo
.pytest_cache
.vscode
.idea
logs/*
data/output/*
tests/*
README.md
*.md
EOF
🐍 3. Pythonコードの作成
3-1. パッケージ初期化(src/__init__.py)
# __init__.pyを作成
cat > src/__init__.py << ‘EOF’
“””ETL Pipeline Package”””
__version__ = ‘1.0.0’
EOF
3-2. 設定管理(src/config.py)
# config.pyを作成
cat > src/config.py << ‘EOF’
“””設定管理モジュール”””
import os
from dotenv import load_dotenv
# .envファイルを読み込み
load_dotenv()
class Config:
“””環境変数から設定を読み込むクラス”””
# データベース設定
DB_HOST = os.getenv(‘DB_HOST’, ‘localhost’)
DB_PORT = int(os.getenv(‘DB_PORT’, 5432))
DB_NAME = os.getenv(‘DB_NAME’, ‘datawarehouse’)
DB_USER = os.getenv(‘DB_USER’, ‘dataeng’)
DB_PASSWORD = os.getenv(‘DB_PASSWORD’, ”)
# ファイルパス設定
INPUT_DIR = os.getenv(‘INPUT_DIR’, ‘/data/input’)
OUTPUT_DIR = os.getenv(‘OUTPUT_DIR’, ‘/data/output’)
LOG_DIR = os.getenv(‘LOG_DIR’, ‘/logs’)
# ETL設定
BATCH_SIZE = int(os.getenv(‘BATCH_SIZE’, 1000))
MAX_RETRIES = int(os.getenv(‘MAX_RETRIES’, 3))
@classmethod
def get_db_url(cls):
“””SQLAlchemy用のデータベース接続URLを生成”””
return (
f”postgresql://{cls.DB_USER}:{cls.DB_PASSWORD}”
f”@{cls.DB_HOST}:{cls.DB_PORT}/{cls.DB_NAME}”
)
@classmethod
def validate(cls):
“””必須設定の検証”””
required = [‘DB_PASSWORD’, ‘DB_HOST’, ‘DB_NAME’]
missing = [key for key in required if not os.getenv(key)]
if missing:
raise ValueError(
f”必須の環境変数が設定されていません: {‘, ‘.join(missing)}”
)
@classmethod
def print_config(cls):
“””設定内容を表示(デバッグ用)”””
print(“=== 設定情報 ===”)
print(f”DB_HOST: {cls.DB_HOST}”)
print(f”DB_PORT: {cls.DB_PORT}”)
print(f”DB_NAME: {cls.DB_NAME}”)
print(f”DB_USER: {cls.DB_USER}”)
print(f”BATCH_SIZE: {cls.BATCH_SIZE}”)
print(f”MAX_RETRIES: {cls.MAX_RETRIES}”)
print(“================”)
if __name__ == ‘__main__’:
Config.print_config()
EOF
3-3. ログ管理(src/logger.py)
# logger.pyを作成
cat > src/logger.py << ‘EOF’
“””ログ管理モジュール”””
import logging
import sys
from pathlib import Path
from datetime import datetime
# 設定をインポート(循環インポート防止のため遅延)
def get_log_dir():
from src.config import Config
return Config.LOG_DIR
def setup_logger(name=’etl’):
“””
ロガーのセットアップ
– 標準出力: INFO以上
– ファイル出力: DEBUG以上
“””
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
# すでにハンドラーが設定されていれば追加しない
if logger.handlers:
return logger
# ログフォーマット
formatter = logging.Formatter(
‘%(asctime)s – %(name)s – %(levelname)s – %(message)s’,
datefmt=’%Y-%m-%d %H:%M:%S’
)
# コンソール出力(標準出力)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# ファイル出力
try:
log_dir = Path(get_log_dir())
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f”etl_{datetime.now().strftime(‘%Y%m%d’)}.log”
file_handler = logging.FileHandler(log_file, encoding=’utf-8′)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
except Exception as e:
logger.warning(f”ファイルログの設定に失敗: {e}”)
return logger
if __name__ == ‘__main__’:
logger = setup_logger(‘test’)
logger.info(“INFOレベルのログ”)
logger.debug(“DEBUGレベルのログ”)
logger.warning(“WARNINGレベルのログ”)
EOF
3-4. データ品質チェック(src/validator.py)
# validator.pyを作成
cat > src/validator.py << ‘EOF’
“””データ品質チェックモジュール”””
import pandas as pd
from src.logger import setup_logger
logger = setup_logger(‘validator’)
class DataValidator:
“””データ品質チェッククラス”””
@staticmethod
def validate_sales_data(df: pd.DataFrame) -> tuple:
“””
売上データの検証
Args:
df: 検証対象のDataFrame
Returns:
tuple: (is_valid: bool, errors: list)
“””
errors = []
# 1. 必須カラムチェック
required_columns = [‘sale_date’, ‘product_name’, ‘quantity’, ‘unit_price’]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
errors.append(f”必須カラムが不足: {missing_columns}”)
return False, errors # 必須カラムがなければ以降のチェック不可
# 2. NULL値チェック
null_counts = df[required_columns].isnull().sum()
null_cols = null_counts[null_counts > 0]
if len(null_cols) > 0:
errors.append(f”NULL値が存在: {null_cols.to_dict()}”)
# 3. 数値の妥当性チェック
if ‘quantity’ in df.columns:
invalid_qty = df[df[‘quantity’] <= 0]
if len(invalid_qty) > 0:
errors.append(f”無効な数量(0以下): {len(invalid_qty)}件”)
if ‘unit_price’ in df.columns:
invalid_price = df[df[‘unit_price’] < 0]
if len(invalid_price) > 0:
errors.append(f”負の単価: {len(invalid_price)}件”)
# 4. 日付フォーマットチェック
if ‘sale_date’ in df.columns:
try:
pd.to_datetime(df[‘sale_date’])
except Exception as e:
errors.append(f”日付フォーマットエラー: {str(e)}”)
# 結果判定
is_valid = len(errors) == 0
if is_valid:
logger.info(f”✅ データ品質チェック完了: {len(df)}件のレコードが正常”)
else:
logger.error(“❌ データ品質エラー検出:”)
for error in errors:
logger.error(f” – {error}”)
return is_valid, errors
@staticmethod
def remove_duplicates(df: pd.DataFrame) -> pd.DataFrame:
“””
重複データの削除
Args:
df: 対象のDataFrame
Returns:
DataFrame: 重複削除後のDataFrame
“””
initial_count = len(df)
df_clean = df.drop_duplicates()
removed_count = initial_count – len(df_clean)
if removed_count > 0:
logger.warning(f”⚠️ {removed_count}件の重複レコードを削除”)
else:
logger.info(“✅ 重複レコードなし”)
return df_clean
if __name__ == ‘__main__’:
# テスト用
test_data = pd.DataFrame({
‘sale_date’: [‘2024-01-01’, ‘2024-01-02’],
‘product_name’: [‘Test’, ‘Test2’],
‘quantity’: [1, 2],
‘unit_price’: [100, 200]
})
validator = DataValidator()
is_valid, errors = validator.validate_sales_data(test_data)
print(f”Valid: {is_valid}, Errors: {errors}”)
EOF
3-5. メインETLロジック(src/etl.py)
# etl.pyを作成
cat > src/etl.py << ‘EOF’
“””メインETLロジック”””
import pandas as pd
from sqlalchemy import create_engine, text
from pathlib import Path
from datetime import datetime
from retry import retry
from src.config import Config
from src.logger import setup_logger
from src.validator import DataValidator
logger = setup_logger(‘etl’)
class SalesETL:
“””売上データETLクラス”””
def __init__(self):
“””初期化”””
Config.validate()
self.engine = create_engine(Config.get_db_url())
self.validator = DataValidator()
logger.info(“📊 ETLパイプライン初期化完了”)
logger.info(f” – DB: {Config.DB_HOST}:{Config.DB_PORT}/{Config.DB_NAME}”)
logger.info(f” – バッチサイズ: {Config.BATCH_SIZE}”)
logger.info(f” – 最大リトライ: {Config.MAX_RETRIES}”)
def extract(self, filename: str) -> pd.DataFrame:
“””
データ抽出(Extract)
CSVファイルからデータを読み込む
“””
logger.info(f”📥 データ抽出開始: {filename}”)
input_path = Path(Config.INPUT_DIR) / filename
if not input_path.exists():
raise FileNotFoundError(f”ファイルが見つかりません: {input_path}”)
try:
df = pd.read_csv(input_path)
logger.info(f”✅ {len(df)}件のレコードを読み込み”)
logger.info(f” – カラム: {list(df.columns)}”)
return df
except Exception as e:
logger.error(f”❌ ファイル読み込みエラー: {str(e)}”)
raise
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
“””
データ変換(Transform)
クレンジング、検証、型変換を実行
“””
logger.info(“🔄 データ変換開始”)
# 1. データ品質チェック
is_valid, errors = self.validator.validate_sales_data(df)
if not is_valid:
raise ValueError(f”データ品質エラー: {errors}”)
# 2. 重複削除
df = self.validator.remove_duplicates(df)
# 3. 日付型変換
df[‘sale_date’] = pd.to_datetime(df[‘sale_date’])
logger.info(” – 日付型に変換完了”)
# 4. 合計金額計算(存在しない場合)
if ‘total_amount’ not in df.columns:
df[‘total_amount’] = df[‘quantity’] * df[‘unit_price’]
logger.info(” – total_amountカラムを計算”)
# 5. 文字列の標準化
if ‘category’ in df.columns:
df[‘category’] = df[‘category’].str.strip().str.title()
logger.info(” – categoryを標準化”)
if ‘region’ in df.columns:
df[‘region’] = df[‘region’].str.strip()
logger.info(” – regionを標準化”)
logger.info(f”✅ データ変換完了: {len(df)}件”)
return df
@retry(tries=3, delay=2, backoff=2, logger=logger)
def load(self, df: pd.DataFrame, table_name: str = ‘sales’):
“””
データロード(Load)
PostgreSQLにバッチ挿入(リトライ機能付き)
“””
logger.info(f”📤 データロード開始: {table_name}テーブル”)
try:
batch_size = Config.BATCH_SIZE
total_rows = len(df)
for i in range(0, total_rows, batch_size):
batch = df.iloc[i:i+batch_size]
batch.to_sql(
table_name,
self.engine,
if_exists=’append’,
index=False,
method=’multi’
)
progress = min(i + len(batch), total_rows)
logger.info(f” 📦 {progress}/{total_rows}件を挿入”)
logger.info(f”✅ データロード完了: {total_rows}件”)
except Exception as e:
logger.error(f”❌ データロードエラー: {str(e)}”)
raise
def generate_summary(self):
“””集計レポート生成”””
logger.info(“📊 集計レポート生成開始”)
try:
query = “””
SELECT
sale_date::date as date,
COUNT(*) as order_count,
SUM(total_amount) as total_sales,
ROUND(AVG(total_amount)::numeric, 2) as avg_order_value,
SUM(quantity) as total_quantity
FROM sales
GROUP BY sale_date::date
ORDER BY sale_date DESC
LIMIT 30
“””
with self.engine.connect() as conn:
df_summary = pd.read_sql(text(query), conn)
# レポートをCSVに保存
timestamp = datetime.now().strftime(‘%Y%m%d_%H%M%S’)
output_path = Path(Config.OUTPUT_DIR) / f”summary_{timestamp}.csv”
output_path.parent.mkdir(parents=True, exist_ok=True)
df_summary.to_csv(output_path, index=False)
logger.info(f”✅ レポート保存完了: {output_path}”)
# サマリーをログに表示
logger.info(“\n” + “=” * 50)
logger.info(“📈 最近の売上サマリー”)
logger.info(“=” * 50)
for _, row in df_summary.head(5).iterrows():
logger.info(
f” {row[‘date’]}: ”
f”¥{row[‘total_sales’]:,.0f} ”
f”({row[‘order_count’]}件)”
)
logger.info(“=” * 50)
return df_summary
except Exception as e:
logger.error(f”❌ レポート生成エラー: {str(e)}”)
raise
def run(self, filename: str):
“””
ETLパイプライン実行
Extract → Transform → Load → Summary
“””
start_time = datetime.now()
logger.info(“🚀 ” + “=” * 48)
logger.info(“🚀 ETLパイプライン開始”)
logger.info(“🚀 ” + “=” * 48)
logger.info(f”⏰ 開始時刻: {start_time.strftime(‘%Y-%m-%d %H:%M:%S’)}”)
logger.info(f”📄 入力ファイル: {filename}”)
try:
# Extract
df = self.extract(filename)
# Transform
df_transformed = self.transform(df)
# Load
self.load(df_transformed)
# Summary
self.generate_summary()
# 完了
end_time = datetime.now()
duration = (end_time – start_time).total_seconds()
logger.info(“=” * 50)
logger.info(“🎉 ETLパイプライン正常完了”)
logger.info(f”⏱️ 実行時間: {duration:.2f}秒”)
logger.info(“=” * 50)
return True
except Exception as e:
end_time = datetime.now()
duration = (end_time – start_time).total_seconds()
logger.error(“=” * 50)
logger.error(“💥 ETLパイプライン失敗”)
logger.error(f”エラー: {str(e)}”)
logger.error(f”⏱️ 実行時間: {duration:.2f}秒”)
logger.error(“=” * 50)
raise
def main():
“””エントリーポイント”””
import sys
if len(sys.argv) < 2:
print("使用方法: python -m src.etl “)
print(“例: python -m src.etl sales_data.csv”)
sys.exit(1)
filename = sys.argv[1]
etl = SalesETL()
etl.run(filename)
if __name__ == ‘__main__’:
main()
EOF
echo “✅ Pythonコード作成完了”
📊 4. データベースとサンプルデータ
4-1. データベース初期化スクリプト(init.sql)
# init.sqlを作成
cat > init.sql << ‘EOF’
— ============================================
— 売上データテーブル
— ============================================
CREATE TABLE IF NOT EXISTS sales (
id SERIAL PRIMARY KEY,
sale_date DATE NOT NULL,
product_name VARCHAR(100) NOT NULL,
category VARCHAR(50),
quantity INTEGER NOT NULL CHECK (quantity > 0),
unit_price DECIMAL(10, 2) NOT NULL CHECK (unit_price >= 0),
total_amount DECIMAL(10, 2) NOT NULL,
customer_id VARCHAR(50),
region VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
— インデックス作成
CREATE INDEX IF NOT EXISTS idx_sales_date ON sales(sale_date);
CREATE INDEX IF NOT EXISTS idx_sales_category ON sales(category);
CREATE INDEX IF NOT EXISTS idx_sales_region ON sales(region);
— 確認メッセージ
SELECT ‘Database initialized successfully’ as status;
EOF
echo “✅ init.sql 作成完了”
4-2. サンプルCSVデータ
# サンプルデータを作成
cat > data/input/sales_data.csv << ‘EOF’
sale_date,product_name,category,quantity,unit_price,customer_id,region
2024-01-15,ノートパソコン,電子機器,2,89000,C001,東京
2024-01-15,マウス,周辺機器,5,1500,C002,大阪
2024-01-15,USBケーブル,アクセサリ,10,500,C003,東京
2024-01-16,キーボード,周辺機器,3,3500,C003,東京
2024-01-16,モニター,電子機器,1,35000,C001,東京
2024-01-16,Webカメラ,周辺機器,4,5000,C004,福岡
2024-01-17,ヘッドセット,周辺機器,2,8000,C005,東京
2024-01-17,外付けHDD,ストレージ,3,12000,C002,大阪
2024-01-17,USBメモリ,ストレージ,10,1000,C006,名古屋
2024-01-18,タブレット,電子機器,1,45000,C007,福岡
2024-01-18,スマートフォン,電子機器,2,75000,C008,東京
2024-01-18,ケース,アクセサリ,5,2000,C008,東京
2024-01-19,イヤホン,アクセサリ,8,3000,C009,大阪
2024-01-19,充電器,アクセサリ,6,1500,C010,名古屋
2024-01-19,マウスパッド,アクセサリ,4,800,C011,札幌
EOF
echo “✅ sales_data.csv 作成完了”
echo “📊 データ件数: $(wc -l < data/input/sales_data.csv) 行(ヘッダー含む)"
🐳 5. Dockerfileの作成
# Dockerfileを作成
cat > Dockerfile << ‘EOF’
# ============================================
# マルチステージビルド: ビルダーステージ
# ============================================
FROM python:3.11-slim as builder
WORKDIR /build
# 依存関係のインストール
COPY requirements.txt .
RUN pip install –no-cache-dir –user -r requirements.txt
# ============================================
# 最終イメージ
# ============================================
FROM python:3.11-slim
# メタデータ
LABEL maintainer=”dataeng” \
version=”1.0″ \
description=”ETL Pipeline Container”
# 非rootユーザー作成
RUN useradd -m -u 1000 -s /bin/bash etluser
WORKDIR /app
# Pythonパッケージをビルダーからコピー
COPY –from=builder /root/.local /home/etluser/.local
# アプリケーションコードをコピー
COPY –chown=etluser:etluser src/ ./src/
# 必要なディレクトリを作成
RUN mkdir -p /data/input /data/output /logs && \
chown -R etluser:etluser /data /logs /app
# 環境変数設定
ENV PATH=/home/etluser/.local/bin:$PATH \
PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONPATH=/app
# 非rootユーザーに切り替え
USER etluser
# ヘルスチェック
HEALTHCHECK –interval=30s –timeout=10s –start-period=5s –retries=3 \
CMD python -c “from src.config import Config; Config.validate()” || exit 1
# デフォルトコマンド
CMD [“python”, “-m”, “src.etl”, “sales_data.csv”]
EOF
echo “✅ Dockerfile 作成完了”
💡 Dockerfileのポイント
- マルチステージビルド:ビルド用と実行用を分離してイメージサイズ削減
- 非rootユーザー:
etluserでセキュリティ向上 - PYTHONUNBUFFERED=1:ログをリアルタイムで出力
- ヘルスチェック:設定の検証でコンテナの健全性を確認
📦 6. docker-compose.yml作成
# docker-compose.ymlを作成
cat > docker-compose.yml << ‘EOF’
version: ‘3.8’
services:
# ============================================
# PostgreSQL データベース
# ============================================
postgres:
image: postgres:15
container_name: etl-postgres
environment:
POSTGRES_DB: ${DB_NAME}
POSTGRES_USER: ${DB_USER}
POSTGRES_PASSWORD: ${DB_PASSWORD}
ports:
– “5432:5432”
volumes:
– postgres_data:/var/lib/postgresql/data
– ./init.sql:/docker-entrypoint-initdb.d/init.sql
networks:
– etl-network
healthcheck:
test: [“CMD-SHELL”, “pg_isready -U ${DB_USER} -d ${DB_NAME}”]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
# ============================================
# ETL パイプライン
# ============================================
etl:
build:
context: .
dockerfile: Dockerfile
container_name: etl-pipeline
environment:
DB_HOST: postgres
DB_PORT: 5432
DB_NAME: ${DB_NAME}
DB_USER: ${DB_USER}
DB_PASSWORD: ${DB_PASSWORD}
INPUT_DIR: /data/input
OUTPUT_DIR: /data/output
LOG_DIR: /logs
BATCH_SIZE: ${BATCH_SIZE:-1000}
MAX_RETRIES: ${MAX_RETRIES:-3}
volumes:
– ./data/input:/data/input:ro # 読み取り専用
– ./data/output:/data/output
– ./logs:/logs
networks:
– etl-network
depends_on:
postgres:
condition: service_healthy
restart: “no” # 一度実行して終了
deploy:
resources:
limits:
cpus: ‘1.0’
memory: 512M
reservations:
cpus: ‘0.5’
memory: 256M
networks:
etl-network:
driver: bridge
volumes:
postgres_data:
EOF
echo “✅ docker-compose.yml 作成完了”
🚀 7. 実行手順
7-1. イメージビルド
# イメージをビルド
docker-compose build
# ビルド確認
docker images | grep etl
7-2. コンテナ起動とETL実行
# PostgreSQLを起動
docker-compose up -d postgres
# PostgreSQLの起動を待機
sleep 10
# ETLを実行(ログをリアルタイム表示)
docker-compose up etl
# または、バックグラウンドで実行してログを確認
# docker-compose up -d etl
# docker-compose logs -f etl
📊 ETLパイプライン初期化完了
– DB: postgres:5432/datawarehouse
– バッチサイズ: 1000
– 最大リトライ: 3
📥 データ抽出開始: sales_data.csv
✅ 15件のレコードを読み込み
– カラム: [‘sale_date’, ‘product_name’, ‘category’, …]
🔄 データ変換開始
✅ データ品質チェック完了: 15件のレコードが正常
✅ 重複レコードなし
– 日付型に変換完了
– total_amountカラムを計算
✅ データ変換完了: 15件
📤 データロード開始: salesテーブル
📦 15/15件を挿入
✅ データロード完了: 15件
📊 集計レポート生成開始
✅ レポート保存完了: /data/output/summary_20240120_123456.csv
==================================================
🎉 ETLパイプライン正常完了
⏱️ 実行時間: 2.34秒
==================================================
7-3. 結果確認
# 出力ファイル確認
ls -la data/output/
cat data/output/summary_*.csv
# ログファイル確認
ls -la logs/
cat logs/etl_$(date +%Y%m%d).log
# PostgreSQLでデータ確認
docker exec -it etl-postgres psql -U dataeng -d datawarehouse -c “SELECT COUNT(*) FROM sales;”
docker exec -it etl-postgres psql -U dataeng -d datawarehouse -c “SELECT * FROM sales LIMIT 5;”
7-4. 再実行方法
# 新しいCSVファイルで再実行
docker-compose run –rm etl python -m src.etl new_sales.csv
# 同じファイルで再実行
docker-compose run –rm etl
🔧 8. トラブルシューティング
問題1: データベース接続エラー
# PostgreSQLの状態確認
docker-compose ps postgres
# ヘルスチェック確認
docker inspect etl-postgres –format='{{.State.Health.Status}}’
# 接続テスト
docker exec -it etl-postgres pg_isready -U dataeng
# ログ確認
docker-compose logs postgres
問題2: ETLが失敗する
# ETLログ確認
docker-compose logs etl
cat logs/etl_$(date +%Y%m%d).log
# コンテナ内でデバッグ
docker-compose run –rm etl /bin/bash
# 設定確認
python -c “from src.config import Config; Config.print_config()”
# 手動でETL実行
python -m src.etl sales_data.csv
問題3: 権限エラー(Permission denied)
# ディレクトリの権限を確認
ls -la data/
ls -la logs/
# 権限を修正
chmod -R 777 data/output logs/
# 再実行
docker-compose run –rm etl
8-1. クリーンアップ
# コンテナを停止
docker-compose down
# ボリュームも含めて削除
docker-compose down -v
# イメージも削除
docker-compose down -v –rmi all
# 出力ファイルをクリア
rm -f data/output/*.csv
rm -f logs/*.log
💪 9. 練習問題
練習問題 1
基礎
ETLの3つのステップ(Extract, Transform, Load)それぞれの役割を説明してください。
- Extract(抽出):データソース(CSV、API、DB等)からデータを取得する
- Transform(変換):データのクレンジング、型変換、計算、標準化を行う
- Load(読み込み):変換したデータを目的のデータベースやファイルに保存する
練習問題 2
基礎
@retryデコレータを使うメリットは何ですか?
一時的な障害に対して自動的に再試行できます。
- ネットワークの一時的な切断
- データベースの一時的な過負荷
- タイムアウト
これにより、手動での再実行が不要になり、システムの堅牢性が向上します。
練習問題 3
応用
DataValidatorクラスに「金額の上限チェック(100万円以上はエラー)」を追加してください。
# validate_sales_data メソッドに追加
MAX_AMOUNT = 1000000 # 100万円
if ‘total_amount’ in df.columns:
over_limit = df[df[‘total_amount’] > MAX_AMOUNT]
if len(over_limit) > 0:
errors.append(
f”金額上限超過({MAX_AMOUNT:,}円以上): {len(over_limit)}件”
)
練習問題 4
応用
docker-compose.ymlにRedisコンテナを追加し、ETLの実行結果をキャッシュする設計を考えてください。
# docker-compose.yml に追加
redis:
image: redis:7-alpine
container_name: etl-redis
ports:
– “6379:6379”
networks:
– etl-network
healthcheck:
test: [“CMD”, “redis-cli”, “ping”]
interval: 10s
timeout: 5s
retries: 5
# ETLコンテナに環境変数追加
etl:
environment:
REDIS_HOST: redis
REDIS_PORT: 6379
活用例:処理済みファイルのハッシュをRedisに保存し、重複処理を防止
練習問題 5
発展
このETLパイプラインをAirflowのDAGとして実装する場合、タスクをどのように分割しますか?
【DAGタスク構成案】
start
│
├─→ check_source_file # ファイル存在確認
│ │
├─→ validate_data # データ品質チェック
│ │
├─→ transform_data # データ変換
│ │
├─→ load_to_database # DB挿入
│ │
├─→ generate_summary # レポート生成
│ │
└─→ send_notification # 完了通知(Slack等)
│
end
各タスクを分離することで、失敗時の再実行が特定のステップから可能になります。
📝 STEP 26 のまとめ
✅ このプロジェクトで習得したこと
- ETLのDocker化:PythonアプリケーションのコンテナKA
- 設定管理:環境変数とConfigクラスによる柔軟な設定
- ログ管理:標準出力 + ファイル出力の二重化
- 品質保証:DataValidatorによるデータ検証
- エラー処理:try-except + @retryによる堅牢な処理
- セキュリティ:非rootユーザーでの実行
- 最適化:マルチステージビルドでイメージサイズ削減
📊 実装したベストプラクティス
| カテゴリ | 実装内容 | ファイル |
|---|---|---|
| 設定管理 | 環境変数 + .envファイル | config.py |
| ログ管理 | 標準出力 + ファイル出力 | logger.py |
| 品質保証 | DataValidatorクラス | validator.py |
| エラー処理 | @retryデコレータ | etl.py |
| セキュリティ | 非rootユーザー | Dockerfile |
| リソース制限 | CPU/メモリ制限 | docker-compose.yml |
🎯 次のステップの予告
STEP 27では、このETLパイプラインをさらに発展させて「本番環境デプロイシミュレーション」を実践します!
- 開発環境と本番環境の設定分離
- イメージのタグ管理とバージョニング
- Docker Secretsによるシークレット管理
- バックアップ・リストア手順
学習メモ
Docker・コンテナ技術入門 - Step 26
📋 過去のメモ一覧
▼