📋 このステップで学ぶこと
- XCom(クロスコミュニケーション)の使い方
- TaskFlow APIの使い方
- Variables(変数)の管理
- Connections(接続情報)の設定
- 実践演習:タスク間でデータを引き継ぐDAG
⏱️ 学習時間の目安:3時間
📝 練習問題:10問(基礎4問・応用4問・発展2問)
🎯 1. タスク間データ共有の必要性
1-1. なぜデータ共有が必要か?
ETLパイプラインでは、前のタスクの結果を次のタスクで使いたいことがよくあります。
📚 例え話:リレー走
タスク間のデータ受け渡しはリレー走のバトンに似ています。
・タスクA(第1走者):データを抽出してバトン(ファイルパス)を渡す
・タスクB(第2走者):バトンを受け取り、データを変換して次へ
・タスクC(第3走者):バトンを受け取り、最終処理
バトンがなければリレーは成立しません。
同様に、データの受け渡しがなければパイプラインは機能しません。
1-2. Airflowのデータ共有方法
データ共有の4つの方法
| 方法 |
特徴 |
データサイズ |
用途 |
| XCom |
タスク間で小さなデータを受け渡し |
〜数MB |
結果値、パス |
| ファイル |
大きなデータはファイルに保存 |
無制限 |
大規模データ |
| Variables |
DAG全体で使う設定値 |
〜数KB |
設定、URL |
| Connections |
DB・APIの接続情報 |
– |
認証情報 |
データ共有の流れ
【XComによるタスク間データ共有】
┌─────────┐ XCom ┌─────────┐ XCom ┌─────────┐
│ Extract │ ─────────────→ │Transform│ ─────────────→ │ Load │
│ │ ファイルパス │ │ 変換後パス │ │
└─────────┘ └─────────┘ └─────────┘
│ │ │
▼ ▼ ▼
/tmp/raw.csv /tmp/trans.csv /tmp/final.csv
【Variables と Connections】
┌─────────────────────────────────────────────────────────────────┐
│ Airflow メタデータDB │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Variables │ │ Connections │ │
│ │ ・S3バケット │ │ ・DB接続情報 │ │
│ │ ・API URL │ │ ・API認証 │ │
│ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓ 参照 ↓ 参照
全タスクから利用可能 全タスクから利用可能
📤 2. XCom(クロスコミュニケーション)
2-1. XComとは?
XCom(Cross Communication)は、タスク間で小さなデータを受け渡す仕組みです。
文字列、数値、リスト、辞書などを渡せます。
💡 XComの制限
- サイズ制限:数KB〜数MB(DBによる)
- 大きなデータ(DataFrame全体など)は不向き
- ファイルパスや集計結果など小さなデータに使う
2-2. XComの基本的な使い方
# ===== XComの基本 =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def push_data(**context):
“””データをプッシュ(送信)”””
# データを返すと自動的にXComにプッシュされる
return “Hello from task1”
def pull_data(**context):
“””データをプル(取得)”””
ti = context[‘ti’] # TaskInstance
# 前のタスクからデータを取得
data = ti.xcom_pull(task_ids=’push_task’)
print(f”受信したデータ: {data}”)
with DAG(
dag_id=’xcom_example’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
task1 = PythonOperator(
task_id=’push_task’,
python_callable=push_data
)
task2 = PythonOperator(
task_id=’pull_task’,
python_callable=pull_data
)
task1 >> task2
【実行ログ】
[push_task] タスク完了、戻り値: “Hello from task1”
[pull_task] 受信したデータ: Hello from task1
2-3. 様々なデータ型を渡す
# ===== 様々なデータ型をXComで渡す =====
def push_various_data(**context):
“””様々なデータ型をプッシュ”””
ti = context[‘ti’]
# 文字列
ti.xcom_push(key=’string_data’, value=’Hello’)
# 数値
ti.xcom_push(key=’number_data’, value=123)
# リスト
ti.xcom_push(key=’list_data’, value=[1, 2, 3, 4, 5])
# 辞書
ti.xcom_push(key=’dict_data’, value={
‘name’: ‘田中’,
‘age’: 30,
‘city’: ‘東京’
})
# デフォルトのreturn値もプッシュされる
return “All data pushed”
def pull_various_data(**context):
“””様々なデータ型をプル”””
ti = context[‘ti’]
# キーを指定してプル
string_val = ti.xcom_pull(task_ids=’push_task’, key=’string_data’)
number_val = ti.xcom_pull(task_ids=’push_task’, key=’number_data’)
list_val = ti.xcom_pull(task_ids=’push_task’, key=’list_data’)
dict_val = ti.xcom_pull(task_ids=’push_task’, key=’dict_data’)
# キーを指定しない場合はreturn値を取得
default_val = ti.xcom_pull(task_ids=’push_task’)
print(f”String: {string_val}”)
print(f”Number: {number_val}”)
print(f”List: {list_val}”)
print(f”Dict: {dict_val}”)
print(f”Default: {default_val}”)
【実行ログ】
String: Hello
Number: 123
List: [1, 2, 3, 4, 5]
Dict: {‘name’: ‘田中’, ‘age’: 30, ‘city’: ‘東京’}
Default: All data pushed
2-4. 実用例:ファイルパスの受け渡し
# ===== ファイルパスの受け渡し =====
import pandas as pd
def extract_data(**context):
“””データを抽出してファイルパスを返す”””
df = pd.DataFrame({
‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’],
‘価格’: [150, 100, 80]
})
file_path = ‘/tmp/extracted_data.csv’
df.to_csv(file_path, index=False)
print(f”データを保存: {file_path}”)
# ファイルパスを返す(XComに自動プッシュ)
return file_path
def transform_data(**context):
“””ファイルを読み込んで変換”””
ti = context[‘ti’]
# 前のタスクからファイルパスを取得
file_path = ti.xcom_pull(task_ids=’extract’)
print(f”ファイルを読み込み: {file_path}”)
df = pd.read_csv(file_path)
df[‘税込価格’] = (df[‘価格’] * 1.1).astype(int)
output_path = ‘/tmp/transformed_data.csv’
df.to_csv(output_path, index=False)
print(f”変換後データを保存: {output_path}”)
return output_path
with DAG(
dag_id=’file_path_xcom’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
extract = PythonOperator(
task_id=’extract’,
python_callable=extract_data
)
transform = PythonOperator(
task_id=’transform’,
python_callable=transform_data
)
extract >> transform
【実行ログ】
[extract] データを保存: /tmp/extracted_data.csv
[transform] ファイルを読み込み: /tmp/extracted_data.csv
[transform] 変換後データを保存: /tmp/transformed_data.csv
⚡ 3. TaskFlow API(推奨)
3-1. TaskFlow APIとは?
TaskFlow APIは、XComをもっと簡単に使うための新しいAPI(Airflow 2.0以降)です。
デコレータ(@task)を使って、Pythonの関数をそのままタスクにできます。
3-2. 従来の方法 vs TaskFlow API
# ===== 従来の方法 vs TaskFlow API =====
# ❌ 従来の方法(冗長)
def extract(**context):
ti = context[‘ti’]
data = “some data”
ti.xcom_push(key=’data’, value=data)
def transform(**context):
ti = context[‘ti’]
data = ti.xcom_pull(task_ids=’extract’, key=’data’)
# 処理…
# ✅ TaskFlow API(シンプル)
from airflow.decorators import task
@task
def extract():
return “some data” # 自動でXComにプッシュ
@task
def transform(data): # 引数で自動的に受け取り
# 処理…
pass
# 実行(関数呼び出しのように書ける)
data = extract()
transform(data)
3-3. TaskFlow APIの基本
# ===== TaskFlow APIの基本 =====
from airflow.decorators import dag, task
from datetime import datetime
@dag(
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
)
def taskflow_example():
@task
def extract():
“””データ抽出”””
data = [1, 2, 3, 4, 5]
print(f”抽出: {data}”)
return data
@task
def transform(data):
“””データ変換”””
transformed = [x * 2 for x in data]
print(f”変換: {transformed}”)
return transformed
@task
def load(data):
“””データロード”””
print(f”ロード: {data}”)
total = sum(data)
print(f”合計: {total}”)
return total
# DAGの実行フロー(関数呼び出しのように書ける)
extracted_data = extract()
transformed_data = transform(extracted_data)
load(transformed_data)
# DAGを登録
dag_instance = taskflow_example()
【実行ログ】
[extract] 抽出: [1, 2, 3, 4, 5]
[transform] 変換: [2, 4, 6, 8, 10]
[load] ロード: [2, 4, 6, 8, 10]
[load] 合計: 30
🎯 TaskFlow APIのメリット
- XComのpush/pull を意識しなくて良い
- Pythonの関数のように書ける
- コードがシンプルで読みやすい
- 型ヒントが使える
3-4. 実用例:ETLパイプライン
# ===== TaskFlow APIを使ったETLパイプライン =====
from airflow.decorators import dag, task
from datetime import datetime
import pandas as pd
@dag(
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False,
tags=[‘etl’, ‘taskflow’]
)
def etl_with_taskflow():
“””TaskFlow APIを使ったETLパイプライン”””
@task
def extract() -> str:
“””データ抽出”””
df = pd.DataFrame({
‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’, ‘ぶどう’],
‘価格’: [150, 100, 80, 300],
‘個数’: [10, 20, 15, 5]
})
file_path = ‘/tmp/extracted.csv’
df.to_csv(file_path, index=False)
print(f”✅ データ抽出: {len(df)}件”)
return file_path
@task
def transform(file_path: str) -> str:
“””データ変換”””
df = pd.read_csv(file_path)
df[‘税込価格’] = (df[‘価格’] * 1.1).astype(int)
df[‘合計金額’] = df[‘税込価格’] * df[‘個数’]
output_path = ‘/tmp/transformed.csv’
df.to_csv(output_path, index=False)
print(f”✅ データ変換: {len(df)}件”)
return output_path
@task
def load(file_path: str) -> dict:
“””データロード”””
df = pd.read_csv(file_path)
final_path = ‘/tmp/final_output.csv’
df.to_csv(final_path, index=False)
summary = {
‘件数’: len(df),
‘総売上’: int(df[‘合計金額’].sum()),
‘ファイル’: final_path
}
print(f”✅ データロード: {summary}”)
return summary
# ETLフローの実行
extracted = extract()
transformed = transform(extracted)
result = load(transformed)
# DAGを登録
dag_instance = etl_with_taskflow()
【実行ログ】
[extract] ✅ データ抽出: 4件
[transform] ✅ データ変換: 4件
[load] ✅ データロード: {‘件数’: 4, ‘総売上’: 5775, ‘ファイル’: ‘/tmp/final_output.csv’}
⚙️ 4. Variables(変数)
4-1. Variablesとは?
Variablesは、DAG全体で使う設定値を保存する仕組みです。
環境変数のように使えます。
📊 Variables の使い所
- APIのエンドポイントURL
- S3バケット名
- メール送信先アドレス
- 環境別の設定(dev/prod)
4-2. Web UIでVariableを設定
# ===== Web UIでの設定方法 =====
# 1. Web UI (http://localhost:8080) にアクセス
# 2. 上部メニュー「Admin」→「Variables」をクリック
# 3. 「+」ボタンで新規作成
# 4. Keyとvalueを入力
# 例: Key=”s3_bucket”, Val=”my-data-bucket”
# 5. 「Save」をクリック
4-3. PythonコードでVariableを使う
# ===== PythonでVariableを使う =====
from airflow.models import Variable
from airflow.decorators import dag, task
from datetime import datetime
@dag(
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
)
def use_variables():
@task
def get_config():
“””Variableから設定を取得”””
# Variableを取得
s3_bucket = Variable.get(‘s3_bucket’)
api_url = Variable.get(‘api_url’)
print(f”S3バケット: {s3_bucket}”)
print(f”API URL: {api_url}”)
# デフォルト値を指定
timeout = Variable.get(‘timeout’, default_var=30)
print(f”タイムアウト: {timeout}秒”)
get_config()
dag_instance = use_variables()
【実行ログ】
S3バケット: my-data-bucket
API URL: https://api.example.com
タイムアウト: 30秒
4-4. JSONでVariableを保存
# ===== JSONでVariableを保存 =====
from airflow.models import Variable
# Variable(Web UIで設定):
# Key: “app_config”
# Val: {“database”: {“host”: “localhost”, “port”: 5432}, “s3”: {“bucket”: “my-bucket”}}
@task
def use_json_variable():
# JSONとして取得
config = Variable.get(‘app_config’, deserialize_json=True)
print(f”DB Host: {config[‘database’][‘host’]}”)
print(f”DB Port: {config[‘database’][‘port’]}”)
print(f”S3 Bucket: {config[‘s3’][‘bucket’]}”)
4-5. CLIでVariableを設定
# Variable を設定
airflow variables set s3_bucket my-data-bucket
# Variable を取得
airflow variables get s3_bucket
# Variable を削除
airflow variables delete s3_bucket
# JSONファイルからインポート
airflow variables import variables.json
# 全Variableをエクスポート
airflow variables export variables.json
⚠️ 機密情報の取り扱い
パスワードやAPIキーなど、機密情報は暗号化して保存しましょう。
Variable名に以下のキーワードを含めると自動で暗号化されます:
password, secret, passwd, api_key, apikey, access_token
🔌 5. Connections(接続情報)
5-1. Connectionsとは?
Connectionsは、データベースやAPIの接続情報を保存する仕組みです。
Connectionの主要フィールド
| フィールド |
説明 |
例 |
| Conn Id |
接続の名前(一意) |
my_postgres |
| Conn Type |
接続タイプ |
Postgres, MySQL, HTTP |
| Host |
ホスト名 |
localhost |
| Schema |
データベース名 |
mydb |
| Login |
ユーザー名 |
postgres |
| Password |
パスワード |
mypassword |
| Port |
ポート番号 |
5432 |
5-2. PythonコードでConnectionを使う
# ===== Connectionを使ってDBに接続 =====
from airflow.hooks.base import BaseHook
from airflow.decorators import dag, task
from datetime import datetime
import pandas as pd
@dag(
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
)
def use_connection():
@task
def query_database():
“””Connectionを使ってDBに接続”””
# Connectionを取得
conn = BaseHook.get_connection(‘my_postgres’)
# 接続文字列を作成
connection_string = (
f”postgresql://{conn.login}:{conn.password}”
f”@{conn.host}:{conn.port}/{conn.schema}”
)
print(f”接続先: {conn.host}:{conn.port}/{conn.schema}”)
# SQLAlchemyで接続
from sqlalchemy import create_engine
engine = create_engine(connection_string)
# SQLクエリを実行
df = pd.read_sql(“SELECT * FROM users LIMIT 10″, engine)
print(f”取得件数: {len(df)}”)
return len(df)
query_database()
dag_instance = use_connection()
【実行ログ】
接続先: localhost:5432/mydb
取得件数: 10
5-3. CLIでConnectionを設定
# Connection を設定
airflow connections add my_postgres \
–conn-type postgres \
–conn-host localhost \
–conn-schema mydb \
–conn-login postgres \
–conn-password mypassword \
–conn-port 5432
# Connection を取得
airflow connections get my_postgres
# Connection を削除
airflow connections delete my_postgres
💼 6. 実践演習:完全なETLパイプライン
6-1. 要件
- Variablesから設定を取得
- TaskFlow APIでETLを実装
- タスク間でデータを受け渡し
- 結果をログに出力
6-2. 完全な実装
# ===== 完全なETLパイプライン =====
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime, timedelta
import pandas as pd
import logging
logger = logging.getLogger(__name__)
@dag(
dag_id=’complete_etl_pipeline’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False,
tags=[‘etl’, ‘taskflow’, ‘complete’]
)
def complete_etl_pipeline():
“””Variables + TaskFlow APIを使った完全なETLパイプライン”””
@task
def get_config() -> dict:
“””Variablesから設定を取得”””
config = {
‘output_dir’: Variable.get(‘output_dir’, default_var=’/tmp’),
‘tax_rate’: float(Variable.get(‘tax_rate’, default_var=’0.1′))
}
logger.info(f”設定を取得: {config}”)
return config
@task
def extract(config: dict) -> str:
“””データ抽出”””
df = pd.DataFrame({
‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’, ‘ぶどう’, ‘いちご’],
‘価格’: [150, 100, 80, 300, 250],
‘個数’: [10, 20, 15, 5, 8]
})
file_path = f”{config[‘output_dir’]}/extracted.csv”
df.to_csv(file_path, index=False)
logger.info(f”✅ Extract完了: {len(df)}件 → {file_path}”)
return file_path
@task
def transform(file_path: str, config: dict) -> str:
“””データ変換”””
df = pd.read_csv(file_path)
tax_rate = config[‘tax_rate’]
df[‘税込価格’] = (df[‘価格’] * (1 + tax_rate)).astype(int)
df[‘合計金額’] = df[‘税込価格’] * df[‘個数’]
output_path = f”{config[‘output_dir’]}/transformed.csv”
df.to_csv(output_path, index=False)
logger.info(f”✅ Transform完了: {len(df)}件 → {output_path}”)
return output_path
@task
def load(file_path: str, config: dict) -> dict:
“””データロード”””
df = pd.read_csv(file_path)
final_path = f”{config[‘output_dir’]}/final_report.csv”
df.to_csv(final_path, index=False)
summary = {
‘件数’: len(df),
‘総売上’: int(df[‘合計金額’].sum()),
‘平均単価’: int(df[‘価格’].mean()),
‘ファイル’: final_path
}
logger.info(f”✅ Load完了: {summary}”)
return summary
@task
def notify(summary: dict):
“””完了通知”””
logger.info(“=” * 50)
logger.info(“📊 ETLパイプライン完了レポート”)
logger.info(“=” * 50)
logger.info(f” 処理件数: {summary[‘件数’]}件”)
logger.info(f” 総売上: ¥{summary[‘総売上’]:,}”)
logger.info(f” 平均単価: ¥{summary[‘平均単価’]:,}”)
logger.info(f” 出力先: {summary[‘ファイル’]}”)
logger.info(“=” * 50)
# ETLフローの実行
config = get_config()
extracted = extract(config)
transformed = transform(extracted, config)
summary = load(transformed, config)
notify(summary)
# DAGを登録
dag_instance = complete_etl_pipeline()
【実行ログ】
[get_config] 設定を取得: {‘output_dir’: ‘/tmp’, ‘tax_rate’: 0.1}
[extract] ✅ Extract完了: 5件 → /tmp/extracted.csv
[transform] ✅ Transform完了: 5件 → /tmp/transformed.csv
[load] ✅ Load完了: {‘件数’: 5, ‘総売上’: 7865, ‘平均単価’: 176, ‘ファイル’: ‘/tmp/final_report.csv’}
[notify] ==================================================
[notify] 📊 ETLパイプライン完了レポート
[notify] ==================================================
[notify] 処理件数: 5件
[notify] 総売上: ¥7,865
[notify] 平均単価: ¥176
[notify] 出力先: /tmp/final_report.csv
[notify] ==================================================
📝 STEP 22 のまとめ
✅ このステップで学んだこと
- XCom:タスク間で小さなデータを受け渡し
- TaskFlow API:XComをシンプルに使える(@task)
- Variables:DAG全体で使う設定値
- Connections:データベースやAPIの接続情報
💡 重要ポイント
- 小さなデータはXCom、大きなデータはファイル
- TaskFlow APIを使うとコードが簡潔
- 設定値はVariablesで管理
- 接続情報はConnectionsで管理
- 機密情報は暗号化して保存
🎯 次のステップの予告
次のSTEP 23では、「Operatorの活用」を学びます。
- PythonOperatorの詳細
- BashOperatorの活用
- EmailOperator(通知)
- カスタムOperatorの作成
📝 練習問題
問題 1
基礎
XComを使って、タスクAからタスクBに文字列「Hello」を渡すDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def push_hello():
return “Hello”
def pull_hello(**context):
ti = context[‘ti’]
message = ti.xcom_pull(task_ids=’task_a’)
print(f”受信: {message}”)
with DAG(
dag_id=’xcom_hello’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
task_a = PythonOperator(
task_id=’task_a’,
python_callable=push_hello
)
task_b = PythonOperator(
task_id=’task_b’,
python_callable=pull_hello
)
task_a >> task_b
問題 2
基礎
TaskFlow APIを使って、数値を2倍にするシンプルなDAGを作成してください。
【解答】
from airflow.decorators import dag, task
from datetime import datetime
@dag(
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
)
def double_number():
@task
def get_number():
return 5
@task
def double(num):
result = num * 2
print(f”結果: {result}”)
return result
number = get_number()
double(number)
dag_instance = double_number()
問題 3
基礎
xcom_pushでキー「count」と値100を送信し、xcom_pullで受け取るDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def push_count(**context):
ti = context[‘ti’]
ti.xcom_push(key=’count’, value=100)
def pull_count(**context):
ti = context[‘ti’]
count = ti.xcom_pull(task_ids=’push_task’, key=’count’)
print(f”受信したcount: {count}”)
with DAG(
dag_id=’xcom_key_value’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
push_task = PythonOperator(
task_id=’push_task’,
python_callable=push_count
)
pull_task = PythonOperator(
task_id=’pull_task’,
python_callable=pull_count
)
push_task >> pull_task
問題 4
基礎
Variable.getでデフォルト値を指定して設定を取得するタスクを作成してください。
【解答】
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime
@dag(
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
)
def use_variable_default():
@task
def get_settings():
# デフォルト値を指定
timeout = Variable.get(‘timeout’, default_var=30)
batch_size = Variable.get(‘batch_size’, default_var=1000)
print(f”タイムアウト: {timeout}秒”)
print(f”バッチサイズ: {batch_size}”)
get_settings()
dag_instance = use_variable_default()
問題 5
応用
TaskFlow APIで、リストの合計を計算するETLパイプラインを作成してください。
【解答】
from airflow.decorators import dag, task
from datetime import datetime
@dag(
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
)
def sum_pipeline():
@task
def extract() -> list:
“””データ抽出”””
data = [10, 20, 30, 40, 50]
print(f”抽出: {data}”)
return data
@task
def transform(data: list) -> list:
“””データ変換(2倍)”””
transformed = [x * 2 for x in data]
print(f”変換: {transformed}”)
return transformed
@task
def load(data: list) -> int:
“””データロード(合計計算)”””
total = sum(data)
print(f”合計: {total}”)
return total
extracted = extract()
transformed = transform(extracted)
load(transformed)
dag_instance = sum_pipeline()
問題 6
応用
辞書をXComで渡して、受信側で各キーの値を出力するDAGを作成してください。
【解答】
from airflow.decorators import dag, task
from datetime import datetime
@dag(
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
)
def dict_xcom():
@task
def create_dict() -> dict:
“””辞書を作成”””
return {
‘name’: ‘田中’,
‘age’: 30,
‘city’: ‘東京’
}
@task
def print_dict(data: dict):
“””辞書の内容を出力”””
print(f”名前: {data[‘name’]}”)
print(f”年齢: {data[‘age’]}”)
print(f”都市: {data[‘city’]}”)
data = create_dict()
print_dict(data)
dag_instance = dict_xcom()
問題 7
応用
ファイルパスをXComで渡し、次のタスクでそのファイルを読み込むDAGを作成してください。
【解答】
from airflow.decorators import dag, task
from datetime import datetime
import pandas as pd
@dag(
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
)
def file_path_pass():
@task
def create_file() -> str:
“””ファイルを作成”””
df = pd.DataFrame({
‘商品’: [‘A’, ‘B’, ‘C’],
‘価格’: [100, 200, 300]
})
file_path = ‘/tmp/data.csv’
df.to_csv(file_path, index=False)
print(f”ファイル作成: {file_path}”)
return file_path
@task
def read_file(file_path: str):
“””ファイルを読み込み”””
df = pd.read_csv(file_path)
print(f”読み込み件数: {len(df)}”)
print(df)
path = create_file()
read_file(path)
dag_instance = file_path_pass()
問題 8
応用
複数のタスクからの結果を1つのタスクで集約するDAGを作成してください。
【解答】
from airflow.decorators import dag, task
from datetime import datetime
@dag(
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
)
def aggregate_results():
@task
def get_sales_a() -> int:
return 1000
@task
def get_sales_b() -> int:
return 2000
@task
def get_sales_c() -> int:
return 1500
@task
def aggregate(a: int, b: int, c: int):
total = a + b + c
print(f”A: {a}, B: {b}, C: {c}”)
print(f”合計: {total}”)
sales_a = get_sales_a()
sales_b = get_sales_b()
sales_c = get_sales_c()
aggregate(sales_a, sales_b, sales_c)
dag_instance = aggregate_results()
問題 9
発展
Variablesから設定を取得し、その設定に基づいてデータを処理するETLパイプラインを作成してください。
【解答】
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime
import pandas as pd
@dag(
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
)
def config_based_etl():
@task
def get_config() -> dict:
“””設定を取得”””
return {
‘tax_rate’: float(Variable.get(‘tax_rate’, default_var=’0.1′)),
‘output_dir’: Variable.get(‘output_dir’, default_var=’/tmp’)
}
@task
def extract(config: dict) -> str:
“””データ抽出”””
df = pd.DataFrame({
‘商品’: [‘A’, ‘B’, ‘C’],
‘価格’: [100, 200, 300]
})
path = f”{config[‘output_dir’]}/raw.csv”
df.to_csv(path, index=False)
return path
@task
def transform(path: str, config: dict) -> str:
“””データ変換”””
df = pd.read_csv(path)
df[‘税込’] = (df[‘価格’] * (1 + config[‘tax_rate’])).astype(int)
output_path = f”{config[‘output_dir’]}/processed.csv”
df.to_csv(output_path, index=False)
return output_path
@task
def load(path: str):
“””データロード”””
df = pd.read_csv(path)
print(f”処理完了: {len(df)}件”)
print(df)
config = get_config()
raw_path = extract(config)
processed_path = transform(raw_path, config)
load(processed_path)
dag_instance = config_based_etl()
問題 10
発展
複数のデータソースから並列でデータを抽出し、マージしてからロードするETLパイプラインを作成してください。
【解答】
from airflow.decorators import dag, task
from datetime import datetime
import pandas as pd
@dag(
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
)
def multi_source_etl():
@task
def extract_source_a() -> str:
“””ソースAからデータ抽出”””
df = pd.DataFrame({
‘id’: [1, 2, 3],
‘name’: [‘田中’, ‘鈴木’, ‘佐藤’]
})
path = ‘/tmp/source_a.csv’
df.to_csv(path, index=False)
return path
@task
def extract_source_b() -> str:
“””ソースBからデータ抽出”””
df = pd.DataFrame({
‘id’: [1, 2, 3],
‘sales’: [1000, 2000, 1500]
})
path = ‘/tmp/source_b.csv’
df.to_csv(path, index=False)
return path
@task
def merge_data(path_a: str, path_b: str) -> str:
“””データをマージ”””
df_a = pd.read_csv(path_a)
df_b = pd.read_csv(path_b)
merged = pd.merge(df_a, df_b, on=’id’)
output_path = ‘/tmp/merged.csv’
merged.to_csv(output_path, index=False)
print(f”マージ完了: {len(merged)}件”)
return output_path
@task
def load(path: str):
“””データロード”””
df = pd.read_csv(path)
print(“最終データ:”)
print(df)
# 並列で抽出
path_a = extract_source_a()
path_b = extract_source_b()
# マージしてロード
merged_path = merge_data(path_a, path_b)
load(merged_path)
dag_instance = multi_source_etl()