STEP 19:Airflowとは何か

🌬️ STEP 19: Airflowとは何か

ワークフローオーケストレーションツールの基礎を学ぼう

📋 このステップで学ぶこと

  • ワークフローオーケストレーションの必要性
  • Airflowのアーキテクチャ
  • DAG(有向非巡回グラフ)の概念
  • Airflowのインストールと環境構築
  • Airflow UIの基本操作

⏱️ 学習時間の目安:2時間

📝 練習問題:10問(基礎4問・応用4問・発展2問)

🔗 Part 4へようこそ!ここまでの振り返り

STEP 1〜18で、ETLのすべての要素を学びました:

  • Part 1(STEP 1-6):ETLの基礎、データ抽出(DB、API、ファイル)
  • Part 2(STEP 7-13):データ変換(クリーニング、結合、集計、検証)
  • Part 3(STEP 14-18):データロード、エラーハンドリング、最適化

これらのスクリプトを毎日自動で実行したい場合、どうしますか?
ここからは、Apache Airflowを使ってパイプラインを自動化・管理する方法を学びます!

🎯 1. ワークフローオーケストレーションとは?

1-1. ETLパイプラインの課題

これまで学んできたETL処理を、毎日自動で実行したいとします。
しかし、手動では様々な問題があります。

📚 例え話:オーケストラの指揮者

ETLパイプラインはオーケストラの演奏に似ています。

・バイオリン → データ抽出(Extract)
・ピアノ → データ変換(Transform)
・チェロ → データロード(Load)

指揮者がいないとどうなるでしょうか?
・各楽器がバラバラに演奏して混乱
・誰かが遅れても気づかない
・間違えても最初からやり直し

Airflow = 指揮者です。
各タスク(楽器)のタイミングを指示し、
問題があればすぐに対応します。

😱 手動実行の問題点
  • 毎日決まった時間に手動で実行する必要がある
  • 複数の処理を順番に実行する必要がある
  • エラーが出たら通知してほしい
  • 失敗したら自動でリトライしてほしい
  • 処理の進捗を確認したい
  • 依存関係がある(Aが成功したらB、Bが成功したらC)

1-2. ワークフローオーケストレーションとは

ワークフローオーケストレーションとは、複数の処理を自動で管理・実行することです。

✅ Airflowができること
  • スケジュール実行:毎日朝9時に自動実行
  • 依存関係の管理:タスクAの後にタスクBを実行
  • リトライ:失敗したら自動で再試行
  • モニタリング:Web UIで進捗を確認
  • 通知:エラーが出たらSlack/メールで通知
  • 並列実行:複数のタスクを同時に実行
  • 履歴管理:過去の実行履歴を確認

1-3. cronとAirflowの比較

cronとAirflowの違い
機能 cron Airflow
スケジュール ⭕ 可能 ⭕ 可能
依存関係 ❌ 不可 ⭕ タスク間の依存関係を定義
リトライ ❌ 不可 ⭕ 自動リトライ
モニタリング ❌ ログを手動確認 ⭕ Web UIで確認
通知 ❌ 別途実装が必要 ⭕ 組み込み機能
並列実行 ❌ 不可 ⭕ 可能

🏗️ 2. Airflowのアーキテクチャ

2-1. Airflowの主要コンポーネント

Airflowを構成する5つのコンポーネント
コンポーネント 役割 例え
Webサーバー Web UIを提供(ダッシュボード) 📺 モニター画面
スケジューラー DAGを監視し、タスクを実行キューに追加 ⏰ 目覚まし時計
Executor タスクを実際に実行する 👷 作業員
メタデータDB DAG、タスク、実行履歴を保存 📁 記録ファイル
DAGファイル ワークフローの定義(Pythonコード) 📋 作業指示書

2-2. Airflowの動作フロー

Airflowの動作フロー
┌─────────────────┐
│  1. DAGファイル  │ ← Pythonで作成
│   をdagsフォルダ │
│   に配置        │
└────────┬────────┘
         ↓
┌─────────────────┐
│ 2. スケジューラー│
│   がDAGを読み込み│
└────────┬────────┘
         ↓
┌─────────────────┐
│ 3. スケジュール  │
│   時刻になったら │
│   タスクをキューへ│
└────────┬────────┘
         ↓
┌─────────────────┐
│ 4. Executorが   │
│   タスクを実行   │
└────────┬────────┘
         ↓
┌─────────────────┐
│ 5. 実行結果を    │
│   メタデータDBへ │
└────────┬────────┘
         ↓
┌─────────────────┐
│ 6. Web UIで     │
│   実行状況を確認 │
└─────────────────┘
        

2-3. Executorの種類

Executorの種類と使い分け
Executor 特徴 使い所
SequentialExecutor 1つずつ順番に実行 開発・テスト用
LocalExecutor ローカルマシンで並列実行 小規模向け
CeleryExecutor 分散実行(複数サーバー) 本番環境向け
KubernetesExecutor Kubernetes上で実行 クラウド環境向け

📊 3. DAG(有向非巡回グラフ)の概念

3-1. DAGとは?

DAG(Directed Acyclic Graph)とは、有向非巡回グラフのことです。
Airflowでは、ワークフロー全体をDAGと呼びます。

📖 DAGの3つの特徴
  • Directed(有向):矢印で方向が決まっている(A → B)
  • Acyclic(非巡回):ループしない(元に戻らない)
  • Graph(グラフ):タスク(ノード)と依存関係(エッジ)

3-2. DAGの例

【簡単なETLパイプライン】
    ┌─────────────────┐
    │ タスクA          │
    │ (データ抽出)    │
    └────────┬────────┘
             ↓
    ┌─────────────────┐
    │ タスクB          │
    │ (データ変換)    │
    └────────┬────────┘
             ↓
    ┌─────────────────┐
    │ タスクC          │
    │ (データロード)  │
    └─────────────────┘
        
【並列処理あり】
         ┌─────────────────┐
         │ タスクA          │
         │ (データ抽出)    │
         └────────┬────────┘
                  ↓
        ┌────────┴────────┐
        ↓                 ↓
┌───────────────┐ ┌───────────────┐
│ タスクB1      │ │ タスクB2      │
│(顧客データ)  │ │(商品データ)  │
└───────┬───────┘ └───────┬───────┘
        ↓                 ↓
        └────────┬────────┘
                 ↓
         ┌─────────────────┐
         │ タスクC          │
         │ (データ統合)    │
         └─────────────────┘
        

3-3. DAGファイルの基本構造

# ===== DAGファイルの基本構造 ===== from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime # タスクで実行する関数 def extract(): print(“データを抽出しています…”) return “extracted_data” def transform(): print(“データを変換しています…”) return “transformed_data” def load(): print(“データをロードしています…”) return “success” # DAGの定義 dag = DAG( dag_id=’my_first_dag’, # DAGの一意な名前 start_date=datetime(2024, 1, 1), # 開始日 schedule_interval=’@daily’, # 実行スケジュール(毎日) catchup=False # 過去の実行をスキップ ) # タスクの定義 task_extract = PythonOperator( task_id=’extract’, python_callable=extract, dag=dag ) task_transform = PythonOperator( task_id=’transform’, python_callable=transform, dag=dag ) task_load = PythonOperator( task_id=’load’, python_callable=load, dag=dag ) # 依存関係の定義(矢印演算子) task_extract >> task_transform >> task_load
💡 DAGのルール
  • DAGはループしてはいけない(非巡回)
  • タスクの依存関係は矢印(>>)で定義する
  • DAG IDは一意である必要がある
  • start_date必須

3-4. スケジュール間隔の指定方法

schedule_intervalの指定方法
プリセット cron式 意味
@once 1回だけ実行
@hourly 0 * * * * 毎時0分に実行
@daily 0 0 * * * 毎日0時に実行
@weekly 0 0 * * 0 毎週日曜0時に実行
@monthly 0 0 1 * * 毎月1日0時に実行
0 9 * * * 毎日9時に実行

💻 4. Airflowのインストール

4-1. システム要件

  • Python 3.8以上
  • pip(Pythonパッケージマネージャー)
  • 4GB以上のメモリ(推奨)

4-2. pipでインストール

# Airflowをインストール pip install apache-airflow # または、特定のバージョン pip install apache-airflow==2.8.0

4-3. 初期設定

# 1. 環境変数を設定(Airflowのホームディレクトリ) export AIRFLOW_HOME=~/airflow # 2. データベースを初期化 airflow db init # 3. 管理ユーザーを作成 airflow users create \ –username admin \ –firstname Admin \ –lastname User \ –role Admin \ –email [email protected] \ –password admin # 4. Webサーバーを起動(ターミナル1) airflow webserver –port 8080 # 5. スケジューラーを起動(ターミナル2 – 別のターミナルで) airflow scheduler
【実行結果】 # ターミナル1(Webサーバー) [2024-01-15 09:00:00] INFO – Starting the web server on host 0.0.0.0 [2024-01-15 09:00:01] INFO – Running on http://0.0.0.0:8080 ✅ Webサーバーが起動しました # ターミナル2(スケジューラー) [2024-01-15 09:00:00] INFO – Starting the scheduler [2024-01-15 09:00:01] INFO – Processing dag files… ✅ スケジューラーが起動しました

4-4. Docker Composeでのインストール(推奨)

# 1. docker-compose.yamlをダウンロード curl -LfO ‘https://airflow.apache.org/docs/apache-airflow/2.8.0/docker-compose.yaml’ # 2. 環境変数ファイルを作成 echo -e “AIRFLOW_UID=$(id -u)” > .env # 3. データベースを初期化 docker-compose up airflow-init # 4. Airflowを起動 docker-compose up -d # ブラウザで http://localhost:8080 にアクセス # ユーザー名: airflow # パスワード: airflow
🎯 Docker Composeのメリット
  • 環境構築が簡単(コマンド3つで完了)
  • 依存関係の問題がない
  • 本番環境に近い構成
  • 削除も簡単(docker-compose down)
⚠️ インストール時の注意点
  • WebサーバーとスケジューラーはTab ular別々のターミナルで起動
  • 初回起動時は数分かかる
  • ポート8080が使用中の場合は別のポートを指定

🖥️ 5. Airflow UIの基本操作

5-1. Web UIにアクセス

ブラウザで http://localhost:8080 にアクセスします。

5-2. DAGsビュー(トップページ)

📊 DAGsビューで確認できること
  • DAG一覧:登録されている全DAG
  • スケジュール:次回実行時刻
  • 最終実行結果:成功/失敗のステータス
  • ON/OFF切替:DAGの有効/無効を切り替え

5-3. DAG詳細ビューのタブ

DAG詳細画面のタブ
タブ名 機能 使い所
Graph タスクの依存関係を視覚的に表示 構造の確認
Grid 過去の実行履歴を時系列で表示 履歴の確認
Calendar カレンダー形式で実行履歴を表示 長期間の傾向
Code DAGファイルのソースコードを表示 コードの確認

5-4. タスクの実行状態

タスクの状態と色
状態 意味
success 🟢 緑 タスクが成功
failed 🔴 赤 タスクが失敗
running 🟡 黄緑 タスク実行中
queued ⚫ グレー 実行待ち
skipped 🟣 ピンク スキップされた
up_for_retry 🟠 オレンジ リトライ待ち

5-5. よく使う操作

🎮 Airflow UIでよく使う操作
  • DAGを手動実行:右上の「▶️ Trigger DAG」ボタン
  • タスクを再実行:タスクをクリック → 「Clear」
  • ログを確認:タスクをクリック → 「Log」
  • DAGを無効化:トグルスイッチをOFF
  • DAGを削除:DAGを選択 → 「Delete」(ファイル削除も必要)

📝 STEP 19 のまとめ

✅ このステップで学んだこと
  • ワークフローオーケストレーション:複数の処理を自動管理
  • Airflowのアーキテクチャ:Webサーバー、スケジューラー、Executor
  • DAG:有向非巡回グラフ、ワークフロー全体の定義
  • インストール:pip または Docker Compose
  • Web UI:DAGの確認、タスクの実行状態
💡 重要ポイント
  • Airflowはワークフローオーケストレーションツール
  • DAGはPythonコードで定義
  • タスクの依存関係は矢印(>>)で表現
  • Web UIで視覚的に確認できる
  • Docker Composeでのインストールが簡単
🎯 次のステップの予告

次のSTEP 20では、「初めてのDAG作成」を実践します。

  • DAGファイルの構造
  • Operatorの種類
  • タスクの定義と依存関係

📝 練習問題

問題 1 基礎

ワークフローオーケストレーションの主な機能を3つ挙げてください。

【解答】

以下のうち3つ:

  • スケジュール実行:決まった時間に自動実行
  • 依存関係の管理:タスク間の実行順序を制御
  • リトライ:失敗時の自動再試行
  • モニタリング:Web UIで進捗確認
  • 通知:エラー時の通知
  • 並列実行:複数タスクの同時実行
問題 2 基礎

DAG(Directed Acyclic Graph)の3つの特徴を説明してください。

【解答】
  • Directed(有向):矢印で方向が決まっている
  • Acyclic(非巡回):ループしない(元に戻らない)
  • Graph(グラフ):タスク(ノード)と依存関係(エッジ)の集合
問題 3 基礎

Airflowの主要コンポーネント(Webサーバー、スケジューラー、Executor、メタデータDB)の役割を説明してください。

【解答】
  • Webサーバー:Web UIを提供(ダッシュボード、タスク管理)
  • スケジューラー:DAGを監視し、タスクを実行キューに追加
  • Executor:タスクを実際に実行する
  • メタデータDB:DAG、タスク、実行履歴などを保存
問題 4 基礎

タスクの実行状態で、緑、赤、オレンジはそれぞれ何を意味しますか?

【解答】
  • 🟢 緑(success):タスクが成功
  • 🔴 赤(failed):タスクが失敗
  • 🟠 オレンジ(up_for_retry):リトライ待ち
問題 5 応用

以下のDAG定義で、task_a → task_b → task_cの順番で実行される依存関係を定義してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime dag = DAG( dag_id=’sequential_dag’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’ ) def task_a_func(): print(“Task A”) def task_b_func(): print(“Task B”) def task_c_func(): print(“Task C”) task_a = PythonOperator(task_id=’task_a’, python_callable=task_a_func, dag=dag) task_b = PythonOperator(task_id=’task_b’, python_callable=task_b_func, dag=dag) task_c = PythonOperator(task_id=’task_c’, python_callable=task_c_func, dag=dag) # 依存関係の定義 task_a >> task_b >> task_c
問題 6 応用

毎日朝9時に実行されるDAGを定義してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime dag = DAG( dag_id=’daily_9am_dag’, start_date=datetime(2024, 1, 1), schedule_interval=’0 9 * * *’, # 毎日9時(cron形式) catchup=False ) def my_task(): print(“毎日9時に実行されます”) task = PythonOperator( task_id=’daily_task’, python_callable=my_task, dag=dag )
問題 7 応用

並列実行されるDAG(task_aの後にtask_b1とtask_b2が並列実行され、両方完了後にtask_cが実行される)を定義してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime dag = DAG( dag_id=’parallel_dag’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’ ) def task_a_func(): print(“Task A – 開始”) def task_b1_func(): print(“Task B1 – 並列処理1”) def task_b2_func(): print(“Task B2 – 並列処理2”) def task_c_func(): print(“Task C – 最終処理”) task_a = PythonOperator(task_id=’task_a’, python_callable=task_a_func, dag=dag) task_b1 = PythonOperator(task_id=’task_b1′, python_callable=task_b1_func, dag=dag) task_b2 = PythonOperator(task_id=’task_b2′, python_callable=task_b2_func, dag=dag) task_c = PythonOperator(task_id=’task_c’, python_callable=task_c_func, dag=dag) # 依存関係の定義 task_a >> [task_b1, task_b2] >> task_c
問題 8 応用

cronとAirflowの違いを3つ説明してください。

【解答】
  • 依存関係:cronは不可、Airflowはタスク間の依存関係を定義可能
  • リトライ:cronは不可、Airflowは自動リトライ機能あり
  • モニタリング:cronはログを手動確認、AirflowはWeb UIで視覚的に確認
  • 並列実行:cronは不可、Airflowは複数タスクを同時実行可能
  • 通知:cronは別途実装が必要、Airflowは組み込み機能
問題 9 発展

ETLパイプライン(Extract → Transform → Load)のDAGを、適切なスケジュール設定とcatchup=Falseで定義してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime # DAGの定義 dag = DAG( dag_id=’etl_pipeline’, description=’ETLパイプライン(Extract → Transform → Load)’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, # 毎日実行 catchup=False, # 過去の実行をスキップ tags=[‘etl’, ‘production’] ) def extract(): “””データ抽出””” print(“=== Extract ===”) print(“データベースからデータを抽出中…”) # 実際の抽出処理 return “extracted_data” def transform(): “””データ変換””” print(“=== Transform ===”) print(“データを変換中…”) # 実際の変換処理 return “transformed_data” def load(): “””データロード””” print(“=== Load ===”) print(“データをロード中…”) # 実際のロード処理 return “success” # タスクの定義 task_extract = PythonOperator( task_id=’extract’, python_callable=extract, dag=dag ) task_transform = PythonOperator( task_id=’transform’, python_callable=transform, dag=dag ) task_load = PythonOperator( task_id=’load’, python_callable=load, dag=dag ) # 依存関係の定義 task_extract >> task_transform >> task_load
問題 10 発展

各Executorの特徴と適切な使用場面を説明してください。

【解答】
  • SequentialExecutor
    • 特徴:1つずつ順番に実行、並列処理なし
    • 使用場面:開発・テスト環境、リソースが限られた環境
  • LocalExecutor
    • 特徴:ローカルマシンで並列実行、PostgreSQL/MySQL必要
    • 使用場面:小〜中規模の本番環境、単一サーバー構成
  • CeleryExecutor
    • 特徴:分散実行、複数サーバーで処理を分散
    • 使用場面:大規模本番環境、高可用性が必要な場合
  • KubernetesExecutor
    • 特徴:各タスクをKubernetes Podとして実行
    • 使用場面:クラウド環境、コンテナベースのインフラ