STEP 22:データパイプラインの統合実践

🔄 STEP 22: データパイプラインの統合実践

PostgreSQL + Airflow + Sparkで完全なETL環境を構築

📋 このステップで学ぶこと

  • PostgreSQL + Airflow + Sparkの統合環境構築
  • エンドツーエンドのETLパイプライン実装
  • コンテナ間データフロー設計
  • パフォーマンスモニタリング
  • 実践演習:CSVデータ取り込み→集計→出力

🔧 0. このステップの前提知識

📚 これまでの学習の復習
  • Docker Compose:複数コンテナの管理(STEP 15-16)
  • ネットワーキング:サービス間通信(STEP 17-18)
  • Airflow:DAGの作成と実行(STEP 19)
  • Spark:クラスター構成とPySpark(STEP 20)
  • 環境変数:.envファイルの活用(STEP 21)

0-1. 作業ディレクトリの準備

mkdir -p ~/docker-practice/step22 cd ~/docker-practice/step22 pwd

0-2. 統合環境のアーキテクチャ

【統合ETL環境アーキテクチャ】 ┌─────────────────────────────────────────────────────────────────────┐ │ ユーザー │ │ ┌───────────────┴───────────────┐ │ │ ↓ ↓ │ │ ┌─────────────────┐ ┌─────────────────┐ │ │ │ Airflow Web UI │ │ Spark Master │ │ │ │ :8080 │ │ UI :8081 │ │ │ └────────┬────────┘ └─────────────────┘ │ │ │ │ ├─────────────┼────────────────────────────────────────────────────────┤ │ │ etl-network │ │ ↓ │ │ ┌─────────────────┐ │ │ │ Airflow │ タスク実行 ┌─────────────────┐ │ │ │ ┌───────────┐ │───────────────→│ Spark Cluster │ │ │ │ │ Scheduler │ │ │ Master + Worker │ │ │ │ └───────────┘ │ └─────────────────┘ │ │ └────────┬────────┘ │ │ │ │ メタデータ/データ │ JDBC │ │ ↓ ↓ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ PostgreSQL :5432 │ │ │ │ メタデータDB / 生データ / 集計結果 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ │ ↓ │ │ ┌─────────────────┐ │ │ │ 共有ボリューム │ │ │ │ ./data (CSV/Parquet) │ │ └─────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘

0-3. ETLパイプラインの流れ

【ETLパイプライン データフロー】 ① EXTRACT(抽出) CSVファイル → Airflow extract_task → PostgreSQL sales_raw ② TRANSFORM(変換) PostgreSQL sales_raw → Airflow transform_task → PostgreSQL sales_summary (GROUP BY、SUM、AVG などの集計処理) ③ LOAD(出力) PostgreSQL sales_summary → Airflow export_task → CSV/レポート

0-4. 使用するコンテナ一覧

コンテナイメージポート役割
postgrespostgres:135432データベース
airflow-webserverapache/airflow:2.7.08080Web UI
airflow-schedulerapache/airflow:2.7.0スケジューリング
spark-masterbitnami/spark:3.47077, 8081Sparkマスター
spark-workerbitnami/spark:3.4Sparkワーカー

🏗️ 1. 統合環境の構築

1-1. プロジェクト構造の作成

mkdir -p ~/docker-practice/step22/{dags,logs,plugins,data/output,init-db} cd ~/docker-practice/step22

1-2. 環境変数ファイルの作成

cat > .env << ‘EOF’ AIRFLOW_UID=50000 POSTGRES_USER=airflow POSTGRES_PASSWORD=airflow POSTGRES_DB=airflow SPARK_MASTER_URL=spark://spark-master:7077 EOF

1-3. docker-compose.ymlの作成

cat > docker-compose.yml << ‘EOF’ version: ‘3.8’ x-airflow-common: &airflow-common image: apache/airflow:2.7.0-python3.9 environment: AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__FERNET_KEY: ” AIRFLOW__CORE__LOAD_EXAMPLES: ‘false’ AIRFLOW__WEBSERVER__SECRET_KEY: ‘secret-key-here’ _PIP_ADDITIONAL_REQUIREMENTS: ‘pandas psycopg2-binary’ volumes: – ./dags:/opt/airflow/dags – ./logs:/opt/airflow/logs – ./plugins:/opt/airflow/plugins – ./data:/opt/airflow/data depends_on: postgres: condition: service_healthy networks: – etl-network services: postgres: image: postgres:13 container_name: etl-postgres environment: POSTGRES_USER: ${POSTGRES_USER:-airflow} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-airflow} POSTGRES_DB: ${POSTGRES_DB:-airflow} volumes: – postgres_data:/var/lib/postgresql/data – ./init-db:/docker-entrypoint-initdb.d ports: – “5432:5432” healthcheck: test: [“CMD”, “pg_isready”, “-U”, “airflow”] interval: 5s retries: 5 networks: – etl-network airflow-webserver: <<: *airflow-common container_name: airflow-webserver command: webserver ports: – “8080:8080” airflow-scheduler: <<: *airflow-common container_name: airflow-scheduler command: scheduler airflow-init: <<: *airflow-common container_name: airflow-init entrypoint: /bin/bash command: – -c – | airflow db init airflow users create –username admin –firstname Admin –lastname User –role Admin –email admin@example.com –password admin restart: “no” spark-master: image: bitnami/spark:3.4 container_name: spark-master environment: – SPARK_MODE=master ports: – “7077:7077” – “8081:8080” volumes: – ./data:/data networks: – etl-network spark-worker: image: bitnami/spark:3.4 container_name: spark-worker depends_on: – spark-master environment: – SPARK_MODE=worker – SPARK_MASTER_URL=spark://spark-master:7077 – SPARK_WORKER_MEMORY=2G volumes: – ./data:/data networks: – etl-network networks: etl-network: driver: bridge volumes: postgres_data: EOF

📊 2. サンプルデータとDB初期化

2-1. サンプルCSVデータの作成

cat > data/sales.csv << ‘EOF’ date,product,category,quantity,price 2024-01-01,ノートPC,Electronics,2,89000 2024-01-01,マウス,Electronics,5,2500 2024-01-02,モニター,Electronics,1,45000 2024-01-02,ノートPC,Electronics,1,89000 2024-01-03,デスク,Furniture,2,35000 2024-01-03,チェア,Furniture,4,25000 2024-01-04,マウス,Electronics,8,2500 2024-01-04,キーボード,Electronics,5,8000 2024-01-05,USBメモリ,Electronics,15,1500 2024-01-05,デスク,Furniture,1,35000 EOF

2-2. データベース初期化スクリプト

cat > init-db/01-init.sql << ‘EOF’ CREATE TABLE IF NOT EXISTS sales_raw ( id SERIAL PRIMARY KEY, date DATE NOT NULL, product VARCHAR(100) NOT NULL, category VARCHAR(50) NOT NULL, quantity INT NOT NULL, price DECIMAL(10, 2) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS sales_summary ( id SERIAL PRIMARY KEY, category VARCHAR(50), product VARCHAR(100), total_quantity BIGINT, total_sales DECIMAL(15, 2), avg_price DECIMAL(10, 2), transaction_count BIGINT, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); EOF

📝 3. ETL DAGの実装

3-1. DAGファイルの作成

cat > dags/etl_pipeline.py << ‘EOF’ from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime, timedelta import pandas as pd default_args = { ‘owner’: ‘airflow’, ‘start_date’: datetime(2024, 1, 1), ‘retries’: 1, ‘retry_delay’: timedelta(minutes=5), } def extract_csv_to_postgres(**context): “””CSVをPostgreSQLに取り込む””” df = pd.read_csv(‘/opt/airflow/data/sales.csv’) hook = PostgresHook(postgres_conn_id=’postgres_default’) hook.run(“TRUNCATE TABLE sales_raw;”) for _, row in df.iterrows(): hook.run(f””” INSERT INTO sales_raw (date, product, category, quantity, price) VALUES (‘{row[‘date’]}’, ‘{row[‘product’]}’, ‘{row[‘category’]}’, {row[‘quantity’]}, {row[‘price’]}); “””) print(f”✅ {len(df)}件のデータを挿入しました”) return len(df) def transform_aggregate(**context): “””データを集計””” hook = PostgresHook(postgres_conn_id=’postgres_default’) hook.run(“TRUNCATE TABLE sales_summary;”) hook.run(“”” INSERT INTO sales_summary (category, product, total_quantity, total_sales, avg_price, transaction_count) SELECT category, product, SUM(quantity), SUM(quantity * price), AVG(price), COUNT(*) FROM sales_raw GROUP BY category, product ORDER BY SUM(quantity * price) DESC; “””) result = hook.get_records(“SELECT COUNT(*) FROM sales_summary;”) print(f”✅ {result[0][0]}件の集計結果を保存しました”) return result[0][0] def export_to_csv(**context): “””集計結果をCSVに出力””” hook = PostgresHook(postgres_conn_id=’postgres_default’) df = hook.get_pandas_df(“SELECT * FROM sales_summary ORDER BY total_sales DESC;”) df.to_csv(‘/opt/airflow/data/output/sales_summary.csv’, index=False) print(f”✅ CSVに出力しました”) def generate_report(**context): “””レポート生成””” ti = context[‘ti’] extract_count = ti.xcom_pull(task_ids=’extract_csv_to_postgres’) transform_count = ti.xcom_pull(task_ids=’transform_aggregate’) hook = PostgresHook(postgres_conn_id=’postgres_default’) total = hook.get_records(“SELECT SUM(total_sales) FROM sales_summary;”)[0][0] report = f””” ETLパイプライン実行レポート ========================== 実行日時: {datetime.now()} 抽出レコード数: {extract_count}件 集計カテゴリ数: {transform_count}件 総売上金額: ¥{total:,.0f} “”” print(report) with open(‘/opt/airflow/data/output/etl_report.txt’, ‘w’) as f: f.write(report) with DAG(‘etl_pipeline’, default_args=default_args, schedule_interval=’@daily’, catchup=False, tags=[‘etl’]) as dag: extract = PythonOperator(task_id=’extract_csv_to_postgres’, python_callable=extract_csv_to_postgres) transform = PythonOperator(task_id=’transform_aggregate’, python_callable=transform_aggregate) export = PythonOperator(task_id=’export_to_csv’, python_callable=export_to_csv) report = PythonOperator(task_id=’generate_report’, python_callable=generate_report) extract >> transform >> export >> report EOF

🚀 4. 環境の起動と実行

4-1. 権限設定と起動

chmod -R 777 logs data docker-compose up airflow-init docker-compose up -d docker-compose ps

4-2. Airflow接続設定

🔌 PostgreSQL接続の設定(Airflow Web UI)
  1. http://localhost:8080 にアクセス(admin/admin)
  2. Admin → Connections → 「+」ボタン
  3. 設定:
    • Connection Id: postgres_default
    • Connection Type: Postgres
    • Host: postgres
    • Schema: airflow
    • Login: airflow
    • Password: airflow
    • Port: 5432
  4. 「Test」→「Save」

4-3. DAGの実行

📊 DAG実行手順
  1. DAGs画面でetl_pipelineを探す
  2. トグルをONにして有効化
  3. 「▶」→「Trigger DAG」で実行
  4. 「Graph」タブで進行状況を確認

4-4. 結果の確認

docker-compose exec postgres psql -U airflow -d airflow -c “SELECT * FROM sales_summary ORDER BY total_sales DESC;”
cat data/output/sales_summary.csv cat data/output/etl_report.txt

📈 5. モニタリングとトラブルシューティング

🌐 Airflow Web UI

http://localhost:8080
DAG管理、実行監視、ログ確認

⚡ Spark Master UI

http://localhost:8081
クラスター状態、リソース監視

5-1. ログ確認

docker-compose logs -f airflow-scheduler docker-compose logs postgres | grep -i error

5-2. よくある問題

問題: DAGが表示されない
docker-compose exec airflow-scheduler python /opt/airflow/dags/etl_pipeline.py docker-compose restart airflow-scheduler

5-3. クリーンアップ

docker-compose down -v rm -rf logs/* data/output/*

💪 6. 練習問題

練習問題 1 基礎

ETLの3つのステップを説明してください。

  1. Extract(抽出):データソースからデータを取得
  2. Transform(変換):データの整形、クレンジング、集計
  3. Load(読み込み):変換後のデータを目的地に保存
練習問題 2 基礎

YAMLアンカー(&と*)を使う利点は何ですか?

  • コードの重複削減:共通設定を1か所にまとめる
  • 保守性の向上:変更が1か所で済む
  • 可読性の向上:各サービスの固有設定が明確になる
練習問題 3 応用

XComを使ってタスク間でデータを渡す方法を説明してください。

# 渡す側:returnで自動push def task_a(): return 100 # 受け取る側:xcom_pullで取得 def task_b(**context): ti = context[‘ti’] value = ti.xcom_pull(task_ids=’task_a’) print(f”受け取った値: {value}”)
練習問題 4 応用

カテゴリ別の売上合計を計算するSQLを書いてください。

SELECT category, SUM(quantity * price) as total_sales, COUNT(*) as count FROM sales_raw GROUP BY category ORDER BY total_sales DESC;
練習問題 5 発展

日次で前日のデータのみを処理する増分ETLを設計してください。

def process_daily(**context): execution_date = context[‘ds’] # 実行日 hook = PostgresHook(postgres_conn_id=’postgres_default’) df = hook.get_pandas_df(f””” SELECT * FROM sales_raw WHERE date = ‘{execution_date}’::date – interval ‘1 day’ “””) # 前日分のみを処理…

📝 STEP 22 のまとめ

✅ このステップで学んだこと
  • 統合環境:PostgreSQL + Airflow + Sparkの連携
  • ETLパイプライン:Extract → Transform → Load の実装
  • YAMLアンカー:共通設定の再利用
  • XCom:タスク間のデータ共有
  • モニタリング:UIとログでの監視
🎯 次のステップの予告

次のSTEP 23では、「CI/CD統合(GitHub Actions)」を学びます。

  • GitHub ActionsでDockerイメージを自動ビルド
  • Docker Hubへの自動プッシュ
  • セキュリティスキャン(Trivy)の導入
📝

学習メモ

Docker・コンテナ技術入門 - Step 22

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE