🚀 STEP 20: 初めてのDAG作成
実際にDAGファイルを書いて、Airflowで実行してみよう
📋 このステップで学ぶこと
- DAGファイルの構造
- Operatorの種類(PythonOperator、BashOperator)
- タスクの定義方法
- タスク依存関係の設定
- 実践演習:シンプルなDAG作成
⏱️ 学習時間の目安:3時間
📝 練習問題:10問(基礎4問・応用4問・発展2問)
🎯 1. DAGファイルの基本構造
1-1. DAGファイルとは?
DAGファイルは、ワークフローを定義するPythonファイルです。
$AIRFLOW_HOME/dags/ フォルダに配置します。
📚 例え話:料理のレシピ
DAGファイルは料理のレシピに似ています。
・DAG = レシピ全体(カレーの作り方)
・タスク = 各手順(野菜を切る、炒める、煮込む)
・依存関係 = 手順の順番(切ってから炒める)
・Operator = 調理器具(包丁、フライパン、鍋)
レシピを見れば、誰でも同じ料理が作れますよね?
DAGファイルも、誰が実行しても同じ処理が行われます。
1-2. 最小構成のDAG
# ===== 最小構成のDAG =====
# ファイル名: hello_dag.py
# 配置場所: $AIRFLOW_HOME/dags/hello_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# 実行する関数
def hello():
print(“Hello, Airflow!”)
# DAGの定義
dag = DAG(
dag_id=’my_first_dag’, # DAGの名前(一意である必要がある)
start_date=datetime(2024, 1, 1), # 開始日
schedule_interval=’@daily’, # 実行スケジュール
catchup=False # 過去の実行をスキップ
)
# タスクの定義
task = PythonOperator(
task_id=’hello_task’, # タスクの名前
python_callable=hello, # 実行する関数
dag=dag # 所属するDAG
)
【Airflow UIでの表示】
DAG: my_first_dag
├── タスク: hello_task
└── スケジュール: @daily(毎日0時に実行)
1-3. DAGパラメータの詳細
DAGの主要パラメータ
| パラメータ | 説明 | 必須 |
|---|---|---|
| dag_id | DAGの一意な名前 | ✅ 必須 |
| start_date | DAGの開始日 | ✅ 必須 |
| schedule_interval | 実行スケジュール | 推奨 |
| catchup | 過去の未実行分を実行するか | 推奨(False) |
| default_args | 全タスク共通のデフォルト設定 | 任意 |
| description | DAGの説明文 | 任意 |
| tags | タグ(分類用) | 任意 |
1-4. schedule_interval の指定方法
スケジュールの指定方法
| プリセット | cron形式 | 意味 |
|---|---|---|
@once |
– | 一度だけ実行 |
@hourly |
0 * * * * | 毎時0分に実行 |
@daily |
0 0 * * * | 毎日0時に実行 |
@weekly |
0 0 * * 0 | 毎週日曜0時に実行 |
@monthly |
0 0 1 * * | 毎月1日0時に実行 |
None |
– | 手動実行のみ |
💡 cron形式の読み方
* * * * *
┬ ┬ ┬ ┬ ┬
│ │ │ │ └─ 曜日(0-6、0=日曜)
│ │ │ └─── 月(1-12)
│ │ └───── 日(1-31)
│ └─────── 時(0-23)
└───────── 分(0-59)
例:
‘0 9 * * *’ → 毎日9時0分
‘*/15 * * * *’ → 15分ごと
‘0 0 1 * *’ → 毎月1日0時0分
‘0 9 * * 1-5’ → 平日(月〜金)9時0分
🔧 2. Operatorの種類
2-1. Operatorとは?
Operatorは、タスクの実行内容を定義するクラスです。
Python関数を実行したり、Bashコマンドを実行したりできます。
主なOperator
| Operator | 用途 | 使用頻度 |
|---|---|---|
| PythonOperator | Python関数を実行 | ⭐⭐⭐(最頻出) |
| BashOperator | Bashコマンドを実行 | ⭐⭐⭐ |
| EmailOperator | メールを送信 | ⭐⭐ |
| PostgresOperator | PostgreSQLでSQL実行 | ⭐⭐ |
| DummyOperator | 何もしない(フロー制御用) | ⭐⭐ |
| HttpOperator | HTTPリクエストを送信 | ⭐ |
2-2. PythonOperator の使い方
# ===== PythonOperatorの基本 =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data():
“””データを抽出する関数”””
print(“データを抽出中…”)
data = [1, 2, 3, 4, 5]
print(f”データ抽出完了: {data}”)
return data
def transform_data():
“””データを変換する関数”””
print(“データを変換中…”)
print(“データ変換完了”)
dag = DAG(
dag_id=’python_operator_example’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
)
# タスク1: データ抽出
task1 = PythonOperator(
task_id=’extract’,
python_callable=extract_data,
dag=dag
)
# タスク2: データ変換
task2 = PythonOperator(
task_id=’transform’,
python_callable=transform_data,
dag=dag
)
# 依存関係: task1 → task2
task1 >> task2
【実行ログ】
[2024-01-15 09:00:01] INFO – データを抽出中…
[2024-01-15 09:00:01] INFO – データ抽出完了: [1, 2, 3, 4, 5]
[2024-01-15 09:00:02] INFO – データを変換中…
[2024-01-15 09:00:02] INFO – データ変換完了
2-3. BashOperator の使い方
# ===== BashOperatorの基本 =====
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
dag = DAG(
dag_id=’bash_operator_example’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
)
# タスク1: ファイルを作成
task1 = BashOperator(
task_id=’create_file’,
bash_command=’echo “Hello, Airflow!” > /tmp/hello.txt’,
dag=dag
)
# タスク2: ファイルの内容を表示
task2 = BashOperator(
task_id=’show_file’,
bash_command=’cat /tmp/hello.txt’,
dag=dag
)
# タスク3: ファイルを削除
task3 = BashOperator(
task_id=’delete_file’,
bash_command=’rm /tmp/hello.txt’,
dag=dag
)
# 依存関係: task1 → task2 → task3
task1 >> task2 >> task3
【実行ログ】
[create_file] echo “Hello, Airflow!” > /tmp/hello.txt
[show_file] Hello, Airflow!
[delete_file] rm /tmp/hello.txt
2-4. 引数を渡す(op_kwargs)
# ===== 引数を渡す =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def greet(name, age):
“””引数を受け取る関数”””
print(f”こんにちは、{name}さん!”)
print(f”あなたは{age}歳ですね。”)
dag = DAG(
dag_id=’with_arguments’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
)
task = PythonOperator(
task_id=’greet_task’,
python_callable=greet,
op_kwargs={‘name’: ‘田中’, ‘age’: 30}, # 引数を辞書で渡す
dag=dag
)
【実行ログ】
こんにちは、田中さん!
あなたは30歳ですね。
🔗 3. タスク依存関係の設定
3-1. 依存関係の書き方
タスクの実行順序は、矢印(>> または <<)で指定します。
依存関係のパターン
【線形(順番に実行)】
A >> B >> C >> D
A → B → C → D
【並列実行】
A >> [B, C, D]
┌→ B
A → ┼→ C
└→ D
【結合】
[A, B, C] >> D
A →┐
B →┼→ D
C →┘
【分岐と結合】
A >> [B, C] >> D
┌→ B →┐
A → ┤ ├→ D
└→ C →┘
3-2. 方法1: >> 演算子(推奨)
# ===== >> 演算子 =====
# task1 の後に task2 を実行
task1 >> task2
# task1 の後に task2、task2 の後に task3 を実行
task1 >> task2 >> task3
# task1 の後に task2 と task3 を並列実行
task1 >> [task2, task3]
# task2 と task3 の後に task4 を実行
[task2, task3] >> task4
3-3. 複雑な依存関係の例
# ===== 複雑な依存関係 =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def task_function(task_name):
print(f”{task_name} を実行中…”)
dag = DAG(
dag_id=’complex_dependencies’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
)
# タスクを定義
start = PythonOperator(
task_id=’start’,
python_callable=task_function,
op_kwargs={‘task_name’: ‘Start’},
dag=dag
)
extract1 = PythonOperator(
task_id=’extract1′,
python_callable=task_function,
op_kwargs={‘task_name’: ‘Extract1′},
dag=dag
)
extract2 = PythonOperator(
task_id=’extract2’,
python_callable=task_function,
op_kwargs={‘task_name’: ‘Extract2′},
dag=dag
)
transform = PythonOperator(
task_id=’transform’,
python_callable=task_function,
op_kwargs={‘task_name’: ‘Transform’},
dag=dag
)
load = PythonOperator(
task_id=’load’,
python_callable=task_function,
op_kwargs={‘task_name’: ‘Load’},
dag=dag
)
end = PythonOperator(
task_id=’end’,
python_callable=task_function,
op_kwargs={‘task_name’: ‘End’},
dag=dag
)
# 依存関係を設定
# start
# / \
# extract1 extract2
# \ /
# transform
# |
# load
# |
# end
start >> [extract1, extract2] >> transform >> load >> end
【Airflow UIでのGraph表示】
┌─────────┐
│ start │
└────┬────┘
┌─────┴─────┐
↓ ↓
┌──────────┐ ┌──────────┐
│ extract1 │ │ extract2 │
└─────┬────┘ └────┬─────┘
└─────┬─────┘
↓
┌──────────┐
│transform │
└────┬─────┘
↓
┌─────────┐
│ load │
└────┬────┘
↓
┌─────────┐
│ end │
└─────────┘
💼 4. 実践演習:ETLパイプラインのDAG作成
4-1. シナリオ
以下のETLパイプラインをAirflowで実装します:
- CSVファイルからデータを抽出
- データを変換(税込価格を計算)
- 結果をファイルに保存
- 一時ファイルをクリーンアップ
4-2. 完全なDAGファイル
# ===== シンプルなETLパイプライン =====
# ファイル名: simple_etl_dag.py
# 配置場所: $AIRFLOW_HOME/dags/simple_etl_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import logging
import os
# ロガーを取得
logger = logging.getLogger(__name__)
# =====================================
# ETL関数の定義
# =====================================
def extract_data():
“””データ抽出”””
logger.info(“データ抽出を開始”)
# サンプルデータを作成
data = {
‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’, ‘ぶどう’],
‘価格’: [150, 100, 80, 300],
‘個数’: [10, 20, 15, 5]
}
df = pd.DataFrame(data)
df.to_csv(‘/tmp/extracted_data.csv’, index=False)
logger.info(f”データ抽出完了: {len(df)}件”)
return ‘/tmp/extracted_data.csv’
def transform_data():
“””データ変換”””
logger.info(“データ変換を開始”)
df = pd.read_csv(‘/tmp/extracted_data.csv’)
# 変換処理
df[‘税込価格’] = (df[‘価格’] * 1.1).astype(int)
df[‘合計金額’] = df[‘税込価格’] * df[‘個数’]
df.to_csv(‘/tmp/transformed_data.csv’, index=False)
logger.info(f”データ変換完了: {len(df)}件”)
return ‘/tmp/transformed_data.csv’
def load_data():
“””データロード”””
logger.info(“データロードを開始”)
df = pd.read_csv(‘/tmp/transformed_data.csv’)
output_path = ‘/tmp/final_output.csv’
df.to_csv(output_path, index=False)
logger.info(f”データロード完了: {output_path}”)
logger.info(f”\n{df.to_string()}”)
return output_path
def cleanup():
“””クリーンアップ”””
logger.info(“クリーンアップを開始”)
files_to_delete = [
‘/tmp/extracted_data.csv’,
‘/tmp/transformed_data.csv’
]
for file_path in files_to_delete:
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f”削除: {file_path}”)
logger.info(“クリーンアップ完了”)
# =====================================
# DAGの定義
# =====================================
default_args = {
‘owner’: ‘data_engineer’,
‘retries’: 2,
‘retry_delay’: timedelta(minutes=5)
}
dag = DAG(
dag_id=’simple_etl_pipeline’,
default_args=default_args,
description=’シンプルなETLパイプライン’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False,
tags=[‘etl’, ‘example’]
)
# =====================================
# タスクの定義
# =====================================
extract_task = PythonOperator(
task_id=’extract_data’,
python_callable=extract_data,
dag=dag
)
transform_task = PythonOperator(
task_id=’transform_data’,
python_callable=transform_data,
dag=dag
)
load_task = PythonOperator(
task_id=’load_data’,
python_callable=load_data,
dag=dag
)
cleanup_task = PythonOperator(
task_id=’cleanup’,
python_callable=cleanup,
dag=dag
)
# =====================================
# 依存関係の設定
# =====================================
extract_task >> transform_task >> load_task >> cleanup_task
4-3. DAGの配置と実行
# 1. DAGファイルを配置
cp simple_etl_dag.py $AIRFLOW_HOME/dags/
# 2. DAGが認識されたか確認
airflow dags list | grep simple_etl
# 3. DAGの構文チェック
python $AIRFLOW_HOME/dags/simple_etl_dag.py
# 4. タスク一覧を確認
airflow tasks list simple_etl_pipeline
# 5. 特定のタスクをテスト実行
airflow tasks test simple_etl_pipeline extract_data 2024-01-01
# 6. DAG全体を手動実行
airflow dags trigger simple_etl_pipeline
【実行結果】
$ airflow tasks list simple_etl_pipeline
extract_data
transform_data
load_data
cleanup
$ airflow tasks test simple_etl_pipeline extract_data 2024-01-01
[2024-01-15 09:00:00] INFO – データ抽出を開始
[2024-01-15 09:00:01] INFO – データ抽出完了: 4件
🔄 5. Context Managerを使った書き方(推奨)
5-1. より簡潔な書き方
Python 3.7以降では、with文を使ってより簡潔に書けます。
# ===== Context Managerを使った書き方 =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract():
print(“Extract”)
def transform():
print(“Transform”)
def load():
print(“Load”)
# Context Managerを使った書き方
with DAG(
dag_id=’etl_with_context’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
# タスク定義(dagパラメータ不要!)
extract_task = PythonOperator(
task_id=’extract’,
python_callable=extract
)
transform_task = PythonOperator(
task_id=’transform’,
python_callable=transform
)
load_task = PythonOperator(
task_id=’load’,
python_callable=load
)
# 依存関係
extract_task >> transform_task >> load_task
🎯 Context Managerのメリット
- コードが簡潔になる
- 各Operatorにdag=dagを書かなくて良い
- Python らしい書き方
- 公式ドキュメントでも推奨
⚠️ よくあるエラーと対処法
- DAGが表示されない:構文エラーがないか確認(python ファイル名.py)
- import error:必要なライブラリがインストールされているか確認
- タスクが実行されない:DAGがONになっているか、スケジュールを確認
📝 STEP 20 のまとめ
✅ このステップで学んだこと
- DAGファイルの構造:DAGとタスクの定義
- Operator:PythonOperator、BashOperator
- タスク依存関係:>>演算子で指定
- 実践演習:ETLパイプラインのDAG作成
- Context Manager:with文を使った簡潔な書き方
💡 重要ポイント
- DAGファイルはPythonコード
- タスクはOperatorで定義
- 依存関係は>>演算子で指定
- DAGはdagsフォルダに配置
- with文を使うと簡潔に書ける
🎯 次のステップの予告
次のSTEP 21では、「スケジュール実行とトリガー」を学びます。
- cronライクなスケジュール設定
- start_dateとschedule_intervalの理解
- バックフィル(過去実行)
📝 練習問題
問題 1
基礎
「Hello, World!」を出力するシンプルなDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def hello():
print(“Hello, World!”)
with DAG(
dag_id=’hello_world_dag’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
task = PythonOperator(
task_id=’hello_task’,
python_callable=hello
)
問題 2
基礎
3つのタスク(A → B → C)を順番に実行するDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def task_a():
print(“タスクA実行”)
def task_b():
print(“タスクB実行”)
def task_c():
print(“タスクC実行”)
with DAG(
dag_id=’sequential_tasks’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
a = PythonOperator(task_id=’task_a’, python_callable=task_a)
b = PythonOperator(task_id=’task_b’, python_callable=task_b)
c = PythonOperator(task_id=’task_c’, python_callable=task_c)
a >> b >> c
問題 3
基礎
BashOperatorを使って、「Hello from Bash!」と出力するDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id=’bash_hello’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
task = BashOperator(
task_id=’hello_bash’,
bash_command=’echo “Hello from Bash!”‘
)
問題 4
基礎
引数(name)を受け取って「こんにちは、{name}さん!」と出力するDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def greet(name):
print(f”こんにちは、{name}さん!”)
with DAG(
dag_id=’greet_dag’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
task = PythonOperator(
task_id=’greet_task’,
python_callable=greet,
op_kwargs={‘name’: ‘太郎’}
)
問題 5
応用
タスクAの後に、タスクBとCを並列実行し、その後タスクDを実行するDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def task_function(name):
print(f”{name}実行”)
with DAG(
dag_id=’parallel_tasks’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
a = PythonOperator(
task_id=’task_a’,
python_callable=task_function,
op_kwargs={‘name’: ‘タスクA’}
)
b = PythonOperator(
task_id=’task_b’,
python_callable=task_function,
op_kwargs={‘name’: ‘タスクB’}
)
c = PythonOperator(
task_id=’task_c’,
python_callable=task_function,
op_kwargs={‘name’: ‘タスクC’}
)
d = PythonOperator(
task_id=’task_d’,
python_callable=task_function,
op_kwargs={‘name’: ‘タスクD’}
)
# A → [B, C] → D
a >> [b, c] >> d
問題 6
応用
BashOperatorを使って、ファイルを作成・表示・削除するDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id=’bash_file_operations’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
create = BashOperator(
task_id=’create_file’,
bash_command=’echo “Airflowのテスト” > /tmp/test.txt’
)
show = BashOperator(
task_id=’show_file’,
bash_command=’cat /tmp/test.txt’
)
delete = BashOperator(
task_id=’delete_file’,
bash_command=’rm /tmp/test.txt’
)
create >> show >> delete
問題 7
応用
毎週月曜日の朝9時に実行されるDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def weekly_task():
print(“週次タスク実行”)
with DAG(
dag_id=’weekly_monday_dag’,
start_date=datetime(2024, 1, 1),
schedule_interval=’0 9 * * 1′, # 毎週月曜9時(cron形式)
catchup=False
) as dag:
task = PythonOperator(
task_id=’weekly_task’,
python_callable=weekly_task
)
問題 8
応用
default_argsを使って、リトライ回数3回、リトライ間隔1分を設定したDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def my_task():
print(“タスク実行”)
default_args = {
‘owner’: ‘data_engineer’,
‘retries’: 3,
‘retry_delay’: timedelta(minutes=1)
}
with DAG(
dag_id=’retry_dag’,
default_args=default_args,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
task = PythonOperator(
task_id=’my_task’,
python_callable=my_task
)
問題 9
発展
ETLパイプライン(Extract → Transform → Load)で、各タスクがログを出力し、最終タスクで処理件数をログに出力するDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
def extract():
logger.info(“=== Extract開始 ===”)
data = list(range(100))
logger.info(f”抽出件数: {len(data)}件”)
return data
def transform():
logger.info(“=== Transform開始 ===”)
logger.info(“データ変換処理中…”)
logger.info(“変換完了”)
def load():
logger.info(“=== Load開始 ===”)
logger.info(“データロード処理中…”)
processed_count = 100
logger.info(f”処理完了: {processed_count}件”)
default_args = {
‘owner’: ‘data_engineer’,
‘retries’: 2,
‘retry_delay’: timedelta(minutes=5)
}
with DAG(
dag_id=’etl_with_logging’,
default_args=default_args,
description=’ログ出力付きETLパイプライン’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False,
tags=[‘etl’, ‘logging’]
) as dag:
extract_task = PythonOperator(
task_id=’extract’,
python_callable=extract
)
transform_task = PythonOperator(
task_id=’transform’,
python_callable=transform
)
load_task = PythonOperator(
task_id=’load’,
python_callable=load
)
extract_task >> transform_task >> load_task
問題 10
発展
複数のデータソース(DB1、DB2、API)から並列でデータを抽出し、統合してからロードするDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def extract_db1():
print(“DB1からデータ抽出”)
def extract_db2():
print(“DB2からデータ抽出”)
def extract_api():
print(“APIからデータ抽出”)
def merge_data():
print(“3つのソースを統合”)
def transform():
print(“データ変換”)
def load():
print(“データロード”)
default_args = {
‘owner’: ‘data_engineer’,
‘retries’: 2,
‘retry_delay’: timedelta(minutes=5)
}
with DAG(
dag_id=’multi_source_etl’,
default_args=default_args,
description=’複数ソースからのETLパイプライン’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False,
tags=[‘etl’, ‘multi-source’]
) as dag:
# 抽出タスク(並列)
extract_db1_task = PythonOperator(
task_id=’extract_db1′,
python_callable=extract_db1
)
extract_db2_task = PythonOperator(
task_id=’extract_db2′,
python_callable=extract_db2
)
extract_api_task = PythonOperator(
task_id=’extract_api’,
python_callable=extract_api
)
# 統合タスク
merge_task = PythonOperator(
task_id=’merge_data’,
python_callable=merge_data
)
# 変換タスク
transform_task = PythonOperator(
task_id=’transform’,
python_callable=transform
)
# ロードタスク
load_task = PythonOperator(
task_id=’load’,
python_callable=load
)
# 依存関係
# [DB1, DB2, API] → Merge → Transform → Load
[extract_db1_task, extract_db2_task, extract_api_task] >> merge_task >> transform_task >> load_task
学習メモ
ETL・データパイプライン構築 - Step 20
📋 過去のメモ一覧
▼