🌬️ STEP 19: Airflowとは何か
ワークフローオーケストレーションツールの基礎を学ぼう
📋 このステップで学ぶこと
- ワークフローオーケストレーションの必要性
- Airflowのアーキテクチャ
- DAG(有向非巡回グラフ)の概念
- Airflowのインストールと環境構築
- Airflow UIの基本操作
⏱️ 学習時間の目安:2時間
📝 練習問題:10問(基礎4問・応用4問・発展2問)
STEP 1〜18で、ETLのすべての要素を学びました:
- Part 1(STEP 1-6):ETLの基礎、データ抽出(DB、API、ファイル)
- Part 2(STEP 7-13):データ変換(クリーニング、結合、集計、検証)
- Part 3(STEP 14-18):データロード、エラーハンドリング、最適化
これらのスクリプトを毎日自動で実行したい場合、どうしますか?
ここからは、Apache Airflowを使ってパイプラインを自動化・管理する方法を学びます!
🎯 1. ワークフローオーケストレーションとは?
1-1. ETLパイプラインの課題
これまで学んできたETL処理を、毎日自動で実行したいとします。
しかし、手動では様々な問題があります。
ETLパイプラインはオーケストラの演奏に似ています。
・バイオリン → データ抽出(Extract)
・ピアノ → データ変換(Transform)
・チェロ → データロード(Load)
指揮者がいないとどうなるでしょうか?
・各楽器がバラバラに演奏して混乱
・誰かが遅れても気づかない
・間違えても最初からやり直し
Airflow = 指揮者です。
各タスク(楽器)のタイミングを指示し、
問題があればすぐに対応します。
- 毎日決まった時間に手動で実行する必要がある
- 複数の処理を順番に実行する必要がある
- エラーが出たら通知してほしい
- 失敗したら自動でリトライしてほしい
- 処理の進捗を確認したい
- 依存関係がある(Aが成功したらB、Bが成功したらC)
1-2. ワークフローオーケストレーションとは
ワークフローオーケストレーションとは、複数の処理を自動で管理・実行することです。
- スケジュール実行:毎日朝9時に自動実行
- 依存関係の管理:タスクAの後にタスクBを実行
- リトライ:失敗したら自動で再試行
- モニタリング:Web UIで進捗を確認
- 通知:エラーが出たらSlack/メールで通知
- 並列実行:複数のタスクを同時に実行
- 履歴管理:過去の実行履歴を確認
1-3. cronとAirflowの比較
| 機能 | cron | Airflow |
|---|---|---|
| スケジュール | ⭕ 可能 | ⭕ 可能 |
| 依存関係 | ❌ 不可 | ⭕ タスク間の依存関係を定義 |
| リトライ | ❌ 不可 | ⭕ 自動リトライ |
| モニタリング | ❌ ログを手動確認 | ⭕ Web UIで確認 |
| 通知 | ❌ 別途実装が必要 | ⭕ 組み込み機能 |
| 並列実行 | ❌ 不可 | ⭕ 可能 |
🏗️ 2. Airflowのアーキテクチャ
2-1. Airflowの主要コンポーネント
| コンポーネント | 役割 | 例え |
|---|---|---|
| Webサーバー | Web UIを提供(ダッシュボード) | 📺 モニター画面 |
| スケジューラー | DAGを監視し、タスクを実行キューに追加 | ⏰ 目覚まし時計 |
| Executor | タスクを実際に実行する | 👷 作業員 |
| メタデータDB | DAG、タスク、実行履歴を保存 | 📁 記録ファイル |
| DAGファイル | ワークフローの定義(Pythonコード) | 📋 作業指示書 |
2-2. Airflowの動作フロー
┌─────────────────┐
│ 1. DAGファイル │ ← Pythonで作成
│ をdagsフォルダ │
│ に配置 │
└────────┬────────┘
↓
┌─────────────────┐
│ 2. スケジューラー│
│ がDAGを読み込み│
└────────┬────────┘
↓
┌─────────────────┐
│ 3. スケジュール │
│ 時刻になったら │
│ タスクをキューへ│
└────────┬────────┘
↓
┌─────────────────┐
│ 4. Executorが │
│ タスクを実行 │
└────────┬────────┘
↓
┌─────────────────┐
│ 5. 実行結果を │
│ メタデータDBへ │
└────────┬────────┘
↓
┌─────────────────┐
│ 6. Web UIで │
│ 実行状況を確認 │
└─────────────────┘
2-3. Executorの種類
| Executor | 特徴 | 使い所 |
|---|---|---|
| SequentialExecutor | 1つずつ順番に実行 | 開発・テスト用 |
| LocalExecutor | ローカルマシンで並列実行 | 小規模向け |
| CeleryExecutor | 分散実行(複数サーバー) | 本番環境向け |
| KubernetesExecutor | Kubernetes上で実行 | クラウド環境向け |
📊 3. DAG(有向非巡回グラフ)の概念
3-1. DAGとは?
DAG(Directed Acyclic Graph)とは、有向非巡回グラフのことです。
Airflowでは、ワークフロー全体をDAGと呼びます。
- Directed(有向):矢印で方向が決まっている(A → B)
- Acyclic(非巡回):ループしない(元に戻らない)
- Graph(グラフ):タスク(ノード)と依存関係(エッジ)
3-2. DAGの例
┌─────────────────┐
│ タスクA │
│ (データ抽出) │
└────────┬────────┘
↓
┌─────────────────┐
│ タスクB │
│ (データ変換) │
└────────┬────────┘
↓
┌─────────────────┐
│ タスクC │
│ (データロード) │
└─────────────────┘
┌─────────────────┐
│ タスクA │
│ (データ抽出) │
└────────┬────────┘
↓
┌────────┴────────┐
↓ ↓
┌───────────────┐ ┌───────────────┐
│ タスクB1 │ │ タスクB2 │
│(顧客データ) │ │(商品データ) │
└───────┬───────┘ └───────┬───────┘
↓ ↓
└────────┬────────┘
↓
┌─────────────────┐
│ タスクC │
│ (データ統合) │
└─────────────────┘
3-3. DAGファイルの基本構造
- DAGはループしてはいけない(非巡回)
- タスクの依存関係は矢印(>>)で定義する
- DAG IDは一意である必要がある
start_dateは必須
3-4. スケジュール間隔の指定方法
| プリセット | cron式 | 意味 |
|---|---|---|
@once |
– | 1回だけ実行 |
@hourly |
0 * * * * | 毎時0分に実行 |
@daily |
0 0 * * * | 毎日0時に実行 |
@weekly |
0 0 * * 0 | 毎週日曜0時に実行 |
@monthly |
0 0 1 * * | 毎月1日0時に実行 |
| – | 0 9 * * * | 毎日9時に実行 |
💻 4. Airflowのインストール
4-1. システム要件
- Python 3.8以上
- pip(Pythonパッケージマネージャー)
- 4GB以上のメモリ(推奨)
4-2. pipでインストール
4-3. 初期設定
4-4. Docker Composeでのインストール(推奨)
- 環境構築が簡単(コマンド3つで完了)
- 依存関係の問題がない
- 本番環境に近い構成
- 削除も簡単(docker-compose down)
- WebサーバーとスケジューラーはTab ular別々のターミナルで起動
- 初回起動時は数分かかる
- ポート8080が使用中の場合は別のポートを指定
🖥️ 5. Airflow UIの基本操作
5-1. Web UIにアクセス
ブラウザで http://localhost:8080 にアクセスします。
5-2. DAGsビュー(トップページ)
- DAG一覧:登録されている全DAG
- スケジュール:次回実行時刻
- 最終実行結果:成功/失敗のステータス
- ON/OFF切替:DAGの有効/無効を切り替え
5-3. DAG詳細ビューのタブ
| タブ名 | 機能 | 使い所 |
|---|---|---|
| Graph | タスクの依存関係を視覚的に表示 | 構造の確認 |
| Grid | 過去の実行履歴を時系列で表示 | 履歴の確認 |
| Calendar | カレンダー形式で実行履歴を表示 | 長期間の傾向 |
| Code | DAGファイルのソースコードを表示 | コードの確認 |
5-4. タスクの実行状態
| 状態 | 色 | 意味 |
|---|---|---|
| success | 🟢 緑 | タスクが成功 |
| failed | 🔴 赤 | タスクが失敗 |
| running | 🟡 黄緑 | タスク実行中 |
| queued | ⚫ グレー | 実行待ち |
| skipped | 🟣 ピンク | スキップされた |
| up_for_retry | 🟠 オレンジ | リトライ待ち |
5-5. よく使う操作
- DAGを手動実行:右上の「▶️ Trigger DAG」ボタン
- タスクを再実行:タスクをクリック → 「Clear」
- ログを確認:タスクをクリック → 「Log」
- DAGを無効化:トグルスイッチをOFF
- DAGを削除:DAGを選択 → 「Delete」(ファイル削除も必要)
📝 STEP 19 のまとめ
- ワークフローオーケストレーション:複数の処理を自動管理
- Airflowのアーキテクチャ:Webサーバー、スケジューラー、Executor
- DAG:有向非巡回グラフ、ワークフロー全体の定義
- インストール:pip または Docker Compose
- Web UI:DAGの確認、タスクの実行状態
- Airflowはワークフローオーケストレーションツール
- DAGはPythonコードで定義
- タスクの依存関係は矢印(>>)で表現
- Web UIで視覚的に確認できる
- Docker Composeでのインストールが簡単
次のSTEP 20では、「初めてのDAG作成」を実践します。
- DAGファイルの構造
- Operatorの種類
- タスクの定義と依存関係
📝 練習問題
ワークフローオーケストレーションの主な機能を3つ挙げてください。
以下のうち3つ:
- スケジュール実行:決まった時間に自動実行
- 依存関係の管理:タスク間の実行順序を制御
- リトライ:失敗時の自動再試行
- モニタリング:Web UIで進捗確認
- 通知:エラー時の通知
- 並列実行:複数タスクの同時実行
DAG(Directed Acyclic Graph)の3つの特徴を説明してください。
- Directed(有向):矢印で方向が決まっている
- Acyclic(非巡回):ループしない(元に戻らない)
- Graph(グラフ):タスク(ノード)と依存関係(エッジ)の集合
Airflowの主要コンポーネント(Webサーバー、スケジューラー、Executor、メタデータDB)の役割を説明してください。
- Webサーバー:Web UIを提供(ダッシュボード、タスク管理)
- スケジューラー:DAGを監視し、タスクを実行キューに追加
- Executor:タスクを実際に実行する
- メタデータDB:DAG、タスク、実行履歴などを保存
タスクの実行状態で、緑、赤、オレンジはそれぞれ何を意味しますか?
- 🟢 緑(success):タスクが成功
- 🔴 赤(failed):タスクが失敗
- 🟠 オレンジ(up_for_retry):リトライ待ち
以下のDAG定義で、task_a → task_b → task_cの順番で実行される依存関係を定義してください。
毎日朝9時に実行されるDAGを定義してください。
並列実行されるDAG(task_aの後にtask_b1とtask_b2が並列実行され、両方完了後にtask_cが実行される)を定義してください。
cronとAirflowの違いを3つ説明してください。
- 依存関係:cronは不可、Airflowはタスク間の依存関係を定義可能
- リトライ:cronは不可、Airflowは自動リトライ機能あり
- モニタリング:cronはログを手動確認、AirflowはWeb UIで視覚的に確認
- 並列実行:cronは不可、Airflowは複数タスクを同時実行可能
- 通知:cronは別途実装が必要、Airflowは組み込み機能
ETLパイプライン(Extract → Transform → Load)のDAGを、適切なスケジュール設定とcatchup=Falseで定義してください。
各Executorの特徴と適切な使用場面を説明してください。
- SequentialExecutor
- 特徴:1つずつ順番に実行、並列処理なし
- 使用場面:開発・テスト環境、リソースが限られた環境
- LocalExecutor
- 特徴:ローカルマシンで並列実行、PostgreSQL/MySQL必要
- 使用場面:小〜中規模の本番環境、単一サーバー構成
- CeleryExecutor
- 特徴:分散実行、複数サーバーで処理を分散
- 使用場面:大規模本番環境、高可用性が必要な場合
- KubernetesExecutor
- 特徴:各タスクをKubernetes Podとして実行
- 使用場面:クラウド環境、コンテナベースのインフラ