📑 STEP 16: サブクエリとCTE
複雑なクエリをわかりやすく!WITH句とサブクエリで読みやすいSQLを書こう
📋 このステップで学ぶこと
- サンプルデータの準備
- サブクエリの基本(WHERE句、FROM句)
- CTE(Common Table Expression / WITH句)の使い方
- 複数のCTEを組み合わせた階層的な集計
📁 0. サンプルデータの準備
このステップでは、サブクエリとCTE(WITH句)を使って複雑なクエリを読みやすく整理する方法を学びます。
まず、SparkSessionを初期化し、サンプルデータを準備しましょう。
0-1. SparkSessionの初期化
# SparkSessionの初期化 from pyspark.sql import SparkSession from pyspark.sql import functions as F # SparkSessionを作成 spark = SparkSession.builder \ .appName("Subquery and CTE") \ .getOrCreate() print("SparkSession準備完了")
SparkSession準備完了
0-2. 社員データの作成
# 社員データを作成 # データ:(部門, 名前, 給与) employees_data = [ ("営業部", "田中", 300000), ("営業部", "鈴木", 350000), ("営業部", "佐藤", 320000), ("技術部", "山田", 400000), ("技術部", "伊藤", 450000), ("管理部", "渡辺", 380000) ] df_employees = spark.createDataFrame(employees_data, ["department", "name", "salary"]) df_employees.createOrReplaceTempView("employees") print("社員データ:") df_employees.show()
社員データ: +----------+----+------+ |department|name|salary| +----------+----+------+ | 営業部|田中|300000| | 営業部|鈴木|350000| | 営業部|佐藤|320000| | 技術部|山田|400000| | 技術部|伊藤|450000| | 管理部|渡辺|380000| +----------+----+------+
0-3. 顧客・注文データの作成
# 顧客データ customers_data = [ ("C001", "田中太郎", "東京"), ("C002", "鈴木花子", "大阪"), ("C003", "佐藤次郎", "名古屋"), ("C004", "山田美咲", "福岡") # 注文なし ] df_customers = spark.createDataFrame(customers_data, ["customer_id", "name", "region"]) df_customers.createOrReplaceTempView("customers") # 注文データ orders_data = [ ("O001", "C001", 10000), ("O002", "C002", 15000), ("O003", "C001", 20000) ] df_orders = spark.createDataFrame(orders_data, ["order_id", "customer_id", "amount"]) df_orders.createOrReplaceTempView("orders") print("顧客データ:") df_customers.show() print("注文データ:") df_orders.show()
顧客データ: +-----------+--------+------+ |customer_id| name|region| +-----------+--------+------+ | C001|田中太郎| 東京| | C002|鈴木花子| 大阪| | C003|佐藤次郎|名古屋| | C004|山田美咲| 福岡| +-----------+--------+------+ 注文データ: +--------+-----------+------+ |order_id|customer_id|amount| +--------+-----------+------+ | O001| C001| 10000| | O002| C002| 15000| | O003| C001| 20000| +--------+-----------+------+
0-4. 売上データの作成
# 月別店舗売上データ sales_data = [ ("2024-01", "店舗A", 1000000), ("2024-01", "店舗B", 1500000), ("2024-02", "店舗A", 1200000), ("2024-02", "店舗B", 1400000), ("2024-03", "店舗A", 1100000), ("2024-03", "店舗B", 1600000) ] df_sales = spark.createDataFrame(sales_data, ["month", "store", "sales"]) df_sales.createOrReplaceTempView("sales") print("売上データ:") df_sales.show()
売上データ: +-------+-----+-------+ | month|store| sales| +-------+-----+-------+ |2024-01|店舗A|1000000| |2024-01|店舗B|1500000| |2024-02|店舗A|1200000| |2024-02|店舗B|1400000| |2024-03|店舗A|1100000| |2024-03|店舗B|1600000| +-------+-----+-------+
0-5. 全サンプルデータを一括作成するコード
# ======================================== # STEP 16 サンプルデータ一括作成スクリプト # ======================================== from pyspark.sql import SparkSession from pyspark.sql import functions as F spark = SparkSession.builder.appName("Subquery and CTE").getOrCreate() # 1. 社員データ employees_data = [ ("営業部", "田中", 300000), ("営業部", "鈴木", 350000), ("営業部", "佐藤", 320000), ("技術部", "山田", 400000), ("技術部", "伊藤", 450000), ("管理部", "渡辺", 380000) ] df_employees = spark.createDataFrame(employees_data, ["department", "name", "salary"]) df_employees.createOrReplaceTempView("employees") # 2. 顧客データ customers_data = [ ("C001", "田中太郎", "東京"), ("C002", "鈴木花子", "大阪"), ("C003", "佐藤次郎", "名古屋"), ("C004", "山田美咲", "福岡") ] df_customers = spark.createDataFrame(customers_data, ["customer_id", "name", "region"]) df_customers.createOrReplaceTempView("customers") # 3. 注文データ orders_data = [("O001", "C001", 10000), ("O002", "C002", 15000), ("O003", "C001", 20000)] df_orders = spark.createDataFrame(orders_data, ["order_id", "customer_id", "amount"]) df_orders.createOrReplaceTempView("orders") # 4. 売上データ sales_data = [ ("2024-01", "店舗A", 1000000), ("2024-01", "店舗B", 1500000), ("2024-02", "店舗A", 1200000), ("2024-02", "店舗B", 1400000), ("2024-03", "店舗A", 1100000), ("2024-03", "店舗B", 1600000) ] df_sales = spark.createDataFrame(sales_data, ["month", "store", "sales"]) df_sales.createOrReplaceTempView("sales") print("✅ サンプルデータを作成しました") print(" - employees: 社員データ") print(" - customers: 顧客データ") print(" - orders: 注文データ") print(" - sales: 売上データ")
✅ サンプルデータを作成しました - employees: 社員データ - customers: 顧客データ - orders: 注文データ - sales: 売上データ
🔍 1. サブクエリの基本
1-1. サブクエリとは?
サブクエリとは、SQL文の中に入れ子になったSELECT文のことです。
複雑な条件や計算を段階的に処理できます。
「平均給与より高い社員を探す」場合:
SELECT name, salary
FROM employees
WHERE salary > (SELECT AVG(salary) FROM employees)
↑ これがサブクエリ
1-2. WHERE句でのサブクエリ(平均との比較)
# 平均給与より高い社員を抽出 print("【平均給与より高い社員】") spark.sql(""" SELECT name, salary FROM employees WHERE salary > (SELECT AVG(salary) FROM employees) ORDER BY salary DESC """).show()
【平均給与より高い社員】 +----+------+ |name|salary| +----+------+ |伊藤|450000| |山田|400000| |渡辺|380000| +----+------+
平均給与は (300000+350000+320000+400000+450000+380000) / 6 = 366,667円
この金額より高い伊藤、山田、渡辺の3名が抽出されます。
1-3. IN句でのサブクエリ
# 注文履歴がある顧客のみ抽出 print("【注文履歴がある顧客】") spark.sql(""" SELECT customer_id, name, region FROM customers WHERE customer_id IN ( SELECT DISTINCT customer_id FROM orders ) """).show()
【注文履歴がある顧客】 +-----------+--------+------+ |customer_id| name|region| +-----------+--------+------+ | C001|田中太郎| 東京| | C002|鈴木花子| 大阪| +-----------+--------+------+
1-4. NOT INで注文がない顧客を抽出
# 注文履歴がない顧客を抽出 print("【注文履歴がない顧客】") spark.sql(""" SELECT customer_id, name, region FROM customers WHERE customer_id NOT IN ( SELECT DISTINCT customer_id FROM orders ) """).show()
【注文履歴がない顧客】 +-----------+--------+------+ |customer_id| name|region| +-----------+--------+------+ | C003|佐藤次郎|名古屋| | C004|山田美咲| 福岡| +-----------+--------+------+
📊 2. FROM句でのサブクエリ
2-1. FROM句でのサブクエリとは?
FROM句でサブクエリを使うと、集計結果を1つのテーブルのように扱えるため、
さらに集計やフィルタリングができます。
2-2. 部門別平均を計算してフィルタリング
# 部門別平均給与を計算し、35万円以上の部門を抽出 print("【部門別平均給与(35万円以上)】") spark.sql(""" SELECT department, avg_salary FROM ( SELECT department, AVG(salary) AS avg_salary FROM employees GROUP BY department ) dept_avg WHERE avg_salary >= 350000 ORDER BY avg_salary DESC """).show()
【部門別平均給与(35万円以上)】 +----------+----------+ |department|avg_salary| +----------+----------+ | 技術部| 425000.0| | 管理部| 380000.0| +----------+----------+
・サブクエリには必ず別名(alias)を付ける(例:dept_avg)
・集計結果をさらにフィルタリングできる
・2段階の処理が必要な場合に便利
📋 3. CTE(WITH句)の基本
3-1. CTEとは?
CTE(Common Table Expression)は、WITH句を使って名前付きの一時結果を定義する機能です。
サブクエリより読みやすく、再利用しやすいのが特徴です。
❌ サブクエリ(読みにくい):
SELECT * FROM (
SELECT department, AVG(salary) AS avg_sal
FROM employees
GROUP BY department
) dept_avg WHERE avg_sal >= 350000
✅ CTE(読みやすい):
WITH dept_avg AS (
SELECT department, AVG(salary) AS avg_sal
FROM employees
GROUP BY department
)
SELECT * FROM dept_avg WHERE avg_sal >= 350000
3-2. 基本的なCTEの使い方
# CTEで部門別平均給与を計算 print("【CTEで部門別平均給与】") spark.sql(""" WITH dept_avg AS ( SELECT department, AVG(salary) AS avg_salary, COUNT(*) AS emp_count FROM employees GROUP BY department ) SELECT department, avg_salary, emp_count FROM dept_avg WHERE avg_salary >= 350000 ORDER BY avg_salary DESC """).show()
【CTEで部門別平均給与】 +----------+----------+---------+ |department|avg_salary|emp_count| +----------+----------+---------+ | 技術部| 425000.0| 2| | 管理部| 380000.0| 1| +----------+----------+---------+
・可読性向上:複雑なクエリを段階的に理解できる
・再利用可能:同じ集計を複数回参照できる
・デバッグしやすい:各CTEを個別に確認できる
🔢 4. 複数CTEの組み合わせ
4-1. 複数のCTEを定義する
CTEはカンマ区切りで複数定義できます。
段階的な処理を読みやすく構造化できます。
# 複数CTEで月別・店舗別の集計 print("【複数CTEの組み合わせ】") spark.sql(""" WITH -- 月別合計 monthly_total AS ( SELECT month, SUM(sales) AS total_sales FROM sales GROUP BY month ), -- 店舗別合計 store_total AS ( SELECT store, SUM(sales) AS total_sales FROM sales GROUP BY store ), -- 全体合計 grand_total AS ( SELECT SUM(sales) AS overall_total FROM sales ) SELECT 'Monthly' AS type, month AS name, total_sales, ROUND(total_sales / overall_total * 100, 1) AS share_pct FROM monthly_total CROSS JOIN grand_total UNION ALL SELECT 'Store' AS type, store AS name, total_sales, ROUND(total_sales / overall_total * 100, 1) AS share_pct FROM store_total CROSS JOIN grand_total ORDER BY type, name """).show()
【複数CTEの組み合わせ】 +-------+-------+-----------+---------+ | type| name|total_sales|share_pct| +-------+-------+-----------+---------+ |Monthly|2024-01| 2500000| 31.3| |Monthly|2024-02| 2600000| 32.5| |Monthly|2024-03| 2700000| 33.8| | Store| 店舗A| 3300000| 41.3| | Store| 店舗B| 4500000| 56.3| +-------+-------+-----------+---------+
4-2. CTEで階層的な集計
# 段階的にセグメント分類 print("【顧客セグメンテーション】") spark.sql(""" WITH -- ステップ1:顧客別の集計 customer_summary AS ( SELECT customer_id, COUNT(*) AS order_count, SUM(amount) AS total_amount FROM orders GROUP BY customer_id ), -- ステップ2:全体統計 overall_stats AS ( SELECT AVG(total_amount) AS avg_customer_value FROM customer_summary ), -- ステップ3:セグメント分類 customer_segments AS ( SELECT cs.*, os.avg_customer_value, CASE WHEN cs.total_amount >= os.avg_customer_value * 1.5 THEN 'VIP' WHEN cs.total_amount >= os.avg_customer_value THEN '優良' ELSE '一般' END AS segment FROM customer_summary cs CROSS JOIN overall_stats os ) SELECT customer_id, order_count, total_amount, segment FROM customer_segments ORDER BY total_amount DESC """).show()
【顧客セグメンテーション】 +-----------+-----------+------------+-------+ |customer_id|order_count|total_amount|segment| +-----------+-----------+------------+-------+ | C001| 2| 30000| VIP| | C002| 1| 15000| 一般| +-----------+-----------+------------+-------+
・CTEはカンマで区切って複数定義
・後のCTEから前のCTEを参照できる
・コメントで各CTEの役割を明記すると読みやすい
📝 練習問題
WHERE句のサブクエリ
平均給与より高い社員の名前と給与を表示してください。
spark.sql("""
SELECT name, salary
FROM employees
WHERE salary > (SELECT AVG(salary) FROM employees)
""").show()+----+------+ |name|salary| +----+------+ |山田|400000| |伊藤|450000| |渡辺|380000| +----+------+
IN句のサブクエリ
注文があった顧客のみを表示してください(IN句使用)。
spark.sql("""
SELECT *
FROM customers
WHERE customer_id IN (
SELECT DISTINCT customer_id FROM orders
)
""").show()+-----------+--------+------+ |customer_id| name|region| +-----------+--------+------+ | C001|田中太郎| 東京| | C002|鈴木花子| 大阪| +-----------+--------+------+
FROM句のサブクエリ
部門別平均給与を計算し、35万円以上の部門のみ表示してください。
spark.sql("""
SELECT department, avg_salary
FROM (
SELECT
department,
AVG(salary) AS avg_salary
FROM employees
GROUP BY department
) dept_avg
WHERE avg_salary >= 350000
""").show()+----------+----------+ |department|avg_salary| +----------+----------+ | 技術部| 425000.0| | 管理部| 380000.0| +----------+----------+
CTEで同じ問題を解く
CTEを使って、問題3と同じ結果を得てください。
spark.sql("""
WITH dept_avg AS (
SELECT
department,
AVG(salary) AS avg_salary
FROM employees
GROUP BY department
)
SELECT department, avg_salary
FROM dept_avg
WHERE avg_salary >= 350000
""").show()+----------+----------+ |department|avg_salary| +----------+----------+ | 技術部| 425000.0| | 管理部| 380000.0| +----------+----------+
❓ よくある質問
📝 STEP 16 のまとめ
・WHERE句サブクエリ:平均との比較、IN/NOT INで存在チェック
・FROM句サブクエリ:集計結果をさらにフィルタリング
・CTE(WITH句):読みやすい複雑なクエリの書き方
・複数CTE:段階的な処理の組み立て
・サブクエリよりCTEが読みやすい
・複雑なクエリは段階的に構築する
・FROM句サブクエリには必ず別名を付ける
・CTEはパフォーマンスに影響しない
次のSTEP 17では、「UDF(ユーザー定義関数)」を学びます。
独自の関数を作って、Sparkの機能を拡張しましょう!
学習メモ
ビッグデータ処理(Apache Spark) - Step 16