🔄 STEP 22: データパイプラインの統合実践
PostgreSQL + Airflow + Sparkで完全なETL環境を構築
📋 このステップで学ぶこと
- PostgreSQL + Airflow + Sparkの統合環境構築
- エンドツーエンドのETLパイプライン実装
- コンテナ間データフロー設計
- パフォーマンスモニタリング
- 実践演習:CSVデータ取り込み→集計→出力
🔧 0. このステップの前提知識
📚 これまでの学習の復習
- Docker Compose:複数コンテナの管理(STEP 15-16)
- ネットワーキング:サービス間通信(STEP 17-18)
- Airflow:DAGの作成と実行(STEP 19)
- Spark:クラスター構成とPySpark(STEP 20)
- 環境変数:.envファイルの活用(STEP 21)
0-1. 作業ディレクトリの準備
mkdir -p ~/docker-practice/step22
cd ~/docker-practice/step22
pwd
0-2. 統合環境のアーキテクチャ
【統合ETL環境アーキテクチャ】
┌─────────────────────────────────────────────────────────────────────┐
│ ユーザー │
│ ┌───────────────┴───────────────┐ │
│ ↓ ↓ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Airflow Web UI │ │ Spark Master │ │
│ │ :8080 │ │ UI :8081 │ │
│ └────────┬────────┘ └─────────────────┘ │
│ │ │
├─────────────┼────────────────────────────────────────────────────────┤
│ │ etl-network │
│ ↓ │
│ ┌─────────────────┐ │
│ │ Airflow │ タスク実行 ┌─────────────────┐ │
│ │ ┌───────────┐ │───────────────→│ Spark Cluster │ │
│ │ │ Scheduler │ │ │ Master + Worker │ │
│ │ └───────────┘ │ └─────────────────┘ │
│ └────────┬────────┘ │ │
│ │ メタデータ/データ │ JDBC │
│ ↓ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ PostgreSQL :5432 │ │
│ │ メタデータDB / 生データ / 集計結果 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ↓ │
│ ┌─────────────────┐ │
│ │ 共有ボリューム │ │
│ │ ./data (CSV/Parquet) │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
0-3. ETLパイプラインの流れ
【ETLパイプライン データフロー】
① EXTRACT(抽出)
CSVファイル → Airflow extract_task → PostgreSQL sales_raw
② TRANSFORM(変換)
PostgreSQL sales_raw → Airflow transform_task → PostgreSQL sales_summary
(GROUP BY、SUM、AVG などの集計処理)
③ LOAD(出力)
PostgreSQL sales_summary → Airflow export_task → CSV/レポート
0-4. 使用するコンテナ一覧
| コンテナ | イメージ | ポート | 役割 |
|---|---|---|---|
| postgres | postgres:13 | 5432 | データベース |
| airflow-webserver | apache/airflow:2.7.0 | 8080 | Web UI |
| airflow-scheduler | apache/airflow:2.7.0 | – | スケジューリング |
| spark-master | bitnami/spark:3.4 | 7077, 8081 | Sparkマスター |
| spark-worker | bitnami/spark:3.4 | – | Sparkワーカー |
🏗️ 1. 統合環境の構築
1-1. プロジェクト構造の作成
mkdir -p ~/docker-practice/step22/{dags,logs,plugins,data/output,init-db}
cd ~/docker-practice/step22
1-2. 環境変数ファイルの作成
cat > .env << ‘EOF’
AIRFLOW_UID=50000
POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflow
POSTGRES_DB=airflow
SPARK_MASTER_URL=spark://spark-master:7077
EOF
1-3. docker-compose.ymlの作成
cat > docker-compose.yml << ‘EOF’
version: ‘3.8’
x-airflow-common: &airflow-common
image: apache/airflow:2.7.0-python3.9
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ”
AIRFLOW__CORE__LOAD_EXAMPLES: ‘false’
AIRFLOW__WEBSERVER__SECRET_KEY: ‘secret-key-here’
_PIP_ADDITIONAL_REQUIREMENTS: ‘pandas psycopg2-binary’
volumes:
– ./dags:/opt/airflow/dags
– ./logs:/opt/airflow/logs
– ./plugins:/opt/airflow/plugins
– ./data:/opt/airflow/data
depends_on:
postgres:
condition: service_healthy
networks:
– etl-network
services:
postgres:
image: postgres:13
container_name: etl-postgres
environment:
POSTGRES_USER: ${POSTGRES_USER:-airflow}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-airflow}
POSTGRES_DB: ${POSTGRES_DB:-airflow}
volumes:
– postgres_data:/var/lib/postgresql/data
– ./init-db:/docker-entrypoint-initdb.d
ports:
– “5432:5432”
healthcheck:
test: [“CMD”, “pg_isready”, “-U”, “airflow”]
interval: 5s
retries: 5
networks:
– etl-network
airflow-webserver:
<<: *airflow-common
container_name: airflow-webserver
command: webserver
ports:
– “8080:8080”
airflow-scheduler:
<<: *airflow-common
container_name: airflow-scheduler
command: scheduler
airflow-init:
<<: *airflow-common
container_name: airflow-init
entrypoint: /bin/bash
command:
– -c
– |
airflow db init
airflow users create –username admin –firstname Admin –lastname User –role Admin –email admin@example.com –password admin
restart: “no”
spark-master:
image: bitnami/spark:3.4
container_name: spark-master
environment:
– SPARK_MODE=master
ports:
– “7077:7077”
– “8081:8080”
volumes:
– ./data:/data
networks:
– etl-network
spark-worker:
image: bitnami/spark:3.4
container_name: spark-worker
depends_on:
– spark-master
environment:
– SPARK_MODE=worker
– SPARK_MASTER_URL=spark://spark-master:7077
– SPARK_WORKER_MEMORY=2G
volumes:
– ./data:/data
networks:
– etl-network
networks:
etl-network:
driver: bridge
volumes:
postgres_data:
EOF
📊 2. サンプルデータとDB初期化
2-1. サンプルCSVデータの作成
cat > data/sales.csv << ‘EOF’
date,product,category,quantity,price
2024-01-01,ノートPC,Electronics,2,89000
2024-01-01,マウス,Electronics,5,2500
2024-01-02,モニター,Electronics,1,45000
2024-01-02,ノートPC,Electronics,1,89000
2024-01-03,デスク,Furniture,2,35000
2024-01-03,チェア,Furniture,4,25000
2024-01-04,マウス,Electronics,8,2500
2024-01-04,キーボード,Electronics,5,8000
2024-01-05,USBメモリ,Electronics,15,1500
2024-01-05,デスク,Furniture,1,35000
EOF
2-2. データベース初期化スクリプト
cat > init-db/01-init.sql << ‘EOF’
CREATE TABLE IF NOT EXISTS sales_raw (
id SERIAL PRIMARY KEY,
date DATE NOT NULL,
product VARCHAR(100) NOT NULL,
category VARCHAR(50) NOT NULL,
quantity INT NOT NULL,
price DECIMAL(10, 2) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS sales_summary (
id SERIAL PRIMARY KEY,
category VARCHAR(50),
product VARCHAR(100),
total_quantity BIGINT,
total_sales DECIMAL(15, 2),
avg_price DECIMAL(10, 2),
transaction_count BIGINT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
EOF
📝 3. ETL DAGの実装
3-1. DAGファイルの作成
cat > dags/etl_pipeline.py << ‘EOF’
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime, timedelta
import pandas as pd
default_args = {
‘owner’: ‘airflow’,
‘start_date’: datetime(2024, 1, 1),
‘retries’: 1,
‘retry_delay’: timedelta(minutes=5),
}
def extract_csv_to_postgres(**context):
“””CSVをPostgreSQLに取り込む”””
df = pd.read_csv(‘/opt/airflow/data/sales.csv’)
hook = PostgresHook(postgres_conn_id=’postgres_default’)
hook.run(“TRUNCATE TABLE sales_raw;”)
for _, row in df.iterrows():
hook.run(f”””
INSERT INTO sales_raw (date, product, category, quantity, price)
VALUES (‘{row[‘date’]}’, ‘{row[‘product’]}’, ‘{row[‘category’]}’,
{row[‘quantity’]}, {row[‘price’]});
“””)
print(f”✅ {len(df)}件のデータを挿入しました”)
return len(df)
def transform_aggregate(**context):
“””データを集計”””
hook = PostgresHook(postgres_conn_id=’postgres_default’)
hook.run(“TRUNCATE TABLE sales_summary;”)
hook.run(“””
INSERT INTO sales_summary (category, product, total_quantity, total_sales, avg_price, transaction_count)
SELECT category, product, SUM(quantity), SUM(quantity * price), AVG(price), COUNT(*)
FROM sales_raw GROUP BY category, product ORDER BY SUM(quantity * price) DESC;
“””)
result = hook.get_records(“SELECT COUNT(*) FROM sales_summary;”)
print(f”✅ {result[0][0]}件の集計結果を保存しました”)
return result[0][0]
def export_to_csv(**context):
“””集計結果をCSVに出力”””
hook = PostgresHook(postgres_conn_id=’postgres_default’)
df = hook.get_pandas_df(“SELECT * FROM sales_summary ORDER BY total_sales DESC;”)
df.to_csv(‘/opt/airflow/data/output/sales_summary.csv’, index=False)
print(f”✅ CSVに出力しました”)
def generate_report(**context):
“””レポート生成”””
ti = context[‘ti’]
extract_count = ti.xcom_pull(task_ids=’extract_csv_to_postgres’)
transform_count = ti.xcom_pull(task_ids=’transform_aggregate’)
hook = PostgresHook(postgres_conn_id=’postgres_default’)
total = hook.get_records(“SELECT SUM(total_sales) FROM sales_summary;”)[0][0]
report = f”””
ETLパイプライン実行レポート
==========================
実行日時: {datetime.now()}
抽出レコード数: {extract_count}件
集計カテゴリ数: {transform_count}件
総売上金額: ¥{total:,.0f}
“””
print(report)
with open(‘/opt/airflow/data/output/etl_report.txt’, ‘w’) as f:
f.write(report)
with DAG(‘etl_pipeline’, default_args=default_args, schedule_interval=’@daily’, catchup=False, tags=[‘etl’]) as dag:
extract = PythonOperator(task_id=’extract_csv_to_postgres’, python_callable=extract_csv_to_postgres)
transform = PythonOperator(task_id=’transform_aggregate’, python_callable=transform_aggregate)
export = PythonOperator(task_id=’export_to_csv’, python_callable=export_to_csv)
report = PythonOperator(task_id=’generate_report’, python_callable=generate_report)
extract >> transform >> export >> report
EOF
🚀 4. 環境の起動と実行
4-1. 権限設定と起動
chmod -R 777 logs data
docker-compose up airflow-init
docker-compose up -d
docker-compose ps
4-2. Airflow接続設定
🔌 PostgreSQL接続の設定(Airflow Web UI)
- http://localhost:8080 にアクセス(admin/admin)
- Admin → Connections → 「+」ボタン
- 設定:
- Connection Id:
postgres_default - Connection Type:
Postgres - Host:
postgres - Schema:
airflow - Login:
airflow - Password:
airflow - Port:
5432
- Connection Id:
- 「Test」→「Save」
4-3. DAGの実行
📊 DAG実行手順
- DAGs画面で
etl_pipelineを探す - トグルをONにして有効化
- 「▶」→「Trigger DAG」で実行
- 「Graph」タブで進行状況を確認
4-4. 結果の確認
docker-compose exec postgres psql -U airflow -d airflow -c “SELECT * FROM sales_summary ORDER BY total_sales DESC;”
cat data/output/sales_summary.csv
cat data/output/etl_report.txt
📈 5. モニタリングとトラブルシューティング
🌐 Airflow Web UI
http://localhost:8080
DAG管理、実行監視、ログ確認
⚡ Spark Master UI
http://localhost:8081
クラスター状態、リソース監視
5-1. ログ確認
docker-compose logs -f airflow-scheduler
docker-compose logs postgres | grep -i error
5-2. よくある問題
問題: DAGが表示されない
docker-compose exec airflow-scheduler python /opt/airflow/dags/etl_pipeline.py
docker-compose restart airflow-scheduler
5-3. クリーンアップ
docker-compose down -v
rm -rf logs/* data/output/*
💪 6. 練習問題
練習問題 1
基礎
ETLの3つのステップを説明してください。
- Extract(抽出):データソースからデータを取得
- Transform(変換):データの整形、クレンジング、集計
- Load(読み込み):変換後のデータを目的地に保存
練習問題 2
基礎
YAMLアンカー(&と*)を使う利点は何ですか?
- コードの重複削減:共通設定を1か所にまとめる
- 保守性の向上:変更が1か所で済む
- 可読性の向上:各サービスの固有設定が明確になる
練習問題 3
応用
XComを使ってタスク間でデータを渡す方法を説明してください。
# 渡す側:returnで自動push
def task_a():
return 100
# 受け取る側:xcom_pullで取得
def task_b(**context):
ti = context[‘ti’]
value = ti.xcom_pull(task_ids=’task_a’)
print(f”受け取った値: {value}”)
練習問題 4
応用
カテゴリ別の売上合計を計算するSQLを書いてください。
SELECT category, SUM(quantity * price) as total_sales, COUNT(*) as count
FROM sales_raw
GROUP BY category
ORDER BY total_sales DESC;
練習問題 5
発展
日次で前日のデータのみを処理する増分ETLを設計してください。
def process_daily(**context):
execution_date = context[‘ds’] # 実行日
hook = PostgresHook(postgres_conn_id=’postgres_default’)
df = hook.get_pandas_df(f”””
SELECT * FROM sales_raw
WHERE date = ‘{execution_date}’::date – interval ‘1 day’
“””)
# 前日分のみを処理…
📝 STEP 22 のまとめ
✅ このステップで学んだこと
- 統合環境:PostgreSQL + Airflow + Sparkの連携
- ETLパイプライン:Extract → Transform → Load の実装
- YAMLアンカー:共通設定の再利用
- XCom:タスク間のデータ共有
- モニタリング:UIとログでの監視
🎯 次のステップの予告
次のSTEP 23では、「CI/CD統合(GitHub Actions)」を学びます。
- GitHub ActionsでDockerイメージを自動ビルド
- Docker Hubへの自動プッシュ
- セキュリティスキャン(Trivy)の導入
学習メモ
Docker・コンテナ技術入門 - Step 22
📋 過去のメモ一覧
▼