STEP 22:タスク間のデータ受け渡し

🔄 STEP 22: タスク間のデータ受け渡し

XCom、Variables、Connectionsでデータを共有しよう

📋 このステップで学ぶこと

  • XCom(クロスコミュニケーション)の使い方
  • TaskFlow APIの使い方
  • Variables(変数)の管理
  • Connections(接続情報)の設定
  • 実践演習:タスク間でデータを引き継ぐDAG

⏱️ 学習時間の目安:3時間

📝 練習問題:10問(基礎4問・応用4問・発展2問)

🎯 1. タスク間データ共有の必要性

1-1. なぜデータ共有が必要か?

ETLパイプラインでは、前のタスクの結果次のタスクで使いたいことがよくあります。

📚 例え話:リレー走

タスク間のデータ受け渡しはリレー走のバトンに似ています。

タスクA(第1走者):データを抽出してバトン(ファイルパス)を渡す
タスクB(第2走者):バトンを受け取り、データを変換して次へ
タスクC(第3走者):バトンを受け取り、最終処理

バトンがなければリレーは成立しません。
同様に、データの受け渡しがなければパイプラインは機能しません。

1-2. Airflowのデータ共有方法

データ共有の4つの方法
方法 特徴 データサイズ 用途
XCom タスク間で小さなデータを受け渡し 〜数MB 結果値、パス
ファイル 大きなデータはファイルに保存 無制限 大規模データ
Variables DAG全体で使う設定値 〜数KB 設定、URL
Connections DB・APIの接続情報 認証情報
データ共有の流れ
【XComによるタスク間データ共有】

┌─────────┐      XCom       ┌─────────┐      XCom       ┌─────────┐
│ Extract │ ─────────────→ │Transform│ ─────────────→ │  Load   │
│         │  ファイルパス    │         │  変換後パス     │         │
└─────────┘                 └─────────┘                 └─────────┘
    │                           │                           │
    ▼                           ▼                           ▼
/tmp/raw.csv              /tmp/trans.csv              /tmp/final.csv

【Variables と Connections】

┌─────────────────────────────────────────────────────────────────┐
│                        Airflow メタデータDB                       │
│  ┌──────────────┐        ┌──────────────┐                      │
│  │  Variables   │        │ Connections  │                      │
│  │ ・S3バケット  │        │ ・DB接続情報  │                      │
│  │ ・API URL    │        │ ・API認証     │                      │
│  └──────────────┘        └──────────────┘                      │
└─────────────────────────────────────────────────────────────────┘
        ↓ 参照                     ↓ 参照
    全タスクから利用可能        全タスクから利用可能
        

📤 2. XCom(クロスコミュニケーション)

2-1. XComとは?

XCom(Cross Communication)は、タスク間で小さなデータを受け渡す仕組みです。
文字列、数値、リスト、辞書などを渡せます。

💡 XComの制限
  • サイズ制限:数KB〜数MB(DBによる)
  • 大きなデータ(DataFrame全体など)は不向き
  • ファイルパスや集計結果など小さなデータに使う

2-2. XComの基本的な使い方

# ===== XComの基本 ===== from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def push_data(**context): “””データをプッシュ(送信)””” # データを返すと自動的にXComにプッシュされる return “Hello from task1” def pull_data(**context): “””データをプル(取得)””” ti = context[‘ti’] # TaskInstance # 前のタスクからデータを取得 data = ti.xcom_pull(task_ids=’push_task’) print(f”受信したデータ: {data}”) with DAG( dag_id=’xcom_example’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: task1 = PythonOperator( task_id=’push_task’, python_callable=push_data ) task2 = PythonOperator( task_id=’pull_task’, python_callable=pull_data ) task1 >> task2
【実行ログ】 [push_task] タスク完了、戻り値: “Hello from task1” [pull_task] 受信したデータ: Hello from task1

2-3. 様々なデータ型を渡す

# ===== 様々なデータ型をXComで渡す ===== def push_various_data(**context): “””様々なデータ型をプッシュ””” ti = context[‘ti’] # 文字列 ti.xcom_push(key=’string_data’, value=’Hello’) # 数値 ti.xcom_push(key=’number_data’, value=123) # リスト ti.xcom_push(key=’list_data’, value=[1, 2, 3, 4, 5]) # 辞書 ti.xcom_push(key=’dict_data’, value={ ‘name’: ‘田中’, ‘age’: 30, ‘city’: ‘東京’ }) # デフォルトのreturn値もプッシュされる return “All data pushed” def pull_various_data(**context): “””様々なデータ型をプル””” ti = context[‘ti’] # キーを指定してプル string_val = ti.xcom_pull(task_ids=’push_task’, key=’string_data’) number_val = ti.xcom_pull(task_ids=’push_task’, key=’number_data’) list_val = ti.xcom_pull(task_ids=’push_task’, key=’list_data’) dict_val = ti.xcom_pull(task_ids=’push_task’, key=’dict_data’) # キーを指定しない場合はreturn値を取得 default_val = ti.xcom_pull(task_ids=’push_task’) print(f”String: {string_val}”) print(f”Number: {number_val}”) print(f”List: {list_val}”) print(f”Dict: {dict_val}”) print(f”Default: {default_val}”)
【実行ログ】 String: Hello Number: 123 List: [1, 2, 3, 4, 5] Dict: {‘name’: ‘田中’, ‘age’: 30, ‘city’: ‘東京’} Default: All data pushed

2-4. 実用例:ファイルパスの受け渡し

# ===== ファイルパスの受け渡し ===== import pandas as pd def extract_data(**context): “””データを抽出してファイルパスを返す””” df = pd.DataFrame({ ‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’], ‘価格’: [150, 100, 80] }) file_path = ‘/tmp/extracted_data.csv’ df.to_csv(file_path, index=False) print(f”データを保存: {file_path}”) # ファイルパスを返す(XComに自動プッシュ) return file_path def transform_data(**context): “””ファイルを読み込んで変換””” ti = context[‘ti’] # 前のタスクからファイルパスを取得 file_path = ti.xcom_pull(task_ids=’extract’) print(f”ファイルを読み込み: {file_path}”) df = pd.read_csv(file_path) df[‘税込価格’] = (df[‘価格’] * 1.1).astype(int) output_path = ‘/tmp/transformed_data.csv’ df.to_csv(output_path, index=False) print(f”変換後データを保存: {output_path}”) return output_path with DAG( dag_id=’file_path_xcom’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: extract = PythonOperator( task_id=’extract’, python_callable=extract_data ) transform = PythonOperator( task_id=’transform’, python_callable=transform_data ) extract >> transform
【実行ログ】 [extract] データを保存: /tmp/extracted_data.csv [transform] ファイルを読み込み: /tmp/extracted_data.csv [transform] 変換後データを保存: /tmp/transformed_data.csv

⚡ 3. TaskFlow API(推奨)

3-1. TaskFlow APIとは?

TaskFlow APIは、XComをもっと簡単に使うための新しいAPI(Airflow 2.0以降)です。
デコレータ(@task)を使って、Pythonの関数をそのままタスクにできます。

3-2. 従来の方法 vs TaskFlow API

# ===== 従来の方法 vs TaskFlow API ===== # ❌ 従来の方法(冗長) def extract(**context): ti = context[‘ti’] data = “some data” ti.xcom_push(key=’data’, value=data) def transform(**context): ti = context[‘ti’] data = ti.xcom_pull(task_ids=’extract’, key=’data’) # 処理… # ✅ TaskFlow API(シンプル) from airflow.decorators import task @task def extract(): return “some data” # 自動でXComにプッシュ @task def transform(data): # 引数で自動的に受け取り # 処理… pass # 実行(関数呼び出しのように書ける) data = extract() transform(data)

3-3. TaskFlow APIの基本

# ===== TaskFlow APIの基本 ===== from airflow.decorators import dag, task from datetime import datetime @dag( start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) def taskflow_example(): @task def extract(): “””データ抽出””” data = [1, 2, 3, 4, 5] print(f”抽出: {data}”) return data @task def transform(data): “””データ変換””” transformed = [x * 2 for x in data] print(f”変換: {transformed}”) return transformed @task def load(data): “””データロード””” print(f”ロード: {data}”) total = sum(data) print(f”合計: {total}”) return total # DAGの実行フロー(関数呼び出しのように書ける) extracted_data = extract() transformed_data = transform(extracted_data) load(transformed_data) # DAGを登録 dag_instance = taskflow_example()
【実行ログ】 [extract] 抽出: [1, 2, 3, 4, 5] [transform] 変換: [2, 4, 6, 8, 10] [load] ロード: [2, 4, 6, 8, 10] [load] 合計: 30
🎯 TaskFlow APIのメリット
  • XComのpush/pull を意識しなくて良い
  • Pythonの関数のように書ける
  • コードがシンプルで読みやすい
  • 型ヒントが使える

3-4. 実用例:ETLパイプライン

# ===== TaskFlow APIを使ったETLパイプライン ===== from airflow.decorators import dag, task from datetime import datetime import pandas as pd @dag( start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False, tags=[‘etl’, ‘taskflow’] ) def etl_with_taskflow(): “””TaskFlow APIを使ったETLパイプライン””” @task def extract() -> str: “””データ抽出””” df = pd.DataFrame({ ‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’, ‘ぶどう’], ‘価格’: [150, 100, 80, 300], ‘個数’: [10, 20, 15, 5] }) file_path = ‘/tmp/extracted.csv’ df.to_csv(file_path, index=False) print(f”✅ データ抽出: {len(df)}件”) return file_path @task def transform(file_path: str) -> str: “””データ変換””” df = pd.read_csv(file_path) df[‘税込価格’] = (df[‘価格’] * 1.1).astype(int) df[‘合計金額’] = df[‘税込価格’] * df[‘個数’] output_path = ‘/tmp/transformed.csv’ df.to_csv(output_path, index=False) print(f”✅ データ変換: {len(df)}件”) return output_path @task def load(file_path: str) -> dict: “””データロード””” df = pd.read_csv(file_path) final_path = ‘/tmp/final_output.csv’ df.to_csv(final_path, index=False) summary = { ‘件数’: len(df), ‘総売上’: int(df[‘合計金額’].sum()), ‘ファイル’: final_path } print(f”✅ データロード: {summary}”) return summary # ETLフローの実行 extracted = extract() transformed = transform(extracted) result = load(transformed) # DAGを登録 dag_instance = etl_with_taskflow()
【実行ログ】 [extract] ✅ データ抽出: 4件 [transform] ✅ データ変換: 4件 [load] ✅ データロード: {‘件数’: 4, ‘総売上’: 5775, ‘ファイル’: ‘/tmp/final_output.csv’}

⚙️ 4. Variables(変数)

4-1. Variablesとは?

Variablesは、DAG全体で使う設定値を保存する仕組みです。
環境変数のように使えます。

📊 Variables の使い所
  • APIのエンドポイントURL
  • S3バケット名
  • メール送信先アドレス
  • 環境別の設定(dev/prod)

4-2. Web UIでVariableを設定

# ===== Web UIでの設定方法 ===== # 1. Web UI (http://localhost:8080) にアクセス # 2. 上部メニュー「Admin」→「Variables」をクリック # 3. 「+」ボタンで新規作成 # 4. Keyとvalueを入力 # 例: Key=”s3_bucket”, Val=”my-data-bucket” # 5. 「Save」をクリック

4-3. PythonコードでVariableを使う

# ===== PythonでVariableを使う ===== from airflow.models import Variable from airflow.decorators import dag, task from datetime import datetime @dag( start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) def use_variables(): @task def get_config(): “””Variableから設定を取得””” # Variableを取得 s3_bucket = Variable.get(‘s3_bucket’) api_url = Variable.get(‘api_url’) print(f”S3バケット: {s3_bucket}”) print(f”API URL: {api_url}”) # デフォルト値を指定 timeout = Variable.get(‘timeout’, default_var=30) print(f”タイムアウト: {timeout}秒”) get_config() dag_instance = use_variables()
【実行ログ】 S3バケット: my-data-bucket API URL: https://api.example.com タイムアウト: 30秒

4-4. JSONでVariableを保存

# ===== JSONでVariableを保存 ===== from airflow.models import Variable # Variable(Web UIで設定): # Key: “app_config” # Val: {“database”: {“host”: “localhost”, “port”: 5432}, “s3”: {“bucket”: “my-bucket”}} @task def use_json_variable(): # JSONとして取得 config = Variable.get(‘app_config’, deserialize_json=True) print(f”DB Host: {config[‘database’][‘host’]}”) print(f”DB Port: {config[‘database’][‘port’]}”) print(f”S3 Bucket: {config[‘s3’][‘bucket’]}”)

4-5. CLIでVariableを設定

# Variable を設定 airflow variables set s3_bucket my-data-bucket # Variable を取得 airflow variables get s3_bucket # Variable を削除 airflow variables delete s3_bucket # JSONファイルからインポート airflow variables import variables.json # 全Variableをエクスポート airflow variables export variables.json
⚠️ 機密情報の取り扱い

パスワードやAPIキーなど、機密情報は暗号化して保存しましょう。
Variable名に以下のキーワードを含めると自動で暗号化されます:
password, secret, passwd, api_key, apikey, access_token

🔌 5. Connections(接続情報)

5-1. Connectionsとは?

Connectionsは、データベースやAPIの接続情報を保存する仕組みです。

Connectionの主要フィールド
フィールド 説明
Conn Id 接続の名前(一意) my_postgres
Conn Type 接続タイプ Postgres, MySQL, HTTP
Host ホスト名 localhost
Schema データベース名 mydb
Login ユーザー名 postgres
Password パスワード mypassword
Port ポート番号 5432

5-2. PythonコードでConnectionを使う

# ===== Connectionを使ってDBに接続 ===== from airflow.hooks.base import BaseHook from airflow.decorators import dag, task from datetime import datetime import pandas as pd @dag( start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) def use_connection(): @task def query_database(): “””Connectionを使ってDBに接続””” # Connectionを取得 conn = BaseHook.get_connection(‘my_postgres’) # 接続文字列を作成 connection_string = ( f”postgresql://{conn.login}:{conn.password}” f”@{conn.host}:{conn.port}/{conn.schema}” ) print(f”接続先: {conn.host}:{conn.port}/{conn.schema}”) # SQLAlchemyで接続 from sqlalchemy import create_engine engine = create_engine(connection_string) # SQLクエリを実行 df = pd.read_sql(“SELECT * FROM users LIMIT 10″, engine) print(f”取得件数: {len(df)}”) return len(df) query_database() dag_instance = use_connection()
【実行ログ】 接続先: localhost:5432/mydb 取得件数: 10

5-3. CLIでConnectionを設定

# Connection を設定 airflow connections add my_postgres \ –conn-type postgres \ –conn-host localhost \ –conn-schema mydb \ –conn-login postgres \ –conn-password mypassword \ –conn-port 5432 # Connection を取得 airflow connections get my_postgres # Connection を削除 airflow connections delete my_postgres

💼 6. 実践演習:完全なETLパイプライン

6-1. 要件

  • Variablesから設定を取得
  • TaskFlow APIでETLを実装
  • タスク間でデータを受け渡し
  • 結果をログに出力

6-2. 完全な実装

# ===== 完全なETLパイプライン ===== from airflow.decorators import dag, task from airflow.models import Variable from datetime import datetime, timedelta import pandas as pd import logging logger = logging.getLogger(__name__) @dag( dag_id=’complete_etl_pipeline’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False, tags=[‘etl’, ‘taskflow’, ‘complete’] ) def complete_etl_pipeline(): “””Variables + TaskFlow APIを使った完全なETLパイプライン””” @task def get_config() -> dict: “””Variablesから設定を取得””” config = { ‘output_dir’: Variable.get(‘output_dir’, default_var=’/tmp’), ‘tax_rate’: float(Variable.get(‘tax_rate’, default_var=’0.1′)) } logger.info(f”設定を取得: {config}”) return config @task def extract(config: dict) -> str: “””データ抽出””” df = pd.DataFrame({ ‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’, ‘ぶどう’, ‘いちご’], ‘価格’: [150, 100, 80, 300, 250], ‘個数’: [10, 20, 15, 5, 8] }) file_path = f”{config[‘output_dir’]}/extracted.csv” df.to_csv(file_path, index=False) logger.info(f”✅ Extract完了: {len(df)}件 → {file_path}”) return file_path @task def transform(file_path: str, config: dict) -> str: “””データ変換””” df = pd.read_csv(file_path) tax_rate = config[‘tax_rate’] df[‘税込価格’] = (df[‘価格’] * (1 + tax_rate)).astype(int) df[‘合計金額’] = df[‘税込価格’] * df[‘個数’] output_path = f”{config[‘output_dir’]}/transformed.csv” df.to_csv(output_path, index=False) logger.info(f”✅ Transform完了: {len(df)}件 → {output_path}”) return output_path @task def load(file_path: str, config: dict) -> dict: “””データロード””” df = pd.read_csv(file_path) final_path = f”{config[‘output_dir’]}/final_report.csv” df.to_csv(final_path, index=False) summary = { ‘件数’: len(df), ‘総売上’: int(df[‘合計金額’].sum()), ‘平均単価’: int(df[‘価格’].mean()), ‘ファイル’: final_path } logger.info(f”✅ Load完了: {summary}”) return summary @task def notify(summary: dict): “””完了通知””” logger.info(“=” * 50) logger.info(“📊 ETLパイプライン完了レポート”) logger.info(“=” * 50) logger.info(f” 処理件数: {summary[‘件数’]}件”) logger.info(f” 総売上: ¥{summary[‘総売上’]:,}”) logger.info(f” 平均単価: ¥{summary[‘平均単価’]:,}”) logger.info(f” 出力先: {summary[‘ファイル’]}”) logger.info(“=” * 50) # ETLフローの実行 config = get_config() extracted = extract(config) transformed = transform(extracted, config) summary = load(transformed, config) notify(summary) # DAGを登録 dag_instance = complete_etl_pipeline()
【実行ログ】 [get_config] 設定を取得: {‘output_dir’: ‘/tmp’, ‘tax_rate’: 0.1} [extract] ✅ Extract完了: 5件 → /tmp/extracted.csv [transform] ✅ Transform完了: 5件 → /tmp/transformed.csv [load] ✅ Load完了: {‘件数’: 5, ‘総売上’: 7865, ‘平均単価’: 176, ‘ファイル’: ‘/tmp/final_report.csv’} [notify] ================================================== [notify] 📊 ETLパイプライン完了レポート [notify] ================================================== [notify] 処理件数: 5件 [notify] 総売上: ¥7,865 [notify] 平均単価: ¥176 [notify] 出力先: /tmp/final_report.csv [notify] ==================================================

📝 STEP 22 のまとめ

✅ このステップで学んだこと
  • XCom:タスク間で小さなデータを受け渡し
  • TaskFlow API:XComをシンプルに使える(@task)
  • Variables:DAG全体で使う設定値
  • Connections:データベースやAPIの接続情報
💡 重要ポイント
  • 小さなデータはXCom、大きなデータはファイル
  • TaskFlow APIを使うとコードが簡潔
  • 設定値はVariablesで管理
  • 接続情報はConnectionsで管理
  • 機密情報は暗号化して保存
🎯 次のステップの予告

次のSTEP 23では、「Operatorの活用」を学びます。

  • PythonOperatorの詳細
  • BashOperatorの活用
  • EmailOperator(通知)
  • カスタムOperatorの作成

📝 練習問題

問題 1 基礎

XComを使って、タスクAからタスクBに文字列「Hello」を渡すDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def push_hello(): return “Hello” def pull_hello(**context): ti = context[‘ti’] message = ti.xcom_pull(task_ids=’task_a’) print(f”受信: {message}”) with DAG( dag_id=’xcom_hello’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: task_a = PythonOperator( task_id=’task_a’, python_callable=push_hello ) task_b = PythonOperator( task_id=’task_b’, python_callable=pull_hello ) task_a >> task_b
問題 2 基礎

TaskFlow APIを使って、数値を2倍にするシンプルなDAGを作成してください。

【解答】
from airflow.decorators import dag, task from datetime import datetime @dag( start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) def double_number(): @task def get_number(): return 5 @task def double(num): result = num * 2 print(f”結果: {result}”) return result number = get_number() double(number) dag_instance = double_number()
問題 3 基礎

xcom_pushでキー「count」と値100を送信し、xcom_pullで受け取るDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def push_count(**context): ti = context[‘ti’] ti.xcom_push(key=’count’, value=100) def pull_count(**context): ti = context[‘ti’] count = ti.xcom_pull(task_ids=’push_task’, key=’count’) print(f”受信したcount: {count}”) with DAG( dag_id=’xcom_key_value’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: push_task = PythonOperator( task_id=’push_task’, python_callable=push_count ) pull_task = PythonOperator( task_id=’pull_task’, python_callable=pull_count ) push_task >> pull_task
問題 4 基礎

Variable.getでデフォルト値を指定して設定を取得するタスクを作成してください。

【解答】
from airflow.decorators import dag, task from airflow.models import Variable from datetime import datetime @dag( start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) def use_variable_default(): @task def get_settings(): # デフォルト値を指定 timeout = Variable.get(‘timeout’, default_var=30) batch_size = Variable.get(‘batch_size’, default_var=1000) print(f”タイムアウト: {timeout}秒”) print(f”バッチサイズ: {batch_size}”) get_settings() dag_instance = use_variable_default()
問題 5 応用

TaskFlow APIで、リストの合計を計算するETLパイプラインを作成してください。

【解答】
from airflow.decorators import dag, task from datetime import datetime @dag( start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) def sum_pipeline(): @task def extract() -> list: “””データ抽出””” data = [10, 20, 30, 40, 50] print(f”抽出: {data}”) return data @task def transform(data: list) -> list: “””データ変換(2倍)””” transformed = [x * 2 for x in data] print(f”変換: {transformed}”) return transformed @task def load(data: list) -> int: “””データロード(合計計算)””” total = sum(data) print(f”合計: {total}”) return total extracted = extract() transformed = transform(extracted) load(transformed) dag_instance = sum_pipeline()
問題 6 応用

辞書をXComで渡して、受信側で各キーの値を出力するDAGを作成してください。

【解答】
from airflow.decorators import dag, task from datetime import datetime @dag( start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) def dict_xcom(): @task def create_dict() -> dict: “””辞書を作成””” return { ‘name’: ‘田中’, ‘age’: 30, ‘city’: ‘東京’ } @task def print_dict(data: dict): “””辞書の内容を出力””” print(f”名前: {data[‘name’]}”) print(f”年齢: {data[‘age’]}”) print(f”都市: {data[‘city’]}”) data = create_dict() print_dict(data) dag_instance = dict_xcom()
問題 7 応用

ファイルパスをXComで渡し、次のタスクでそのファイルを読み込むDAGを作成してください。

【解答】
from airflow.decorators import dag, task from datetime import datetime import pandas as pd @dag( start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) def file_path_pass(): @task def create_file() -> str: “””ファイルを作成””” df = pd.DataFrame({ ‘商品’: [‘A’, ‘B’, ‘C’], ‘価格’: [100, 200, 300] }) file_path = ‘/tmp/data.csv’ df.to_csv(file_path, index=False) print(f”ファイル作成: {file_path}”) return file_path @task def read_file(file_path: str): “””ファイルを読み込み””” df = pd.read_csv(file_path) print(f”読み込み件数: {len(df)}”) print(df) path = create_file() read_file(path) dag_instance = file_path_pass()
問題 8 応用

複数のタスクからの結果を1つのタスクで集約するDAGを作成してください。

【解答】
from airflow.decorators import dag, task from datetime import datetime @dag( start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) def aggregate_results(): @task def get_sales_a() -> int: return 1000 @task def get_sales_b() -> int: return 2000 @task def get_sales_c() -> int: return 1500 @task def aggregate(a: int, b: int, c: int): total = a + b + c print(f”A: {a}, B: {b}, C: {c}”) print(f”合計: {total}”) sales_a = get_sales_a() sales_b = get_sales_b() sales_c = get_sales_c() aggregate(sales_a, sales_b, sales_c) dag_instance = aggregate_results()
問題 9 発展

Variablesから設定を取得し、その設定に基づいてデータを処理するETLパイプラインを作成してください。

【解答】
from airflow.decorators import dag, task from airflow.models import Variable from datetime import datetime import pandas as pd @dag( start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) def config_based_etl(): @task def get_config() -> dict: “””設定を取得””” return { ‘tax_rate’: float(Variable.get(‘tax_rate’, default_var=’0.1′)), ‘output_dir’: Variable.get(‘output_dir’, default_var=’/tmp’) } @task def extract(config: dict) -> str: “””データ抽出””” df = pd.DataFrame({ ‘商品’: [‘A’, ‘B’, ‘C’], ‘価格’: [100, 200, 300] }) path = f”{config[‘output_dir’]}/raw.csv” df.to_csv(path, index=False) return path @task def transform(path: str, config: dict) -> str: “””データ変換””” df = pd.read_csv(path) df[‘税込’] = (df[‘価格’] * (1 + config[‘tax_rate’])).astype(int) output_path = f”{config[‘output_dir’]}/processed.csv” df.to_csv(output_path, index=False) return output_path @task def load(path: str): “””データロード””” df = pd.read_csv(path) print(f”処理完了: {len(df)}件”) print(df) config = get_config() raw_path = extract(config) processed_path = transform(raw_path, config) load(processed_path) dag_instance = config_based_etl()
問題 10 発展

複数のデータソースから並列でデータを抽出し、マージしてからロードするETLパイプラインを作成してください。

【解答】
from airflow.decorators import dag, task from datetime import datetime import pandas as pd @dag( start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) def multi_source_etl(): @task def extract_source_a() -> str: “””ソースAからデータ抽出””” df = pd.DataFrame({ ‘id’: [1, 2, 3], ‘name’: [‘田中’, ‘鈴木’, ‘佐藤’] }) path = ‘/tmp/source_a.csv’ df.to_csv(path, index=False) return path @task def extract_source_b() -> str: “””ソースBからデータ抽出””” df = pd.DataFrame({ ‘id’: [1, 2, 3], ‘sales’: [1000, 2000, 1500] }) path = ‘/tmp/source_b.csv’ df.to_csv(path, index=False) return path @task def merge_data(path_a: str, path_b: str) -> str: “””データをマージ””” df_a = pd.read_csv(path_a) df_b = pd.read_csv(path_b) merged = pd.merge(df_a, df_b, on=’id’) output_path = ‘/tmp/merged.csv’ merged.to_csv(output_path, index=False) print(f”マージ完了: {len(merged)}件”) return output_path @task def load(path: str): “””データロード””” df = pd.read_csv(path) print(“最終データ:”) print(df) # 並列で抽出 path_a = extract_source_a() path_b = extract_source_b() # マージしてロード merged_path = merge_data(path_a, path_b) load(merged_path) dag_instance = multi_source_etl()
📝

学習メモ

ETL・データパイプライン構築 - Step 22

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE