STEP 25:プロジェクト① ローカルデータ基盤構築

🏗️ STEP 25: プロジェクト① ローカルデータ基盤構築

完全なデータ分析環境をDocker Composeで構築

📋 このプロジェクトで作るもの

  • PostgreSQL(メインデータベース)
  • pgAdmin(GUI管理ツール)
  • Jupyter Notebook(分析環境)
  • Apache Airflow(スケジューリング)

目標:4つのコンテナが連携する実用的なデータ分析環境を構築し、実際のデータ処理フローを体験する

学習時間の目安: 3時間

🔧 0. このプロジェクトの前提知識

📚 これまでの学習の復習
  • Docker Compose:マルチコンテナ管理(STEP 15-18)
  • PostgreSQL:データベースコンテナ(STEP 4, 17)
  • 環境変数:.envファイル管理(STEP 21)
  • Airflow:DAG作成(STEP 19, 22)
  • ボリューム:データ永続化(STEP 10)

0-1. 作業ディレクトリの準備

mkdir -p ~/docker-practice/step25 cd ~/docker-practice/step25 pwd

0-2. 全体アーキテクチャ

【ローカルデータ基盤 システム構成図】 ┌─────────────────────────────────────────────────────────────────────┐ │ ローカルデータ基盤 │ │ │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ Jupyter │───── 分析 ────────→│ PostgreSQL │ │ │ │ Notebook │ │ :5432 │ │ │ │ :8888 │ │ │ │ │ └─────────────┘ └─────────────┘ │ │ │ ↑ │ │ │ │ │ │ │ 可視化 ETL実行 │ │ │ ↓ │ │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ pgAdmin │───── 管理 ────────→│ Airflow │ │ │ │ :5050 │ │ :8080 │ │ │ └─────────────┘ └─────────────┘ │ │ │ │ 【共有ボリューム】 │ │ ├── postgres_data: PostgreSQLデータ永続化 │ │ ├── ./data: CSVファイル共有 │ │ ├── ./notebooks: Jupyterノートブック │ │ └── ./airflow/dags: AirflowのDAGファイル │ │ │ └─────────────────────────────────────────────────────────────────────┘

0-3. コンテナ構成一覧

サービスイメージポート役割
postgrespostgres:155432メインデータベース
pgadmindpage/pgadmin45050DB管理GUI
jupyterjupyter/scipy-notebook8888データ分析環境
airflowapache/airflow:2.7.38080ワークフロー管理

0-4. データ処理フロー

【データ処理の流れ】 ① データ投入 ② 定期集計 ③ 分析・可視化 ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ CSVファイル │───→│ Airflow │───→│ Jupyter │ │ ↓ │ │ DAG実行 │ │ Notebook │ │ PostgreSQL │ │ ↓ │ │ ↓ │ │ 初期データ │ │ 集計テーブル │ │ グラフ出力 │ └──────────────┘ └──────────────┘ └──────────────┘ ④ データベース管理 ┌──────────────┐ │ pgAdmin │ │ GUI操作 │ └──────────────┘

📂 1. プロジェクト構造の作成

1-1. ディレクトリ構成の作成

# プロジェクトディレクトリを作成 mkdir -p local-data-platform cd local-data-platform # 必要なディレクトリを作成 mkdir -p postgres/init mkdir -p airflow/dags mkdir -p airflow/logs mkdir -p airflow/plugins mkdir -p notebooks mkdir -p data # 構造を確認 tree -L 2 . || find . -type d | head -20
📁 最終的なディレクトリ構造
local-data-platform/ ├── docker-compose.yml # メイン設定ファイル ├── .env # 環境変数(パスワード等) ├── postgres/ │ └── init/ │ └── init.sql # DB初期化スクリプト ├── airflow/ │ ├── dags/ │ │ └── daily_aggregation.py # 日次集計DAG │ ├── logs/ # Airflowログ │ └── plugins/ # カスタムプラグイン ├── notebooks/ │ └── analysis.ipynb # 分析ノートブック └── data/ └── sample_sales.csv # サンプルデータ

🔐 2. 環境変数ファイル(.env)作成

# .envファイルを作成 cat > .env << ‘EOF’ # PostgreSQL設定 POSTGRES_PASSWORD=dataeng_password_123 # pgAdmin設定 PGADMIN_EMAIL=admin@example.com PGADMIN_PASSWORD=pgadmin123 # Airflow設定 AIRFLOW_USER=admin AIRFLOW_PASSWORD=airflow123 AIRFLOW_UID=50000 EOF # 確認 cat .env
⚠️ セキュリティ注意
  • 本番環境では強力なパスワードを使用
  • .envファイルは.gitignoreに追加
  • パスワードをGitにコミットしない
# .gitignoreに追加 echo “.env” >> .gitignore echo “airflow/logs/*” >> .gitignore

⚙️ 3. docker-compose.yml作成

# docker-compose.ymlを作成 cat > docker-compose.yml << ‘EOF’ version: ‘3.8’ services: # ============================================ # PostgreSQL データベース # ============================================ postgres: image: postgres:15 container_name: local-postgres environment: POSTGRES_DB: datawarehouse POSTGRES_USER: dataeng POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} ports: – “5432:5432” volumes: – postgres_data:/var/lib/postgresql/data – ./postgres/init:/docker-entrypoint-initdb.d – ./data:/data networks: – data-network healthcheck: test: [“CMD-SHELL”, “pg_isready -U dataeng -d datawarehouse”] interval: 10s timeout: 5s retries: 5 restart: unless-stopped # ============================================ # pgAdmin 管理ツール # ============================================ pgadmin: image: dpage/pgadmin4:latest container_name: local-pgadmin environment: PGADMIN_DEFAULT_EMAIL: ${PGADMIN_EMAIL} PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_PASSWORD} ports: – “5050:80” volumes: – pgadmin_data:/var/lib/pgadmin networks: – data-network depends_on: postgres: condition: service_healthy restart: unless-stopped # ============================================ # Jupyter Notebook # ============================================ jupyter: image: jupyter/scipy-notebook:latest container_name: local-jupyter environment: JUPYTER_ENABLE_LAB: “yes” ports: – “8888:8888″ volumes: – ./notebooks:/home/jovyan/work – ./data:/home/jovyan/data networks: – data-network depends_on: postgres: condition: service_healthy command: start-notebook.sh –NotebookApp.token=” restart: unless-stopped # ============================================ # Apache Airflow(スタンドアロン構成) # ============================================ airflow: image: apache/airflow:2.7.3 container_name: local-airflow environment: AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://dataeng:${POSTGRES_PASSWORD}@postgres/datawarehouse AIRFLOW__CORE__LOAD_EXAMPLES: ‘false’ AIRFLOW__CORE__FERNET_KEY: ” AIRFLOW__WEBSERVER__SECRET_KEY: ‘my_secret_key_12345’ _AIRFLOW_DB_MIGRATE: ‘true’ _AIRFLOW_WWW_USER_CREATE: ‘true’ _AIRFLOW_WWW_USER_USERNAME: ${AIRFLOW_USER} _AIRFLOW_WWW_USER_PASSWORD: ${AIRFLOW_PASSWORD} ports: – “8080:8080” volumes: – ./airflow/dags:/opt/airflow/dags – ./airflow/logs:/opt/airflow/logs – ./airflow/plugins:/opt/airflow/plugins – ./data:/data networks: – data-network depends_on: postgres: condition: service_healthy command: standalone healthcheck: test: [“CMD”, “curl”, “-f”, “http://localhost:8080/health”] interval: 30s timeout: 10s retries: 5 start_period: 60s restart: unless-stopped networks: data-network: driver: bridge volumes: postgres_data: pgadmin_data: EOF echo “✅ docker-compose.yml 作成完了”
💡 設定のポイント
  • ヘルスチェック:PostgreSQLが完全に起動してから他のサービスを開始
  • ネットワーク:すべてのコンテナがdata-networkで通信可能
  • ボリューム:名前付きボリュームでデータ永続化、バインドマウントでコード同期
  • restart:unless-stoppedで異常終了時に自動再起動

📊 4. PostgreSQL初期化スクリプト

# 初期化SQLを作成 cat > postgres/init/init.sql << ‘EOF’ — ============================================ — テーブル作成 — ============================================ — 売上データテーブル CREATE TABLE IF NOT EXISTS sales ( id SERIAL PRIMARY KEY, sale_date DATE NOT NULL, product_name VARCHAR(100) NOT NULL, category VARCHAR(50) NOT NULL, quantity INTEGER NOT NULL, unit_price DECIMAL(10, 2) NOT NULL, total_amount DECIMAL(10, 2) NOT NULL, customer_id VARCHAR(50) NOT NULL, region VARCHAR(50) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); — 日次集計結果テーブル CREATE TABLE IF NOT EXISTS daily_summary ( summary_date DATE PRIMARY KEY, total_sales DECIMAL(12, 2) NOT NULL, total_orders INTEGER NOT NULL, avg_order_value DECIMAL(10, 2) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); — ============================================ — インデックス作成 — ============================================ CREATE INDEX IF NOT EXISTS idx_sales_date ON sales(sale_date); CREATE INDEX IF NOT EXISTS idx_sales_category ON sales(category); CREATE INDEX IF NOT EXISTS idx_sales_region ON sales(region); — ============================================ — サンプルデータ投入 — ============================================ INSERT INTO sales (sale_date, product_name, category, quantity, unit_price, total_amount, customer_id, region) VALUES — 2024年1月1日 (‘2024-01-01’, ‘ノートパソコン’, ‘電子機器’, 2, 89000, 178000, ‘C001’, ‘東京’), (‘2024-01-01’, ‘マウス’, ‘周辺機器’, 5, 1500, 7500, ‘C002’, ‘大阪’), (‘2024-01-01’, ‘USBケーブル’, ‘アクセサリ’, 10, 500, 5000, ‘C003’, ‘東京’), — 2024年1月2日 (‘2024-01-02’, ‘キーボード’, ‘周辺機器’, 3, 3500, 10500, ‘C003’, ‘東京’), (‘2024-01-02’, ‘モニター’, ‘電子機器’, 1, 35000, 35000, ‘C001’, ‘東京’), (‘2024-01-02’, ‘Webカメラ’, ‘周辺機器’, 4, 5000, 20000, ‘C004’, ‘福岡’), — 2024年1月3日 (‘2024-01-03’, ‘タブレット’, ‘電子機器’, 2, 45000, 90000, ‘C005’, ‘大阪’), (‘2024-01-03’, ‘イヤホン’, ‘アクセサリ’, 8, 3000, 24000, ‘C002’, ‘大阪’), (‘2024-01-03’, ‘マウスパッド’, ‘アクセサリ’, 6, 800, 4800, ‘C006’, ‘名古屋’), — 2024年1月4日 (‘2024-01-04’, ‘スマートフォン’, ‘電子機器’, 3, 75000, 225000, ‘C007’, ‘東京’), (‘2024-01-04’, ‘ケース’, ‘アクセサリ’, 5, 2000, 10000, ‘C007’, ‘東京’), (‘2024-01-04’, ‘ヘッドセット’, ‘周辺機器’, 2, 8000, 16000, ‘C008’, ‘札幌’); — ============================================ — 初期集計実行 — ============================================ INSERT INTO daily_summary (summary_date, total_sales, total_orders, avg_order_value) SELECT sale_date, SUM(total_amount) as total_sales, COUNT(*) as total_orders, ROUND(AVG(total_amount), 2) as avg_order_value FROM sales GROUP BY sale_date ON CONFLICT (summary_date) DO UPDATE SET total_sales = EXCLUDED.total_sales, total_orders = EXCLUDED.total_orders, avg_order_value = EXCLUDED.avg_order_value, updated_at = CURRENT_TIMESTAMP; — 確認用クエリ SELECT ‘Sales records inserted: ‘ || COUNT(*)::TEXT FROM sales; SELECT ‘Summary records created: ‘ || COUNT(*)::TEXT FROM daily_summary; EOF echo “✅ init.sql 作成完了”

🔄 5. Airflow DAG作成

# Airflow DAGを作成 cat > airflow/dags/daily_aggregation.py << ‘EOF’ “”” 日次売上集計DAG 毎日午前1時に実行し、前日の売上を集計する “”” from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime, timedelta import logging # ロガー設定 logger = logging.getLogger(__name__) # デフォルト引数 default_args = { ‘owner’: ‘dataeng’, ‘depends_on_past’: False, ‘start_date’: datetime(2024, 1, 1), ‘email_on_failure’: False, ‘email_on_retry’: False, ‘retries’: 3, ‘retry_delay’: timedelta(minutes=5), } def check_data_quality(**context): “””データ品質チェック””” execution_date = context[‘ds’] logger.info(f”📋 データ品質チェック開始: {execution_date}”) pg_hook = PostgresHook(postgres_conn_id=’postgres_default’) # 異常値チェック sql = f””” SELECT COUNT(*) as total_records, SUM(CASE WHEN total_amount < 0 THEN 1 ELSE 0 END) as negative_amounts, SUM(CASE WHEN quantity <= 0 THEN 1 ELSE 0 END) as invalid_quantities, SUM(CASE WHEN unit_price < 0 THEN 1 ELSE 0 END) as negative_prices FROM sales WHERE sale_date = '{execution_date}'; """ result = pg_hook.get_first(sql) total, negative, invalid, neg_price = result logger.info(f" - 総レコード数: {total}") logger.info(f" - 負の金額: {negative}") logger.info(f" - 無効な数量: {invalid}") logger.info(f" - 負の単価: {neg_price}") if negative > 0 or invalid > 0 or neg_price > 0: raise ValueError( f”データ品質エラー: 負の金額={negative}, ” f”無効な数量={invalid}, 負の単価={neg_price}” ) logger.info(f”✅ データ品質チェック完了: {total}件”) return total def aggregate_daily_sales(**context): “””日次売上集計を実行””” execution_date = context[‘ds’] logger.info(f”📊 日次集計開始: {execution_date}”) pg_hook = PostgresHook(postgres_conn_id=’postgres_default’) # 集計クエリ sql = f””” INSERT INTO daily_summary (summary_date, total_sales, total_orders, avg_order_value, updated_at) SELECT sale_date, SUM(total_amount) as total_sales, COUNT(*) as total_orders, ROUND(AVG(total_amount), 2) as avg_order_value, CURRENT_TIMESTAMP FROM sales WHERE sale_date = ‘{execution_date}’ GROUP BY sale_date ON CONFLICT (summary_date) DO UPDATE SET total_sales = EXCLUDED.total_sales, total_orders = EXCLUDED.total_orders, avg_order_value = EXCLUDED.avg_order_value, updated_at = CURRENT_TIMESTAMP; “”” pg_hook.run(sql) # 結果確認 result = pg_hook.get_first( f”SELECT total_sales, total_orders FROM daily_summary ” f”WHERE summary_date = ‘{execution_date}'” ) if result: logger.info(f” – 売上合計: ¥{result[0]:,.0f}”) logger.info(f” – 注文数: {result[1]}件”) logger.info(f”✅ {execution_date}の集計完了”) def generate_report(**context): “””集計レポートを生成””” execution_date = context[‘ds’] logger.info(f”📝 レポート生成開始: {execution_date}”) pg_hook = PostgresHook(postgres_conn_id=’postgres_default’) # カテゴリ別集計 sql = f””” SELECT category, SUM(total_amount) as category_sales, COUNT(*) as order_count FROM sales WHERE sale_date = ‘{execution_date}’ GROUP BY category ORDER BY category_sales DESC; “”” results = pg_hook.get_records(sql) logger.info(“=== カテゴリ別売上 ===”) for row in results: logger.info(f” {row[0]}: ¥{row[1]:,.0f} ({row[2]}件)”) logger.info(f”✅ レポート生成完了”) # DAG定義 with DAG( ‘daily_sales_aggregation’, default_args=default_args, description=’日次売上集計とデータ品質チェック’, schedule_interval=’0 1 * * *’, # 毎日午前1時実行 catchup=False, tags=[‘sales’, ‘aggregation’, ‘daily’], ) as dag: # タスク定義 quality_check = PythonOperator( task_id=’check_data_quality’, python_callable=check_data_quality, ) aggregation = PythonOperator( task_id=’aggregate_daily_sales’, python_callable=aggregate_daily_sales, ) report = PythonOperator( task_id=’generate_report’, python_callable=generate_report, ) # タスク依存関係 quality_check >> aggregation >> report EOF echo “✅ daily_aggregation.py 作成完了”
📝 DAGの説明
  • check_data_quality:データの異常値(負の金額、無効な数量)をチェック
  • aggregate_daily_sales:日次売上の集計をdaily_summaryテーブルに保存
  • generate_report:カテゴリ別売上のレポートを生成
  • 依存関係:品質チェック → 集計 → レポート の順で実行

📓 6. Jupyter Notebook作成

# Jupyter Notebook用のPythonファイルを作成(参考用) cat > notebooks/analysis_script.py << ‘EOF’ “”” 売上データ分析スクリプト Jupyter Notebookで実行する内容の参考用 “”” # ============================================ # セル1: ライブラリインポート # ============================================ import pandas as pd from sqlalchemy import create_engine import matplotlib.pyplot as plt import warnings warnings.filterwarnings(‘ignore’) # 日本語フォント設定(環境に応じて変更) plt.rcParams[‘font.family’] = ‘DejaVu Sans’ print(“✅ ライブラリインポート完了”) # ============================================ # セル2: データベース接続 # ============================================ # コンテナ名 ‘postgres’ で接続(Docker Compose内) engine = create_engine( ‘postgresql://dataeng:dataeng_password_123@postgres:5432/datawarehouse’ ) print(“✅ データベース接続完了”) # ============================================ # セル3: 売上データ取得 # ============================================ query = “SELECT * FROM sales ORDER BY sale_date, id” df_sales = pd.read_sql(query, engine) print(f”📊 データ件数: {len(df_sales)}件”) print(df_sales.head(10)) # ============================================ # セル4: 基本統計 # ============================================ print(“\n=== 売上統計 ===”) print(f”合計売上: ¥{df_sales[‘total_amount’].sum():,.0f}”) print(f”平均単価: ¥{df_sales[‘unit_price’].mean():,.0f}”) print(f”平均注文金額: ¥{df_sales[‘total_amount’].mean():,.0f}”) print(f”最高売上: ¥{df_sales[‘total_amount’].max():,.0f}”) print(f”最低売上: ¥{df_sales[‘total_amount’].min():,.0f}”) # ============================================ # セル5: カテゴリ別売上 # ============================================ category_sales = df_sales.groupby(‘category’)[‘total_amount’].sum().sort_values(ascending=False) print(“\n=== カテゴリ別売上 ===”) for cat, sales in category_sales.items(): print(f” {cat}: ¥{sales:,.0f}”) # グラフ作成 plt.figure(figsize=(10, 6)) category_sales.plot(kind=’bar’, color=[‘#667eea’, ‘#764ba2’, ‘#f093fb’]) plt.title(‘Category Sales’, fontsize=16) plt.xlabel(‘Category’) plt.ylabel(‘Sales (JPY)’) plt.xticks(rotation=45) plt.tight_layout() plt.savefig(‘/home/jovyan/work/category_sales.png’, dpi=100) plt.show() print(“✅ グラフを category_sales.png に保存”) # ============================================ # セル6: 地域別分析 # ============================================ region_stats = df_sales.groupby(‘region’).agg({ ‘total_amount’: ‘sum’, ‘quantity’: ‘sum’, ‘id’: ‘count’ }).rename(columns={‘id’: ‘order_count’}) print(“\n=== 地域別統計 ===”) print(region_stats.sort_values(‘total_amount’, ascending=False)) # ============================================ # セル7: 日次集計結果確認 # ============================================ query_summary = “SELECT * FROM daily_summary ORDER BY summary_date” df_summary = pd.read_sql(query_summary, engine) print(“\n=== 日次集計結果 ===”) print(df_summary) # 日次売上推移グラフ plt.figure(figsize=(10, 6)) plt.plot(df_summary[‘summary_date’], df_summary[‘total_sales’], marker=’o’, linewidth=2, markersize=8) plt.title(‘Daily Sales Trend’, fontsize=16) plt.xlabel(‘Date’) plt.ylabel(‘Sales (JPY)’) plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig(‘/home/jovyan/work/daily_trend.png’, dpi=100) plt.show() print(“✅ グラフを daily_trend.png に保存”) print(“\n🎉 分析完了!”) EOF echo “✅ analysis_script.py 作成完了” echo “※ Jupyter Notebookで新規ノートブックを作成し、上記の内容をセルごとにコピーして実行してください”

🚀 7. システム起動手順

7-1. コンテナ起動

# すべてのコンテナを起動 docker-compose up -d # 起動確認(全サービスがUpになるまで待機) docker-compose ps # ログ確認(Ctrl+Cで終了) docker-compose logs -f
CONTAINER ID NAME STATUS PORTS xxx local-postgres Up (healthy) 0.0.0.0:5432->5432/tcp xxx local-pgadmin Up 0.0.0.0:5050->80/tcp xxx local-jupyter Up 0.0.0.0:8888->8888/tcp xxx local-airflow Up (healthy) 0.0.0.0:8080->8080/tcp

7-2. Airflow接続設定

# Airflowコンテナに入る docker exec -it local-airflow bash # PostgreSQL接続を追加 airflow connections add postgres_default \ –conn-type postgres \ –conn-host postgres \ –conn-login dataeng \ –conn-password dataeng_password_123 \ –conn-schema datawarehouse \ –conn-port 5432 # 接続確認 airflow connections list # コンテナから出る exit

7-3. 各サービスへのアクセス

サービスURLログイン情報
pgAdminhttp://localhost:5050admin@example.com / pgadmin123
Jupyterhttp://localhost:8888トークンなし(直接アクセス可)
Airflowhttp://localhost:8080admin / airflow123

7-4. pgAdminでデータベース登録

📋 pgAdmin設定手順
  1. ブラウザで http://localhost:5050 にアクセス
  2. ログイン(admin@example.com / pgadmin123)
  3. 左メニューで「Servers」を右クリック →「Register」→「Server」
  4. 「General」タブ: Name = Local PostgreSQL
  5. 「Connection」タブ:
    • Host name/address = postgres
    • Port = 5432
    • Username = dataeng
    • Password = dataeng_password_123
  6. 「Save」クリック

✅ 8. 動作確認

8-1. PostgreSQLデータ確認

# PostgreSQLに接続 docker exec -it local-postgres psql -U dataeng -d datawarehouse # テーブル一覧 \dt # 売上データ確認 SELECT * FROM sales ORDER BY sale_date LIMIT 10; # 日次集計確認 SELECT * FROM daily_summary; # カテゴリ別集計 SELECT category, SUM(total_amount) as total FROM sales GROUP BY category ORDER BY total DESC; # 終了 \q

8-2. AirflowでDAG実行

🔄 DAG実行手順
  1. ブラウザで http://localhost:8080 にアクセス
  2. ログイン(admin / airflow123)
  3. DAG一覧から「daily_sales_aggregation」を探す
  4. 左側のトグルスイッチをONにして有効化
  5. 右側の「▶」ボタンで「Trigger DAG」をクリック
  6. すべてのタスクが緑色になれば成功

8-3. Jupyter Notebookで分析

📓 分析手順
  1. ブラウザで http://localhost:8888 にアクセス
  2. 「work」フォルダに移動
  3. 「New」→「Python 3」で新規ノートブック作成
  4. analysis_script.pyの内容を参考にセルを実行
  5. グラフと統計情報が表示されることを確認

🔧 9. トラブルシューティング

問題1: Airflowが起動しない
# ログを確認 docker-compose logs airflow # データベース初期化を再実行 docker-compose exec airflow airflow db init # 再起動 docker-compose restart airflow
問題2: PostgreSQL接続エラー
# PostgreSQLが起動しているか確認 docker-compose ps postgres # ヘルスチェック状態確認 docker inspect local-postgres –format='{{.State.Health.Status}}’ # 手動で接続テスト docker exec -it local-postgres pg_isready -U dataeng
問題3: DAGが表示されない
# DAGファイルの構文チェック docker exec -it local-airflow python /opt/airflow/dags/daily_aggregation.py # DAGをリロード docker exec -it local-airflow airflow dags reserialize # Airflowを再起動 docker-compose restart airflow

9-1. クリーンアップ

# コンテナを停止 docker-compose down # ボリュームも含めて削除(データも消える) docker-compose down -v # すべてのリソースを削除 docker-compose down -v –rmi all

💪 10. 練習問題

練習問題 1 基礎

このプロジェクトで使用している4つのコンテナとその役割を答えてください。

  1. PostgreSQL:メインデータベース(売上データ、集計結果を保存)
  2. pgAdmin:データベース管理GUI(SQLの実行、テーブル確認)
  3. Jupyter Notebook:データ分析環境(Python、pandas、matplotlib)
  4. Airflow:ワークフロー管理(DAGによる定期集計の自動化)
練習問題 2 基礎

depends_onとhealthcheckを組み合わせる利点は何ですか?

PostgreSQLが完全に起動してから他のサービス(pgAdmin、Jupyter、Airflow)を開始できます。

単純なdepends_onだけでは、コンテナが起動しただけで(サービスが利用可能になる前に)次のコンテナが起動してしまいます。condition: service_healthyを使うことで、ヘルスチェックが成功するまで待機します。

練習問題 3 応用

新しい商品カテゴリ「ソフトウェア」の売上データを追加するSQLを書いてください。

INSERT INTO sales (sale_date, product_name, category, quantity, unit_price, total_amount, customer_id, region) VALUES (‘2024-01-05’, ‘Office 365’, ‘ソフトウェア’, 5, 12000, 60000, ‘C009’, ‘東京’), (‘2024-01-05’, ‘Adobe CC’, ‘ソフトウェア’, 2, 25000, 50000, ‘C010’, ‘大阪’);
練習問題 4 応用

DAGに「地域別売上集計」タスクを追加する場合、どのような実装になりますか?

def aggregate_by_region(**context): “””地域別売上集計””” execution_date = context[‘ds’] pg_hook = PostgresHook(postgres_conn_id=’postgres_default’) sql = f””” SELECT region, SUM(total_amount) as region_sales, COUNT(*) as order_count FROM sales WHERE sale_date = ‘{execution_date}’ GROUP BY region ORDER BY region_sales DESC; “”” results = pg_hook.get_records(sql) for row in results: logger.info(f”{row[0]}: ¥{row[1]:,.0f} ({row[2]}件)”) # タスク追加 region_aggregation = PythonOperator( task_id=’aggregate_by_region’, python_callable=aggregate_by_region, ) # 依存関係に追加 quality_check >> aggregation >> [report, region_aggregation]
練習問題 5 発展

このシステムにRedis(キャッシュ)を追加する場合、docker-compose.ymlにどのような設定を追加しますか?

services: # … 既存のサービス … redis: image: redis:7-alpine container_name: local-redis ports: – “6379:6379” volumes: – redis_data:/data networks: – data-network healthcheck: test: [“CMD”, “redis-cli”, “ping”] interval: 10s timeout: 5s retries: 5 restart: unless-stopped volumes: postgres_data: pgadmin_data: redis_data: # 追加

📝 STEP 25 のまとめ

✅ このプロジェクトで習得したこと
  • マルチコンテナ環境:4つのコンテナが連携するシステム構築
  • PostgreSQL初期化:init.sqlによるテーブル作成とデータ投入
  • Airflow DAG:データ品質チェック→集計→レポートのワークフロー
  • Jupyter分析:SQLAlchemyでのDB接続とmatplotlibでの可視化
  • ヘルスチェック:サービス間の依存関係管理
  • 環境変数管理:.envファイルでパスワード管理
🎯 次のステップの予告

STEP 26では、このローカル環境をさらに発展させて「ETLパイプラインの完全なコンテナ化」に挑戦します!

  • 既存のPythonスクリプトのDocker化
  • エラーハンドリングとリトライロジック
  • データ品質チェックの組み込み
  • 本番環境を意識した設計
📝

学習メモ

Docker・コンテナ技術入門 - Step 25

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE