STEP 20:初めてのDAG作成

🚀 STEP 20: 初めてのDAG作成

実際にDAGファイルを書いて、Airflowで実行してみよう

📋 このステップで学ぶこと

  • DAGファイルの構造
  • Operatorの種類(PythonOperator、BashOperator)
  • タスクの定義方法
  • タスク依存関係の設定
  • 実践演習:シンプルなDAG作成

⏱️ 学習時間の目安:3時間

📝 練習問題:10問(基礎4問・応用4問・発展2問)

🎯 1. DAGファイルの基本構造

1-1. DAGファイルとは?

DAGファイルは、ワークフローを定義するPythonファイルです。
$AIRFLOW_HOME/dags/ フォルダに配置します。

📚 例え話:料理のレシピ

DAGファイルは料理のレシピに似ています。

DAG = レシピ全体(カレーの作り方)
タスク = 各手順(野菜を切る、炒める、煮込む)
依存関係 = 手順の順番(切ってから炒める)
Operator = 調理器具(包丁、フライパン、鍋)

レシピを見れば、誰でも同じ料理が作れますよね?
DAGファイルも、誰が実行しても同じ処理が行われます。

1-2. 最小構成のDAG

# ===== 最小構成のDAG ===== # ファイル名: hello_dag.py # 配置場所: $AIRFLOW_HOME/dags/hello_dag.py from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime # 実行する関数 def hello(): print(“Hello, Airflow!”) # DAGの定義 dag = DAG( dag_id=’my_first_dag’, # DAGの名前(一意である必要がある) start_date=datetime(2024, 1, 1), # 開始日 schedule_interval=’@daily’, # 実行スケジュール catchup=False # 過去の実行をスキップ ) # タスクの定義 task = PythonOperator( task_id=’hello_task’, # タスクの名前 python_callable=hello, # 実行する関数 dag=dag # 所属するDAG )
【Airflow UIでの表示】 DAG: my_first_dag ├── タスク: hello_task └── スケジュール: @daily(毎日0時に実行)

1-3. DAGパラメータの詳細

DAGの主要パラメータ
パラメータ 説明 必須
dag_id DAGの一意な名前 ✅ 必須
start_date DAGの開始日 ✅ 必須
schedule_interval 実行スケジュール 推奨
catchup 過去の未実行分を実行するか 推奨(False)
default_args 全タスク共通のデフォルト設定 任意
description DAGの説明文 任意
tags タグ(分類用) 任意

1-4. schedule_interval の指定方法

スケジュールの指定方法
プリセット cron形式 意味
@once 一度だけ実行
@hourly 0 * * * * 毎時0分に実行
@daily 0 0 * * * 毎日0時に実行
@weekly 0 0 * * 0 毎週日曜0時に実行
@monthly 0 0 1 * * 毎月1日0時に実行
None 手動実行のみ
💡 cron形式の読み方
* * * * * ┬ ┬ ┬ ┬ ┬ │ │ │ │ └─ 曜日(0-6、0=日曜) │ │ │ └─── 月(1-12) │ │ └───── 日(1-31) │ └─────── 時(0-23) └───────── 分(0-59) 例: ‘0 9 * * *’ → 毎日9時0分 ‘*/15 * * * *’ → 15分ごと ‘0 0 1 * *’ → 毎月1日0時0分 ‘0 9 * * 1-5’ → 平日(月〜金)9時0分

🔧 2. Operatorの種類

2-1. Operatorとは?

Operatorは、タスクの実行内容を定義するクラスです。
Python関数を実行したり、Bashコマンドを実行したりできます。

主なOperator
Operator 用途 使用頻度
PythonOperator Python関数を実行 ⭐⭐⭐(最頻出)
BashOperator Bashコマンドを実行 ⭐⭐⭐
EmailOperator メールを送信 ⭐⭐
PostgresOperator PostgreSQLでSQL実行 ⭐⭐
DummyOperator 何もしない(フロー制御用) ⭐⭐
HttpOperator HTTPリクエストを送信

2-2. PythonOperator の使い方

# ===== PythonOperatorの基本 ===== from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def extract_data(): “””データを抽出する関数””” print(“データを抽出中…”) data = [1, 2, 3, 4, 5] print(f”データ抽出完了: {data}”) return data def transform_data(): “””データを変換する関数””” print(“データを変換中…”) print(“データ変換完了”) dag = DAG( dag_id=’python_operator_example’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) # タスク1: データ抽出 task1 = PythonOperator( task_id=’extract’, python_callable=extract_data, dag=dag ) # タスク2: データ変換 task2 = PythonOperator( task_id=’transform’, python_callable=transform_data, dag=dag ) # 依存関係: task1 → task2 task1 >> task2
【実行ログ】 [2024-01-15 09:00:01] INFO – データを抽出中… [2024-01-15 09:00:01] INFO – データ抽出完了: [1, 2, 3, 4, 5] [2024-01-15 09:00:02] INFO – データを変換中… [2024-01-15 09:00:02] INFO – データ変換完了

2-3. BashOperator の使い方

# ===== BashOperatorの基本 ===== from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime dag = DAG( dag_id=’bash_operator_example’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) # タスク1: ファイルを作成 task1 = BashOperator( task_id=’create_file’, bash_command=’echo “Hello, Airflow!” > /tmp/hello.txt’, dag=dag ) # タスク2: ファイルの内容を表示 task2 = BashOperator( task_id=’show_file’, bash_command=’cat /tmp/hello.txt’, dag=dag ) # タスク3: ファイルを削除 task3 = BashOperator( task_id=’delete_file’, bash_command=’rm /tmp/hello.txt’, dag=dag ) # 依存関係: task1 → task2 → task3 task1 >> task2 >> task3
【実行ログ】 [create_file] echo “Hello, Airflow!” > /tmp/hello.txt [show_file] Hello, Airflow! [delete_file] rm /tmp/hello.txt

2-4. 引数を渡す(op_kwargs)

# ===== 引数を渡す ===== from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def greet(name, age): “””引数を受け取る関数””” print(f”こんにちは、{name}さん!”) print(f”あなたは{age}歳ですね。”) dag = DAG( dag_id=’with_arguments’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) task = PythonOperator( task_id=’greet_task’, python_callable=greet, op_kwargs={‘name’: ‘田中’, ‘age’: 30}, # 引数を辞書で渡す dag=dag )
【実行ログ】 こんにちは、田中さん! あなたは30歳ですね。

🔗 3. タスク依存関係の設定

3-1. 依存関係の書き方

タスクの実行順序は、矢印(>> または <<)で指定します。

依存関係のパターン
【線形(順番に実行)】
A >> B >> C >> D

  A → B → C → D

【並列実行】
A >> [B, C, D]

      ┌→ B
  A → ┼→ C
      └→ D

【結合】
[A, B, C] >> D

  A →┐
  B →┼→ D
  C →┘

【分岐と結合】
A >> [B, C] >> D

      ┌→ B →┐
  A → ┤      ├→ D
      └→ C →┘
        

3-2. 方法1: >> 演算子(推奨)

# ===== >> 演算子 ===== # task1 の後に task2 を実行 task1 >> task2 # task1 の後に task2、task2 の後に task3 を実行 task1 >> task2 >> task3 # task1 の後に task2 と task3 を並列実行 task1 >> [task2, task3] # task2 と task3 の後に task4 を実行 [task2, task3] >> task4

3-3. 複雑な依存関係の例

# ===== 複雑な依存関係 ===== from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def task_function(task_name): print(f”{task_name} を実行中…”) dag = DAG( dag_id=’complex_dependencies’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) # タスクを定義 start = PythonOperator( task_id=’start’, python_callable=task_function, op_kwargs={‘task_name’: ‘Start’}, dag=dag ) extract1 = PythonOperator( task_id=’extract1′, python_callable=task_function, op_kwargs={‘task_name’: ‘Extract1′}, dag=dag ) extract2 = PythonOperator( task_id=’extract2’, python_callable=task_function, op_kwargs={‘task_name’: ‘Extract2′}, dag=dag ) transform = PythonOperator( task_id=’transform’, python_callable=task_function, op_kwargs={‘task_name’: ‘Transform’}, dag=dag ) load = PythonOperator( task_id=’load’, python_callable=task_function, op_kwargs={‘task_name’: ‘Load’}, dag=dag ) end = PythonOperator( task_id=’end’, python_callable=task_function, op_kwargs={‘task_name’: ‘End’}, dag=dag ) # 依存関係を設定 # start # / \ # extract1 extract2 # \ / # transform # | # load # | # end start >> [extract1, extract2] >> transform >> load >> end
【Airflow UIでのGraph表示】 ┌─────────┐ │ start │ └────┬────┘ ┌─────┴─────┐ ↓ ↓ ┌──────────┐ ┌──────────┐ │ extract1 │ │ extract2 │ └─────┬────┘ └────┬─────┘ └─────┬─────┘ ↓ ┌──────────┐ │transform │ └────┬─────┘ ↓ ┌─────────┐ │ load │ └────┬────┘ ↓ ┌─────────┐ │ end │ └─────────┘

💼 4. 実践演習:ETLパイプラインのDAG作成

4-1. シナリオ

以下のETLパイプラインをAirflowで実装します:

  1. CSVファイルからデータを抽出
  2. データを変換(税込価格を計算)
  3. 結果をファイルに保存
  4. 一時ファイルをクリーンアップ

4-2. 完全なDAGファイル

# ===== シンプルなETLパイプライン ===== # ファイル名: simple_etl_dag.py # 配置場所: $AIRFLOW_HOME/dags/simple_etl_dag.py from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import pandas as pd import logging import os # ロガーを取得 logger = logging.getLogger(__name__) # ===================================== # ETL関数の定義 # ===================================== def extract_data(): “””データ抽出””” logger.info(“データ抽出を開始”) # サンプルデータを作成 data = { ‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’, ‘ぶどう’], ‘価格’: [150, 100, 80, 300], ‘個数’: [10, 20, 15, 5] } df = pd.DataFrame(data) df.to_csv(‘/tmp/extracted_data.csv’, index=False) logger.info(f”データ抽出完了: {len(df)}件”) return ‘/tmp/extracted_data.csv’ def transform_data(): “””データ変換””” logger.info(“データ変換を開始”) df = pd.read_csv(‘/tmp/extracted_data.csv’) # 変換処理 df[‘税込価格’] = (df[‘価格’] * 1.1).astype(int) df[‘合計金額’] = df[‘税込価格’] * df[‘個数’] df.to_csv(‘/tmp/transformed_data.csv’, index=False) logger.info(f”データ変換完了: {len(df)}件”) return ‘/tmp/transformed_data.csv’ def load_data(): “””データロード””” logger.info(“データロードを開始”) df = pd.read_csv(‘/tmp/transformed_data.csv’) output_path = ‘/tmp/final_output.csv’ df.to_csv(output_path, index=False) logger.info(f”データロード完了: {output_path}”) logger.info(f”\n{df.to_string()}”) return output_path def cleanup(): “””クリーンアップ””” logger.info(“クリーンアップを開始”) files_to_delete = [ ‘/tmp/extracted_data.csv’, ‘/tmp/transformed_data.csv’ ] for file_path in files_to_delete: if os.path.exists(file_path): os.remove(file_path) logger.info(f”削除: {file_path}”) logger.info(“クリーンアップ完了”) # ===================================== # DAGの定義 # ===================================== default_args = { ‘owner’: ‘data_engineer’, ‘retries’: 2, ‘retry_delay’: timedelta(minutes=5) } dag = DAG( dag_id=’simple_etl_pipeline’, default_args=default_args, description=’シンプルなETLパイプライン’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False, tags=[‘etl’, ‘example’] ) # ===================================== # タスクの定義 # ===================================== extract_task = PythonOperator( task_id=’extract_data’, python_callable=extract_data, dag=dag ) transform_task = PythonOperator( task_id=’transform_data’, python_callable=transform_data, dag=dag ) load_task = PythonOperator( task_id=’load_data’, python_callable=load_data, dag=dag ) cleanup_task = PythonOperator( task_id=’cleanup’, python_callable=cleanup, dag=dag ) # ===================================== # 依存関係の設定 # ===================================== extract_task >> transform_task >> load_task >> cleanup_task

4-3. DAGの配置と実行

# 1. DAGファイルを配置 cp simple_etl_dag.py $AIRFLOW_HOME/dags/ # 2. DAGが認識されたか確認 airflow dags list | grep simple_etl # 3. DAGの構文チェック python $AIRFLOW_HOME/dags/simple_etl_dag.py # 4. タスク一覧を確認 airflow tasks list simple_etl_pipeline # 5. 特定のタスクをテスト実行 airflow tasks test simple_etl_pipeline extract_data 2024-01-01 # 6. DAG全体を手動実行 airflow dags trigger simple_etl_pipeline
【実行結果】 $ airflow tasks list simple_etl_pipeline extract_data transform_data load_data cleanup $ airflow tasks test simple_etl_pipeline extract_data 2024-01-01 [2024-01-15 09:00:00] INFO – データ抽出を開始 [2024-01-15 09:00:01] INFO – データ抽出完了: 4件

🔄 5. Context Managerを使った書き方(推奨)

5-1. より簡潔な書き方

Python 3.7以降では、with文を使ってより簡潔に書けます。

# ===== Context Managerを使った書き方 ===== from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def extract(): print(“Extract”) def transform(): print(“Transform”) def load(): print(“Load”) # Context Managerを使った書き方 with DAG( dag_id=’etl_with_context’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: # タスク定義(dagパラメータ不要!) extract_task = PythonOperator( task_id=’extract’, python_callable=extract ) transform_task = PythonOperator( task_id=’transform’, python_callable=transform ) load_task = PythonOperator( task_id=’load’, python_callable=load ) # 依存関係 extract_task >> transform_task >> load_task
🎯 Context Managerのメリット
  • コードが簡潔になる
  • 各Operatorにdag=dagを書かなくて良い
  • Python らしい書き方
  • 公式ドキュメントでも推奨
⚠️ よくあるエラーと対処法
  • DAGが表示されない:構文エラーがないか確認(python ファイル名.py)
  • import error:必要なライブラリがインストールされているか確認
  • タスクが実行されない:DAGがONになっているか、スケジュールを確認

📝 STEP 20 のまとめ

✅ このステップで学んだこと
  • DAGファイルの構造:DAGとタスクの定義
  • Operator:PythonOperator、BashOperator
  • タスク依存関係:>>演算子で指定
  • 実践演習:ETLパイプラインのDAG作成
  • Context Manager:with文を使った簡潔な書き方
💡 重要ポイント
  • DAGファイルはPythonコード
  • タスクはOperatorで定義
  • 依存関係は>>演算子で指定
  • DAGはdagsフォルダに配置
  • with文を使うと簡潔に書ける
🎯 次のステップの予告

次のSTEP 21では、「スケジュール実行とトリガー」を学びます。

  • cronライクなスケジュール設定
  • start_dateとschedule_intervalの理解
  • バックフィル(過去実行)

📝 練習問題

問題 1 基礎

「Hello, World!」を出力するシンプルなDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def hello(): print(“Hello, World!”) with DAG( dag_id=’hello_world_dag’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: task = PythonOperator( task_id=’hello_task’, python_callable=hello )
問題 2 基礎

3つのタスク(A → B → C)を順番に実行するDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def task_a(): print(“タスクA実行”) def task_b(): print(“タスクB実行”) def task_c(): print(“タスクC実行”) with DAG( dag_id=’sequential_tasks’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: a = PythonOperator(task_id=’task_a’, python_callable=task_a) b = PythonOperator(task_id=’task_b’, python_callable=task_b) c = PythonOperator(task_id=’task_c’, python_callable=task_c) a >> b >> c
問題 3 基礎

BashOperatorを使って、「Hello from Bash!」と出力するDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime with DAG( dag_id=’bash_hello’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: task = BashOperator( task_id=’hello_bash’, bash_command=’echo “Hello from Bash!”‘ )
問題 4 基礎

引数(name)を受け取って「こんにちは、{name}さん!」と出力するDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def greet(name): print(f”こんにちは、{name}さん!”) with DAG( dag_id=’greet_dag’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: task = PythonOperator( task_id=’greet_task’, python_callable=greet, op_kwargs={‘name’: ‘太郎’} )
問題 5 応用

タスクAの後に、タスクBとCを並列実行し、その後タスクDを実行するDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def task_function(name): print(f”{name}実行”) with DAG( dag_id=’parallel_tasks’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: a = PythonOperator( task_id=’task_a’, python_callable=task_function, op_kwargs={‘name’: ‘タスクA’} ) b = PythonOperator( task_id=’task_b’, python_callable=task_function, op_kwargs={‘name’: ‘タスクB’} ) c = PythonOperator( task_id=’task_c’, python_callable=task_function, op_kwargs={‘name’: ‘タスクC’} ) d = PythonOperator( task_id=’task_d’, python_callable=task_function, op_kwargs={‘name’: ‘タスクD’} ) # A → [B, C] → D a >> [b, c] >> d
問題 6 応用

BashOperatorを使って、ファイルを作成・表示・削除するDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime with DAG( dag_id=’bash_file_operations’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: create = BashOperator( task_id=’create_file’, bash_command=’echo “Airflowのテスト” > /tmp/test.txt’ ) show = BashOperator( task_id=’show_file’, bash_command=’cat /tmp/test.txt’ ) delete = BashOperator( task_id=’delete_file’, bash_command=’rm /tmp/test.txt’ ) create >> show >> delete
問題 7 応用

毎週月曜日の朝9時に実行されるDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def weekly_task(): print(“週次タスク実行”) with DAG( dag_id=’weekly_monday_dag’, start_date=datetime(2024, 1, 1), schedule_interval=’0 9 * * 1′, # 毎週月曜9時(cron形式) catchup=False ) as dag: task = PythonOperator( task_id=’weekly_task’, python_callable=weekly_task )
問題 8 応用

default_argsを使って、リトライ回数3回、リトライ間隔1分を設定したDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta def my_task(): print(“タスク実行”) default_args = { ‘owner’: ‘data_engineer’, ‘retries’: 3, ‘retry_delay’: timedelta(minutes=1) } with DAG( dag_id=’retry_dag’, default_args=default_args, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False ) as dag: task = PythonOperator( task_id=’my_task’, python_callable=my_task )
問題 9 発展

ETLパイプライン(Extract → Transform → Load)で、各タスクがログを出力し、最終タスクで処理件数をログに出力するDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) def extract(): logger.info(“=== Extract開始 ===”) data = list(range(100)) logger.info(f”抽出件数: {len(data)}件”) return data def transform(): logger.info(“=== Transform開始 ===”) logger.info(“データ変換処理中…”) logger.info(“変換完了”) def load(): logger.info(“=== Load開始 ===”) logger.info(“データロード処理中…”) processed_count = 100 logger.info(f”処理完了: {processed_count}件”) default_args = { ‘owner’: ‘data_engineer’, ‘retries’: 2, ‘retry_delay’: timedelta(minutes=5) } with DAG( dag_id=’etl_with_logging’, default_args=default_args, description=’ログ出力付きETLパイプライン’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False, tags=[‘etl’, ‘logging’] ) as dag: extract_task = PythonOperator( task_id=’extract’, python_callable=extract ) transform_task = PythonOperator( task_id=’transform’, python_callable=transform ) load_task = PythonOperator( task_id=’load’, python_callable=load ) extract_task >> transform_task >> load_task
問題 10 発展

複数のデータソース(DB1、DB2、API)から並列でデータを抽出し、統合してからロードするDAGを作成してください。

【解答】
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta def extract_db1(): print(“DB1からデータ抽出”) def extract_db2(): print(“DB2からデータ抽出”) def extract_api(): print(“APIからデータ抽出”) def merge_data(): print(“3つのソースを統合”) def transform(): print(“データ変換”) def load(): print(“データロード”) default_args = { ‘owner’: ‘data_engineer’, ‘retries’: 2, ‘retry_delay’: timedelta(minutes=5) } with DAG( dag_id=’multi_source_etl’, default_args=default_args, description=’複数ソースからのETLパイプライン’, start_date=datetime(2024, 1, 1), schedule_interval=’@daily’, catchup=False, tags=[‘etl’, ‘multi-source’] ) as dag: # 抽出タスク(並列) extract_db1_task = PythonOperator( task_id=’extract_db1′, python_callable=extract_db1 ) extract_db2_task = PythonOperator( task_id=’extract_db2′, python_callable=extract_db2 ) extract_api_task = PythonOperator( task_id=’extract_api’, python_callable=extract_api ) # 統合タスク merge_task = PythonOperator( task_id=’merge_data’, python_callable=merge_data ) # 変換タスク transform_task = PythonOperator( task_id=’transform’, python_callable=transform ) # ロードタスク load_task = PythonOperator( task_id=’load’, python_callable=load ) # 依存関係 # [DB1, DB2, API] → Merge → Transform → Load [extract_db1_task, extract_db2_task, extract_api_task] >> merge_task >> transform_task >> load_task

📝

学習メモ

ETL・データパイプライン構築 - Step 20

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE