STEP 16:サブクエリとCTE

📑 STEP 16: サブクエリとCTE

複雑なクエリをわかりやすく!WITH句とサブクエリで読みやすいSQLを書こう

📋 このステップで学ぶこと

  • サンプルデータの準備
  • サブクエリの基本(WHERE句、FROM句)
  • CTE(Common Table Expression / WITH句)の使い方
  • 複数のCTEを組み合わせた階層的な集計

📁 0. サンプルデータの準備

このステップでは、サブクエリとCTE(WITH句)を使って複雑なクエリを読みやすく整理する方法を学びます。
まず、SparkSessionを初期化し、サンプルデータを準備しましょう。

0-1. SparkSessionの初期化

# SparkSessionの初期化

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# SparkSessionを作成
spark = SparkSession.builder \
    .appName("Subquery and CTE") \
    .getOrCreate()

print("SparkSession準備完了")
SparkSession準備完了

0-2. 社員データの作成

# 社員データを作成

# データ:(部門, 名前, 給与)
employees_data = [
    ("営業部", "田中", 300000),
    ("営業部", "鈴木", 350000),
    ("営業部", "佐藤", 320000),
    ("技術部", "山田", 400000),
    ("技術部", "伊藤", 450000),
    ("管理部", "渡辺", 380000)
]

df_employees = spark.createDataFrame(employees_data, ["department", "name", "salary"])
df_employees.createOrReplaceTempView("employees")

print("社員データ:")
df_employees.show()
社員データ:
+----------+----+------+
|department|name|salary|
+----------+----+------+
|    営業部|田中|300000|
|    営業部|鈴木|350000|
|    営業部|佐藤|320000|
|    技術部|山田|400000|
|    技術部|伊藤|450000|
|    管理部|渡辺|380000|
+----------+----+------+

0-3. 顧客・注文データの作成

# 顧客データ
customers_data = [
    ("C001", "田中太郎", "東京"),
    ("C002", "鈴木花子", "大阪"),
    ("C003", "佐藤次郎", "名古屋"),
    ("C004", "山田美咲", "福岡")  # 注文なし
]
df_customers = spark.createDataFrame(customers_data, ["customer_id", "name", "region"])
df_customers.createOrReplaceTempView("customers")

# 注文データ
orders_data = [
    ("O001", "C001", 10000),
    ("O002", "C002", 15000),
    ("O003", "C001", 20000)
]
df_orders = spark.createDataFrame(orders_data, ["order_id", "customer_id", "amount"])
df_orders.createOrReplaceTempView("orders")

print("顧客データ:")
df_customers.show()
print("注文データ:")
df_orders.show()
顧客データ:
+-----------+--------+------+
|customer_id|    name|region|
+-----------+--------+------+
|       C001|田中太郎|  東京|
|       C002|鈴木花子|  大阪|
|       C003|佐藤次郎|名古屋|
|       C004|山田美咲|  福岡|
+-----------+--------+------+

注文データ:
+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|    O001|       C001| 10000|
|    O002|       C002| 15000|
|    O003|       C001| 20000|
+--------+-----------+------+

0-4. 売上データの作成

# 月別店舗売上データ
sales_data = [
    ("2024-01", "店舗A", 1000000),
    ("2024-01", "店舗B", 1500000),
    ("2024-02", "店舗A", 1200000),
    ("2024-02", "店舗B", 1400000),
    ("2024-03", "店舗A", 1100000),
    ("2024-03", "店舗B", 1600000)
]
df_sales = spark.createDataFrame(sales_data, ["month", "store", "sales"])
df_sales.createOrReplaceTempView("sales")

print("売上データ:")
df_sales.show()
売上データ:
+-------+-----+-------+
|  month|store|  sales|
+-------+-----+-------+
|2024-01|店舗A|1000000|
|2024-01|店舗B|1500000|
|2024-02|店舗A|1200000|
|2024-02|店舗B|1400000|
|2024-03|店舗A|1100000|
|2024-03|店舗B|1600000|
+-------+-----+-------+

0-5. 全サンプルデータを一括作成するコード

# ========================================
# STEP 16 サンプルデータ一括作成スクリプト
# ========================================

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("Subquery and CTE").getOrCreate()

# 1. 社員データ
employees_data = [
    ("営業部", "田中", 300000), ("営業部", "鈴木", 350000), ("営業部", "佐藤", 320000),
    ("技術部", "山田", 400000), ("技術部", "伊藤", 450000), ("管理部", "渡辺", 380000)
]
df_employees = spark.createDataFrame(employees_data, ["department", "name", "salary"])
df_employees.createOrReplaceTempView("employees")

# 2. 顧客データ
customers_data = [
    ("C001", "田中太郎", "東京"), ("C002", "鈴木花子", "大阪"),
    ("C003", "佐藤次郎", "名古屋"), ("C004", "山田美咲", "福岡")
]
df_customers = spark.createDataFrame(customers_data, ["customer_id", "name", "region"])
df_customers.createOrReplaceTempView("customers")

# 3. 注文データ
orders_data = [("O001", "C001", 10000), ("O002", "C002", 15000), ("O003", "C001", 20000)]
df_orders = spark.createDataFrame(orders_data, ["order_id", "customer_id", "amount"])
df_orders.createOrReplaceTempView("orders")

# 4. 売上データ
sales_data = [
    ("2024-01", "店舗A", 1000000), ("2024-01", "店舗B", 1500000),
    ("2024-02", "店舗A", 1200000), ("2024-02", "店舗B", 1400000),
    ("2024-03", "店舗A", 1100000), ("2024-03", "店舗B", 1600000)
]
df_sales = spark.createDataFrame(sales_data, ["month", "store", "sales"])
df_sales.createOrReplaceTempView("sales")

print("✅ サンプルデータを作成しました")
print("  - employees: 社員データ")
print("  - customers: 顧客データ")
print("  - orders: 注文データ")
print("  - sales: 売上データ")
✅ サンプルデータを作成しました
  - employees: 社員データ
  - customers: 顧客データ
  - orders: 注文データ
  - sales: 売上データ

🔍 1. サブクエリの基本

1-1. サブクエリとは?

サブクエリとは、SQL文の中に入れ子になったSELECT文のことです。
複雑な条件や計算を段階的に処理できます。

🎯 サブクエリの例

「平均給与より高い社員を探す」場合:

SELECT name, salary
FROM employees
WHERE salary > (SELECT AVG(salary) FROM employees)
         ↑ これがサブクエリ

1-2. WHERE句でのサブクエリ(平均との比較)

# 平均給与より高い社員を抽出

print("【平均給与より高い社員】")
spark.sql("""
    SELECT name, salary
    FROM employees
    WHERE salary > (SELECT AVG(salary) FROM employees)
    ORDER BY salary DESC
""").show()
【平均給与より高い社員】
+----+------+
|name|salary|
+----+------+
|伊藤|450000|
|山田|400000|
|渡辺|380000|
+----+------+
💡 解説

平均給与は (300000+350000+320000+400000+450000+380000) / 6 = 366,667円
この金額より高い伊藤、山田、渡辺の3名が抽出されます。

1-3. IN句でのサブクエリ

# 注文履歴がある顧客のみ抽出

print("【注文履歴がある顧客】")
spark.sql("""
    SELECT customer_id, name, region
    FROM customers
    WHERE customer_id IN (
        SELECT DISTINCT customer_id FROM orders
    )
""").show()
【注文履歴がある顧客】
+-----------+--------+------+
|customer_id|    name|region|
+-----------+--------+------+
|       C001|田中太郎|  東京|
|       C002|鈴木花子|  大阪|
+-----------+--------+------+

1-4. NOT INで注文がない顧客を抽出

# 注文履歴がない顧客を抽出

print("【注文履歴がない顧客】")
spark.sql("""
    SELECT customer_id, name, region
    FROM customers
    WHERE customer_id NOT IN (
        SELECT DISTINCT customer_id FROM orders
    )
""").show()
【注文履歴がない顧客】
+-----------+--------+------+
|customer_id|    name|region|
+-----------+--------+------+
|       C003|佐藤次郎|名古屋|
|       C004|山田美咲|  福岡|
+-----------+--------+------+

📊 2. FROM句でのサブクエリ

2-1. FROM句でのサブクエリとは?

FROM句でサブクエリを使うと、集計結果を1つのテーブルのように扱えるため、
さらに集計やフィルタリングができます。

2-2. 部門別平均を計算してフィルタリング

# 部門別平均給与を計算し、35万円以上の部門を抽出

print("【部門別平均給与(35万円以上)】")
spark.sql("""
    SELECT 
        department,
        avg_salary
    FROM (
        SELECT 
            department,
            AVG(salary) AS avg_salary
        FROM employees
        GROUP BY department
    ) dept_avg
    WHERE avg_salary >= 350000
    ORDER BY avg_salary DESC
""").show()
【部門別平均給与(35万円以上)】
+----------+----------+
|department|avg_salary|
+----------+----------+
|    技術部|  425000.0|
|    管理部|  380000.0|
+----------+----------+
🎯 FROM句サブクエリのポイント

・サブクエリには必ず別名(alias)を付ける(例:dept_avg
・集計結果をさらにフィルタリングできる
2段階の処理が必要な場合に便利

📋 3. CTE(WITH句)の基本

3-1. CTEとは?

CTE(Common Table Expression)は、WITH句を使って名前付きの一時結果を定義する機能です。
サブクエリより読みやすく、再利用しやすいのが特徴です。

🎯 サブクエリ vs CTE

❌ サブクエリ(読みにくい):

SELECT * FROM (
  SELECT department, AVG(salary) AS avg_sal
  FROM employees
  GROUP BY department
) dept_avg WHERE avg_sal >= 350000

✅ CTE(読みやすい):

WITH dept_avg AS (
  SELECT department, AVG(salary) AS avg_sal
  FROM employees
  GROUP BY department
)
SELECT * FROM dept_avg WHERE avg_sal >= 350000

3-2. 基本的なCTEの使い方

# CTEで部門別平均給与を計算

print("【CTEで部門別平均給与】")
spark.sql("""
    WITH dept_avg AS (
        SELECT 
            department,
            AVG(salary) AS avg_salary,
            COUNT(*) AS emp_count
        FROM employees
        GROUP BY department
    )
    SELECT 
        department,
        avg_salary,
        emp_count
    FROM dept_avg
    WHERE avg_salary >= 350000
    ORDER BY avg_salary DESC
""").show()
【CTEで部門別平均給与】
+----------+----------+---------+
|department|avg_salary|emp_count|
+----------+----------+---------+
|    技術部|  425000.0|        2|
|    管理部|  380000.0|        1|
+----------+----------+---------+
💡 CTEのメリット

可読性向上:複雑なクエリを段階的に理解できる
再利用可能:同じ集計を複数回参照できる
デバッグしやすい:各CTEを個別に確認できる

🔢 4. 複数CTEの組み合わせ

4-1. 複数のCTEを定義する

CTEはカンマ区切りで複数定義できます。
段階的な処理を読みやすく構造化できます。

# 複数CTEで月別・店舗別の集計

print("【複数CTEの組み合わせ】")
spark.sql("""
    WITH 
        -- 月別合計
        monthly_total AS (
            SELECT 
                month,
                SUM(sales) AS total_sales
            FROM sales
            GROUP BY month
        ),
        -- 店舗別合計
        store_total AS (
            SELECT 
                store,
                SUM(sales) AS total_sales
            FROM sales
            GROUP BY store
        ),
        -- 全体合計
        grand_total AS (
            SELECT SUM(sales) AS overall_total
            FROM sales
        )
    SELECT 
        'Monthly' AS type,
        month AS name,
        total_sales,
        ROUND(total_sales / overall_total * 100, 1) AS share_pct
    FROM monthly_total
    CROSS JOIN grand_total
    
    UNION ALL
    
    SELECT 
        'Store' AS type,
        store AS name,
        total_sales,
        ROUND(total_sales / overall_total * 100, 1) AS share_pct
    FROM store_total
    CROSS JOIN grand_total
    
    ORDER BY type, name
""").show()
【複数CTEの組み合わせ】
+-------+-------+-----------+---------+
|   type|   name|total_sales|share_pct|
+-------+-------+-----------+---------+
|Monthly|2024-01|    2500000|     31.3|
|Monthly|2024-02|    2600000|     32.5|
|Monthly|2024-03|    2700000|     33.8|
|  Store|  店舗A|    3300000|     41.3|
|  Store|  店舗B|    4500000|     56.3|
+-------+-------+-----------+---------+

4-2. CTEで階層的な集計

# 段階的にセグメント分類

print("【顧客セグメンテーション】")
spark.sql("""
    WITH 
        -- ステップ1:顧客別の集計
        customer_summary AS (
            SELECT 
                customer_id,
                COUNT(*) AS order_count,
                SUM(amount) AS total_amount
            FROM orders
            GROUP BY customer_id
        ),
        -- ステップ2:全体統計
        overall_stats AS (
            SELECT 
                AVG(total_amount) AS avg_customer_value
            FROM customer_summary
        ),
        -- ステップ3:セグメント分類
        customer_segments AS (
            SELECT 
                cs.*,
                os.avg_customer_value,
                CASE 
                    WHEN cs.total_amount >= os.avg_customer_value * 1.5 THEN 'VIP'
                    WHEN cs.total_amount >= os.avg_customer_value THEN '優良'
                    ELSE '一般'
                END AS segment
            FROM customer_summary cs
            CROSS JOIN overall_stats os
        )
    SELECT 
        customer_id,
        order_count,
        total_amount,
        segment
    FROM customer_segments
    ORDER BY total_amount DESC
""").show()
【顧客セグメンテーション】
+-----------+-----------+------------+-------+
|customer_id|order_count|total_amount|segment|
+-----------+-----------+------------+-------+
|       C001|          2|       30000|    VIP|
|       C002|          1|       15000|   一般|
+-----------+-----------+------------+-------+
🎯 複数CTEのポイント

・CTEはカンマで区切って複数定義
後のCTEから前のCTEを参照できる
コメントで各CTEの役割を明記すると読みやすい

📝 練習問題

問題 1 基礎

WHERE句のサブクエリ

平均給与より高い社員の名前と給与を表示してください。

【解答】
spark.sql("""
    SELECT name, salary
    FROM employees
    WHERE salary > (SELECT AVG(salary) FROM employees)
""").show()
+----+------+
|name|salary|
+----+------+
|山田|400000|
|伊藤|450000|
|渡辺|380000|
+----+------+
問題 2 基礎

IN句のサブクエリ

注文があった顧客のみを表示してください(IN句使用)。

【解答】
spark.sql("""
    SELECT *
    FROM customers
    WHERE customer_id IN (
        SELECT DISTINCT customer_id FROM orders
    )
""").show()
+-----------+--------+------+
|customer_id|    name|region|
+-----------+--------+------+
|       C001|田中太郎|  東京|
|       C002|鈴木花子|  大阪|
+-----------+--------+------+
問題 3 応用

FROM句のサブクエリ

部門別平均給与を計算し、35万円以上の部門のみ表示してください。

【解答】
spark.sql("""
    SELECT department, avg_salary
    FROM (
        SELECT 
            department,
            AVG(salary) AS avg_salary
        FROM employees
        GROUP BY department
    ) dept_avg
    WHERE avg_salary >= 350000
""").show()
+----------+----------+
|department|avg_salary|
+----------+----------+
|    技術部|  425000.0|
|    管理部|  380000.0|
+----------+----------+
問題 4 応用

CTEで同じ問題を解く

CTEを使って、問題3と同じ結果を得てください。

【解答】
spark.sql("""
    WITH dept_avg AS (
        SELECT 
            department,
            AVG(salary) AS avg_salary
        FROM employees
        GROUP BY department
    )
    SELECT department, avg_salary
    FROM dept_avg
    WHERE avg_salary >= 350000
""").show()
+----------+----------+
|department|avg_salary|
+----------+----------+
|    技術部|  425000.0|
|    管理部|  380000.0|
+----------+----------+

❓ よくある質問

Q1: サブクエリとCTEはどちらを使うべき?
基本的にCTEを推奨します。読みやすく、メンテナンスしやすいためです。ただし、単純な1回限りの条件ならサブクエリでもOKです。
Q2: CTEはパフォーマンスに影響しますか?
いいえ。CTEはサブクエリと同じように最適化されるため、パフォーマンスは同等です。可読性のためにCTEを使っても問題ありません。
Q3: CTEは複数回参照できますか?
はい、できます。1つのCTEを複数回参照することで、同じ集計を繰り返さずに済みます。
Q4: FROM句のサブクエリに別名は必要?
はい、必須です。FROM句でサブクエリを使う場合、必ず別名(alias)を付ける必要があります。

📝 STEP 16 のまとめ

✅ このステップで学んだこと

WHERE句サブクエリ:平均との比較、IN/NOT INで存在チェック
FROM句サブクエリ:集計結果をさらにフィルタリング
CTE(WITH句):読みやすい複雑なクエリの書き方
複数CTE:段階的な処理の組み立て

💡 重要ポイント

・サブクエリよりCTEが読みやすい
・複雑なクエリは段階的に構築する
・FROM句サブクエリには必ず別名を付ける
・CTEはパフォーマンスに影響しない

🎯 次のステップの予告

次のSTEP 17では、「UDF(ユーザー定義関数)」を学びます。
独自の関数を作って、Sparkの機能を拡張しましょう!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 16

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE