📋 このステップで学ぶこと
Cloud Composerとは何か(Airflowのマネージドサービス)
DAG(有向非巡回グラフ)の概念
Cloud ComposerとAirflowの違い
DAGの作成とデプロイ
GCPサービスとの連携
実践演習:BigQueryへの日次データロード
🎯 このステップのゴール
このステップを終えると、Cloud Composerを使ってデータパイプラインをスケジュール実行できるようになります 。「毎朝9時にETL処理を自動実行」のような複雑なワークフローを構築しましょう!
🎯 1. Cloud Composerとは?
Cloud Composerの基本
Cloud Composer(クラウド・コンポーザー) は、Apache Airflowのフルマネージド版 です。
💡 例え話:オーケストラの指揮者
【Cloud Composer = オーケストラの指揮者】
オーケストラ演奏(データパイプライン)を成功させるには:
🎻 各楽器(タスク)を正しい順番で演奏させる
・バイオリン → フルート → ドラム の順番
・データ取得 → 変換 → BigQuery書込 の順番
🎼 タイミングを制御する
・「ここで3秒待って」「ここから全員で」
・「毎朝9時に開始」「前の処理が終わったら次へ」
❌ 失敗したら対処する
・「バイオリンが音を外したら、もう一度」
・「タスクが失敗したら、3回まで再試行」
👀 全体を監視する
・「今どこを演奏中?」「どこで問題があった?」
・「このタスクは成功?失敗?」
Cloud Composer = この指揮者を雇うサービス
自分で指揮者を育てる(Airflow自前運用)より楽!
Apache Airflowとの関係
📝 Apache Beam と Dataflow の関係と同じ!
【STEP 20で学んだ関係と同じパターン】
Apache Beam(レシピ)→ Dataflow(調理場)
Apache Airflow(楽譜)→ Cloud Composer(コンサートホール)
┌─────────────────────────────────────────────────────┐
│ Apache Airflow(オープンソース) │
│ ├── Pythonでワークフローを定義 │
│ ├── DAG(有向非巡回グラフ)で処理を表現 │
│ └── 実行環境: │
│ ├── Cloud Composer ← 今回学ぶ(GCP) │
│ ├── MWAA(AWS版のマネージドAirflow) │
│ ├── Astronomer(専門サービス) │
│ └── 自分でサーバー構築(大変!) │
└─────────────────────────────────────────────────────┘
【Cloud Composerを使うメリット】
・Airflowの面倒な設定・運用をGoogleが全部やってくれる
・GCPサービス(BigQuery、Dataflow等)との連携が簡単
・Webブラウザで状態監視、ログ確認できる
Cloud Composerの特徴
💼 フルマネージド
Airflowのインストール・設定・運用をGoogleが全部やってくれる 。サーバー管理不要。
🔗 GCP統合
BigQuery、Dataflow、GCSなどGCPサービスと簡単連携 。認証も自動。
📊 Web UI
ブラウザでDAGの状態を確認 。実行履歴、ログも見られる。
🔄 再試行機能
失敗したタスクを自動で再試行 。回数・間隔も設定可能。
AWSのStep Functionsとの比較
項目
AWS Step Functions
Cloud Composer
ベース
AWS独自
Apache Airflow (オープンソース)
定義方法
JSON(ASL)
Python
柔軟性
限定的
非常に高い (Pythonで何でも書ける)
Web UI
シンプル
高機能 (グラフビュー、ログ等)
料金
ステップ数で課金(安い)
環境の稼働時間 (高め)
学習コスト
低い
やや高い(Airflowの知識必要)
おすすめ
シンプルなワークフロー
複雑なデータパイプライン
📊 2. DAG(有向非巡回グラフ)とは?
DAGの基本概念
DAG(Directed Acyclic Graph) とは、処理の流れを表すグラフ です。
💡 例え話:料理のレシピ
【DAG = 料理のレシピ(手順書)】
カレーを作るレシピ(DAG):
[タスク1] 野菜を切る ─────────────┐
↓
[タスク2] 肉を切る ──────────────→ [タスク4] 鍋で炒める
↓
[タスク3] お湯を沸かす ──────────→ [タスク5] 水を入れて煮込む
↓
[タスク6] ルーを入れる
↓
[タスク7] 完成!
【ポイント】
・タスク1,2,3は同時に実行できる(並列処理)
・タスク4は、1と2が両方終わってから実行
・タスク5は、3と4が両方終わってから実行
・必ず「完成」に向かう(ループしない = 非巡回)
これがDAGの考え方!
📝 図で理解するDAG
【データパイプラインのDAG例:日次レポート作成】
開始
↓
┌───────┴───────┐
↓ ↓
[タスク1] [タスク2]
GCSから APIから
データ取得 データ取得
│ │
└───────┬───────┘
↓
[タスク3]
データ結合
↓
[タスク4]
BigQueryに書込
↓
┌───────┴───────┐
↓ ↓
[タスク5] [タスク6]
集計クエリ バックアップ
実行 作成
│ │
└───────┬───────┘
↓
[タスク7]
メール送信
↓
終了
💡 タスク1と2は並列実行可能(依存関係がない)
💡 タスク3は、1と2の両方が成功してから実行
DAGの3つの要素
1️⃣ DAG(ワークフロー全体)
複数のタスクをまとめたもの 。スケジュールや設定を持つ。「日次売上レポート作成」などの名前を付ける。
2️⃣ Task(タスク)
1つの処理単位 。例:「GCSからデータ取得」「BigQueryにクエリ実行」など。
3️⃣ Dependency(依存関係)
タスクの実行順序 。「タスクAが終わったらタスクBを実行」のように定義。
なぜDAGと呼ぶのか?
📖 DAGの名前の意味
【DAG = Directed Acyclic Graph】
D = Directed(有向)
・矢印に「向き」がある
・A → B は「AからBへ」の一方通行
・処理の順序を表す
A = Acyclic(非巡回)
・ループしない
・A → B → C → A みたいな循環はNG
・必ず「終わり」がある
G = Graph(グラフ)
・ノード(点)とエッジ(線)で表現
・ノード = タスク
・エッジ = 依存関係
【なぜ「非巡回」が重要?】
❌ もしループがあったら…
A → B → C → A → B → C → A → …
永遠に終わらない!無限ループ!
✅ 非巡回なら…
A → B → C → 完了
必ず終わる!安心!
🛠️ 3. Cloud Composerの環境構築
⚠️ 重要:料金について
Cloud Composerは常時稼働 するため、月額数万円 かかります。
Small環境でも月額 $300〜500 (約45,000〜75,000円)
学習後は必ず環境を削除 してください
本番環境以外はローカルAirflowで開発 することを推奨
STEP 1: Cloud Composer環境の作成
📝 作成手順
GCPコンソール → 「Composer」を検索
「環境を作成」をクリック
「Composer 2」を選択(最新版、推奨)
以下の設定を入力
項目
設定値
名前
my-composer-env
ロケーション
asia-northeast1(東京)
イメージバージョン
最新版を選択(例:composer-2.x.x-airflow-2.x.x)
環境のサイズ
Small(学習用)
⏱️ 作成には時間がかかります
環境の作成には20〜30分 かかります。内部でGKE(Kubernetes)クラスタ、Cloud SQL、GCSバケットなどが自動作成されています。コーヒーでも飲みながら待ちましょう ☕
STEP 2: Airflow Web UIへのアクセス
📝 アクセス手順
環境が「利用可能」になったら、環境名をクリック
「Airflow ウェブサーバー」のリンクをクリック
ブラウザでAirflow UIが開きます
【Airflow Web UIでできること】
📋 DAG一覧
・登録されているDAGを一覧表示
・ON/OFFの切り替え
📊 実行履歴
・過去の実行結果を確認
・成功/失敗のステータス
📝 ログ
・各タスクの詳細ログを閲覧
・エラー原因の特定
▶️ 手動実行
・DAGを手動でトリガー
・テスト実行に便利
🔀 グラフビュー
・DAGの構造を視覚化
・依存関係を図で確認
💻 4. 最初のDAGを作ってみよう
シンプルなDAGの例
まずは、「Hello World」を出力するだけ の簡単なDAGを作ります。
“””
最初のDAG: hello_world.py
“””
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# デフォルト引数
default_args = {
‘owner’: ‘airflow’,
‘depends_on_past’: False,
‘start_date’: datetime(2024, 1, 1),
‘email_on_failure’: False,
‘email_on_retry’: False,
‘retries’: 1,
‘retry_delay’: timedelta(minutes=5),
}
# DAGを定義
dag = DAG(
‘hello_world’, # DAG ID
default_args=default_args,
description=’最初のDAG’,
schedule_interval=’@daily’, # 毎日実行
catchup=False, # 過去分は実行しない
)
# タスク1: Hello World を出力
def print_hello():
print(“Hello World!”)
return “Hello World!”
task1 = PythonOperator(
task_id=’print_hello’,
python_callable=print_hello,
dag=dag,
)
# タスク2: 現在時刻を出力
def print_date():
from datetime import datetime
print(f”現在時刻: {datetime.now()}”)
return datetime.now()
task2 = PythonOperator(
task_id=’print_date’,
python_callable=print_date,
dag=dag,
)
# 依存関係を設定(task1 → task2)
task1 >> task2
💡 default_args の各パラメータ詳細解説
【default_argsの全パラメータを理解しよう】
default_args = {
‘owner’: ‘airflow’,
‘depends_on_past’: False,
‘start_date’: datetime(2024, 1, 1),
‘email_on_failure’: False,
‘email_on_retry’: False,
‘retries’: 1,
‘retry_delay’: timedelta(minutes=5),
}
┌─────────────────────────────────────────────────────────┐
│ ‘owner’: ‘airflow’ │
├─────────────────────────────────────────────────────────┤
│ DAGの所有者(担当者名) │
│ ・誰が管理しているかを明示 │
│ ・’data-team’、’tanaka’ など任意の名前 │
│ ・Airflow UIで担当者でフィルタリング可能 │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ ‘depends_on_past’: False │
├─────────────────────────────────────────────────────────┤
│ 前回の実行結果に依存するか │
│ ・False → 前回が失敗でも今回は実行する │
│ ・True → 前回が成功していないと今回は実行しない │
│ ※ 通常はFalseで問題なし │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ ‘start_date’: datetime(2024, 1, 1) │
├─────────────────────────────────────────────────────────┤
│ DAGの開始日(スケジュールの基準日) │
│ ・この日以降からスケジュールが有効になる │
│ ・過去の日付を指定するのが一般的 │
│ ※ 未来の日付だと、その日まで実行されない │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ ‘email_on_failure’: False │
├─────────────────────────────────────────────────────────┤
│ タスク失敗時にメール通知するか │
│ ・True → 失敗したらメール送信 │
│ ・’email’: [‘alert@example.com’] と併用 │
│ ※ SMTPサーバーの設定が別途必要 │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ ‘retries’: 1 │
├─────────────────────────────────────────────────────────┤
│ 失敗時の再試行回数 │
│ ・1 → 失敗したら1回だけ再試行 │
│ ・3 → 最大3回まで再試行 │
│ ・0 → 再試行しない │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ ‘retry_delay’: timedelta(minutes=5) │
├─────────────────────────────────────────────────────────┤
│ 再試行までの待ち時間 │
│ ・timedelta(minutes=5) → 5分後に再試行 │
│ ・timedelta(seconds=30) → 30秒後に再試行 │
│ ・API制限などを考慮して適切な間隔を設定 │
└─────────────────────────────────────────────────────────┘
💡 DAG() の各パラメータ詳細解説
【DAG定義の各パラメータを理解しよう】
dag = DAG(
‘hello_world’,
default_args=default_args,
description=’最初のDAG’,
schedule_interval=’@daily’,
catchup=False,
)
┌─────────────────────────────────────────────────────────┐
│ ‘hello_world’ (第1引数) │
├─────────────────────────────────────────────────────────┤
│ DAG ID(ユニークな識別子) │
│ ・Airflow UI に表示される名前 │
│ ・英数字とアンダースコアのみ推奨 │
│ ・例:’daily_sales_report’, ‘hourly_etl_job’ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ schedule_interval=’@daily’ │
├─────────────────────────────────────────────────────────┤
│ 実行スケジュール │
│ ・’@daily’ → 毎日0時0分 │
│ ・’@hourly’ → 毎時0分 │
│ ・’0 9 * * *’ → 毎日9時0分(Cron形式) │
│ ・None → 手動実行のみ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ catchup=False │
├─────────────────────────────────────────────────────────┤
│ 過去分の実行をするか(重要!) │
│ │
│ 【例】start_date=2024/1/1、今日=2024/1/15 の場合 │
│ │
│ catchup=True: │
│ 1/1, 1/2, 1/3, … 1/15 の15日分を全部実行 │
│ → 過去分を一気に処理(バックフィル) │
│ │
│ catchup=False: │
│ 1/15 だけ実行 │
│ → 過去分は無視(通常はこちら) │
│ │
│ ※ 新規DAGでは catchup=False を推奨! │
│ Trueだと大量の過去分が実行されて大変なことに… │
└─────────────────────────────────────────────────────────┘
DAGのデプロイ
📝 デプロイ手順
上記のコードを hello_world.py として保存
ComposerのDAGフォルダを確認: GCPコンソール → Composer → 環境の詳細 → 「DAGフォルダ」の欄
DAGをアップロード:
# DAGフォルダの例
# gs://asia-northeast1-my-compo-xxxxx-bucket/dags
# DAGをアップロード
gsutil cp hello_world.py gs://asia-northeast1-my-compo-xxxxx-bucket/dags/
【実行結果】
✅ DAG ‘hello_world’ が正常に実行されました
[2024-01-15 09:00:00] タスク1: print_hello
Hello World!
✅ 成功
[2024-01-15 09:00:02] タスク2: print_date
現在時刻: 2024-01-15 09:00:02.123456
✅ 成功
⏱️ 総実行時間: 2秒
💡 依存関係の書き方
【依存関係の記法】
# 基本:task1 の後に task2 を実行
task1 >> task2
# チェーン:task1 → task2 → task3 の順
task1 >> task2 >> task3
# 分岐:task1 の後に task2 と task3 を並列実行
task1 >> [task2, task3]
# 合流:task1 と task2 の両方が終わったら task3
[task1, task2] >> task3
# 複雑な例:
# → task2 →
# task1 → task4
# → task3 →
task1 >> [task2, task3]
[task2, task3] >> task4
【別の書き方(set_downstream / set_upstream)】
# これも同じ意味
task1.set_downstream(task2) # task1 → task2
task2.set_upstream(task1) # task1 → task2
# >> の方がシンプルで読みやすいので推奨!
🔧 5. よく使うOperator
Operatorとは?
💡 例え話:工具箱の道具
【Operator = 工具箱の道具】
大工仕事(データパイプライン)をするときに、
作業内容に応じて道具(Operator)を選ぶ:
🔨 ハンマー = PythonOperator
・汎用的、何でもできる
・Python関数を実行
🪚 ノコギリ = BashOperator
・シェルコマンドを実行
・ファイル操作、スクリプト実行
📊 測定器 = BigQueryInsertJobOperator
・BigQuery専用
・SQLクエリを実行
🚀 特殊工具 = DataflowOperator
・大量データ処理専用
・Dataflowジョブを起動
【ポイント】
・作業に合った道具を選ぶと効率的!
・GCP専用Operatorは認証が自動で楽!
主要なOperator一覧
Operator
用途
使用例
PythonOperator
Python関数を実行
データ処理、計算、API呼び出し
BashOperator
Bashコマンド実行
ファイル操作、シェルスクリプト
BigQueryInsertJobOperator
BigQueryクエリ実行
データ集計、変換、テーブル作成
GCSToBigQueryOperator
GCS→BigQueryロード
CSVファイルのインポート
DataflowCreatePythonJobOperator
Dataflowジョブ実行
大量データのETL処理
GCSToGCSOperator
GCS間でファイルコピー
バックアップ、ファイル移動
EmailOperator
メール送信
完了通知、レポート送信
例1: BashOperatorを使う
from airflow.operators.bash import BashOperator
# Bashコマンドを実行
task_bash = BashOperator(
task_id=’run_bash_command’,
bash_command=’echo “Hello from Bash!” && date’,
dag=dag,
)
例2: BigQueryOperatorを使う
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
# BigQueryクエリを実行
task_bq = BigQueryInsertJobOperator(
task_id=’run_bigquery’,
configuration={
‘query’: {
‘query’: ”’
SELECT
order_date,
COUNT(*) AS order_count,
SUM(amount) AS total_amount
FROM `my-project.sales.orders`
WHERE order_date = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY order_date
”’,
‘useLegacySql’: False,
‘destinationTable’: {
‘projectId’: ‘my-project’,
‘datasetId’: ‘reports’,
‘tableId’: ‘daily_summary’
},
‘writeDisposition’: ‘WRITE_TRUNCATE’
}
},
dag=dag,
)
💡 BigQueryInsertJobOperatorの設定解説
【configuration辞書の構造】
configuration={
‘query’: {
‘query’: ‘SELECT …’, # 実行するSQL
‘useLegacySql’: False, # 標準SQL使用(必ずFalse)
‘destinationTable’: {…}, # 結果の出力先(省略可)
‘writeDisposition’: ‘…’ # 書き込みモード
}
}
【writeDispositionの選択肢】
・’WRITE_TRUNCATE’ → 既存データを削除して上書き
・’WRITE_APPEND’ → 既存データに追加
・’WRITE_EMPTY’ → テーブルが空の時のみ書込
例3: ETLパイプライン(複数Operator)
“””
ETLパイプライン例:extract → transform → load
“””
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
# タスク1: Extract(データ抽出)
def extract_data(**context):
“””GCSからデータを取得する処理”””
print(“📥 データを抽出中…”)
# 実際はGCSからファイルを読み込む処理
file_path = “gs://my-bucket/raw/sales_20240115.csv”
print(f”✅ 抽出完了: {file_path}”)
# XComで次のタスクにファイルパスを渡す
context[‘ti’].xcom_push(key=’file_path’, value=file_path)
return file_path
task_extract = PythonOperator(
task_id=’extract’,
python_callable=extract_data,
dag=dag,
)
# タスク2: Transform(データ変換)
def transform_data(**context):
“””データをクリーニング・変換する処理”””
# 前のタスクからファイルパスを取得
file_path = context[‘ti’].xcom_pull(key=’file_path’, task_ids=’extract’)
print(f”🔄 変換中: {file_path}”)
# 実際は変換処理を行う
output_path = file_path.replace(‘raw’, ‘processed’)
print(f”✅ 変換完了: {output_path}”)
context[‘ti’].xcom_push(key=’output_path’, value=output_path)
return output_path
task_transform = PythonOperator(
task_id=’transform’,
python_callable=transform_data,
dag=dag,
)
# タスク3: Load(BigQueryへロード)
task_load = BigQueryInsertJobOperator(
task_id=’load’,
configuration={
‘query’: {
‘query’: ”’
INSERT INTO `my-project.sales.orders`
SELECT * FROM `my-project.staging.orders_temp`
”’,
‘useLegacySql’: False,
}
},
dag=dag,
)
# 依存関係: extract → transform → load
task_extract >> task_transform >> task_load
📤 6. XCom(タスク間データ受け渡し)
XComとは
XCom(Cross Communication) は、タスク間でデータを受け渡す仕組み です。
💡 例え話:リレー走のバトン
【XCom = リレー走のバトン】
リレー競技(データパイプライン):
🏃 走者1(タスク1)
・自分の区間を走る(処理を実行)
・バトン(データ)を持っている
🔄 バトンパス(XCom)
・走者1 → 走者2 にバトンを渡す
・xcom_push: バトンを持つ
・xcom_pull: バトンを受け取る
🏃 走者2(タスク2)
・バトンを受け取って走る
・前の走者のデータを使って処理
【コードで書くと】
# タスク1: バトンを持つ(push)
context[‘ti’].xcom_push(key=’result’, value=’処理結果’)
# タスク2: バトンを受け取る(pull)
data = context[‘ti’].xcom_pull(key=’result’, task_ids=’task1′)
📝 XComの使い方詳細
【XComの基本操作】
① データを保存する(push)
context[‘ti’].xcom_push(key=’キー名’, value=データ)
・key: データを識別する名前(文字列)
・value: 保存するデータ(JSON化可能なもの)
② データを取得する(pull)
data = context[‘ti’].xcom_pull(
key=’キー名’,
task_ids=’データを保存したタスクID’
)
【注意点】
⚠️ 大きなデータ(数MB以上)は保存しない!
→ XComはメタデータDB(Cloud SQL)に保存される
→ 大きなデータはGCSに保存してパスだけXComで渡す
⚠️ **context が必要
→ PythonOperatorで **context を受け取る必要がある
→ python_callable=関数名 だけでOK(自動で渡される)
【良い例と悪い例】
❌ 悪い例:大きなDataFrameを直接渡す
context[‘ti’].xcom_push(key=’data’, value=huge_dataframe)
✅ 良い例:ファイルパスを渡す
huge_dataframe.to_csv(‘gs://bucket/data.csv’)
context[‘ti’].xcom_push(key=’path’, value=’gs://bucket/data.csv’)
“””
XComの実践例
“””
def task_a(**context):
“””データを処理してXComに保存”””
result = {
‘record_count’: 1000,
‘file_path’: ‘gs://my-bucket/output.csv’,
‘processed_at’: ‘2024-01-15 10:00:00’
}
# XComに保存
context[‘ti’].xcom_push(key=’processing_result’, value=result)
print(f”✅ 処理完了: {result}”)
return result
def task_b(**context):
“””XComからデータを取得して使用”””
# XComから取得
result = context[‘ti’].xcom_pull(
key=’processing_result’,
task_ids=’task_a’
)
print(f”📥 受け取ったデータ: {result}”)
print(f” – レコード数: {result[‘record_count’]}”)
print(f” – ファイルパス: {result[‘file_path’]}”)
return result
task_a_op = PythonOperator(
task_id=’task_a’,
python_callable=task_a,
dag=dag,
)
task_b_op = PythonOperator(
task_id=’task_b’,
python_callable=task_b,
dag=dag,
)
task_a_op >> task_b_op
📅 7. スケジュール設定
schedule_intervalの書き方
設定値
意味
実行タイミング
None
手動実行のみ
自動実行しない(テスト用)
'@once'
1回だけ
最初の1回だけ実行
'@hourly'
毎時
毎時0分(1:00, 2:00, 3:00…)
'@daily'
毎日
毎日0時0分(UTC)
'@weekly'
毎週
毎週日曜0時0分(UTC)
'@monthly'
毎月
毎月1日0時0分(UTC)
Cron形式でのスケジュール
📝 Cron形式の完全解説
【Cron形式の書き方】
‘分 時 日 月 曜日’
│ │ │ │ │
│ │ │ │ └── 曜日(0-6、0=日曜)
│ │ │ └────── 月(1-12)
│ │ └───────── 日(1-31)
│ └──────────── 時(0-23)
└─────────────── 分(0-59)
【特殊文字】
* = すべての値(毎〜)
, = 複数指定(1,15 = 1日と15日)
– = 範囲指定(1-5 = 1〜5)
/ = 間隔指定(*/15 = 15分ごと)
【よく使う例】
‘0 9 * * *’ → 毎日9時0分
’30 14 * * *’ → 毎日14時30分
‘0 */6 * * *’ → 6時間ごと(0時、6時、12時、18時)
‘0 9 * * 1’ → 毎週月曜日9時0分
‘0 9 * * 1-5’ → 平日(月〜金)9時0分
‘0 0 1 * *’ → 毎月1日0時0分
‘0 9 1,15 * *’ → 毎月1日と15日の9時0分
‘*/15 * * * *’ → 15分ごと
‘0 2 * * *’ → 毎日午前2時(バッチ処理に最適)
【タイムゾーンに注意!】
Airflowのデフォルトは UTC です。
日本時間(JST)は UTC+9 なので:
日本時間9時に実行したい場合:
→ UTC 0時 = ‘0 0 * * *’
日本時間18時に実行したい場合:
→ UTC 9時 = ‘0 9 * * *’
🔗 8. 実践:BigQueryへの日次データロード
シナリオ
毎日午前2時(UTC)に、GCSのCSVファイルをBigQueryにロード するDAGを作ります。
📋 要件
毎日午前2時(UTC)に実行
前日のデータ(sales_YYYYMMDD.csv)をGCSから取得
BigQueryの sales テーブルにロード
データ品質チェック(0件でないか確認)
成功したら通知
“””
日次データロードDAG: daily_sales_load.py
“””
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCheckOperator
from datetime import datetime, timedelta
# ==========================================
# 1. 基本設定
# ==========================================
default_args = {
‘owner’: ‘data-team’,
‘depends_on_past’: False,
‘start_date’: datetime(2024, 1, 1),
‘email_on_failure’: True,
‘email’: [‘alert@example.com’],
‘retries’: 3, # 失敗時3回まで再試行
‘retry_delay’: timedelta(minutes=5), # 5分間隔で再試行
}
dag = DAG(
‘daily_sales_load’,
default_args=default_args,
description=’GCSからBigQueryへの日次売上データロード’,
schedule_interval=’0 2 * * *’, # 毎日午前2時(UTC)
catchup=False, # 過去分は実行しない
tags=[‘sales’, ‘daily’, ‘etl’],
)
# ==========================================
# 2. タスク定義
# ==========================================
# タスク1: 前日の日付を取得
def get_yesterday_date(**context):
“””前日の日付を取得してXComに保存”””
yesterday = (datetime.now() – timedelta(days=1)).strftime(‘%Y%m%d’)
print(f”📅 処理対象日: {yesterday}”)
context[‘ti’].xcom_push(key=’target_date’, value=yesterday)
return yesterday
task_get_date = PythonOperator(
task_id=’get_yesterday_date’,
python_callable=get_yesterday_date,
dag=dag,
)
# タスク2: GCSからBigQueryへロード
task_load = GCSToBigQueryOperator(
task_id=’load_to_bigquery’,
bucket=’my-data-bucket’,
source_objects=[‘sales/sales_{{ ti.xcom_pull(key=”target_date”) }}.csv’],
destination_project_dataset_table=’my-project:my_dataset.sales’,
source_format=’CSV’,
skip_leading_rows=1, # ヘッダー行スキップ
write_disposition=’WRITE_APPEND’, # 追記モード
autodetect=True, # スキーマ自動検出
dag=dag,
)
# タスク3: データ品質チェック
task_check = BigQueryCheckOperator(
task_id=’check_data_quality’,
sql=”’
SELECT COUNT(*) > 0
FROM `my-project.my_dataset.sales`
WHERE DATE(order_date) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
”’,
use_legacy_sql=False,
dag=dag,
)
# タスク4: 成功通知
def send_notification(**context):
“””成功通知を送る”””
target_date = context[‘ti’].xcom_pull(key=’target_date’)
print(f”✅ {target_date}のデータロードが完了しました!”)
print(f”📊 BigQueryテーブル: my-project.my_dataset.sales”)
# 実際はSlack APIなどで通知を送る
return “success”
task_notify = PythonOperator(
task_id=’send_notification’,
python_callable=send_notification,
dag=dag,
)
# ==========================================
# 3. 依存関係を設定
# ==========================================
task_get_date >> task_load >> task_check >> task_notify
【実行結果】
✅ DAG ‘daily_sales_load’ が正常に実行されました
[2024-01-15 02:00:00] タスク1: get_yesterday_date
📅 処理対象日: 20240114
✅ 成功 (0.5秒)
[2024-01-15 02:00:05] タスク2: load_to_bigquery
📥 ロード中: gs://my-data-bucket/sales/sales_20240114.csv
✅ 10,523行がロードされました (15秒)
[2024-01-15 02:00:25] タスク3: check_data_quality
🔍 データ品質チェック中…
✅ チェック合格: 10,523行 (3秒)
[2024-01-15 02:00:30] タスク4: send_notification
✅ 20240114のデータロードが完了しました!
📊 BigQueryテーブル: my-project.my_dataset.sales
⏱️ 総実行時間: 30秒
💰 9. Cloud Composerの料金
⚠️ 重要:料金は高めです
Cloud Composerは常時稼働 するため、他のGCPサービスより料金が高めです。使わない時は環境を削除することを強く推奨します。
項目
料金
説明
Composer環境
$0.20/vCPU/時間
環境の基本料金
GKEクラスタ
$0.10/vCPU/時間
Kubernetes環境
Cloud SQL
$0.017/時間〜
メタデータDB
GCS
$0.020/GB/月
DAG保存用
📊 環境サイズ別の月額目安
【月額料金の目安(24時間稼働の場合)】
┌─────────────────────────────────────────────────────┐
│ Small環境(学習・開発用) │
├─────────────────────────────────────────────────────┤
│ 月額: 約 $300〜500(約45,000〜75,000円) │
│ 用途: 学習、PoC、小規模ワークフロー │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ Medium環境(本番用) │
├─────────────────────────────────────────────────────┤
│ 月額: 約 $1,000〜1,500(約15〜22万円) │
│ 用途: 本番環境、中規模ワークフロー │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ Large環境(大規模用) │
├─────────────────────────────────────────────────────┤
│ 月額: 約 $2,000〜3,000(約30〜45万円) │
│ 用途: 大規模本番環境、多数の並列DAG │
└─────────────────────────────────────────────────────┘
💡 コスト削減のコツ
環境サイズ :最初はSmallで十分。必要に応じてスケールアップ
環境削除 :学習後は必ず環境を削除。再作成は20〜30分
ローカル開発 :開発・テストはローカルAirflowで。本番だけComposer
代替案検討 :シンプルなワークフローなら Cloud Scheduler + Cloud Functions が安い
📝 10. STEP 21 のまとめ
✅ このステップで学んだこと
Cloud Composer は Apache Airflow のフルマネージド版
DAG (有向非巡回グラフ)でワークフローを定義
Operator でタスクの種類を指定(Python、Bash、BigQuery等)
XCom でタスク間のデータ受け渡し
Cron形式 でスケジュール設定(毎日、毎時など)
GCSからBigQueryへの日次データロード を自動化
💡 次のステップへ
Cloud Composerは複雑なデータパイプラインの管理 に最適です。
次のSTEP 22では、コスト最適化戦略 を学びます。
クラウドの料金を50%削減するテクニックを身につけましょう!
📝 練習問題
問題 1
基礎
DAGとは何の略で、どういう意味ですか?
解答を見る
【解答】
DAG = Directed Acyclic Graph(有向非巡回グラフ)
Directed(有向) :矢印に向きがある(処理の順序がある)
Acyclic(非巡回) :ループしない(必ず終わる)
Graph(グラフ) :タスクと依存関係を表すグラフ構造
問題 2
基礎
以下のCron形式は、いつ実行されますか?'30 9 * * 1-5'
解答を見る
【解答】
平日(月〜金)の午前9時30分(UTC)
30:30分
9:9時
*:毎日
*:毎月
1-5:月曜〜金曜(0=日曜、6=土曜)
問題 3
応用
3つのタスク(A、B、C)があります。「AとBが両方完了したらCを実行」という依存関係を、Pythonコードで書いてください。
解答を見る
【解答】
# 方法1: リストで指定(推奨)
[task_a, task_b] >> task_c
# 方法2: 個別に指定
task_a >> task_c
task_b >> task_c
どちらも同じ意味です。AとBが両方成功したら、Cが実行されます。
問題 4
発展
catchup=False の意味を説明してください。また、なぜ新規DAGではFalseを推奨するのですか?
解答を見る
【解答】
catchup=False は、過去分の実行をスキップする設定です。
【例】start_date=2024/1/1、今日=2024/1/15 の場合
catchup=True:
1/1, 1/2, 1/3, … 1/15 の15日分を全部実行
→ 過去分を一気に処理(バックフィル)
catchup=False:
1/15 だけ実行
→ 過去分は無視
Falseを推奨する理由: Trueだと、DAGを初めてデプロイした時に大量の過去分が実行され、リソースを大量消費したり、意図しないデータ処理が発生する可能性があるためです。
❓ よくある質問
Q1: Cloud ComposerとCloud Schedulerの違いは何ですか?
複雑さの違いです。 Cloud Schedulerは単純なタスクを定期実行 するだけ(例:毎日同じCloud Functionを呼ぶ)。Cloud Composerは複数のタスクを組み合わせた複雑なワークフロー を管理できます。依存関係、条件分岐、リトライが必要ならComposer、シンプルならSchedulerで十分です。料金もSchedulerの方がずっと安いです。
Q2: DAGが表示されないのですが、どうすればいいですか?
以下を確認してください:
1. DAGファイルが正しいGCSバケット(DAGフォルダ)にアップロードされているか
2. Pythonの構文エラーがないか(ローカルでpython hello_world.pyを実行してみる)
3. DAG IDが既存のものと重複していないか
4. 5〜10分待ってもダメなら、Airflow UIの「ログ」でエラーを確認
5. dag変数が正しく定義されているか確認
Q3: ローカル環境でAirflowを動かすことはできますか?
はい、できます! 開発・テストはローカルで行い、本番環境だけCloud Composerを使うことで、コストを大幅に削減できます。
# インストール
pip install apache-airflow
# 初期化
airflow db init
# 起動(Webサーバーとスケジューラー)
airflow standalone
# ブラウザで http://localhost:8080 にアクセス
Q4: XComで大きなデータを渡したいのですが?
XComで大きなデータ(数MB以上)を直接渡すのは避けてください。 XComはメタデータDB(Cloud SQL)に保存されるため、大きなデータを入れるとパフォーマンスが低下します。代わりに、GCSにファイルを保存して、そのパス(URL)だけをXComで渡す のがベストプラクティスです。
Q5: 日本時間でスケジュールを設定したいのですが?
Airflowのデフォルトタイムゾーンは UTC です。 日本時間(JST)はUTC+9なので、9時間引いた時間で設定してください。例えば、日本時間の午前9時に実行したい場合は、UTC午前0時('0 0 * * *')と設定します。または、Composer環境の設定でタイムゾーンを変更することもできます。
Q6: Cloud Composerの料金が高すぎるのですが、代替案はありますか?
以下の代替案があります:
1. Cloud Scheduler + Cloud Functions :シンプルなワークフローなら十分。月額数ドル程度
2. Cloud Workflows :GCPの軽量ワークフローサービス。JSONで定義
3. ローカルAirflow + Compute Engine :自分でサーバーを管理する
4. Cloud Run Jobs :コンテナベースのバッチ処理。使った分だけ課金
5. 開発環境は使わない :本番環境だけComposerを使う
×
artnasekai
#artnasekai #学習メモ