🏗️ STEP 25: プロジェクト① ローカルデータ基盤構築
完全なデータ分析環境をDocker Composeで構築
📋 このプロジェクトで作るもの
- PostgreSQL(メインデータベース)
- pgAdmin(GUI管理ツール)
- Jupyter Notebook(分析環境)
- Apache Airflow(スケジューリング)
目標:4つのコンテナが連携する実用的なデータ分析環境を構築し、実際のデータ処理フローを体験する
学習時間の目安: 3時間
🔧 0. このプロジェクトの前提知識
📚 これまでの学習の復習
- Docker Compose:マルチコンテナ管理(STEP 15-18)
- PostgreSQL:データベースコンテナ(STEP 4, 17)
- 環境変数:.envファイル管理(STEP 21)
- Airflow:DAG作成(STEP 19, 22)
- ボリューム:データ永続化(STEP 10)
0-1. 作業ディレクトリの準備
mkdir -p ~/docker-practice/step25
cd ~/docker-practice/step25
pwd
0-2. 全体アーキテクチャ
【ローカルデータ基盤 システム構成図】
┌─────────────────────────────────────────────────────────────────────┐
│ ローカルデータ基盤 │
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Jupyter │───── 分析 ────────→│ PostgreSQL │ │
│ │ Notebook │ │ :5432 │ │
│ │ :8888 │ │ │ │
│ └─────────────┘ └─────────────┘ │
│ │ ↑ │
│ │ │ │
│ │ 可視化 ETL実行 │ │
│ ↓ │ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ pgAdmin │───── 管理 ────────→│ Airflow │ │
│ │ :5050 │ │ :8080 │ │
│ └─────────────┘ └─────────────┘ │
│ │
│ 【共有ボリューム】 │
│ ├── postgres_data: PostgreSQLデータ永続化 │
│ ├── ./data: CSVファイル共有 │
│ ├── ./notebooks: Jupyterノートブック │
│ └── ./airflow/dags: AirflowのDAGファイル │
│ │
└─────────────────────────────────────────────────────────────────────┘
0-3. コンテナ構成一覧
| サービス | イメージ | ポート | 役割 |
|---|---|---|---|
| postgres | postgres:15 | 5432 | メインデータベース |
| pgadmin | dpage/pgadmin4 | 5050 | DB管理GUI |
| jupyter | jupyter/scipy-notebook | 8888 | データ分析環境 |
| airflow | apache/airflow:2.7.3 | 8080 | ワークフロー管理 |
0-4. データ処理フロー
【データ処理の流れ】
① データ投入 ② 定期集計 ③ 分析・可視化
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ CSVファイル │───→│ Airflow │───→│ Jupyter │
│ ↓ │ │ DAG実行 │ │ Notebook │
│ PostgreSQL │ │ ↓ │ │ ↓ │
│ 初期データ │ │ 集計テーブル │ │ グラフ出力 │
└──────────────┘ └──────────────┘ └──────────────┘
④ データベース管理
┌──────────────┐
│ pgAdmin │
│ GUI操作 │
└──────────────┘
📂 1. プロジェクト構造の作成
1-1. ディレクトリ構成の作成
# プロジェクトディレクトリを作成
mkdir -p local-data-platform
cd local-data-platform
# 必要なディレクトリを作成
mkdir -p postgres/init
mkdir -p airflow/dags
mkdir -p airflow/logs
mkdir -p airflow/plugins
mkdir -p notebooks
mkdir -p data
# 構造を確認
tree -L 2 . || find . -type d | head -20
📁 最終的なディレクトリ構造
local-data-platform/
├── docker-compose.yml # メイン設定ファイル
├── .env # 環境変数(パスワード等)
├── postgres/
│ └── init/
│ └── init.sql # DB初期化スクリプト
├── airflow/
│ ├── dags/
│ │ └── daily_aggregation.py # 日次集計DAG
│ ├── logs/ # Airflowログ
│ └── plugins/ # カスタムプラグイン
├── notebooks/
│ └── analysis.ipynb # 分析ノートブック
└── data/
└── sample_sales.csv # サンプルデータ
🔐 2. 環境変数ファイル(.env)作成
# .envファイルを作成
cat > .env << ‘EOF’
# PostgreSQL設定
POSTGRES_PASSWORD=dataeng_password_123
# pgAdmin設定
PGADMIN_EMAIL=admin@example.com
PGADMIN_PASSWORD=pgadmin123
# Airflow設定
AIRFLOW_USER=admin
AIRFLOW_PASSWORD=airflow123
AIRFLOW_UID=50000
EOF
# 確認
cat .env
⚠️ セキュリティ注意
- 本番環境では強力なパスワードを使用
- .envファイルは.gitignoreに追加
- パスワードをGitにコミットしない
# .gitignoreに追加
echo “.env” >> .gitignore
echo “airflow/logs/*” >> .gitignore
⚙️ 3. docker-compose.yml作成
# docker-compose.ymlを作成
cat > docker-compose.yml << ‘EOF’
version: ‘3.8’
services:
# ============================================
# PostgreSQL データベース
# ============================================
postgres:
image: postgres:15
container_name: local-postgres
environment:
POSTGRES_DB: datawarehouse
POSTGRES_USER: dataeng
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
ports:
– “5432:5432”
volumes:
– postgres_data:/var/lib/postgresql/data
– ./postgres/init:/docker-entrypoint-initdb.d
– ./data:/data
networks:
– data-network
healthcheck:
test: [“CMD-SHELL”, “pg_isready -U dataeng -d datawarehouse”]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
# ============================================
# pgAdmin 管理ツール
# ============================================
pgadmin:
image: dpage/pgadmin4:latest
container_name: local-pgadmin
environment:
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_EMAIL}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_PASSWORD}
ports:
– “5050:80”
volumes:
– pgadmin_data:/var/lib/pgadmin
networks:
– data-network
depends_on:
postgres:
condition: service_healthy
restart: unless-stopped
# ============================================
# Jupyter Notebook
# ============================================
jupyter:
image: jupyter/scipy-notebook:latest
container_name: local-jupyter
environment:
JUPYTER_ENABLE_LAB: “yes”
ports:
– “8888:8888″
volumes:
– ./notebooks:/home/jovyan/work
– ./data:/home/jovyan/data
networks:
– data-network
depends_on:
postgres:
condition: service_healthy
command: start-notebook.sh –NotebookApp.token=”
restart: unless-stopped
# ============================================
# Apache Airflow(スタンドアロン構成)
# ============================================
airflow:
image: apache/airflow:2.7.3
container_name: local-airflow
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://dataeng:${POSTGRES_PASSWORD}@postgres/datawarehouse
AIRFLOW__CORE__LOAD_EXAMPLES: ‘false’
AIRFLOW__CORE__FERNET_KEY: ”
AIRFLOW__WEBSERVER__SECRET_KEY: ‘my_secret_key_12345’
_AIRFLOW_DB_MIGRATE: ‘true’
_AIRFLOW_WWW_USER_CREATE: ‘true’
_AIRFLOW_WWW_USER_USERNAME: ${AIRFLOW_USER}
_AIRFLOW_WWW_USER_PASSWORD: ${AIRFLOW_PASSWORD}
ports:
– “8080:8080”
volumes:
– ./airflow/dags:/opt/airflow/dags
– ./airflow/logs:/opt/airflow/logs
– ./airflow/plugins:/opt/airflow/plugins
– ./data:/data
networks:
– data-network
depends_on:
postgres:
condition: service_healthy
command: standalone
healthcheck:
test: [“CMD”, “curl”, “-f”, “http://localhost:8080/health”]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
restart: unless-stopped
networks:
data-network:
driver: bridge
volumes:
postgres_data:
pgadmin_data:
EOF
echo “✅ docker-compose.yml 作成完了”
💡 設定のポイント
- ヘルスチェック:PostgreSQLが完全に起動してから他のサービスを開始
- ネットワーク:すべてのコンテナが
data-networkで通信可能 - ボリューム:名前付きボリュームでデータ永続化、バインドマウントでコード同期
- restart:
unless-stoppedで異常終了時に自動再起動
📊 4. PostgreSQL初期化スクリプト
# 初期化SQLを作成
cat > postgres/init/init.sql << ‘EOF’
— ============================================
— テーブル作成
— ============================================
— 売上データテーブル
CREATE TABLE IF NOT EXISTS sales (
id SERIAL PRIMARY KEY,
sale_date DATE NOT NULL,
product_name VARCHAR(100) NOT NULL,
category VARCHAR(50) NOT NULL,
quantity INTEGER NOT NULL,
unit_price DECIMAL(10, 2) NOT NULL,
total_amount DECIMAL(10, 2) NOT NULL,
customer_id VARCHAR(50) NOT NULL,
region VARCHAR(50) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
— 日次集計結果テーブル
CREATE TABLE IF NOT EXISTS daily_summary (
summary_date DATE PRIMARY KEY,
total_sales DECIMAL(12, 2) NOT NULL,
total_orders INTEGER NOT NULL,
avg_order_value DECIMAL(10, 2) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
— ============================================
— インデックス作成
— ============================================
CREATE INDEX IF NOT EXISTS idx_sales_date ON sales(sale_date);
CREATE INDEX IF NOT EXISTS idx_sales_category ON sales(category);
CREATE INDEX IF NOT EXISTS idx_sales_region ON sales(region);
— ============================================
— サンプルデータ投入
— ============================================
INSERT INTO sales (sale_date, product_name, category, quantity, unit_price, total_amount, customer_id, region)
VALUES
— 2024年1月1日
(‘2024-01-01’, ‘ノートパソコン’, ‘電子機器’, 2, 89000, 178000, ‘C001’, ‘東京’),
(‘2024-01-01’, ‘マウス’, ‘周辺機器’, 5, 1500, 7500, ‘C002’, ‘大阪’),
(‘2024-01-01’, ‘USBケーブル’, ‘アクセサリ’, 10, 500, 5000, ‘C003’, ‘東京’),
— 2024年1月2日
(‘2024-01-02’, ‘キーボード’, ‘周辺機器’, 3, 3500, 10500, ‘C003’, ‘東京’),
(‘2024-01-02’, ‘モニター’, ‘電子機器’, 1, 35000, 35000, ‘C001’, ‘東京’),
(‘2024-01-02’, ‘Webカメラ’, ‘周辺機器’, 4, 5000, 20000, ‘C004’, ‘福岡’),
— 2024年1月3日
(‘2024-01-03’, ‘タブレット’, ‘電子機器’, 2, 45000, 90000, ‘C005’, ‘大阪’),
(‘2024-01-03’, ‘イヤホン’, ‘アクセサリ’, 8, 3000, 24000, ‘C002’, ‘大阪’),
(‘2024-01-03’, ‘マウスパッド’, ‘アクセサリ’, 6, 800, 4800, ‘C006’, ‘名古屋’),
— 2024年1月4日
(‘2024-01-04’, ‘スマートフォン’, ‘電子機器’, 3, 75000, 225000, ‘C007’, ‘東京’),
(‘2024-01-04’, ‘ケース’, ‘アクセサリ’, 5, 2000, 10000, ‘C007’, ‘東京’),
(‘2024-01-04’, ‘ヘッドセット’, ‘周辺機器’, 2, 8000, 16000, ‘C008’, ‘札幌’);
— ============================================
— 初期集計実行
— ============================================
INSERT INTO daily_summary (summary_date, total_sales, total_orders, avg_order_value)
SELECT
sale_date,
SUM(total_amount) as total_sales,
COUNT(*) as total_orders,
ROUND(AVG(total_amount), 2) as avg_order_value
FROM sales
GROUP BY sale_date
ON CONFLICT (summary_date) DO UPDATE SET
total_sales = EXCLUDED.total_sales,
total_orders = EXCLUDED.total_orders,
avg_order_value = EXCLUDED.avg_order_value,
updated_at = CURRENT_TIMESTAMP;
— 確認用クエリ
SELECT ‘Sales records inserted: ‘ || COUNT(*)::TEXT FROM sales;
SELECT ‘Summary records created: ‘ || COUNT(*)::TEXT FROM daily_summary;
EOF
echo “✅ init.sql 作成完了”
🔄 5. Airflow DAG作成
# Airflow DAGを作成
cat > airflow/dags/daily_aggregation.py << ‘EOF’
“””
日次売上集計DAG
毎日午前1時に実行し、前日の売上を集計する
“””
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime, timedelta
import logging
# ロガー設定
logger = logging.getLogger(__name__)
# デフォルト引数
default_args = {
‘owner’: ‘dataeng’,
‘depends_on_past’: False,
‘start_date’: datetime(2024, 1, 1),
‘email_on_failure’: False,
‘email_on_retry’: False,
‘retries’: 3,
‘retry_delay’: timedelta(minutes=5),
}
def check_data_quality(**context):
“””データ品質チェック”””
execution_date = context[‘ds’]
logger.info(f”📋 データ品質チェック開始: {execution_date}”)
pg_hook = PostgresHook(postgres_conn_id=’postgres_default’)
# 異常値チェック
sql = f”””
SELECT
COUNT(*) as total_records,
SUM(CASE WHEN total_amount < 0 THEN 1 ELSE 0 END) as negative_amounts,
SUM(CASE WHEN quantity <= 0 THEN 1 ELSE 0 END) as invalid_quantities,
SUM(CASE WHEN unit_price < 0 THEN 1 ELSE 0 END) as negative_prices
FROM sales
WHERE sale_date = '{execution_date}';
"""
result = pg_hook.get_first(sql)
total, negative, invalid, neg_price = result
logger.info(f" - 総レコード数: {total}")
logger.info(f" - 負の金額: {negative}")
logger.info(f" - 無効な数量: {invalid}")
logger.info(f" - 負の単価: {neg_price}")
if negative > 0 or invalid > 0 or neg_price > 0:
raise ValueError(
f”データ品質エラー: 負の金額={negative}, ”
f”無効な数量={invalid}, 負の単価={neg_price}”
)
logger.info(f”✅ データ品質チェック完了: {total}件”)
return total
def aggregate_daily_sales(**context):
“””日次売上集計を実行”””
execution_date = context[‘ds’]
logger.info(f”📊 日次集計開始: {execution_date}”)
pg_hook = PostgresHook(postgres_conn_id=’postgres_default’)
# 集計クエリ
sql = f”””
INSERT INTO daily_summary (summary_date, total_sales, total_orders, avg_order_value, updated_at)
SELECT
sale_date,
SUM(total_amount) as total_sales,
COUNT(*) as total_orders,
ROUND(AVG(total_amount), 2) as avg_order_value,
CURRENT_TIMESTAMP
FROM sales
WHERE sale_date = ‘{execution_date}’
GROUP BY sale_date
ON CONFLICT (summary_date) DO UPDATE SET
total_sales = EXCLUDED.total_sales,
total_orders = EXCLUDED.total_orders,
avg_order_value = EXCLUDED.avg_order_value,
updated_at = CURRENT_TIMESTAMP;
“””
pg_hook.run(sql)
# 結果確認
result = pg_hook.get_first(
f”SELECT total_sales, total_orders FROM daily_summary ”
f”WHERE summary_date = ‘{execution_date}'”
)
if result:
logger.info(f” – 売上合計: ¥{result[0]:,.0f}”)
logger.info(f” – 注文数: {result[1]}件”)
logger.info(f”✅ {execution_date}の集計完了”)
def generate_report(**context):
“””集計レポートを生成”””
execution_date = context[‘ds’]
logger.info(f”📝 レポート生成開始: {execution_date}”)
pg_hook = PostgresHook(postgres_conn_id=’postgres_default’)
# カテゴリ別集計
sql = f”””
SELECT
category,
SUM(total_amount) as category_sales,
COUNT(*) as order_count
FROM sales
WHERE sale_date = ‘{execution_date}’
GROUP BY category
ORDER BY category_sales DESC;
“””
results = pg_hook.get_records(sql)
logger.info(“=== カテゴリ別売上 ===”)
for row in results:
logger.info(f” {row[0]}: ¥{row[1]:,.0f} ({row[2]}件)”)
logger.info(f”✅ レポート生成完了”)
# DAG定義
with DAG(
‘daily_sales_aggregation’,
default_args=default_args,
description=’日次売上集計とデータ品質チェック’,
schedule_interval=’0 1 * * *’, # 毎日午前1時実行
catchup=False,
tags=[‘sales’, ‘aggregation’, ‘daily’],
) as dag:
# タスク定義
quality_check = PythonOperator(
task_id=’check_data_quality’,
python_callable=check_data_quality,
)
aggregation = PythonOperator(
task_id=’aggregate_daily_sales’,
python_callable=aggregate_daily_sales,
)
report = PythonOperator(
task_id=’generate_report’,
python_callable=generate_report,
)
# タスク依存関係
quality_check >> aggregation >> report
EOF
echo “✅ daily_aggregation.py 作成完了”
📝 DAGの説明
- check_data_quality:データの異常値(負の金額、無効な数量)をチェック
- aggregate_daily_sales:日次売上の集計をdaily_summaryテーブルに保存
- generate_report:カテゴリ別売上のレポートを生成
- 依存関係:品質チェック → 集計 → レポート の順で実行
📓 6. Jupyter Notebook作成
# Jupyter Notebook用のPythonファイルを作成(参考用)
cat > notebooks/analysis_script.py << ‘EOF’
“””
売上データ分析スクリプト
Jupyter Notebookで実行する内容の参考用
“””
# ============================================
# セル1: ライブラリインポート
# ============================================
import pandas as pd
from sqlalchemy import create_engine
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings(‘ignore’)
# 日本語フォント設定(環境に応じて変更)
plt.rcParams[‘font.family’] = ‘DejaVu Sans’
print(“✅ ライブラリインポート完了”)
# ============================================
# セル2: データベース接続
# ============================================
# コンテナ名 ‘postgres’ で接続(Docker Compose内)
engine = create_engine(
‘postgresql://dataeng:dataeng_password_123@postgres:5432/datawarehouse’
)
print(“✅ データベース接続完了”)
# ============================================
# セル3: 売上データ取得
# ============================================
query = “SELECT * FROM sales ORDER BY sale_date, id”
df_sales = pd.read_sql(query, engine)
print(f”📊 データ件数: {len(df_sales)}件”)
print(df_sales.head(10))
# ============================================
# セル4: 基本統計
# ============================================
print(“\n=== 売上統計 ===”)
print(f”合計売上: ¥{df_sales[‘total_amount’].sum():,.0f}”)
print(f”平均単価: ¥{df_sales[‘unit_price’].mean():,.0f}”)
print(f”平均注文金額: ¥{df_sales[‘total_amount’].mean():,.0f}”)
print(f”最高売上: ¥{df_sales[‘total_amount’].max():,.0f}”)
print(f”最低売上: ¥{df_sales[‘total_amount’].min():,.0f}”)
# ============================================
# セル5: カテゴリ別売上
# ============================================
category_sales = df_sales.groupby(‘category’)[‘total_amount’].sum().sort_values(ascending=False)
print(“\n=== カテゴリ別売上 ===”)
for cat, sales in category_sales.items():
print(f” {cat}: ¥{sales:,.0f}”)
# グラフ作成
plt.figure(figsize=(10, 6))
category_sales.plot(kind=’bar’, color=[‘#667eea’, ‘#764ba2’, ‘#f093fb’])
plt.title(‘Category Sales’, fontsize=16)
plt.xlabel(‘Category’)
plt.ylabel(‘Sales (JPY)’)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(‘/home/jovyan/work/category_sales.png’, dpi=100)
plt.show()
print(“✅ グラフを category_sales.png に保存”)
# ============================================
# セル6: 地域別分析
# ============================================
region_stats = df_sales.groupby(‘region’).agg({
‘total_amount’: ‘sum’,
‘quantity’: ‘sum’,
‘id’: ‘count’
}).rename(columns={‘id’: ‘order_count’})
print(“\n=== 地域別統計 ===”)
print(region_stats.sort_values(‘total_amount’, ascending=False))
# ============================================
# セル7: 日次集計結果確認
# ============================================
query_summary = “SELECT * FROM daily_summary ORDER BY summary_date”
df_summary = pd.read_sql(query_summary, engine)
print(“\n=== 日次集計結果 ===”)
print(df_summary)
# 日次売上推移グラフ
plt.figure(figsize=(10, 6))
plt.plot(df_summary[‘summary_date’], df_summary[‘total_sales’],
marker=’o’, linewidth=2, markersize=8)
plt.title(‘Daily Sales Trend’, fontsize=16)
plt.xlabel(‘Date’)
plt.ylabel(‘Sales (JPY)’)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(‘/home/jovyan/work/daily_trend.png’, dpi=100)
plt.show()
print(“✅ グラフを daily_trend.png に保存”)
print(“\n🎉 分析完了!”)
EOF
echo “✅ analysis_script.py 作成完了”
echo “※ Jupyter Notebookで新規ノートブックを作成し、上記の内容をセルごとにコピーして実行してください”
🚀 7. システム起動手順
7-1. コンテナ起動
# すべてのコンテナを起動
docker-compose up -d
# 起動確認(全サービスがUpになるまで待機)
docker-compose ps
# ログ確認(Ctrl+Cで終了)
docker-compose logs -f
CONTAINER ID NAME STATUS PORTS
xxx local-postgres Up (healthy) 0.0.0.0:5432->5432/tcp
xxx local-pgadmin Up 0.0.0.0:5050->80/tcp
xxx local-jupyter Up 0.0.0.0:8888->8888/tcp
xxx local-airflow Up (healthy) 0.0.0.0:8080->8080/tcp
7-2. Airflow接続設定
# Airflowコンテナに入る
docker exec -it local-airflow bash
# PostgreSQL接続を追加
airflow connections add postgres_default \
–conn-type postgres \
–conn-host postgres \
–conn-login dataeng \
–conn-password dataeng_password_123 \
–conn-schema datawarehouse \
–conn-port 5432
# 接続確認
airflow connections list
# コンテナから出る
exit
7-3. 各サービスへのアクセス
| サービス | URL | ログイン情報 |
|---|---|---|
| pgAdmin | http://localhost:5050 | admin@example.com / pgadmin123 |
| Jupyter | http://localhost:8888 | トークンなし(直接アクセス可) |
| Airflow | http://localhost:8080 | admin / airflow123 |
7-4. pgAdminでデータベース登録
📋 pgAdmin設定手順
- ブラウザで
http://localhost:5050にアクセス - ログイン(admin@example.com / pgadmin123)
- 左メニューで「Servers」を右クリック →「Register」→「Server」
- 「General」タブ: Name =
Local PostgreSQL - 「Connection」タブ:
- Host name/address =
postgres - Port =
5432 - Username =
dataeng - Password =
dataeng_password_123
- Host name/address =
- 「Save」クリック
✅ 8. 動作確認
8-1. PostgreSQLデータ確認
# PostgreSQLに接続
docker exec -it local-postgres psql -U dataeng -d datawarehouse
# テーブル一覧
\dt
# 売上データ確認
SELECT * FROM sales ORDER BY sale_date LIMIT 10;
# 日次集計確認
SELECT * FROM daily_summary;
# カテゴリ別集計
SELECT category, SUM(total_amount) as total
FROM sales
GROUP BY category
ORDER BY total DESC;
# 終了
\q
8-2. AirflowでDAG実行
🔄 DAG実行手順
- ブラウザで
http://localhost:8080にアクセス - ログイン(admin / airflow123)
- DAG一覧から「daily_sales_aggregation」を探す
- 左側のトグルスイッチをONにして有効化
- 右側の「▶」ボタンで「Trigger DAG」をクリック
- すべてのタスクが緑色になれば成功
8-3. Jupyter Notebookで分析
📓 分析手順
- ブラウザで
http://localhost:8888にアクセス - 「work」フォルダに移動
- 「New」→「Python 3」で新規ノートブック作成
analysis_script.pyの内容を参考にセルを実行- グラフと統計情報が表示されることを確認
🔧 9. トラブルシューティング
問題1: Airflowが起動しない
# ログを確認
docker-compose logs airflow
# データベース初期化を再実行
docker-compose exec airflow airflow db init
# 再起動
docker-compose restart airflow
問題2: PostgreSQL接続エラー
# PostgreSQLが起動しているか確認
docker-compose ps postgres
# ヘルスチェック状態確認
docker inspect local-postgres –format='{{.State.Health.Status}}’
# 手動で接続テスト
docker exec -it local-postgres pg_isready -U dataeng
問題3: DAGが表示されない
# DAGファイルの構文チェック
docker exec -it local-airflow python /opt/airflow/dags/daily_aggregation.py
# DAGをリロード
docker exec -it local-airflow airflow dags reserialize
# Airflowを再起動
docker-compose restart airflow
9-1. クリーンアップ
# コンテナを停止
docker-compose down
# ボリュームも含めて削除(データも消える)
docker-compose down -v
# すべてのリソースを削除
docker-compose down -v –rmi all
💪 10. 練習問題
練習問題 1
基礎
このプロジェクトで使用している4つのコンテナとその役割を答えてください。
- PostgreSQL:メインデータベース(売上データ、集計結果を保存)
- pgAdmin:データベース管理GUI(SQLの実行、テーブル確認)
- Jupyter Notebook:データ分析環境(Python、pandas、matplotlib)
- Airflow:ワークフロー管理(DAGによる定期集計の自動化)
練習問題 2
基礎
depends_onとhealthcheckを組み合わせる利点は何ですか?
PostgreSQLが完全に起動してから他のサービス(pgAdmin、Jupyter、Airflow)を開始できます。
単純なdepends_onだけでは、コンテナが起動しただけで(サービスが利用可能になる前に)次のコンテナが起動してしまいます。condition: service_healthyを使うことで、ヘルスチェックが成功するまで待機します。
練習問題 3
応用
新しい商品カテゴリ「ソフトウェア」の売上データを追加するSQLを書いてください。
INSERT INTO sales
(sale_date, product_name, category, quantity, unit_price, total_amount, customer_id, region)
VALUES
(‘2024-01-05’, ‘Office 365’, ‘ソフトウェア’, 5, 12000, 60000, ‘C009’, ‘東京’),
(‘2024-01-05’, ‘Adobe CC’, ‘ソフトウェア’, 2, 25000, 50000, ‘C010’, ‘大阪’);
練習問題 4
応用
DAGに「地域別売上集計」タスクを追加する場合、どのような実装になりますか?
def aggregate_by_region(**context):
“””地域別売上集計”””
execution_date = context[‘ds’]
pg_hook = PostgresHook(postgres_conn_id=’postgres_default’)
sql = f”””
SELECT
region,
SUM(total_amount) as region_sales,
COUNT(*) as order_count
FROM sales
WHERE sale_date = ‘{execution_date}’
GROUP BY region
ORDER BY region_sales DESC;
“””
results = pg_hook.get_records(sql)
for row in results:
logger.info(f”{row[0]}: ¥{row[1]:,.0f} ({row[2]}件)”)
# タスク追加
region_aggregation = PythonOperator(
task_id=’aggregate_by_region’,
python_callable=aggregate_by_region,
)
# 依存関係に追加
quality_check >> aggregation >> [report, region_aggregation]
練習問題 5
発展
このシステムにRedis(キャッシュ)を追加する場合、docker-compose.ymlにどのような設定を追加しますか?
services:
# … 既存のサービス …
redis:
image: redis:7-alpine
container_name: local-redis
ports:
– “6379:6379”
volumes:
– redis_data:/data
networks:
– data-network
healthcheck:
test: [“CMD”, “redis-cli”, “ping”]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
volumes:
postgres_data:
pgadmin_data:
redis_data: # 追加
📝 STEP 25 のまとめ
✅ このプロジェクトで習得したこと
- マルチコンテナ環境:4つのコンテナが連携するシステム構築
- PostgreSQL初期化:init.sqlによるテーブル作成とデータ投入
- Airflow DAG:データ品質チェック→集計→レポートのワークフロー
- Jupyter分析:SQLAlchemyでのDB接続とmatplotlibでの可視化
- ヘルスチェック:サービス間の依存関係管理
- 環境変数管理:.envファイルでパスワード管理
🎯 次のステップの予告
STEP 26では、このローカル環境をさらに発展させて「ETLパイプラインの完全なコンテナ化」に挑戦します!
- 既存のPythonスクリプトのDocker化
- エラーハンドリングとリトライロジック
- データ品質チェックの組み込み
- 本番環境を意識した設計
学習メモ
Docker・コンテナ技術入門 - Step 25
📋 過去のメモ一覧
▼