📋 このステップで学ぶこと
- PythonOperatorの詳細な使い方
- BashOperatorでのシェルコマンド実行
- EmailOperatorによるメール通知
- SlackOperatorによるSlack通知
- カスタムOperatorの作成方法
- Operatorの選び方と使い分け
⏱️ 学習時間の目安:2.5時間
📝 練習問題:10問(基礎4問・応用4問・発展2問)
🎯 1. Operatorとは?
1-1. Operatorの基本概念
Operator(オペレーター)とは、Airflowで「1つのタスクを実行するための部品」のことです。
📚 例え話:道具箱
Operatorは「道具箱の中の道具」のようなものです。
・PythonOperator = ドライバー(プログラムを動かす)
・BashOperator = ハンマー(コマンドを叩く)
・EmailOperator = 封筒(メールを送る)
・SlackOperator = メガホン(チームに通知)
大工さんが作業内容に合わせて道具を選ぶように、
私たちも処理内容に合わせて適切なOperatorを選びます!
1-2. 主要なOperator一覧
よく使うOperator
| Operator |
用途 |
使用頻度 |
カテゴリ |
| PythonOperator |
Pythonコードを実行 |
⭐⭐⭐ |
基本 |
| BashOperator |
シェルコマンドを実行 |
⭐⭐⭐ |
基本 |
| EmailOperator |
メールを送信 |
⭐⭐ |
通知 |
| SlackWebhookOperator |
Slackにメッセージ送信 |
⭐⭐ |
通知 |
| PostgresOperator |
PostgreSQLでSQL実行 |
⭐⭐ |
データベース |
| DummyOperator |
何もしない(構造作り) |
⭐⭐ |
ユーティリティ |
| BranchPythonOperator |
条件分岐 |
⭐ |
制御フロー |
Operatorの選び方フローチャート
┌─────────────────────────────────────────────────────────────┐
│ 何をしたいですか? │
└─────────────────────────────────────────────────────────────┘
│
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ コード実行 │ │ 通知送信 │ │ DB操作 │
└──────────┘ └──────────┘ └──────────┘
│ │ │
┌────┴────┐ ┌────┴────┐ │
▼ ▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────────────┐
│Python│ │ Bash │ │Email │ │Slack │ │PostgresOperator│
│ ✓ │ │ ✓ │ │ ✓ │ │ ✓ │ │ MySqlOperator │
└──────┘ └──────┘ └──────┘ └──────┘ └──────────────┘
判断基準:
・Pythonでデータ処理 → PythonOperator
・シェルコマンド/スクリプト → BashOperator
・メール通知 → EmailOperator
・Slack通知 → SlackWebhookOperator
・SQL実行 → PostgresOperator/MySqlOperator
🐍 2. PythonOperatorの詳細
2-1. 基本的な使い方
PythonOperatorは、Pythonの関数を実行するOperatorです。
# ===== PythonOperatorの基本 =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# 実行したい関数を定義
def hello_world():
print(“Hello, Airflow!”)
return “タスク完了”
# DAGの定義
with DAG(
dag_id=’python_operator_example’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
# PythonOperatorでタスクを作成
task = PythonOperator(
task_id=’hello_task’,
python_callable=hello_world # 実行する関数
)
【実行ログ】
[hello_task] Hello, Airflow!
[hello_task] Done. Returned value was: タスク完了
💡 重要ポイント
python_callableには、関数名を渡す(()は付けない)
❌ 間違い: python_callable=hello_world()
✅ 正解: python_callable=hello_world
2-2. 引数を渡す方法
# ===== 引数を渡す =====
def greet(name, age):
print(f”こんにちは、{name}さん!{age}歳ですね。”)
return f”{name}さんの処理完了”
# op_kwargsで引数を渡す
task = PythonOperator(
task_id=’greet_task’,
python_callable=greet,
op_kwargs={
‘name’: ‘太郎’,
‘age’: 25
}
)
【実行ログ】
[greet_task] こんにちは、太郎さん!25歳ですね。
[greet_task] Done. Returned value was: 太郎さんの処理完了
2-3. contextを使う
# ===== contextを使って実行情報を取得 =====
def process_with_context(**context):
“””contextから実行情報を取得”””
# 実行日(execution_date)
execution_date = context[‘ds’]
print(f”実行日: {execution_date}”)
# DAG ID
dag_id = context[‘dag’].dag_id
print(f”DAG ID: {dag_id}”)
# タスクID
task_id = context[‘task_instance’].task_id
print(f”タスク ID: {task_id}”)
# XComにデータを保存
context[‘ti’].xcom_push(key=’status’, value=’success’)
return “処理完了”
task = PythonOperator(
task_id=’context_task’,
python_callable=process_with_context,
provide_context=True # Airflow 2.0以降は自動でTrueになる
)
【実行ログ】
[context_task] 実行日: 2024-01-15
[context_task] DAG ID: python_operator_example
[context_task] タスク ID: context_task
2-4. 実践例:データベースからデータ抽出
# ===== データベースからデータ抽出 =====
import pandas as pd
from sqlalchemy import create_engine
def extract_data(**context):
“””データベースからデータを抽出”””
engine = create_engine(‘postgresql://user:pass@localhost/mydb’)
# 実行日を使ってクエリ
execution_date = context[‘ds’]
query = f”””
SELECT * FROM sales
WHERE date = ‘{execution_date}’
“””
df = pd.read_sql(query, engine)
print(f”抽出件数: {len(df)}”)
# CSVに保存
file_path = f’/tmp/sales_{execution_date}.csv’
df.to_csv(file_path, index=False)
return file_path
extract_task = PythonOperator(
task_id=’extract_sales_data’,
python_callable=extract_data
)
【実行ログ】
[extract_sales_data] 抽出件数: 150
[extract_sales_data] Done. Returned value was: /tmp/sales_2024-01-15.csv
💻 3. BashOperatorの活用
3-1. 基本的な使い方
BashOperatorは、シェルコマンドを実行するOperatorです。
# ===== BashOperatorの基本 =====
from airflow.operators.bash import BashOperator
# 単純なコマンド
task1 = BashOperator(
task_id=’print_date’,
bash_command=’date’
)
# 複数のコマンドを実行
task2 = BashOperator(
task_id=’create_and_list’,
bash_command=’mkdir -p /tmp/airflow_test && ls -la /tmp/airflow_test’
)
【実行ログ】
[print_date] Mon Jan 15 09:00:00 JST 2024
[create_and_list] total 0
drwxr-xr-x 2 airflow airflow 40 Jan 15 09:00 .
3-2. Airflowマクロを使う
# ===== Airflowマクロを使う =====
task = BashOperator(
task_id=’use_macros’,
bash_command=”’
echo “実行日: {{ ds }}”
echo “実行日(フォーマット): {{ ds_nodash }}”
echo “前日: {{ yesterday_ds }}”
echo “翌日: {{ tomorrow_ds }}”
”’
)
【実行ログ】
[use_macros] 実行日: 2024-01-15
[use_macros] 実行日(フォーマット): 20240115
[use_macros] 前日: 2024-01-14
[use_macros] 翌日: 2024-01-16
3-3. 環境変数を使う
# ===== 環境変数を渡す =====
task = BashOperator(
task_id=’use_env_var’,
bash_command=’echo “DB: $DB_NAME, ENV: $ENVIRONMENT”‘,
env={
‘DB_NAME’: ‘production’,
‘ENVIRONMENT’: ‘prod’
}
)
【実行ログ】
[use_env_var] DB: production, ENV: prod
3-4. 実践例:ファイルのバックアップ
# ===== ファイルバックアップ =====
backup_task = BashOperator(
task_id=’backup_files’,
bash_command=”’
# バックアップディレクトリ作成
BACKUP_DIR=”/backup/{{ ds }}”
mkdir -p $BACKUP_DIR
# ファイルをコピー
cp -r /data/important_files $BACKUP_DIR/
# 圧縮
tar -czf $BACKUP_DIR.tar.gz $BACKUP_DIR
# 古いバックアップを削除(30日以前)
find /backup -name “*.tar.gz” -mtime +30 -delete
echo “✅ バックアップ完了: $BACKUP_DIR.tar.gz”
”’
)
【実行ログ】
[backup_files] ✅ バックアップ完了: /backup/2024-01-15.tar.gz
🎯 BashOperatorを使う場面
- ファイル操作(コピー、移動、削除、圧縮)
- 既存のシェルスクリプトを実行
- システムコマンドを実行
- Pythonより簡単に書ける処理
📧 4. EmailOperatorによるメール通知
4-1. メール設定
EmailOperatorを使う前に、airflow.cfgにメールサーバーの設定が必要です。
# airflow.cfg の設定例
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = your_email@gmail.com
smtp_password = your_app_password
smtp_port = 587
smtp_mail_from = your_email@gmail.com
4-2. 基本的な使い方
# ===== EmailOperatorの基本 =====
from airflow.operators.email import EmailOperator
send_email = EmailOperator(
task_id=’send_email’,
to=’recipient@example.com’,
subject=’Airflowからのお知らせ’,
html_content=”’
タスクが完了しました!
処理が正常に終了しました。
”’
)
4-3. 動的な内容を送信
# ===== 動的な内容 =====
send_email = EmailOperator(
task_id=’send_report’,
to=’team@example.com’,
subject=’日次レポート – {{ ds }}’,
html_content=”’
日次レポート
実行日: {{ ds }}
DAG: {{ dag.dag_id }}
ステータス: 正常終了
”’
)
4-4. 添付ファイル付きメール
# ===== 添付ファイル付きメール =====
send_email = EmailOperator(
task_id=’send_with_attachment’,
to=’manager@example.com’,
subject=’レポート添付 – {{ ds }}’,
html_content=’
レポートを添付しました。ご確認ください。
‘,
files=[‘/tmp/report.csv’, ‘/tmp/summary.pdf’]
)
⚠️ Gmailを使う場合の注意
Gmailでは通常のパスワードは使えません。
アプリパスワードを生成して使用してください。
1. Googleアカウントで2段階認証を有効化
2. 「アプリパスワード」を生成
3. airflow.cfgのsmtp_passwordに設定
💬 5. SlackOperatorによるSlack通知
5-1. Slack Webhook URLの取得
📝 Webhook URLの取得手順
- Slackワークスペースにログイン
- 「App管理」→「Incoming Webhooks」を検索
- 「Add to Slack」をクリック
- 通知先のチャンネルを選択
- Webhook URLをコピー
5-2. Airflowへの設定
# Airflow UI → Admin → Connections → +
Conn Id: slack_webhook
Conn Type: HTTP
Host: https://hooks.slack.com/services
Password: T00000000/B00000000/XXXXXXXXXXXX
5-3. 基本的な使い方
# ===== SlackWebhookOperatorの基本 =====
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
send_slack = SlackWebhookOperator(
task_id=’send_slack’,
slack_webhook_conn_id=’slack_webhook’,
message=’タスクが完了しました!’,
channel=’#data-pipeline’
)
5-4. リッチメッセージを送信
# ===== リッチメッセージ =====
send_slack = SlackWebhookOperator(
task_id=’send_rich_message’,
slack_webhook_conn_id=’slack_webhook’,
message=”’
:white_check_mark: *データパイプライン完了*
*DAG:* `sales_daily_pipeline`
*実行日:* {{ ds }}
*ステータス:* 成功
処理件数: 10,000件
処理時間: 5分30秒
”’,
channel=’#data-pipeline’
)
🎯 Slack通知のベストプラクティス
- 絵文字を活用:✅ ❌ ⚠️ で視覚的にわかりやすく
- チャンネルを分ける:重要度でチャンネル分け
- メンション機能:エラー時は@channelで全員に通知
- リンクを含める:Airflow UIへのリンクを追加
🔨 6. カスタムOperatorの作成
6-1. カスタムOperatorとは?
既存のOperatorで対応できない処理がある場合、自分でOperatorを作成できます。
6-2. 基本的な作り方
# ===== カスタムOperatorの基本 =====
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
“””カスタムOperatorの例”””
def __init__(self, my_param, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_param = my_param
def execute(self, context):
“””実際の処理を書く”””
self.log.info(f”パラメータ: {self.my_param}”)
self.log.info(“カスタム処理を実行中…”)
# 処理を実装
result = self.my_param * 2
self.log.info(f”結果: {result}”)
return result
# 使い方
task = MyCustomOperator(
task_id=’custom_task’,
my_param=10
)
【実行ログ】
[custom_task] パラメータ: 10
[custom_task] カスタム処理を実行中…
[custom_task] 結果: 20
6-3. 実践例:CSVファイル検証Operator
# ===== CSVファイル検証Operator =====
import pandas as pd
from airflow.models import BaseOperator
class CsvValidationOperator(BaseOperator):
“””CSVファイルを検証するカスタムOperator”””
def __init__(
self,
file_path,
required_columns=None,
max_null_ratio=0.1,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.file_path = file_path
self.required_columns = required_columns or []
self.max_null_ratio = max_null_ratio
def execute(self, context):
“””検証処理”””
self.log.info(f”ファイル検証開始: {self.file_path}”)
# CSVを読み込み
df = pd.read_csv(self.file_path)
# 1. 必須カラムの存在確認
missing = set(self.required_columns) – set(df.columns)
if missing:
raise ValueError(f”必須カラムが不足: {missing}”)
# 2. NULL値の割合チェック
for col in df.columns:
null_ratio = df[col].isnull().sum() / len(df)
if null_ratio > self.max_null_ratio:
self.log.warning(f”⚠️ ‘{col}’ のNULL値が多い: {null_ratio:.2%}”)
self.log.info(f”✅ 検証完了: {len(df)}行”)
return {
‘row_count’: len(df),
‘column_count’: len(df.columns),
‘validation_passed’: True
}
# 使い方
validate_csv = CsvValidationOperator(
task_id=’validate_sales_csv’,
file_path=’/data/sales.csv’,
required_columns=[‘date’, ‘product_id’, ‘amount’],
max_null_ratio=0.05
)
【実行ログ】
[validate_sales_csv] ファイル検証開始: /data/sales.csv
[validate_sales_csv] ✅ 検証完了: 1000行
[validate_sales_csv] Done. Returned value was: {‘row_count’: 1000, ‘column_count’: 5, ‘validation_passed’: True}
🎯 7. Operatorの使い分けガイド
7-1. シチュエーション別の選択
Operator選択ガイド
| やりたいこと |
おすすめOperator |
理由 |
| Pythonでデータ処理 |
PythonOperator |
Pandas、NumPyなどが使える |
| ファイル操作 |
BashOperator |
シェルコマンドが簡単 |
| SQLの実行 |
PostgresOperator |
専用Operatorが便利 |
| メール通知 |
EmailOperator |
設定が簡単 |
| Slack通知 |
SlackWebhookOperator |
リアルタイム性が高い |
| DAGの構造作り |
DummyOperator |
プレースホルダーとして便利 |
| 複雑な独自処理 |
カスタムOperator |
再利用性が高い |
💡 Operatorを選ぶ4つの基準
- シンプルさ:できるだけシンプルなOperatorを選ぶ
- 再利用性:同じ処理が多いならカスタムOperatorを検討
- 保守性:チームメンバーが理解しやすいものを選ぶ
- パフォーマンス:大量データならPythonOperatorが有利
💼 8. 実践演習:複数Operatorを組み合わせたDAG
8-1. 要件
- データを抽出(PythonOperator)
- ファイルを圧縮(BashOperator)
- Slackに通知(SlackWebhookOperator)
- メールで報告(EmailOperator)
8-2. 完全な実装
# ===== 複数Operatorを組み合わせたDAG =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.email import EmailOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime
import pandas as pd
def extract_data(**context):
“””データ抽出”””
df = pd.DataFrame({
‘日付’: [‘2024-01-15’] * 100,
‘商品’: [f’商品{i}’ for i in range(100)],
‘売上’: [i * 100 for i in range(100)]
})
file_path = f”/tmp/sales_{context[‘ds’]}.csv”
df.to_csv(file_path, index=False)
print(f”✅ 抽出完了: {len(df)}件 → {file_path}”)
# 次のタスクにパスを渡す
context[‘ti’].xcom_push(key=’file_path’, value=file_path)
context[‘ti’].xcom_push(key=’record_count’, value=len(df))
return file_path
with DAG(
dag_id=’multi_operator_dag’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False,
tags=[‘example’, ‘multi-operator’]
) as dag:
# 1. データ抽出(Python)
extract = PythonOperator(
task_id=’extract_data’,
python_callable=extract_data
)
# 2. ファイル圧縮(Bash)
compress = BashOperator(
task_id=’compress_file’,
bash_command=”’
FILE=”{{ ti.xcom_pull(task_ids=’extract_data’, key=’file_path’) }}”
gzip -f $FILE
echo “✅ 圧縮完了: ${FILE}.gz”
”’
)
# 3. Slack通知
notify_slack = SlackWebhookOperator(
task_id=’notify_slack’,
slack_webhook_conn_id=’slack_webhook’,
message=”’
:white_check_mark: *日次データ処理完了*
*実行日:* {{ ds }}
*処理件数:* {{ ti.xcom_pull(task_ids=’extract_data’, key=’record_count’) }}件
”’
)
# 4. メール送信
send_email = EmailOperator(
task_id=’send_email’,
to=’team@example.com’,
subject=’日次データ処理完了 – {{ ds }}’,
html_content=”’
処理完了レポート
日次データ処理が正常に完了しました。
処理件数: {{ ti.xcom_pull(task_ids=’extract_data’, key=’record_count’) }}件
”’
)
# 依存関係: extract → compress → [slack, email]
extract >> compress >> [notify_slack, send_email]
【実行ログ】
[extract_data] ✅ 抽出完了: 100件 → /tmp/sales_2024-01-15.csv
[compress_file] ✅ 圧縮完了: /tmp/sales_2024-01-15.csv.gz
[notify_slack] Slack通知送信完了
[send_email] メール送信完了
📝 STEP 23 のまとめ
✅ このステップで学んだこと
- PythonOperator:Pythonコードを実行、引数の渡し方
- BashOperator:シェルコマンドやスクリプトの実行
- EmailOperator:メール通知、添付ファイルの送信
- SlackWebhookOperator:Slack通知、リッチメッセージ
- カスタムOperator:独自Operatorの作成方法
- Operatorの使い分け:状況に応じた適切な選択
💡 重要ポイント
- データ処理はPythonOperatorが基本
- ファイル操作はBashOperatorが便利
- 通知はEmailOperatorとSlackOperatorを併用
- 再利用性を高めるならカスタムOperator
🎯 次のステップの予告
次のSTEP 24では、「エラーハンドリングとモニタリング」を学びます。
- リトライ設定
- タイムアウト設定
- エラー通知(メール、Slack)
- DAGの監視とログ確認
📝 練習問題
問題 1
基礎
PythonOperatorで「Hello, Airflow!」と表示するタスクを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def hello():
print(“Hello, Airflow!”)
with DAG(
dag_id=’hello_dag’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
task = PythonOperator(
task_id=’hello_task’,
python_callable=hello
)
問題 2
基礎
BashOperatorで現在の日時を表示するタスクを作成してください。
【解答】
from airflow.operators.bash import BashOperator
task = BashOperator(
task_id=’print_date’,
bash_command=’date’
)
問題 3
基礎
PythonOperatorでop_kwargsを使って、名前と年齢を引数として受け取り表示するタスクを作成してください。
【解答】
def greet(name, age):
print(f”こんにちは、{name}さん!{age}歳ですね。”)
task = PythonOperator(
task_id=’greet_task’,
python_callable=greet,
op_kwargs={
‘name’: ‘太郎’,
‘age’: 25
}
)
問題 4
基礎
BashOperatorでAirflowマクロ {{ ds }} を使って実行日を表示するタスクを作成してください。
【解答】
task = BashOperator(
task_id=’show_execution_date’,
bash_command=’echo “実行日: {{ ds }}”‘
)
問題 5
応用
PythonOperatorでcontextを使って実行日を取得し、その日付をファイル名に含めてCSVファイルを作成するタスクを作成してください。
【解答】
import pandas as pd
def create_csv(**context):
execution_date = context[‘ds’]
df = pd.DataFrame({
‘商品’: [‘A’, ‘B’, ‘C’],
‘価格’: [100, 200, 300]
})
file_path = f’/tmp/data_{execution_date}.csv’
df.to_csv(file_path, index=False)
print(f”ファイル作成: {file_path}”)
return file_path
task = PythonOperator(
task_id=’create_csv’,
python_callable=create_csv
)
問題 6
応用
BashOperatorで環境変数を使って、データベース名とテーブル名を表示するタスクを作成してください。
【解答】
task = BashOperator(
task_id=’show_db_info’,
bash_command=’echo “DB: $DB_NAME, Table: $TABLE_NAME”‘,
env={
‘DB_NAME’: ‘production’,
‘TABLE_NAME’: ‘sales’
}
)
問題 7
応用
EmailOperatorで動的な件名(実行日を含む)と本文を持つメールを送信するタスクを作成してください。
【解答】
from airflow.operators.email import EmailOperator
send_email = EmailOperator(
task_id=’send_report’,
to=’team@example.com’,
subject=’日次レポート – {{ ds }}’,
html_content=”’
日次レポート
実行日: {{ ds }}
DAG: {{ dag.dag_id }}
処理が正常に完了しました。
”’
)
問題 8
応用
SlackWebhookOperatorで絵文字を使ったリッチメッセージを送信するタスクを作成してください。
【解答】
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
send_slack = SlackWebhookOperator(
task_id=’send_slack’,
slack_webhook_conn_id=’slack_webhook’,
message=”’
:white_check_mark: *処理完了*
*DAG:* `{{ dag.dag_id }}`
*実行日:* {{ ds }}
*ステータス:* 成功 :tada:
”’,
channel=’#data-pipeline’
)
問題 9
発展
ファイルの行数をカウントするカスタムOperatorを作成してください。ファイルパスを引数として受け取り、行数を返すようにしてください。
【解答】
from airflow.models import BaseOperator
class FileLineCountOperator(BaseOperator):
“””ファイルの行数をカウントするOperator”””
def __init__(self, file_path, *args, **kwargs):
super().__init__(*args, **kwargs)
self.file_path = file_path
def execute(self, context):
self.log.info(f”ファイル: {self.file_path}”)
with open(self.file_path, ‘r’) as f:
line_count = sum(1 for line in f)
self.log.info(f”行数: {line_count}”)
return line_count
# 使い方
count_task = FileLineCountOperator(
task_id=’count_lines’,
file_path=’/tmp/data.csv’
)
問題 10
発展
PythonOperator、BashOperator、SlackWebhookOperatorを組み合わせて、データ抽出→ファイル圧縮→Slack通知の一連のDAGを作成してください。
【解答】
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime
import pandas as pd
def extract_data(**context):
df = pd.DataFrame({
‘商品’: [‘A’, ‘B’, ‘C’],
‘売上’: [100, 200, 300]
})
file_path = f”/tmp/data_{context[‘ds’]}.csv”
df.to_csv(file_path, index=False)
context[‘ti’].xcom_push(key=’file_path’, value=file_path)
context[‘ti’].xcom_push(key=’count’, value=len(df))
return file_path
with DAG(
dag_id=’etl_with_notification’,
start_date=datetime(2024, 1, 1),
schedule_interval=’@daily’,
catchup=False
) as dag:
extract = PythonOperator(
task_id=’extract’,
python_callable=extract_data
)
compress = BashOperator(
task_id=’compress’,
bash_command=”’
FILE=”{{ ti.xcom_pull(task_ids=’extract’, key=’file_path’) }}”
gzip -f $FILE
echo “圧縮完了: ${FILE}.gz”
”’
)
notify = SlackWebhookOperator(
task_id=’notify’,
slack_webhook_conn_id=’slack_webhook’,
message=”’
:white_check_mark: *ETL完了*
*実行日:* {{ ds }}
*処理件数:* {{ ti.xcom_pull(task_ids=’extract’, key=’count’) }}件
”’
)
extract >> compress >> notify