🎯 総合プロジェクト
📋 このステップで学ぶこと
- 外部広告APIからのデータ取得(認証、ページネーション、レート制限対策)
- CRMデータベースからの顧客データ抽出
- 営業部門のCSVレポートの読み込みと正規化
- Great Expectationsを使った高度なデータ品質チェック(STEP 12の実践)
- マーケティングKPIの計算(CAC、LTV、ROAS等)
- 分析用データマートの構築
- 複数形式でのレポート出力(S3、メール添付)
- Slack通知による完全な監視
⏱️ 学習時間の目安:3.5時間
📝 難易度:⭐⭐⭐⭐⭐(最高難度)
📌 STEP 25との違い
STEP 25では社内DBからの売上データ集計を行いました。
STEP 26では、より複雑なマーケティング分析シナリオに挑戦します:
|
STEP 25 |
STEP 26(今回) |
| テーマ |
EC売上集計 |
マーケティング分析 |
| データソース |
社内DB(PostgreSQL、MySQL) |
外部API + CRM + CSV |
| 品質チェック |
手動実装 |
Great Expectations |
| 分析指標 |
売上、利益率 |
CAC、LTV、ROAS、CVR |
🎯 1. プロジェクト概要
📚 例え話:マーケティング部門の「データ参謀」
あなたは軍師のように、様々な情報源からデータを集めて分析します。
・広告API = 偵察隊からの報告(どこにいくら使い、何人獲得できたか)
・CRM = 本陣の記録(顧客の詳細情報、過去の購入履歴)
・営業CSV = 前線からの報告(実際の売上、商談結果)
これらを統合・分析して、「次にどこに投資すべきか」を経営陣に提言します!🎯
1-1. プロジェクトの背景
あなたはデータエンジニアとして、マーケティング効果分析システムを構築するプロジェクトに参加しています。
マーケティング部門は、広告投資の効果を正確に把握し、データドリブンな意思決定を行いたいと考えています。
📊 ビジネス要件
- 毎日午前7時に自動実行(広告データは前日分を取得)
- 3種類のデータソースを統合:
- 広告API(Google Ads風):広告費用、クリック数、コンバージョン
- CRM(PostgreSQL):顧客情報、購入履歴
- 営業CSV:月次売上レポート
- Great Expectationsでデータ品質を検証
- マーケティングKPIを計算(CAC、LTV、ROAS、CVR)
- 分析用データマートを構築
- 経営陣向けサマリーレポートを自動生成
- 完了/エラーをSlack/メールで通知
1-2. データソースと主要KPI
データソース一覧
| データソース |
種類 |
取得方法 |
主なカラム |
| 広告データ |
REST API |
OAuth認証 |
campaign_id, date, impressions, clicks, cost, conversions |
| 顧客データ |
PostgreSQL |
SQLクエリ |
customer_id, acquisition_date, source, lifetime_value |
| 売上データ |
CSVファイル |
ファイル読み込み |
customer_id, order_date, revenue, product_category |
計算するマーケティングKPI
| KPI |
正式名称 |
計算式 |
意味 |
| CAC |
Customer Acquisition Cost |
広告費 ÷ 新規顧客数 |
1人の顧客獲得にかかるコスト |
| LTV |
Lifetime Value |
顧客あたり累計売上 |
1人の顧客がもたらす総売上 |
| ROAS |
Return On Ad Spend |
売上 ÷ 広告費 × 100 |
広告費1円あたりの売上 |
| CVR |
Conversion Rate |
コンバージョン ÷ クリック × 100 |
クリックからの成約率 |
1-3. パイプライン構成
パイプライン全体図
┌─────────────────────────────────────────────────────────────────────────────────┐
│ マーケティングデータ統合パイプライン │
└─────────────────────────────────────────────────────────────────────────────────┘
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 広告API │ │ CRM (DB) │ │ 営業CSV │
│ (Google Ads)│ │ (PostgreSQL)│ │ (売上) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
│ Task 1 │ Task 2 │ Task 3
▼ ▼ ▼
┌──────────────────────────────────────────────────────┐
│ Extract(並列実行) │
└──────────────────────────┬───────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ Great Expectations 品質チェック(Task 4) │
│ ※ STEP 12で学んだ技術を活用! │
└──────────────────────────┬───────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ Transform & KPI計算(Task 5-6) │
│ ・データ統合 ・CAC/LTV/ROAS計算 │
└──────────────────────────┬───────────────────────────┘
│
┌────────────┴────────────┐
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ Load to DWH │ │ Generate Report │
│ (Task 7) │ │ (Task 8) │
└────────┬─────────┘ └────────┬─────────┘
│ │
└─────────────┬───────────────┘
▼
┌──────────────────┐
│ Notify (Slack) │
│ (Task 9) │
└──────────────────┘
📥 2. Task 1: 広告APIからデータ取得
2-1. API仕様
Google Ads風の広告APIからキャンペーン別のパフォーマンスデータを取得します。
広告API仕様
| 項目 | 内容 |
| エンドポイント | https://api.ads.example.com/v1/reports |
| メソッド | GET |
| 認証 | OAuth 2.0(Bearer Token) |
| レート制限 | 100リクエスト/分 |
2-2. 実装コード
# ===== Task 1: 広告APIからデータ取得 =====
import requests
import pandas as pd
from airflow.models import Variable
import time
from datetime import datetime, timedelta
def extract_ads_from_api(**context):
“””広告APIからキャンペーンデータを取得”””
execution_date = context[‘ds’]
# 前日のデータを取得(広告データは1日遅れで確定)
report_date = (datetime.strptime(execution_date, ‘%Y-%m-%d’) – timedelta(days=1)).strftime(‘%Y-%m-%d’)
# API設定
base_url = Variable.get(‘ads_api_url’)
access_token = Variable.get(‘ads_api_token’)
headers = {
‘Authorization’: f’Bearer {access_token}’,
‘Content-Type’: ‘application/json’
}
all_campaigns = []
page = 1
max_retries = 3
while True:
params = {
‘date’: report_date,
‘page’: page,
‘per_page’: 50,
‘metrics’: ‘impressions,clicks,cost,conversions’
}
for attempt in range(max_retries):
try:
print(f”[Task1] 広告API: ページ {page} (試行 {attempt + 1})”)
response = requests.get(
f'{base_url}/reports’,
headers=headers,
params=params,
timeout=30
)
# レート制限チェック
if response.status_code == 429:
retry_after = int(response.headers.get(‘Retry-After’, 60))
print(f”レート制限: {retry_after}秒待機”)
time.sleep(retry_after)
continue
response.raise_for_status()
data = response.json()
campaigns = data.get(‘campaigns’, [])
all_campaigns.extend(campaigns)
print(f”取得: {len(campaigns)}件 (累計: {len(all_campaigns)}件)”)
# ページネーション
if not data.get(‘has_next_page’, False):
break
page += 1
time.sleep(0.6) # レート制限対策
break
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"APIエラー: {e}、{wait_time}秒後リトライ")
time.sleep(wait_time)
else:
raise Exception(f"広告API取得失敗: {e}")
else:
continue
break
if not all_campaigns:
raise ValueError(f"広告データが取得できませんでした: {report_date}")
df = pd.DataFrame(all_campaigns)
# データ型変換
df['date'] = pd.to_datetime(df['date'])
df['impressions'] = df['impressions'].astype(int)
df['clicks'] = df['clicks'].astype(int)
df['cost'] = df['cost'].astype(float)
df['conversions'] = df['conversions'].astype(int)
# CTR(クリック率)を計算
df['ctr'] = (df['clicks'] / df['impressions'] * 100).round(2)
output_path = f'/tmp/ads_data_{execution_date}.csv'
df.to_csv(output_path, index=False)
print(f"✅ 広告API抽出完了: {len(df)}キャンペーン、総広告費: ¥{df['cost'].sum():,.0f}")
context['ti'].xcom_push(key='ads_file', value=output_path)
context['ti'].xcom_push(key='ads_count', value=len(df))
context['ti'].xcom_push(key='total_ad_cost', value=df['cost'].sum())
return output_path
【実行ログ】
[2024-01-15 07:00:05] [Task1] 広告API: ページ 1 (試行 1)
[2024-01-15 07:00:06] 取得: 50件 (累計: 50件)
[2024-01-15 07:00:07] [Task1] 広告API: ページ 2 (試行 1)
[2024-01-15 07:00:08] 取得: 35件 (累計: 85件)
[2024-01-15 07:00:09] ✅ 広告API抽出完了: 85キャンペーン、総広告費: ¥2,450,000
🗄️ 3. Task 2: CRMから顧客データ抽出
# ===== Task 2: CRMから顧客データ抽出 =====
from airflow.providers.postgres.hooks.postgres import PostgresHook
def extract_customers_from_crm(**context):
“””CRM(PostgreSQL)から顧客データを抽出”””
execution_date = context[‘ds’]
hook = PostgresHook(postgres_conn_id=’crm_db’)
engine = hook.get_sqlalchemy_engine()
# 顧客とその獲得チャネル、累計売上を取得
query = “””
SELECT
c.customer_id,
c.customer_name,
c.email,
c.acquisition_date,
c.acquisition_source, — 広告キャンペーンID
c.acquisition_channel, — Google, Facebook, Organic等
COALESCE(SUM(o.revenue), 0) as lifetime_value,
COUNT(o.order_id) as order_count,
MAX(o.order_date) as last_order_date
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
WHERE c.acquisition_date >= CURRENT_DATE – INTERVAL ’90 days’
GROUP BY c.customer_id, c.customer_name, c.email,
c.acquisition_date, c.acquisition_source, c.acquisition_channel
“””
print(“[Task2] CRMから顧客データを抽出中…”)
df = pd.read_sql(query, engine)
if len(df) == 0:
raise ValueError(“顧客データが0件です”)
# 顧客ステータスを追加
df[‘customer_status’] = df.apply(
lambda row: ‘Active’ if row[‘order_count’] > 0 else ‘Inactive’,
axis=1
)
output_path = f’/tmp/crm_customers_{execution_date}.csv’
df.to_csv(output_path, index=False)
print(f”✅ CRM抽出完了: {len(df)}顧客、平均LTV: ¥{df[‘lifetime_value’].mean():,.0f}”)
context[‘ti’].xcom_push(key=’crm_file’, value=output_path)
context[‘ti’].xcom_push(key=’customer_count’, value=len(df))
return output_path
📄 4. Task 3: 営業CSVから売上データ読み込み
# ===== Task 3: 営業CSVから売上データ読み込み =====
def extract_sales_from_csv(**context):
“””営業部門のCSVレポートを読み込み”””
execution_date = context[‘ds’]
# 月次レポートのパス
year_month = execution_date[:7] # 2024-01
csv_path = f’/data/sales_reports/sales_{year_month}.csv’
print(f”[Task3] 営業CSVを読み込み中: {csv_path}”)
df = pd.read_csv(
csv_path,
encoding=’utf-8′,
dtype={
‘customer_id’: str,
‘product_category’: str
},
parse_dates=[‘order_date’]
)
print(f”読み込み完了: {len(df)}件”)
# データクリーニング
original_count = len(df)
# 重複削除
df = df.drop_duplicates(subset=[‘order_id’])
# NULL値チェック
df = df[df[‘customer_id’].notna()]
df = df[df[‘revenue’].notna()]
# 異常値除去(マイナス売上)
df = df[df[‘revenue’] >= 0]
if len(df) < original_count:
print(f"警告: {original_count - len(df)}件のデータを除外しました")
output_path = f'/tmp/sales_data_{execution_date}.csv'
df.to_csv(output_path, index=False)
print(f"✅ 営業CSV処理完了: {len(df)}件、総売上: ¥{df['revenue'].sum():,.0f}")
context['ti'].xcom_push(key='sales_file', value=output_path)
context['ti'].xcom_push(key='sales_count', value=len(df))
context['ti'].xcom_push(key='total_revenue', value=df['revenue'].sum())
return output_path
✅ 5. Task 4: Great Expectationsによる品質チェック
🔗 STEP 12で学んだGreat Expectationsを実践!
STEP 12で学んだGreat Expectationsを使って、
プロダクションレベルのデータ品質チェックを実装します。
# ===== Task 4: Great Expectationsによる品質チェック =====
import great_expectations as gx
from great_expectations.dataset import PandasDataset
def validate_with_great_expectations(**context):
“””Great Expectationsでデータ品質を検証”””
ti = context[‘ti’]
execution_date = context[‘ds’]
# 各ファイルを取得
ads_file = ti.xcom_pull(task_ids=’extract_ads_from_api’, key=’ads_file’)
crm_file = ti.xcom_pull(task_ids=’extract_customers_from_crm’, key=’crm_file’)
sales_file = ti.xcom_pull(task_ids=’extract_sales_from_csv’, key=’sales_file’)
validation_results = {}
all_passed = True
# ===== 広告データの検証 =====
print(“[Task4] 広告データを検証中…”)
ads_df = PandasDataset(pd.read_csv(ads_file))
# Expectationsを定義
ads_expectations = [
ads_df.expect_column_to_exist(‘campaign_id’),
ads_df.expect_column_to_exist(‘impressions’),
ads_df.expect_column_to_exist(‘clicks’),
ads_df.expect_column_to_exist(‘cost’),
ads_df.expect_column_values_to_be_between(‘impressions’, min_value=0),
ads_df.expect_column_values_to_be_between(‘clicks’, min_value=0),
ads_df.expect_column_values_to_be_between(‘cost’, min_value=0),
ads_df.expect_column_values_to_not_be_null(‘campaign_id’),
# clicksはimpressionsを超えない
ads_df.expect_column_pair_values_A_to_be_greater_than_B(
‘impressions’, ‘clicks’, or_equal=True
)
]
ads_passed = all(e[‘success’] for e in ads_expectations)
validation_results[‘ads’] = {
‘passed’: ads_passed,
‘expectations’: len(ads_expectations),
‘failed’: sum(1 for e in ads_expectations if not e[‘success’])
}
if not ads_passed:
all_passed = False
print(“⚠️ 広告データに品質問題があります”)
# ===== 顧客データの検証 =====
print(“[Task4] 顧客データを検証中…”)
crm_df = PandasDataset(pd.read_csv(crm_file))
crm_expectations = [
crm_df.expect_column_to_exist(‘customer_id’),
crm_df.expect_column_values_to_be_unique(‘customer_id’),
crm_df.expect_column_values_to_not_be_null(‘customer_id’),
crm_df.expect_column_values_to_be_between(‘lifetime_value’, min_value=0),
crm_df.expect_column_values_to_be_in_set(
‘acquisition_channel’,
[‘Google’, ‘Facebook’, ‘Instagram’, ‘Organic’, ‘Referral’, ‘Other’]
)
]
crm_passed = all(e[‘success’] for e in crm_expectations)
validation_results[‘crm’] = {
‘passed’: crm_passed,
‘expectations’: len(crm_expectations),
‘failed’: sum(1 for e in crm_expectations if not e[‘success’])
}
if not crm_passed:
all_passed = False
print(“⚠️ 顧客データに品質問題があります”)
# ===== 売上データの検証 =====
print(“[Task4] 売上データを検証中…”)
sales_df = PandasDataset(pd.read_csv(sales_file))
sales_expectations = [
sales_df.expect_column_to_exist(‘order_id’),
sales_df.expect_column_to_exist(‘customer_id’),
sales_df.expect_column_to_exist(‘revenue’),
sales_df.expect_column_values_to_be_unique(‘order_id’),
sales_df.expect_column_values_to_be_between(‘revenue’, min_value=0, max_value=10000000),
sales_df.expect_column_values_to_not_be_null(‘customer_id’),
# 売上の95%が100万円以下
sales_df.expect_column_quantile_values_to_be_between(
‘revenue’,
quantile_ranges={‘quantiles’: [0.95], ‘value_ranges’: [[0, 1000000]]}
)
]
sales_passed = all(e[‘success’] for e in sales_expectations)
validation_results[‘sales’] = {
‘passed’: sales_passed,
‘expectations’: len(sales_expectations),
‘failed’: sum(1 for e in sales_expectations if not e[‘success’])
}
if not sales_passed:
all_passed = False
print(“⚠️ 売上データに品質問題があります”)
# 結果をXComに保存
context[‘ti’].xcom_push(key=’validation_results’, value=validation_results)
context[‘ti’].xcom_push(key=’all_passed’, value=all_passed)
# サマリー出力
print(“\n” + “=”*50)
print(“【Great Expectations 検証結果】”)
print(“=”*50)
for source, result in validation_results.items():
status = “✅ PASS” if result[‘passed’] else “❌ FAIL”
print(f”{source}: {status} ({result[‘expectations’] – result[‘failed’]}/{result[‘expectations’]})”)
print(“=”*50)
if not all_passed:
# 警告を送信するが、処理は続行
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
hook = SlackWebhookHook(slack_webhook_conn_id=’slack_webhook’)
hook.send_text(f”:warning: データ品質チェックで問題が検出されました\n{validation_results}”)
print(“✅ Great Expectations検証完了”)
return validation_results
【Great Expectations 検証結果】
==================================================
ads: ✅ PASS (9/9)
crm: ✅ PASS (5/5)
sales: ✅ PASS (7/7)
==================================================
✅ Great Expectations検証完了
🎯 Great Expectationsを使うメリット
- 宣言的:「何を検証するか」を明確に記述
- 再利用可能:同じExpectationsを複数のパイプラインで使える
- ドキュメント生成:検証ルールが自動でドキュメント化
- 本番レベル:多くの企業で採用されている標準ツール
🔄 6. Task 5-6: データ変換とKPI計算
6-1. Task 5: データ統合
# ===== Task 5: データ統合 =====
def merge_marketing_data(**context):
“””3つのデータソースを統合”””
ti = context[‘ti’]
execution_date = context[‘ds’]
ads_file = ti.xcom_pull(task_ids=’extract_ads_from_api’, key=’ads_file’)
crm_file = ti.xcom_pull(task_ids=’extract_customers_from_crm’, key=’crm_file’)
sales_file = ti.xcom_pull(task_ids=’extract_sales_from_csv’, key=’sales_file’)
ads_df = pd.read_csv(ads_file)
crm_df = pd.read_csv(crm_file)
sales_df = pd.read_csv(sales_file)
print(f”[Task5] データ統合: 広告={len(ads_df)}, 顧客={len(crm_df)}, 売上={len(sales_df)}”)
# 顧客と売上を結合
customer_sales = sales_df.groupby(‘customer_id’).agg({
‘revenue’: ‘sum’,
‘order_id’: ‘count’
}).reset_index()
customer_sales.columns = [‘customer_id’, ‘period_revenue’, ‘period_orders’]
crm_enriched = crm_df.merge(customer_sales, on=’customer_id’, how=’left’)
crm_enriched[‘period_revenue’] = crm_enriched[‘period_revenue’].fillna(0)
crm_enriched[‘period_orders’] = crm_enriched[‘period_orders’].fillna(0)
# 獲得チャネル別に顧客を集計
channel_customers = crm_enriched.groupby(‘acquisition_channel’).agg({
‘customer_id’: ‘count’,
‘lifetime_value’: ‘sum’,
‘period_revenue’: ‘sum’
}).reset_index()
channel_customers.columns = [‘channel’, ‘new_customers’, ‘total_ltv’, ‘period_revenue’]
# 広告データをチャネル別に集計
ads_by_channel = ads_df.groupby(‘acquisition_channel’).agg({
‘impressions’: ‘sum’,
‘clicks’: ‘sum’,
‘cost’: ‘sum’,
‘conversions’: ‘sum’
}).reset_index()
ads_by_channel.columns = [‘channel’, ‘impressions’, ‘clicks’, ‘ad_cost’, ‘conversions’]
# チャネル別統合データを作成
merged = channel_customers.merge(ads_by_channel, on=’channel’, how=’outer’)
merged = merged.fillna(0)
output_path = f’/tmp/merged_marketing_{execution_date}.csv’
merged.to_csv(output_path, index=False)
# 詳細データも保存
crm_enriched.to_csv(f’/tmp/customer_detail_{execution_date}.csv’, index=False)
print(f”✅ データ統合完了: {len(merged)}チャネル”)
context[‘ti’].xcom_push(key=’merged_file’, value=output_path)
context[‘ti’].xcom_push(key=’customer_detail_file’, value=f’/tmp/customer_detail_{execution_date}.csv’)
return output_path
6-2. Task 6: マーケティングKPI計算
# ===== Task 6: マーケティングKPI計算 =====
def calculate_marketing_kpis(**context):
“””マーケティングKPIを計算”””
ti = context[‘ti’]
execution_date = context[‘ds’]
merged_file = ti.xcom_pull(task_ids=’merge_marketing_data’, key=’merged_file’)
df = pd.read_csv(merged_file)
print(“[Task6] マーケティングKPIを計算中…”)
# KPI計算
# CAC(顧客獲得コスト)= 広告費 ÷ 新規顧客数
df[‘cac’] = df.apply(
lambda row: row[‘ad_cost’] / row[‘new_customers’] if row[‘new_customers’] > 0 else 0,
axis=1
)
# 平均LTV = 累計LTV ÷ 顧客数
df[‘avg_ltv’] = df.apply(
lambda row: row[‘total_ltv’] / row[‘new_customers’] if row[‘new_customers’] > 0 else 0,
axis=1
)
# ROAS(広告費用対効果)= 売上 ÷ 広告費 × 100
df[‘roas’] = df.apply(
lambda row: (row[‘period_revenue’] / row[‘ad_cost’] * 100) if row[‘ad_cost’] > 0 else 0,
axis=1
)
# CVR(コンバージョン率)= コンバージョン ÷ クリック × 100
df[‘cvr’] = df.apply(
lambda row: (row[‘conversions’] / row[‘clicks’] * 100) if row[‘clicks’] > 0 else 0,
axis=1
)
# CTR(クリック率)= クリック ÷ インプレッション × 100
df[‘ctr’] = df.apply(
lambda row: (row[‘clicks’] / row[‘impressions’] * 100) if row[‘impressions’] > 0 else 0,
axis=1
)
# LTV/CAC比率(3以上が健全)
df[‘ltv_cac_ratio’] = df.apply(
lambda row: row[‘avg_ltv’] / row[‘cac’] if row[‘cac’] > 0 else 0,
axis=1
)
# 全体サマリー
overall_summary = {
‘date’: execution_date,
‘total_ad_cost’: df[‘ad_cost’].sum(),
‘total_revenue’: df[‘period_revenue’].sum(),
‘total_customers’: df[‘new_customers’].sum(),
‘overall_cac’: df[‘ad_cost’].sum() / df[‘new_customers’].sum() if df[‘new_customers’].sum() > 0 else 0,
‘overall_roas’: df[‘period_revenue’].sum() / df[‘ad_cost’].sum() * 100 if df[‘ad_cost’].sum() > 0 else 0,
‘avg_cvr’: df[‘cvr’].mean(),
‘best_channel’: df.loc[df[‘roas’].idxmax(), ‘channel’] if len(df) > 0 else ‘N/A’,
‘best_channel_roas’: df[‘roas’].max()
}
# 結果を保存
kpi_path = f’/tmp/marketing_kpi_{execution_date}.csv’
df.to_csv(kpi_path, index=False)
print(“✅ KPI計算完了”)
print(f” 総広告費: ¥{overall_summary[‘total_ad_cost’]:,.0f}”)
print(f” 総売上: ¥{overall_summary[‘total_revenue’]:,.0f}”)
print(f” ROAS: {overall_summary[‘overall_roas’]:.1f}%”)
print(f” CAC: ¥{overall_summary[‘overall_cac’]:,.0f}”)
print(f” 最高ROASチャネル: {overall_summary[‘best_channel’]} ({overall_summary[‘best_channel_roas’]:.1f}%)”)
context[‘ti’].xcom_push(key=’kpi_file’, value=kpi_path)
context[‘ti’].xcom_push(key=’overall_summary’, value=overall_summary)
return overall_summary
【KPI計算結果】
✅ KPI計算完了
総広告費: ¥2,450,000
総売上: ¥8,230,000
ROAS: 336.0%
CAC: ¥4,900
最高ROASチャネル: Google (425.3%)
💾 7. Task 7: DWHへのロード
# ===== Task 7: DWHへのロード =====
def load_to_marketing_dwh(**context):
“””マーケティングデータマートにロード”””
ti = context[‘ti’]
execution_date = context[‘ds’]
kpi_file = ti.xcom_pull(task_ids=’calculate_marketing_kpis’, key=’kpi_file’)
customer_detail_file = ti.xcom_pull(task_ids=’merge_marketing_data’, key=’customer_detail_file’)
overall_summary = ti.xcom_pull(task_ids=’calculate_marketing_kpis’, key=’overall_summary’)
kpi_df = pd.read_csv(kpi_file)
customer_df = pd.read_csv(customer_detail_file)
hook = PostgresHook(postgres_conn_id=’marketing_dwh’)
engine = hook.get_sqlalchemy_engine()
print(“[Task7] DWHへのロード開始…”)
with engine.begin() as conn:
# べき等性確保(既存データ削除)
conn.execute(f”DELETE FROM mart_channel_kpi WHERE date = ‘{execution_date}'”)
conn.execute(f”DELETE FROM mart_daily_summary WHERE date = ‘{execution_date}'”)
# チャネル別KPIをロード
kpi_df[‘date’] = execution_date
kpi_df.to_sql(‘mart_channel_kpi’, conn, if_exists=’append’, index=False)
# 日次サマリーをロード
summary_df = pd.DataFrame([overall_summary])
summary_df.to_sql(‘mart_daily_summary’, conn, if_exists=’append’, index=False)
print(f”✅ DWHロード完了: {len(kpi_df)}チャネル + サマリー”)
context[‘ti’].xcom_push(key=’loaded_channels’, value=len(kpi_df))
return len(kpi_df)
📊 8. Task 8: レポート生成とS3アップロード
# ===== Task 8: レポート生成とS3アップロード =====
import boto3
from io import BytesIO
def generate_and_upload_reports(**context):
“””マーケティングレポートを生成してS3にアップロード”””
ti = context[‘ti’]
execution_date = context[‘ds’]
kpi_file = ti.xcom_pull(task_ids=’calculate_marketing_kpis’, key=’kpi_file’)
overall_summary = ti.xcom_pull(task_ids=’calculate_marketing_kpis’, key=’overall_summary’)
kpi_df = pd.read_csv(kpi_file)
print(“[Task8] レポート生成中…”)
# テキストレポート生成
report_text = f”””
====================================================
マーケティング効果分析レポート
日付: {execution_date}
====================================================
【全体サマリー】
・総広告費: ¥{overall_summary[‘total_ad_cost’]:,.0f}
・総売上: ¥{overall_summary[‘total_revenue’]:,.0f}
・ROAS: {overall_summary[‘overall_roas’]:.1f}%
・CAC: ¥{overall_summary[‘overall_cac’]:,.0f}
・新規顧客数: {overall_summary[‘total_customers’]:,.0f}人
【チャネル別パフォーマンス】
“””
# チャネル別サマリーを追加
for _, row in kpi_df.sort_values(‘roas’, ascending=False).iterrows():
report_text += f”””
{row[‘channel’]}:
広告費: ¥{row[‘ad_cost’]:,.0f}
売上: ¥{row[‘period_revenue’]:,.0f}
ROAS: {row[‘roas’]:.1f}%
CAC: ¥{row[‘cac’]:,.0f}
CVR: {row[‘cvr’]:.2f}%
LTV/CAC: {row[‘ltv_cac_ratio’]:.2f}
“””
report_text += “””
====================================================
このレポートは自動生成されました。
====================================================
“””
# ローカルに保存
report_path = f’/tmp/marketing_report_{execution_date}.txt’
with open(report_path, ‘w’, encoding=’utf-8′) as f:
f.write(report_text)
# S3にアップロード
s3_client = boto3.client(‘s3’)
bucket_name = Variable.get(‘marketing_s3_bucket’, ‘marketing-data-lake’)
year, month, day = execution_date.split(‘-‘)
uploaded_files = []
# テキストレポート
s3_key_txt = f’reports/daily/{year}/{month}/marketing_report_{execution_date}.txt’
s3_client.upload_file(report_path, bucket_name, s3_key_txt)
uploaded_files.append(s3_key_txt)
# CSVレポート
s3_key_csv = f’data/kpi/{year}/{month}/channel_kpi_{execution_date}.csv’
s3_client.upload_file(kpi_file, bucket_name, s3_key_csv)
uploaded_files.append(s3_key_csv)
print(f”✅ S3アップロード完了: {len(uploaded_files)}ファイル”)
context[‘ti’].xcom_push(key=’report_path’, value=report_path)
context[‘ti’].xcom_push(key=’s3_files’, value=uploaded_files)
context[‘ti’].xcom_push(key=’s3_bucket’, value=bucket_name)
return uploaded_files
📢 9. Task 9: Slack通知
# ===== Task 9: Slack通知 =====
def send_marketing_notification(**context):
“””マーケティングチームへ完了通知”””
ti = context[‘ti’]
execution_date = context[‘ds’]
overall_summary = ti.xcom_pull(task_ids=’calculate_marketing_kpis’, key=’overall_summary’)
validation_results = ti.xcom_pull(task_ids=’validate_with_great_expectations’, key=’validation_results’)
s3_files = ti.xcom_pull(task_ids=’generate_and_upload_reports’, key=’s3_files’)
# 品質チェック結果
quality_status = “:white_check_mark: 全データ品質OK” if all(
r[‘passed’] for r in validation_results.values()
) else “:warning: 一部のデータに品質問題あり”
# ROASの評価
roas = overall_summary[‘overall_roas’]
if roas >= 300:
roas_emoji = “:rocket:”
roas_comment = “素晴らしい!”
elif roas >= 200:
roas_emoji = “:chart_with_upwards_trend:”
roas_comment = “好調です”
elif roas >= 100:
roas_emoji = “:ok_hand:”
roas_comment = “採算ライン”
else:
roas_emoji = “:warning:”
roas_comment = “要改善”
message = f”””
:bar_chart: *マーケティングデータ統合パイプライン完了*
*実行日:* {execution_date}
━━━━━━━━━━━━━━━━━━━━━━━━
*📊 マーケティングKPI*
• 総広告費: ¥{overall_summary[‘total_ad_cost’]:,.0f}
• 総売上: ¥{overall_summary[‘total_revenue’]:,.0f}
• ROAS: {roas:.1f}% {roas_emoji} {roas_comment}
• CAC: ¥{overall_summary[‘overall_cac’]:,.0f}
• 新規顧客: {overall_summary[‘total_customers’]:,.0f}人
• 最高ROASチャネル: *{overall_summary[‘best_channel’]}* ({overall_summary[‘best_channel_roas’]:.1f}%)
*✅ データ品質*
{quality_status}
*📁 レポート*
{len(s3_files)}ファイルをS3にアップロード済み
━━━━━━━━━━━━━━━━━━━━━━━━
“””
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
hook = SlackWebhookHook(slack_webhook_conn_id=’slack_webhook’)
hook.send_text(message)
print(“✅ Slack通知送信完了”)
return message
🔗 10. 完全なDAG定義
# ===== 完全なDAGファイル: marketing_data_pipeline.py =====
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# エラー通知関数
def send_error_notification(context):
“””重大エラー時の通知”””
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
ti = context[‘task_instance’]
exception = context.get(‘exception’)
error_message = f”””
:rotating_light: *マーケティングパイプラインエラー*
*実行日:* {context[‘ds’]}
*失敗タスク:* `{ti.task_id}`
*エラー内容:* {exception}
<!channel> 対応をお願いします。
“””
hook = SlackWebhookHook(slack_webhook_conn_id=’slack_webhook’)
hook.send_text(error_message)
# デフォルト設定
default_args = {
‘owner’: ‘marketing_analytics_team’,
‘retries’: 3,
‘retry_delay’: timedelta(minutes=5),
‘retry_exponential_backoff’: True,
‘execution_timeout’: timedelta(hours=2),
‘on_failure_callback’: send_error_notification
}
# DAG定義
with DAG(
dag_id=’marketing_data_integration_pipeline’,
default_args=default_args,
description=’マーケティングデータ統合パイプライン(最終プロジェクト)’,
schedule_interval=’0 7 * * *’, # 毎日朝7時
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=[‘production’, ‘marketing’, ‘kpi’, ‘final-project’]
) as dag:
# Extract Tasks (並列)
task1 = PythonOperator(
task_id=’extract_ads_from_api’,
python_callable=extract_ads_from_api,
execution_timeout=timedelta(minutes=15)
)
task2 = PythonOperator(
task_id=’extract_customers_from_crm’,
python_callable=extract_customers_from_crm,
execution_timeout=timedelta(minutes=10)
)
task3 = PythonOperator(
task_id=’extract_sales_from_csv’,
python_callable=extract_sales_from_csv,
execution_timeout=timedelta(minutes=10)
)
# Great Expectations検証
task4 = PythonOperator(
task_id=’validate_with_great_expectations’,
python_callable=validate_with_great_expectations,
execution_timeout=timedelta(minutes=15)
)
# Transform Tasks
task5 = PythonOperator(
task_id=’merge_marketing_data’,
python_callable=merge_marketing_data,
execution_timeout=timedelta(minutes=20)
)
task6 = PythonOperator(
task_id=’calculate_marketing_kpis’,
python_callable=calculate_marketing_kpis,
execution_timeout=timedelta(minutes=10)
)
# Load Tasks (並列)
task7 = PythonOperator(
task_id=’load_to_marketing_dwh’,
python_callable=load_to_marketing_dwh,
execution_timeout=timedelta(minutes=15)
)
task8 = PythonOperator(
task_id=’generate_and_upload_reports’,
python_callable=generate_and_upload_reports,
execution_timeout=timedelta(minutes=15)
)
# Notification Task
task9 = PythonOperator(
task_id=’send_marketing_notification’,
python_callable=send_marketing_notification
)
# タスク依存関係
[task1, task2, task3] >> task4 >> task5 >> task6 >> [task7, task8] >> task9
タスク依存関係図
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Task 1 │ │ Task 2 │ │ Task 3 │
│ 広告API抽出 │ │ CRM抽出 │ │ 営業CSV読込 │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
└──────────────────┼──────────────────┘
│
▼
┌───────────────┐
│ Task 4 │
│ Great Expectations │
│ 品質チェック │
└───────┬───────┘
│
▼
┌───────────────┐
│ Task 5 │
│ データ統合 │
└───────┬───────┘
│
▼
┌───────────────┐
│ Task 6 │
│ KPI計算 │
└───────┬───────┘
│
┌───────────┴───────────┐
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ Task 7 │ │ Task 8 │
│ DWHロード │ │ レポート生成 │
└───────┬───────┘ └───────┬───────┘
│ │
└───────────┬───────────┘
│
▼
┌───────────────┐
│ Task 9 │
│ Slack通知 │
└───────────────┘
🎓 11. 学習のポイントと振り返り
11-1. STEP 25との違いのまとめ
STEP 25 vs STEP 26 比較
| 観点 |
STEP 25(売上集計) |
STEP 26(マーケティング分析) |
| 業務領域 |
EC運営(売上管理) |
マーケティング(効果測定) |
| データソース |
社内DB中心 |
外部API + 社内DB + CSV |
| 品質チェック |
手動実装 |
Great Expectations(STEP 12活用) |
| 分析指標 |
売上、利益、利益率 |
CAC、LTV、ROAS、CVR |
| 学習ポイント |
基本的なETLパイプライン |
高度な分析 + 品質管理 |
11-2. このプロジェクトで習得したスキル
1️⃣ API連携
- OAuth認証
- レート制限対策
- ページネーション処理
2️⃣ Great Expectations
- 宣言的な品質チェック
- 複数データソースの検証
- 本番レベルの品質管理
3️⃣ マーケティング分析
- CAC、LTV、ROAS計算
- チャネル別分析
- KPIダッシュボード用データ
4️⃣ データマート構築
- 分析用テーブル設計
- 集計データの格納
- BIツール連携
5️⃣ レポート自動化
- テキストレポート生成
- S3へのアップロード
- Slack通知(KPI付き)
🎉 コース完走おめでとうございます!
🏆 ETL・データパイプライン構築コース修了
🎊 おめでとうございます!
あなたはETL・データパイプライン構築コースの全26ステップを完了しました!
これで、あなたは以下のスキルを習得しました:
- ✅ Part 1:ETLの基礎、DB接続、API連携、ファイル処理
- ✅ Part 2:データクリーニング、変換、結合、集計、検証
- ✅ Part 3:DBロード、ファイル出力、クラウド連携、エラー処理
- ✅ Part 4:Apache Airflow、DAG、スケジュール、監視
- ✅ Part 5:実践プロジェクト(EC売上 + マーケティング分析)
🚀 次のステップ
1. 実務で活用する
学んだ知識を実際の業務に適用してみましょう。小さなパイプラインから始めて、徐々に複雑化していきます。
2. さらなる学習
- dbt:データ変換をSQLで管理
- Apache Spark:大規模データ処理
- Snowflake / BigQuery:クラウドDWH
- AWS Glue / Google Cloud Dataflow:マネージドETL
💪 自信を持ってください!
あなたは今、データエンジニアとして市場価値の高いスキルセットを持っています。
このコースで学んだことは、実務で必ず役立ちます。
データパイプライン構築の旅は、ここから始まります!
頑張ってください!🚀