STEP 19:PostgreSQL + Airflowのコンテナ化

🚀 STEP 19: PostgreSQL + Airflowのコンテナ化

データパイプラインの基盤を構築する

📋 このステップで学ぶこと

  • Apache Airflowの概要とコンポーネント
  • Airflowのマルチコンテナ構成
  • PostgreSQL(メタデータベース)の設定
  • Airflow Webserver、Scheduler、Workerの役割
  • DAGのボリュームマウント
  • コンテナ間接続とヘルスチェック
  • 実践演習:AirflowでETLパイプライン構築

🔧 0. このステップの前提知識

📚 STEP 15-18の復習

このステップでは、以下の知識を使います。忘れた場合は復習してから進めましょう。

  • Docker Compose:複数コンテナの管理(STEP 15-16)
  • サービス間通信:サービス名でのアクセス(STEP 17)
  • ボリュームマウント:データの永続化(STEP 10, 16)
  • healthcheck:サービスの準備完了確認(STEP 17)
  • depends_on:起動順序の制御(STEP 16)

0-1. 作業ディレクトリの準備

# 作業ディレクトリを作成して移動 mkdir -p ~/docker-practice/step19 cd ~/docker-practice/step19 # 現在の場所を確認 pwd

0-2. Airflowのアーキテクチャ

【Airflow マルチコンテナ構成】 ┌─────────────────────────────────────────────────────────────┐ │ ユーザー(ブラウザ) │ └───────────────────────────┬─────────────────────────────────┘ │ http://localhost:8080 ↓ ┌─────────────────────────────────────────────────────────────┐ │ Airflow Webserver │ │ (Web UI提供) │ │ • DAG一覧表示・管理 │ │ • タスク実行状況の監視 │ │ • ログの確認 │ └───────────────────────────┬─────────────────────────────────┘ │ ↓ ┌─────────────────────────────────────────────────────────────┐ │ PostgreSQL │ │ (メタデータベース) │ │ • DAG情報の保存 │ │ • タスク実行履歴 │ │ • 接続情報・変数 │ └───────────────────────────┬─────────────────────────────────┘ ↑ ┌───────────────────────────┴─────────────────────────────────┐ │ Airflow Scheduler │ │ (スケジュール管理) │ │ • DAGファイルの解析 │ │ • タスクの起動タイミング決定 │ │ • 依存関係の管理 │ └───────────────────────────┬─────────────────────────────────┘ │ タスク実行指示 ↓ ┌─────────────────────────────────────────────────────────────┐ │ Airflow Worker │ │ (タスク実行) │ │ • 実際のPythonコード実行 │ │ • データベース操作 │ │ • ファイル処理 │ └─────────────────────────────────────────────────────────────┘ │ ↓ ┌─────────────────────────────────────────────────────────────┐ │ 共有ボリューム │ │ ./dags → /opt/airflow/dags │ │ ./logs → /opt/airflow/logs │ │ ./plugins → /opt/airflow/plugins │ └─────────────────────────────────────────────────────────────┘

0-3. Airflowの主要コンポーネント

コンポーネント 役割 Docker設定
Webserver Web UIを提供、DAG管理・監視 command: webserver
Scheduler タスクのスケジュール管理、DAG解析 command: scheduler
Worker タスクの実際の実行(Celery使用時) command: celery worker
PostgreSQL メタデータ保存(DAG情報、履歴) image: postgres:13
Redis タスクキュー(Celery使用時) image: redis:alpine

0-4. Executorの種類

Airflow Executorの比較
Executor 特徴 用途
LocalExecutor 単一マシンで並列実行、シンプル 開発・小規模本番
CeleryExecutor Workerを分散配置、Redis必要 大規模本番環境
SequentialExecutor 1タスクずつ実行、SQLite可 テスト・学習用
💡 このステップでの構成

このステップではLocalExecutorを使用します。
シンプルな構成でAirflowの基本を学び、必要に応じてCeleryExecutorに拡張できます。

🎯 1. Apache Airflowの概要

1-1. Airflowとは?

Apache Airflowは、ワークフローを管理・スケジュール実行するためのプラットフォームです。

📊 Airflowの主な機能
  • DAG(Directed Acyclic Graph):タスクの依存関係を定義
  • スケジュール実行:定期的にパイプラインを自動実行
  • リトライ機能:失敗したタスクを自動再実行
  • Web UI:ブラウザでパイプラインを監視
  • 拡張性:カスタムオペレーターで機能拡張

1-2. なぜDockerでAirflow?

✅ Docker化のメリット
  • 環境統一:開発・本番で同じ環境
  • 簡単セットアップ:複雑な依存関係を解決
  • スケーラビリティ:Workerを簡単に増やせる
  • クリーンな環境:ホストマシンを汚さない
⚠️ 注意点
  • メモリ消費:最低4GB推奨
  • 初回起動:DB初期化に時間がかかる
  • ログ権限:適切な権限設定が必要

1-3. DAGの基本構造

【DAG(有向非巡回グラフ)の例】 ┌─────────────┐ │ extract │ ← データ抽出 │ (タスク1) │ └──────┬──────┘ │ ↓ ┌─────────────┐ │ transform │ ← データ変換 │ (タスク2) │ └──────┬──────┘ │ ↓ ┌─────────────┐ │ load │ ← データ読み込み │ (タスク3) │ └─────────────┘ タスク間の依存関係をコードで定義: extract >> transform >> load

🏗️ 2. docker-compose.ymlの構成

2-1. 必要なサービス(LocalExecutor版)

【LocalExecutor構成】 ┌─────────────────────────────────────────────────────────────┐ │ サービス一覧 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. postgres ← メタデータベース │ │ 2. airflow-init ← 初期化(1回のみ実行) │ │ 3. airflow-webserver ← Web UI(ポート8080) │ │ 4. airflow-scheduler ← スケジュール管理 │ │ │ │ ※ LocalExecutorではWorkerとRedisは不要 │ │ │ └─────────────────────────────────────────────────────────────┘

2-2. 主要な環境変数

Airflow環境変数
環境変数 説明
AIRFLOW__CORE__EXECUTOR Executorの種類(LocalExecutor等)
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN メタデータベースの接続文字列
AIRFLOW__CORE__FERNET_KEY 暗号化キー(接続情報の暗号化)
AIRFLOW__CORE__LOAD_EXAMPLES サンプルDAGの読み込み(false推奨)
AIRFLOW__WEBSERVER__SECRET_KEY Webサーバーのシークレットキー

2-3. ボリュームマウントの構成

【ボリュームマウント】 ホスト側 コンテナ内 ───────────────────────────────────────────────────────────── ./dags/ → /opt/airflow/dags/ └── my_dag.py └── my_dag.py (DAGファイル) ./logs/ → /opt/airflow/logs/ └── dag_id/ └── dag_id/ └── task_id/ └── task_id/ └── attempt.log └── attempt.log (実行ログ) ./plugins/ → /opt/airflow/plugins/ └── my_operator.py └── my_operator.py (カスタムプラグイン)

🛠️ 3. 実践演習:Airflow環境構築

3-1. プロジェクトディレクトリを作成

# プロジェクトディレクトリを作成 mkdir -p ~/docker-practice/airflow-docker/{dags,logs,plugins} cd ~/docker-practice/airflow-docker # 権限設定(重要!) chmod -R 777 logs # 現在の場所を確認 pwd
📁 完成時のプロジェクト構成
airflow-docker/ ├── docker-compose.yml ├── .env ├── dags/ │ └── sample_dag.py ├── logs/ └── plugins/

3-2. .envファイルを作成

# .envファイルを作成 cat > .env << ‘EOF’ # Airflow設定 AIRFLOW_UID=50000 AIRFLOW_GID=0 # PostgreSQL設定 POSTGRES_USER=airflow POSTGRES_PASSWORD=airflow POSTGRES_DB=airflow # Airflow管理者設定 _AIRFLOW_WWW_USER_USERNAME=admin _AIRFLOW_WWW_USER_PASSWORD=admin EOF # 内容を確認 cat .env

3-3. docker-compose.ymlを作成

# docker-compose.ymlを作成 cat > docker-compose.yml << ‘EOF’ version: ‘3.8’ # 共通の環境変数 x-airflow-common: &airflow-common image: apache/airflow:2.7.0 environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__FERNET_KEY: ” AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: ‘true’ AIRFLOW__CORE__LOAD_EXAMPLES: ‘false’ AIRFLOW__WEBSERVER__SECRET_KEY: ‘your-secret-key-here’ volumes: – ./dags:/opt/airflow/dags – ./logs:/opt/airflow/logs – ./plugins:/opt/airflow/plugins user: “${AIRFLOW_UID:-50000}:0” depends_on: &airflow-common-depends-on postgres: condition: service_healthy services: # PostgreSQL(メタデータベース) postgres: image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: – postgres_data:/var/lib/postgresql/data healthcheck: test: [“CMD”, “pg_isready”, “-U”, “airflow”] interval: 10s timeout: 5s retries: 5 start_period: 10s restart: unless-stopped # Airflow初期化(1回のみ実行) airflow-init: <<: *airflow-common entrypoint: /bin/bash command: – -c – | airflow db init airflow users create \ –username admin \ –firstname Admin \ –lastname User \ –role Admin \ –email admin@example.com \ –password admin echo “Airflow initialization completed!” environment: <<: *airflow-common-env restart: “no” # Airflow Webserver airflow-webserver: <<: *airflow-common command: webserver ports: – “8080:8080” healthcheck: test: [“CMD”, “curl”, “–fail”, “http://localhost:8080/health”] interval: 30s timeout: 10s retries: 5 start_period: 60s restart: unless-stopped depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully # Airflow Scheduler airflow-scheduler: <<: *airflow-common command: scheduler healthcheck: test: [“CMD”, “curl”, “–fail”, “http://localhost:8974/health”] interval: 30s timeout: 10s retries: 5 start_period: 60s restart: unless-stopped depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully volumes: postgres_data: EOF # 内容を確認 cat docker-compose.yml
🎯 docker-compose.ymlのポイント
  • x-airflow-common: YAMLアンカーで共通設定を定義
  • <<: *airflow-common 共通設定を継承
  • condition: service_healthy PostgreSQL準備完了を待つ
  • condition: service_completed_successfully 初期化完了を待つ

3-4. サンプルDAGを作成

# サンプルDAGを作成 cat > dags/sample_dag.py << ‘EOF’ from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator from datetime import datetime, timedelta # デフォルト引数 default_args = { ‘owner’: ‘airflow’, ‘depends_on_past’: False, ‘start_date’: datetime(2024, 1, 1), ‘email_on_failure’: False, ‘email_on_retry’: False, ‘retries’: 1, ‘retry_delay’: timedelta(minutes=5), } # タスク関数 def print_hello(): print(“Hello from Airflow!”) return “Hello World” def print_date(): current_date = datetime.now() print(f”Current date: {current_date}”) return str(current_date) def print_goodbye(): print(“Goodbye from Airflow!”) return “Goodbye” # DAG定義 with DAG( ‘sample_dag’, default_args=default_args, description=’A simple sample DAG’, schedule_interval=timedelta(days=1), catchup=False, tags=[‘example’, ‘sample’], ) as dag: # タスク1: Hello task_hello = PythonOperator( task_id=’print_hello’, python_callable=print_hello, ) # タスク2: 日付表示 task_date = PythonOperator( task_id=’print_date’, python_callable=print_date, ) # タスク3: Bashコマンド実行 task_bash = BashOperator( task_id=’run_bash’, bash_command=’echo “Running bash command!” && date’, ) # タスク4: Goodbye task_goodbye = PythonOperator( task_id=’print_goodbye’, python_callable=print_goodbye, ) # 依存関係: hello -> date -> bash -> goodbye task_hello >> task_date >> task_bash >> task_goodbye EOF # 内容を確認 cat dags/sample_dag.py

3-5. ファイル構成を確認

# ファイル構成を確認 find . -type f -name “*.py” -o -name “*.yml” -o -name “.env” | head -20
./.env ./docker-compose.yml ./dags/sample_dag.py

3-6. 起動

# 初期化と起動(初回は時間がかかります) docker-compose up -d # 起動状況を確認 docker-compose ps
NAME COMMAND STATUS PORTS airflow-docker-postgres-1 “docker-entrypoint.s…” Up (healthy) 5432/tcp airflow-docker-airflow-init-1 “/bin/bash -c ‘airfl…” Exited (0) airflow-docker-airflow-webserver-1 “/usr/bin/dumb-init …” Up (healthy) 0.0.0.0:8080->8080/tcp airflow-docker-airflow-scheduler-1 “/usr/bin/dumb-init …” Up (healthy) 8974/tcp
# ログを確認(Ctrl+Cで終了) docker-compose logs -f airflow-webserver

3-7. Web UIにアクセス

🌐 Airflow Web UIへのアクセス
  1. ブラウザでhttp://localhost:8080を開く
  2. ログイン:
    Username: admin
    Password: admin
  3. DAG一覧でsample_dagを確認

3-8. DAGを実行

📊 DAGを実行する手順
  1. DAG一覧でsample_dagの行を探す
  2. 左側のトグルスイッチをON(青色)にする
  3. 右側の「▶」(Trigger DAG)ボタンをクリック
  4. 「Trigger」をクリックして実行開始
  5. DAG名をクリックして詳細画面へ
  6. 「Graph」タブでタスクの実行状況を確認
  7. 緑=成功、赤=失敗、黄=実行中

3-9. クリーンアップ

# 停止 docker-compose down # ボリュームも削除(データが消える) docker-compose down -v

📁 4. PostgreSQLにデータを挿入するDAG

4-1. PostgreSQL操作用DAGの作成

AirflowにはPostgresOperatorPostgresHookが用意されています。

# PostgreSQL操作用DAGを作成 cat > dags/postgres_etl_dag.py << ‘EOF’ from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime, timedelta import random default_args = { ‘owner’: ‘airflow’, ‘start_date’: datetime(2024, 1, 1), ‘retries’: 1, ‘retry_delay’: timedelta(minutes=5), } def extract_data(**context): “””データ抽出(サンプルデータ生成)””” data = [] products = [‘ノートPC’, ‘マウス’, ‘キーボード’, ‘モニター’, ‘ヘッドセット’] for i in range(10): data.append({ ‘product’: random.choice(products), ‘amount’: random.randint(1000, 100000), ‘quantity’: random.randint(1, 10), }) # XComでデータを渡す context[‘ti’].xcom_push(key=’extracted_data’, value=data) print(f”Extracted {len(data)} records”) return data def load_data(**context): “””データをPostgreSQLに挿入””” # XComからデータを取得 data = context[‘ti’].xcom_pull(key=’extracted_data’, task_ids=’extract’) hook = PostgresHook(postgres_conn_id=’postgres_default’) for record in data: hook.run(f””” INSERT INTO sales (product, amount, quantity, sale_date) VALUES ( ‘{record[‘product’]}’, {record[‘amount’]}, {record[‘quantity’]}, CURRENT_DATE ); “””) print(f”Loaded {len(data)} records to PostgreSQL”) def check_data(): “””データ確認””” hook = PostgresHook(postgres_conn_id=’postgres_default’) records = hook.get_records(“SELECT COUNT(*) FROM sales;”) print(f”Total records in sales table: {records[0][0]}”) with DAG( ‘postgres_etl_dag’, default_args=default_args, description=’ETL pipeline with PostgreSQL’, schedule_interval=’@daily’, catchup=False, tags=[‘etl’, ‘postgres’], ) as dag: # テーブル作成 create_table = PostgresOperator( task_id=’create_table’, postgres_conn_id=’postgres_default’, sql=””” CREATE TABLE IF NOT EXISTS sales ( id SERIAL PRIMARY KEY, product VARCHAR(100), amount INTEGER, quantity INTEGER, sale_date DATE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); “””, ) # データ抽出 extract = PythonOperator( task_id=’extract’, python_callable=extract_data, ) # データ読み込み load = PythonOperator( task_id=’load’, python_callable=load_data, ) # データ確認 check = PythonOperator( task_id=’check’, python_callable=check_data, ) create_table >> extract >> load >> check EOF # 内容を確認 head -30 dags/postgres_etl_dag.py

4-2. Airflow接続設定

⚠️ PostgreSQL接続の設定が必要

postgres_etl_dagを実行するには、AirflowにPostgreSQL接続を設定する必要があります。

🔧 Web UIでの接続設定
  1. Airflow Web UI(http://localhost:8080)にログイン
  2. 上部メニュー「Admin」→「Connections」をクリック
  3. 「+」ボタンで新規接続を追加
  4. 以下を入力:
    • Connection Id: postgres_default
    • Connection Type: Postgres
    • Host: postgres(サービス名)
    • Schema: airflow
    • Login: airflow
    • Password: airflow
    • Port: 5432
  5. 「Test」ボタンで接続テスト
  6. 「Save」で保存

🔍 5. トラブルシューティング

5-1. よくある問題と解決策

❌ 問題1:DAGが表示されない

原因:

  • DAGファイルにSyntaxエラーがある
  • dagsディレクトリのパスが間違っている
  • Schedulerが起動していない

解決策:

# DAGファイルの構文チェック docker-compose exec airflow-webserver python /opt/airflow/dags/sample_dag.py # Schedulerのログを確認 docker-compose logs airflow-scheduler | grep -i error
❌ 問題2:Webserverが起動しない

原因:

  • データベース初期化が完了していない
  • PostgreSQLに接続できない

解決策:

# 初期化を再実行 docker-compose down -v docker-compose up airflow-init docker-compose up -d
❌ 問題3:Permission denied(ログファイル)

原因:

logsディレクトリの権限が不適切

解決策:

# 権限を変更 chmod -R 777 logs # または.envでUIDを設定 echo “AIRFLOW_UID=$(id -u)” >> .env

5-2. デバッグコマンド

# コンテナの状態確認 docker-compose ps # 各サービスのログ確認 docker-compose logs postgres docker-compose logs airflow-webserver docker-compose logs airflow-scheduler # コンテナ内でコマンド実行 docker-compose exec airflow-webserver airflow dags list docker-compose exec airflow-webserver airflow tasks list sample_dag # データベース接続テスト docker-compose exec airflow-webserver airflow db check

💪 6. 練習問題

練習問題 1 基礎

Airflowの3つの主要コンポーネントを挙げてください。

【解答】
  1. Webserver:Web UIを提供、DAG管理・監視
  2. Scheduler:タスクのスケジュール管理、DAG解析
  3. Worker:タスクの実際の実行(CeleryExecutor使用時)

※LocalExecutorの場合、SchedulerがWorkerの役割も兼ねます。

練習問題 2 基礎

LocalExecutorとCeleryExecutorの違いを説明してください。

【解答】
項目 LocalExecutor CeleryExecutor
実行方式 単一マシンで並列実行 複数Workerに分散実行
必要なもの PostgreSQL PostgreSQL + Redis + Worker
用途 開発・小規模本番 大規模本番環境
練習問題 3 応用

DAGの依存関係を定義する方法を2つ書いてください。

【解答】
# 方法1: ビットシフト演算子 task1 >> task2 >> task3 # 方法2: set_downstream / set_upstream task1.set_downstream(task2) task2.set_downstream(task3) # または task3.set_upstream(task2) task2.set_upstream(task1) # 並列実行の場合 task1 >> [task2, task3] >> task4
練習問題 4 応用

XComを使ってタスク間でデータを受け渡す方法を書いてください。

【解答】
def push_data(**context): “””データを送信””” data = {‘key’: ‘value’, ‘count’: 100} context[‘ti’].xcom_push(key=’my_data’, value=data) return data def pull_data(**context): “””データを受信””” data = context[‘ti’].xcom_pull( key=’my_data’, task_ids=’push_task’ ) print(f”Received: {data}”) with DAG(‘xcom_example’, …) as dag: push_task = PythonOperator( task_id=’push_task’, python_callable=push_data, ) pull_task = PythonOperator( task_id=’pull_task’, python_callable=pull_data, ) push_task >> pull_task
練習問題 5 発展

毎時実行で、現在時刻をファイルに追記するDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def write_timestamp(): timestamp = datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’) with open(‘/opt/airflow/logs/timestamps.txt’, ‘a’) as f: f.write(f”{timestamp}\n”) print(f”Written: {timestamp}”) with DAG( ‘hourly_timestamp’, start_date=datetime(2024, 1, 1), schedule_interval=’@hourly’, # 毎時実行 catchup=False, ) as dag: write_task = PythonOperator( task_id=’write_timestamp’, python_callable=write_timestamp, )
練習問題 6 応用

DAGが表示されない場合の確認手順を3つ挙げてください。

【解答】
  1. DAGファイルの構文チェック
    docker-compose exec airflow-webserver python /opt/airflow/dags/my_dag.py
  2. Schedulerのログを確認
    docker-compose logs airflow-scheduler | grep -i error
  3. DAGリストを確認
    docker-compose exec airflow-webserver airflow dags list
練習問題 7 発展

BranchPythonOperatorを使った条件分岐DAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.operators.empty import EmptyOperator from datetime import datetime import random def choose_branch(): “””ランダムに分岐先を選択””” if random.random() > 0.5: return ‘branch_a’ else: return ‘branch_b’ def task_a(): print(“Executing Branch A”) def task_b(): print(“Executing Branch B”) with DAG( ‘branching_dag’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False, ) as dag: start = EmptyOperator(task_id=’start’) branch = BranchPythonOperator( task_id=’branch’, python_callable=choose_branch, ) branch_a = PythonOperator( task_id=’branch_a’, python_callable=task_a, ) branch_b = PythonOperator( task_id=’branch_b’, python_callable=task_b, ) end = EmptyOperator( task_id=’end’, trigger_rule=’none_failed_min_one_success’ ) start >> branch >> [branch_a, branch_b] >> end
練習問題 8 基礎

Airflowで使用される主なボリュームマウント先を3つ挙げてください。

【解答】
  1. /opt/airflow/dags – DAGファイルの配置場所
  2. /opt/airflow/logs – タスク実行ログの保存場所
  3. /opt/airflow/plugins – カスタムプラグインの配置場所

📝 STEP 19 のまとめ

✅ このステップで学んだこと
  • Airflowのコンポーネント:Webserver、Scheduler、Worker
  • Docker構成:PostgreSQL + Airflowのマルチコンテナ
  • DAGのマウント:ホストのディレクトリをコンテナで使用
  • ヘルスチェック:依存関係を適切に管理
  • PostgreSQL操作:PostgresOperatorとPostgresHook
📊 Airflow Docker構成 早見表
サービス 役割 ポート
postgres メタデータベース 5432(内部)
airflow-webserver Web UI 8080:8080
airflow-scheduler スケジュール管理 8974(内部)
airflow-init 初期化(1回のみ)
💡 重要ポイント
  • LocalExecutorは開発・小規模本番に最適
  • logsディレクトリの権限に注意(777推奨)
  • airflow-initは初回のみ実行される
  • DAGファイルはホストで編集、即座に反映
  • PostgreSQL接続はWeb UIで設定
🎯 次のステップの予告

次のSTEP 20では、「Sparkクラスターのコンテナ化」を学びます。

  • Apache Sparkの概要
  • Spark Master + Worker構成
  • Jupyter NotebookからSpark接続
  • PySparkでのデータ処理

大規模データ処理の基盤を作りましょう!

📝

学習メモ

Docker・コンテナ技術入門 - Step 19

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE