STEP 26:プロジェクト② ETLパイプラインのコンテナ化

🔧 STEP 26: プロジェクト② ETLパイプラインのコンテナ化

既存のPythonスクリプトをDocker化して本番レディに

📋 このプロジェクトのゴール

  • ローカルで動いているPythonスクリプトをDockerコンテナ化
  • 環境変数で設定を柔軟に管理
  • ヘルスチェックとリトライロジックの実装
  • ログ出力の最適化(標準出力 + ファイル)
  • データ品質チェックの組み込み
  • エラーハンドリングの実装

目標:本番環境にデプロイ可能な堅牢なETLコンテナを構築

学習時間の目安: 3時間

🔧 0. このプロジェクトの前提知識

📚 これまでの学習の復習
  • Dockerfile:マルチステージビルド(STEP 14)
  • 環境変数:.envファイル管理(STEP 21)
  • Docker Compose:ヘルスチェック、depends_on(STEP 15-18)
  • リソース制限:メモリ、CPU制限(STEP 24)
  • Python:pandas、SQLAlchemy基礎

0-1. 作業ディレクトリの準備

mkdir -p ~/docker-practice/step26 cd ~/docker-practice/step26 pwd

0-2. ETLパイプラインのアーキテクチャ

【ETLパイプライン アーキテクチャ】 ┌─────────────────────────────────────────────────────────────────────┐ │ ETL パイプライン │ │ │ │ 【Extract】 【Transform】 【Load】 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ CSV │───────→│ 変換 │───────→│PostgreSQL│ │ │ │ ファイル │ │ 処理 │ │ DB │ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ │ │ │ ↓ ↓ ↓ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ファイル読込│ │品質チェック│ │バッチ挿入 │ │ │ │エンコード │ │重複削除 │ │リトライ │ │ │ │検証 │ │型変換 │ │トランザクション│ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ 【共通機能】 │ │ ├── 📝 ログ出力(標準出力 + ファイル) │ │ ├── ⚙️ 環境変数による設定管理 │ │ ├── 🔄 リトライロジック(3回まで自動再試行) │ │ └── 📊 集計レポート生成 │ │ │ └─────────────────────────────────────────────────────────────────────┘

0-3. プロジェクトシナリオ

📊 シナリオ設定

あなたは以下のようなPythonスクリプトをローカルで運用しています:

  • CSVファイルから売上データを読み込み
  • データクレンジングと変換処理
  • PostgreSQLデータベースへ挿入
  • 集計レポートの生成

これをDockerコンテナ化して、どこでも同じように動作する堅牢なシステムを構築します。

0-4. 実装するベストプラクティス一覧

カテゴリ実装内容目的
設定管理環境変数 + .envファイル環境ごとの設定切り替え
エラー処理try-except + リトライ一時的な障害への対応
ログ管理標準出力 + ファイル出力監視とデバッグ
品質保証DataValidatorクラスデータ品質の担保
セキュリティ非rootユーザー実行コンテナセキュリティ
効率化マルチステージビルドイメージサイズ削減
監視ヘルスチェックコンテナ健全性確認
リソースCPU/メモリ制限リソース消費の制御

📂 1. プロジェクト構造の作成

1-1. ディレクトリ作成

# プロジェクトディレクトリ作成 mkdir -p etl-pipeline-docker cd etl-pipeline-docker # サブディレクトリ作成 mkdir -p src mkdir -p data/input mkdir -p data/output mkdir -p logs mkdir -p tests # 構造確認 tree -L 2 . || find . -type d
📁 最終的なディレクトリ構造
etl-pipeline-docker/ ├── Dockerfile # コンテナイメージ定義 ├── docker-compose.yml # マルチコンテナ構成 ├── .env # 環境変数(Git管理外) ├── .dockerignore # ビルド除外ファイル ├── .gitignore # Git除外ファイル ├── requirements.txt # Python依存関係 ├── init.sql # DB初期化スクリプト ├── src/ │ ├── __init__.py # パッケージ初期化 │ ├── config.py # 設定管理 │ ├── logger.py # ログ管理 │ ├── validator.py # データ品質チェック │ └── etl.py # メインETLロジック ├── data/ │ ├── input/ # 入力CSVファイル │ │ └── sales_data.csv │ └── output/ # 出力レポート ├── logs/ # ログファイル └── tests/ └── test_etl.py # テストコード

🔐 2. 環境変数と依存関係の設定

2-1. 環境変数ファイル(.env)

# .envファイルを作成 cat > .env << ‘EOF’ # データベース設定 DB_HOST=postgres DB_PORT=5432 DB_NAME=datawarehouse DB_USER=dataeng DB_PASSWORD=secure_password_123 # ETL設定 BATCH_SIZE=1000 MAX_RETRIES=3 # パス設定 INPUT_DIR=/data/input OUTPUT_DIR=/data/output LOG_DIR=/logs EOF cat .env

2-2. Python依存関係(requirements.txt)

# requirements.txtを作成 cat > requirements.txt << ‘EOF’ pandas==2.1.4 psycopg2-binary==2.9.9 python-dotenv==1.0.0 sqlalchemy==2.0.23 retry==0.9.2 EOF cat requirements.txt

2-3. Git/Docker除外ファイル

# .gitignoreを作成 cat > .gitignore << ‘EOF’ .env __pycache__/ *.pyc logs/* data/output/* .pytest_cache/ EOF # .dockerignoreを作成 cat > .dockerignore << ‘EOF’ .git .gitignore .env __pycache__ *.pyc *.pyo .pytest_cache .vscode .idea logs/* data/output/* tests/* README.md *.md EOF

🐍 3. Pythonコードの作成

3-1. パッケージ初期化(src/__init__.py)

# __init__.pyを作成 cat > src/__init__.py << ‘EOF’ “””ETL Pipeline Package””” __version__ = ‘1.0.0’ EOF

3-2. 設定管理(src/config.py)

# config.pyを作成 cat > src/config.py << ‘EOF’ “””設定管理モジュール””” import os from dotenv import load_dotenv # .envファイルを読み込み load_dotenv() class Config: “””環境変数から設定を読み込むクラス””” # データベース設定 DB_HOST = os.getenv(‘DB_HOST’, ‘localhost’) DB_PORT = int(os.getenv(‘DB_PORT’, 5432)) DB_NAME = os.getenv(‘DB_NAME’, ‘datawarehouse’) DB_USER = os.getenv(‘DB_USER’, ‘dataeng’) DB_PASSWORD = os.getenv(‘DB_PASSWORD’, ”) # ファイルパス設定 INPUT_DIR = os.getenv(‘INPUT_DIR’, ‘/data/input’) OUTPUT_DIR = os.getenv(‘OUTPUT_DIR’, ‘/data/output’) LOG_DIR = os.getenv(‘LOG_DIR’, ‘/logs’) # ETL設定 BATCH_SIZE = int(os.getenv(‘BATCH_SIZE’, 1000)) MAX_RETRIES = int(os.getenv(‘MAX_RETRIES’, 3)) @classmethod def get_db_url(cls): “””SQLAlchemy用のデータベース接続URLを生成””” return ( f”postgresql://{cls.DB_USER}:{cls.DB_PASSWORD}” f”@{cls.DB_HOST}:{cls.DB_PORT}/{cls.DB_NAME}” ) @classmethod def validate(cls): “””必須設定の検証””” required = [‘DB_PASSWORD’, ‘DB_HOST’, ‘DB_NAME’] missing = [key for key in required if not os.getenv(key)] if missing: raise ValueError( f”必須の環境変数が設定されていません: {‘, ‘.join(missing)}” ) @classmethod def print_config(cls): “””設定内容を表示(デバッグ用)””” print(“=== 設定情報 ===”) print(f”DB_HOST: {cls.DB_HOST}”) print(f”DB_PORT: {cls.DB_PORT}”) print(f”DB_NAME: {cls.DB_NAME}”) print(f”DB_USER: {cls.DB_USER}”) print(f”BATCH_SIZE: {cls.BATCH_SIZE}”) print(f”MAX_RETRIES: {cls.MAX_RETRIES}”) print(“================”) if __name__ == ‘__main__’: Config.print_config() EOF

3-3. ログ管理(src/logger.py)

# logger.pyを作成 cat > src/logger.py << ‘EOF’ “””ログ管理モジュール””” import logging import sys from pathlib import Path from datetime import datetime # 設定をインポート(循環インポート防止のため遅延) def get_log_dir(): from src.config import Config return Config.LOG_DIR def setup_logger(name=’etl’): “”” ロガーのセットアップ – 標準出力: INFO以上 – ファイル出力: DEBUG以上 “”” logger = logging.getLogger(name) logger.setLevel(logging.DEBUG) # すでにハンドラーが設定されていれば追加しない if logger.handlers: return logger # ログフォーマット formatter = logging.Formatter( ‘%(asctime)s – %(name)s – %(levelname)s – %(message)s’, datefmt=’%Y-%m-%d %H:%M:%S’ ) # コンソール出力(標準出力) console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.INFO) console_handler.setFormatter(formatter) logger.addHandler(console_handler) # ファイル出力 try: log_dir = Path(get_log_dir()) log_dir.mkdir(parents=True, exist_ok=True) log_file = log_dir / f”etl_{datetime.now().strftime(‘%Y%m%d’)}.log” file_handler = logging.FileHandler(log_file, encoding=’utf-8′) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) logger.addHandler(file_handler) except Exception as e: logger.warning(f”ファイルログの設定に失敗: {e}”) return logger if __name__ == ‘__main__’: logger = setup_logger(‘test’) logger.info(“INFOレベルのログ”) logger.debug(“DEBUGレベルのログ”) logger.warning(“WARNINGレベルのログ”) EOF

3-4. データ品質チェック(src/validator.py)

# validator.pyを作成 cat > src/validator.py << ‘EOF’ “””データ品質チェックモジュール””” import pandas as pd from src.logger import setup_logger logger = setup_logger(‘validator’) class DataValidator: “””データ品質チェッククラス””” @staticmethod def validate_sales_data(df: pd.DataFrame) -> tuple: “”” 売上データの検証 Args: df: 検証対象のDataFrame Returns: tuple: (is_valid: bool, errors: list) “”” errors = [] # 1. 必須カラムチェック required_columns = [‘sale_date’, ‘product_name’, ‘quantity’, ‘unit_price’] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: errors.append(f”必須カラムが不足: {missing_columns}”) return False, errors # 必須カラムがなければ以降のチェック不可 # 2. NULL値チェック null_counts = df[required_columns].isnull().sum() null_cols = null_counts[null_counts > 0] if len(null_cols) > 0: errors.append(f”NULL値が存在: {null_cols.to_dict()}”) # 3. 数値の妥当性チェック if ‘quantity’ in df.columns: invalid_qty = df[df[‘quantity’] <= 0] if len(invalid_qty) > 0: errors.append(f”無効な数量(0以下): {len(invalid_qty)}件”) if ‘unit_price’ in df.columns: invalid_price = df[df[‘unit_price’] < 0] if len(invalid_price) > 0: errors.append(f”負の単価: {len(invalid_price)}件”) # 4. 日付フォーマットチェック if ‘sale_date’ in df.columns: try: pd.to_datetime(df[‘sale_date’]) except Exception as e: errors.append(f”日付フォーマットエラー: {str(e)}”) # 結果判定 is_valid = len(errors) == 0 if is_valid: logger.info(f”✅ データ品質チェック完了: {len(df)}件のレコードが正常”) else: logger.error(“❌ データ品質エラー検出:”) for error in errors: logger.error(f” – {error}”) return is_valid, errors @staticmethod def remove_duplicates(df: pd.DataFrame) -> pd.DataFrame: “”” 重複データの削除 Args: df: 対象のDataFrame Returns: DataFrame: 重複削除後のDataFrame “”” initial_count = len(df) df_clean = df.drop_duplicates() removed_count = initial_count – len(df_clean) if removed_count > 0: logger.warning(f”⚠️ {removed_count}件の重複レコードを削除”) else: logger.info(“✅ 重複レコードなし”) return df_clean if __name__ == ‘__main__’: # テスト用 test_data = pd.DataFrame({ ‘sale_date’: [‘2024-01-01’, ‘2024-01-02’], ‘product_name’: [‘Test’, ‘Test2’], ‘quantity’: [1, 2], ‘unit_price’: [100, 200] }) validator = DataValidator() is_valid, errors = validator.validate_sales_data(test_data) print(f”Valid: {is_valid}, Errors: {errors}”) EOF

3-5. メインETLロジック(src/etl.py)

# etl.pyを作成 cat > src/etl.py << ‘EOF’ “””メインETLロジック””” import pandas as pd from sqlalchemy import create_engine, text from pathlib import Path from datetime import datetime from retry import retry from src.config import Config from src.logger import setup_logger from src.validator import DataValidator logger = setup_logger(‘etl’) class SalesETL: “””売上データETLクラス””” def __init__(self): “””初期化””” Config.validate() self.engine = create_engine(Config.get_db_url()) self.validator = DataValidator() logger.info(“📊 ETLパイプライン初期化完了”) logger.info(f” – DB: {Config.DB_HOST}:{Config.DB_PORT}/{Config.DB_NAME}”) logger.info(f” – バッチサイズ: {Config.BATCH_SIZE}”) logger.info(f” – 最大リトライ: {Config.MAX_RETRIES}”) def extract(self, filename: str) -> pd.DataFrame: “”” データ抽出(Extract) CSVファイルからデータを読み込む “”” logger.info(f”📥 データ抽出開始: {filename}”) input_path = Path(Config.INPUT_DIR) / filename if not input_path.exists(): raise FileNotFoundError(f”ファイルが見つかりません: {input_path}”) try: df = pd.read_csv(input_path) logger.info(f”✅ {len(df)}件のレコードを読み込み”) logger.info(f” – カラム: {list(df.columns)}”) return df except Exception as e: logger.error(f”❌ ファイル読み込みエラー: {str(e)}”) raise def transform(self, df: pd.DataFrame) -> pd.DataFrame: “”” データ変換(Transform) クレンジング、検証、型変換を実行 “”” logger.info(“🔄 データ変換開始”) # 1. データ品質チェック is_valid, errors = self.validator.validate_sales_data(df) if not is_valid: raise ValueError(f”データ品質エラー: {errors}”) # 2. 重複削除 df = self.validator.remove_duplicates(df) # 3. 日付型変換 df[‘sale_date’] = pd.to_datetime(df[‘sale_date’]) logger.info(” – 日付型に変換完了”) # 4. 合計金額計算(存在しない場合) if ‘total_amount’ not in df.columns: df[‘total_amount’] = df[‘quantity’] * df[‘unit_price’] logger.info(” – total_amountカラムを計算”) # 5. 文字列の標準化 if ‘category’ in df.columns: df[‘category’] = df[‘category’].str.strip().str.title() logger.info(” – categoryを標準化”) if ‘region’ in df.columns: df[‘region’] = df[‘region’].str.strip() logger.info(” – regionを標準化”) logger.info(f”✅ データ変換完了: {len(df)}件”) return df @retry(tries=3, delay=2, backoff=2, logger=logger) def load(self, df: pd.DataFrame, table_name: str = ‘sales’): “”” データロード(Load) PostgreSQLにバッチ挿入(リトライ機能付き) “”” logger.info(f”📤 データロード開始: {table_name}テーブル”) try: batch_size = Config.BATCH_SIZE total_rows = len(df) for i in range(0, total_rows, batch_size): batch = df.iloc[i:i+batch_size] batch.to_sql( table_name, self.engine, if_exists=’append’, index=False, method=’multi’ ) progress = min(i + len(batch), total_rows) logger.info(f” 📦 {progress}/{total_rows}件を挿入”) logger.info(f”✅ データロード完了: {total_rows}件”) except Exception as e: logger.error(f”❌ データロードエラー: {str(e)}”) raise def generate_summary(self): “””集計レポート生成””” logger.info(“📊 集計レポート生成開始”) try: query = “”” SELECT sale_date::date as date, COUNT(*) as order_count, SUM(total_amount) as total_sales, ROUND(AVG(total_amount)::numeric, 2) as avg_order_value, SUM(quantity) as total_quantity FROM sales GROUP BY sale_date::date ORDER BY sale_date DESC LIMIT 30 “”” with self.engine.connect() as conn: df_summary = pd.read_sql(text(query), conn) # レポートをCSVに保存 timestamp = datetime.now().strftime(‘%Y%m%d_%H%M%S’) output_path = Path(Config.OUTPUT_DIR) / f”summary_{timestamp}.csv” output_path.parent.mkdir(parents=True, exist_ok=True) df_summary.to_csv(output_path, index=False) logger.info(f”✅ レポート保存完了: {output_path}”) # サマリーをログに表示 logger.info(“\n” + “=” * 50) logger.info(“📈 最近の売上サマリー”) logger.info(“=” * 50) for _, row in df_summary.head(5).iterrows(): logger.info( f” {row[‘date’]}: ” f”¥{row[‘total_sales’]:,.0f} ” f”({row[‘order_count’]}件)” ) logger.info(“=” * 50) return df_summary except Exception as e: logger.error(f”❌ レポート生成エラー: {str(e)}”) raise def run(self, filename: str): “”” ETLパイプライン実行 Extract → Transform → Load → Summary “”” start_time = datetime.now() logger.info(“🚀 ” + “=” * 48) logger.info(“🚀 ETLパイプライン開始”) logger.info(“🚀 ” + “=” * 48) logger.info(f”⏰ 開始時刻: {start_time.strftime(‘%Y-%m-%d %H:%M:%S’)}”) logger.info(f”📄 入力ファイル: {filename}”) try: # Extract df = self.extract(filename) # Transform df_transformed = self.transform(df) # Load self.load(df_transformed) # Summary self.generate_summary() # 完了 end_time = datetime.now() duration = (end_time – start_time).total_seconds() logger.info(“=” * 50) logger.info(“🎉 ETLパイプライン正常完了”) logger.info(f”⏱️ 実行時間: {duration:.2f}秒”) logger.info(“=” * 50) return True except Exception as e: end_time = datetime.now() duration = (end_time – start_time).total_seconds() logger.error(“=” * 50) logger.error(“💥 ETLパイプライン失敗”) logger.error(f”エラー: {str(e)}”) logger.error(f”⏱️ 実行時間: {duration:.2f}秒”) logger.error(“=” * 50) raise def main(): “””エントリーポイント””” import sys if len(sys.argv) < 2: print("使用方法: python -m src.etl “) print(“例: python -m src.etl sales_data.csv”) sys.exit(1) filename = sys.argv[1] etl = SalesETL() etl.run(filename) if __name__ == ‘__main__’: main() EOF echo “✅ Pythonコード作成完了”

📊 4. データベースとサンプルデータ

4-1. データベース初期化スクリプト(init.sql)

# init.sqlを作成 cat > init.sql << ‘EOF’ — ============================================ — 売上データテーブル — ============================================ CREATE TABLE IF NOT EXISTS sales ( id SERIAL PRIMARY KEY, sale_date DATE NOT NULL, product_name VARCHAR(100) NOT NULL, category VARCHAR(50), quantity INTEGER NOT NULL CHECK (quantity > 0), unit_price DECIMAL(10, 2) NOT NULL CHECK (unit_price >= 0), total_amount DECIMAL(10, 2) NOT NULL, customer_id VARCHAR(50), region VARCHAR(50), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); — インデックス作成 CREATE INDEX IF NOT EXISTS idx_sales_date ON sales(sale_date); CREATE INDEX IF NOT EXISTS idx_sales_category ON sales(category); CREATE INDEX IF NOT EXISTS idx_sales_region ON sales(region); — 確認メッセージ SELECT ‘Database initialized successfully’ as status; EOF echo “✅ init.sql 作成完了”

4-2. サンプルCSVデータ

# サンプルデータを作成 cat > data/input/sales_data.csv << ‘EOF’ sale_date,product_name,category,quantity,unit_price,customer_id,region 2024-01-15,ノートパソコン,電子機器,2,89000,C001,東京 2024-01-15,マウス,周辺機器,5,1500,C002,大阪 2024-01-15,USBケーブル,アクセサリ,10,500,C003,東京 2024-01-16,キーボード,周辺機器,3,3500,C003,東京 2024-01-16,モニター,電子機器,1,35000,C001,東京 2024-01-16,Webカメラ,周辺機器,4,5000,C004,福岡 2024-01-17,ヘッドセット,周辺機器,2,8000,C005,東京 2024-01-17,外付けHDD,ストレージ,3,12000,C002,大阪 2024-01-17,USBメモリ,ストレージ,10,1000,C006,名古屋 2024-01-18,タブレット,電子機器,1,45000,C007,福岡 2024-01-18,スマートフォン,電子機器,2,75000,C008,東京 2024-01-18,ケース,アクセサリ,5,2000,C008,東京 2024-01-19,イヤホン,アクセサリ,8,3000,C009,大阪 2024-01-19,充電器,アクセサリ,6,1500,C010,名古屋 2024-01-19,マウスパッド,アクセサリ,4,800,C011,札幌 EOF echo “✅ sales_data.csv 作成完了” echo “📊 データ件数: $(wc -l < data/input/sales_data.csv) 行(ヘッダー含む)"

🐳 5. Dockerfileの作成

# Dockerfileを作成 cat > Dockerfile << ‘EOF’ # ============================================ # マルチステージビルド: ビルダーステージ # ============================================ FROM python:3.11-slim as builder WORKDIR /build # 依存関係のインストール COPY requirements.txt . RUN pip install –no-cache-dir –user -r requirements.txt # ============================================ # 最終イメージ # ============================================ FROM python:3.11-slim # メタデータ LABEL maintainer=”dataeng” \ version=”1.0″ \ description=”ETL Pipeline Container” # 非rootユーザー作成 RUN useradd -m -u 1000 -s /bin/bash etluser WORKDIR /app # Pythonパッケージをビルダーからコピー COPY –from=builder /root/.local /home/etluser/.local # アプリケーションコードをコピー COPY –chown=etluser:etluser src/ ./src/ # 必要なディレクトリを作成 RUN mkdir -p /data/input /data/output /logs && \ chown -R etluser:etluser /data /logs /app # 環境変数設定 ENV PATH=/home/etluser/.local/bin:$PATH \ PYTHONUNBUFFERED=1 \ PYTHONDONTWRITEBYTECODE=1 \ PYTHONPATH=/app # 非rootユーザーに切り替え USER etluser # ヘルスチェック HEALTHCHECK –interval=30s –timeout=10s –start-period=5s –retries=3 \ CMD python -c “from src.config import Config; Config.validate()” || exit 1 # デフォルトコマンド CMD [“python”, “-m”, “src.etl”, “sales_data.csv”] EOF echo “✅ Dockerfile 作成完了”
💡 Dockerfileのポイント
  • マルチステージビルド:ビルド用と実行用を分離してイメージサイズ削減
  • 非rootユーザー:etluserでセキュリティ向上
  • PYTHONUNBUFFERED=1:ログをリアルタイムで出力
  • ヘルスチェック:設定の検証でコンテナの健全性を確認

📦 6. docker-compose.yml作成

# docker-compose.ymlを作成 cat > docker-compose.yml << ‘EOF’ version: ‘3.8’ services: # ============================================ # PostgreSQL データベース # ============================================ postgres: image: postgres:15 container_name: etl-postgres environment: POSTGRES_DB: ${DB_NAME} POSTGRES_USER: ${DB_USER} POSTGRES_PASSWORD: ${DB_PASSWORD} ports: – “5432:5432” volumes: – postgres_data:/var/lib/postgresql/data – ./init.sql:/docker-entrypoint-initdb.d/init.sql networks: – etl-network healthcheck: test: [“CMD-SHELL”, “pg_isready -U ${DB_USER} -d ${DB_NAME}”] interval: 10s timeout: 5s retries: 5 restart: unless-stopped # ============================================ # ETL パイプライン # ============================================ etl: build: context: . dockerfile: Dockerfile container_name: etl-pipeline environment: DB_HOST: postgres DB_PORT: 5432 DB_NAME: ${DB_NAME} DB_USER: ${DB_USER} DB_PASSWORD: ${DB_PASSWORD} INPUT_DIR: /data/input OUTPUT_DIR: /data/output LOG_DIR: /logs BATCH_SIZE: ${BATCH_SIZE:-1000} MAX_RETRIES: ${MAX_RETRIES:-3} volumes: – ./data/input:/data/input:ro # 読み取り専用 – ./data/output:/data/output – ./logs:/logs networks: – etl-network depends_on: postgres: condition: service_healthy restart: “no” # 一度実行して終了 deploy: resources: limits: cpus: ‘1.0’ memory: 512M reservations: cpus: ‘0.5’ memory: 256M networks: etl-network: driver: bridge volumes: postgres_data: EOF echo “✅ docker-compose.yml 作成完了”

🚀 7. 実行手順

7-1. イメージビルド

# イメージをビルド docker-compose build # ビルド確認 docker images | grep etl

7-2. コンテナ起動とETL実行

# PostgreSQLを起動 docker-compose up -d postgres # PostgreSQLの起動を待機 sleep 10 # ETLを実行(ログをリアルタイム表示) docker-compose up etl # または、バックグラウンドで実行してログを確認 # docker-compose up -d etl # docker-compose logs -f etl
📊 ETLパイプライン初期化完了 – DB: postgres:5432/datawarehouse – バッチサイズ: 1000 – 最大リトライ: 3 📥 データ抽出開始: sales_data.csv ✅ 15件のレコードを読み込み – カラム: [‘sale_date’, ‘product_name’, ‘category’, …] 🔄 データ変換開始 ✅ データ品質チェック完了: 15件のレコードが正常 ✅ 重複レコードなし – 日付型に変換完了 – total_amountカラムを計算 ✅ データ変換完了: 15件 📤 データロード開始: salesテーブル 📦 15/15件を挿入 ✅ データロード完了: 15件 📊 集計レポート生成開始 ✅ レポート保存完了: /data/output/summary_20240120_123456.csv ================================================== 🎉 ETLパイプライン正常完了 ⏱️ 実行時間: 2.34秒 ==================================================

7-3. 結果確認

# 出力ファイル確認 ls -la data/output/ cat data/output/summary_*.csv # ログファイル確認 ls -la logs/ cat logs/etl_$(date +%Y%m%d).log # PostgreSQLでデータ確認 docker exec -it etl-postgres psql -U dataeng -d datawarehouse -c “SELECT COUNT(*) FROM sales;” docker exec -it etl-postgres psql -U dataeng -d datawarehouse -c “SELECT * FROM sales LIMIT 5;”

7-4. 再実行方法

# 新しいCSVファイルで再実行 docker-compose run –rm etl python -m src.etl new_sales.csv # 同じファイルで再実行 docker-compose run –rm etl

🔧 8. トラブルシューティング

問題1: データベース接続エラー
# PostgreSQLの状態確認 docker-compose ps postgres # ヘルスチェック確認 docker inspect etl-postgres –format='{{.State.Health.Status}}’ # 接続テスト docker exec -it etl-postgres pg_isready -U dataeng # ログ確認 docker-compose logs postgres
問題2: ETLが失敗する
# ETLログ確認 docker-compose logs etl cat logs/etl_$(date +%Y%m%d).log # コンテナ内でデバッグ docker-compose run –rm etl /bin/bash # 設定確認 python -c “from src.config import Config; Config.print_config()” # 手動でETL実行 python -m src.etl sales_data.csv
問題3: 権限エラー(Permission denied)
# ディレクトリの権限を確認 ls -la data/ ls -la logs/ # 権限を修正 chmod -R 777 data/output logs/ # 再実行 docker-compose run –rm etl

8-1. クリーンアップ

# コンテナを停止 docker-compose down # ボリュームも含めて削除 docker-compose down -v # イメージも削除 docker-compose down -v –rmi all # 出力ファイルをクリア rm -f data/output/*.csv rm -f logs/*.log

💪 9. 練習問題

練習問題 1 基礎

ETLの3つのステップ(Extract, Transform, Load)それぞれの役割を説明してください。

  1. Extract(抽出):データソース(CSV、API、DB等)からデータを取得する
  2. Transform(変換):データのクレンジング、型変換、計算、標準化を行う
  3. Load(読み込み):変換したデータを目的のデータベースやファイルに保存する
練習問題 2 基礎

@retryデコレータを使うメリットは何ですか?

一時的な障害に対して自動的に再試行できます。

  • ネットワークの一時的な切断
  • データベースの一時的な過負荷
  • タイムアウト

これにより、手動での再実行が不要になり、システムの堅牢性が向上します。

練習問題 3 応用

DataValidatorクラスに「金額の上限チェック(100万円以上はエラー)」を追加してください。

# validate_sales_data メソッドに追加 MAX_AMOUNT = 1000000 # 100万円 if ‘total_amount’ in df.columns: over_limit = df[df[‘total_amount’] > MAX_AMOUNT] if len(over_limit) > 0: errors.append( f”金額上限超過({MAX_AMOUNT:,}円以上): {len(over_limit)}件” )
練習問題 4 応用

docker-compose.ymlにRedisコンテナを追加し、ETLの実行結果をキャッシュする設計を考えてください。

# docker-compose.yml に追加 redis: image: redis:7-alpine container_name: etl-redis ports: – “6379:6379” networks: – etl-network healthcheck: test: [“CMD”, “redis-cli”, “ping”] interval: 10s timeout: 5s retries: 5 # ETLコンテナに環境変数追加 etl: environment: REDIS_HOST: redis REDIS_PORT: 6379

活用例:処理済みファイルのハッシュをRedisに保存し、重複処理を防止

練習問題 5 発展

このETLパイプラインをAirflowのDAGとして実装する場合、タスクをどのように分割しますか?

【DAGタスク構成案】 start │ ├─→ check_source_file # ファイル存在確認 │ │ ├─→ validate_data # データ品質チェック │ │ ├─→ transform_data # データ変換 │ │ ├─→ load_to_database # DB挿入 │ │ ├─→ generate_summary # レポート生成 │ │ └─→ send_notification # 完了通知(Slack等) │ end

各タスクを分離することで、失敗時の再実行が特定のステップから可能になります。

📝 STEP 26 のまとめ

✅ このプロジェクトで習得したこと
  • ETLのDocker化:PythonアプリケーションのコンテナKA
  • 設定管理:環境変数とConfigクラスによる柔軟な設定
  • ログ管理:標準出力 + ファイル出力の二重化
  • 品質保証:DataValidatorによるデータ検証
  • エラー処理:try-except + @retryによる堅牢な処理
  • セキュリティ:非rootユーザーでの実行
  • 最適化:マルチステージビルドでイメージサイズ削減
📊 実装したベストプラクティス
カテゴリ実装内容ファイル
設定管理環境変数 + .envファイルconfig.py
ログ管理標準出力 + ファイル出力logger.py
品質保証DataValidatorクラスvalidator.py
エラー処理@retryデコレータetl.py
セキュリティ非rootユーザーDockerfile
リソース制限CPU/メモリ制限docker-compose.yml
🎯 次のステップの予告

STEP 27では、このETLパイプラインをさらに発展させて「本番環境デプロイシミュレーション」を実践します!

  • 開発環境と本番環境の設定分離
  • イメージのタグ管理とバージョニング
  • Docker Secretsによるシークレット管理
  • バックアップ・リストア手順
📝

学習メモ

Docker・コンテナ技術入門 - Step 26

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE