STEP 14:Spark SQLの基礎

🗄️ STEP 14: Spark SQLの基礎

SQLでSparkを操作!テンポラリビューの作成とSQLクエリの実行をマスターしよう

📋 このステップで学ぶこと

  • サンプルデータの準備
  • Spark SQLとは何か、なぜ便利なのか
  • テンポラリビュー(一時テーブル)の作成
  • SQLクエリの実行方法(SELECT、WHERE、ORDER BY、GROUP BY)
  • DataFrameとSQLの相互変換と使い分け

📁 0. サンプルデータの準備

このステップでは、Spark SQLを使って標準的なSQLでSparkを操作する方法を学びます。
まず、SparkSessionを初期化し、演習用のサンプルデータを準備しましょう。

0-1. SparkSessionの初期化

# SparkSessionの初期化

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# SparkSessionを作成
spark = SparkSession.builder \
    .appName("Spark SQL Basics") \
    .getOrCreate()

print("SparkSession準備完了")
SparkSession準備完了

0-2. 売上サンプルデータの作成

# 売上データを作成

# データ:(商品, カテゴリ, 価格, 数量)
sales_data = [
    ("商品A", "食品", 1000, 10),
    ("商品B", "食品", 1500, 15),
    ("商品C", "電化製品", 50000, 2),
    ("商品D", "電化製品", 80000, 1),
    ("商品E", "衣料品", 5000, 5),
    ("商品F", "衣料品", 8000, 3)
]

df_sales = spark.createDataFrame(sales_data, ["product", "category", "price", "quantity"])

print("売上データ:")
df_sales.show()
売上データ:
+-------+--------+-----+--------+
|product|category|price|quantity|
+-------+--------+-----+--------+
|  商品A|    食品| 1000|      10|
|  商品B|    食品| 1500|      15|
|  商品C|電化製品|50000|       2|
|  商品D|電化製品|80000|       1|
|  商品E|  衣料品| 5000|       5|
|  商品F|  衣料品| 8000|       3|
+-------+--------+-----+--------+

0-3. 社員サンプルデータの作成

# 社員データを作成

# データ:(部門, 名前, 給与)
employees_data = [
    ("営業部", "田中", 300000),
    ("営業部", "鈴木", 350000),
    ("営業部", "佐藤", 320000),
    ("技術部", "山田", 400000),
    ("技術部", "伊藤", 450000),
    ("管理部", "渡辺", 380000)
]

df_employees = spark.createDataFrame(employees_data, ["department", "name", "salary"])

print("社員データ:")
df_employees.show()
社員データ:
+----------+----+------+
|department|name|salary|
+----------+----+------+
|    営業部|田中|300000|
|    営業部|鈴木|350000|
|    営業部|佐藤|320000|
|    技術部|山田|400000|
|    技術部|伊藤|450000|
|    管理部|渡辺|380000|
+----------+----+------+

0-4. 全サンプルデータを一括作成するコード

# ========================================
# STEP 14 サンプルデータ一括作成スクリプト
# ========================================

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("Spark SQL Basics").getOrCreate()

# 1. 売上データ
sales_data = [
    ("商品A", "食品", 1000, 10), ("商品B", "食品", 1500, 15),
    ("商品C", "電化製品", 50000, 2), ("商品D", "電化製品", 80000, 1),
    ("商品E", "衣料品", 5000, 5), ("商品F", "衣料品", 8000, 3)
]
df_sales = spark.createDataFrame(sales_data, ["product", "category", "price", "quantity"])

# 2. 社員データ
employees_data = [
    ("営業部", "田中", 300000), ("営業部", "鈴木", 350000), ("営業部", "佐藤", 320000),
    ("技術部", "山田", 400000), ("技術部", "伊藤", 450000), ("管理部", "渡辺", 380000)
]
df_employees = spark.createDataFrame(employees_data, ["department", "name", "salary"])

# 3. 月別売上データ
monthly_data = [
    ("2024-01", 1000000), ("2024-02", 1200000),
    ("2024-03", 1100000), ("2024-04", 1300000)
]
df_monthly = spark.createDataFrame(monthly_data, ["month", "sales"])

print("✅ サンプルデータを作成しました")
print("  - df_sales: 売上データ")
print("  - df_employees: 社員データ")
print("  - df_monthly: 月別売上データ")
✅ サンプルデータを作成しました
  - df_sales: 売上データ
  - df_employees: 社員データ
  - df_monthly: 月別売上データ

💡 1. Spark SQLとは?

1-1. Spark SQLの概要

Spark SQLは、Sparkで標準的なSQLを使ってデータを操作できる機能です。
DataFrameをSQLのテーブルのように扱えます。

🎯 Spark SQLのメリット

SQLの知識が活かせる:既存のSQLスキルをそのまま使える
可読性が高い:複雑な処理もSQLなら理解しやすい
チーム協力しやすい:SQLは多くの人が知っている
パフォーマンス同等:DataFrameと同じエンジンで実行される

📊 DataFrameとSpark SQLの関係

DataFrameとSpark SQLは内部的には同じものです:

DataFrame API → Spark SQL エンジン ← SQL
    ↓               ↓
   同じ実行計画を生成
    ↓
   同じパフォーマンス

つまり、どちらで書いても速度は同じ。好みや状況に応じて使い分けられます!

📝 2. テンポラリビューの作成

2-1. テンポラリビュー(一時テーブル)とは?

テンポラリビューは、DataFrameに一時的な名前を付けてSQLから参照できるようにする機能です。
実際のテーブルではなく、メモリ上の一時的なビューです。

# テンポラリビューの基本構文

# 1. DataFrameにビュー名を付ける
df.createOrReplaceTempView("テーブル名")

# 2. SQLクエリを実行(結果はDataFrameとして返る)
result = spark.sql("SELECT * FROM テーブル名")

# 3. 結果を表示
result.show()

2-2. 実践:テンポラリビューを作成してSQLで操作

# テンポラリビューを作成してSQLでクエリ実行

# 1. DataFrameにビュー名「sales」を付ける
df_sales.createOrReplaceTempView("sales")

print("テンポラリビュー 'sales' を作成しました")

# 2. SQLでクエリ実行
result = spark.sql("""
    SELECT * FROM sales
""")

print("\nSQLクエリの結果:")
result.show()
テンポラリビュー 'sales' を作成しました

SQLクエリの結果:
+-------+--------+-----+--------+
|product|category|price|quantity|
+-------+--------+-----+--------+
|  商品A|    食品| 1000|      10|
|  商品B|    食品| 1500|      15|
|  商品C|電化製品|50000|       2|
|  商品D|電化製品|80000|       1|
|  商品E|  衣料品| 5000|       5|
|  商品F|  衣料品| 8000|       3|
+-------+--------+-----+--------+

2-3. 複数のテンポラリビューを作成

# 複数のDataFrameにビューを作成

# 売上データにビュー名を付ける
df_sales.createOrReplaceTempView("sales")

# 社員データにビュー名を付ける
df_employees.createOrReplaceTempView("employees")

print("テンポラリビューを作成しました:")
print("- sales(売上データ)")
print("- employees(社員データ)")
テンポラリビューを作成しました:
- sales(売上データ)
- employees(社員データ)
💡 テンポラリビューのポイント

createOrReplaceTempView:同名のビューがあれば上書き
一時的:SparkSessionが終了すると消える
メモリ上:実際のテーブルではなくビュー(参照)
複数作成可能:複数のビューを作成してJOINなどに活用

🔍 3. SQLクエリの実行

3-1. 基本的なSELECT

# 基本的なSELECT(社員データを使用)

# 1. 全カラム選択
print("【1】全カラム選択:")
spark.sql("SELECT * FROM employees").show()

# 2. 特定カラムのみ選択
print("【2】特定カラムのみ:")
spark.sql("""
    SELECT name, salary 
    FROM employees
""").show()

# 3. カラムに別名を付ける
print("【3】カラムに別名:")
spark.sql("""
    SELECT 
        name AS 氏名,
        salary AS 給与
    FROM employees
""").show()
【1】全カラム選択:
+----------+----+------+
|department|name|salary|
+----------+----+------+
|    営業部|田中|300000|
|    営業部|鈴木|350000|
|    営業部|佐藤|320000|
|    技術部|山田|400000|
|    技術部|伊藤|450000|
|    管理部|渡辺|380000|
+----------+----+------+

【2】特定カラムのみ:
+----+------+
|name|salary|
+----+------+
|田中|300000|
|鈴木|350000|
|佐藤|320000|
|山田|400000|
|伊藤|450000|
|渡辺|380000|
+----+------+

【3】カラムに別名:
+----+------+
|氏名|  給与|
+----+------+
|田中|300000|
|鈴木|350000|
|佐藤|320000|
|山田|400000|
|伊藤|450000|
|渡辺|380000|
+----+------+

3-2. WHERE句でフィルタリング

# WHERE句でフィルタリング

# 1. 単純な条件
print("【1】給与が35万円以上:")
spark.sql("""
    SELECT * FROM employees
    WHERE salary >= 350000
""").show()

# 2. 複数条件(AND)
print("【2】営業部かつ給与32万円以上:")
spark.sql("""
    SELECT * FROM employees
    WHERE department = '営業部' 
      AND salary >= 320000
""").show()

# 3. IN句(複数の値のいずれかに一致)
print("【3】営業部または技術部:")
spark.sql("""
    SELECT * FROM employees
    WHERE department IN ('営業部', '技術部')
""").show()
【1】給与が35万円以上:
+----------+----+------+
|department|name|salary|
+----------+----+------+
|    営業部|鈴木|350000|
|    技術部|山田|400000|
|    技術部|伊藤|450000|
|    管理部|渡辺|380000|
+----------+----+------+

【2】営業部かつ給与32万円以上:
+----------+----+------+
|department|name|salary|
+----------+----+------+
|    営業部|鈴木|350000|
|    営業部|佐藤|320000|
+----------+----+------+

【3】営業部または技術部:
+----------+----+------+
|department|name|salary|
+----------+----+------+
|    営業部|田中|300000|
|    営業部|鈴木|350000|
|    営業部|佐藤|320000|
|    技術部|山田|400000|
|    技術部|伊藤|450000|
+----------+----+------+

3-3. ORDER BYで並び替え

# ORDER BYで並び替え

# 1. 降順(DESC)
print("【1】給与の降順:")
spark.sql("""
    SELECT * FROM employees
    ORDER BY salary DESC
""").show()

# 2. LIMITで件数制限(TOP3)
print("【2】給与TOP3:")
spark.sql("""
    SELECT * FROM employees
    ORDER BY salary DESC
    LIMIT 3
""").show()
【1】給与の降順:
+----------+----+------+
|department|name|salary|
+----------+----+------+
|    技術部|伊藤|450000|
|    技術部|山田|400000|
|    管理部|渡辺|380000|
|    営業部|鈴木|350000|
|    営業部|佐藤|320000|
|    営業部|田中|300000|
+----------+----+------+

【2】給与TOP3:
+----------+----+------+
|department|name|salary|
+----------+----+------+
|    技術部|伊藤|450000|
|    技術部|山田|400000|
|    管理部|渡辺|380000|
+----------+----+------+

3-4. GROUP BYで集計

# GROUP BYで集計

# 1. 部門別の集計
print("【1】部門別の人数と平均給与:")
spark.sql("""
    SELECT 
        department,
        COUNT(*) AS 人数,
        AVG(salary) AS 平均給与,
        MAX(salary) AS 最高給与,
        MIN(salary) AS 最低給与
    FROM employees
    GROUP BY department
    ORDER BY 平均給与 DESC
""").show()

# 2. HAVINGで集計後のフィルター
print("【2】平均給与が35万円以上の部門:")
spark.sql("""
    SELECT 
        department,
        AVG(salary) AS 平均給与
    FROM employees
    GROUP BY department
    HAVING AVG(salary) >= 350000
""").show()
【1】部門別の人数と平均給与:
+----------+----+------------------+--------+--------+
|department|人数|          平均給与|最高給与|最低給与|
+----------+----+------------------+--------+--------+
|    技術部|   2|          425000.0|  450000|  400000|
|    管理部|   1|          380000.0|  380000|  380000|
|    営業部|   3|323333.33333333331|  350000|  300000|
+----------+----+------------------+--------+--------+

【2】平均給与が35万円以上の部門:
+----------+--------+
|department|平均給与|
+----------+--------+
|    技術部|425000.0|
|    管理部|380000.0|
+----------+--------+
🎯 WHEREとHAVINGの違い

WHEREグループ化前に行をフィルター(個々の行に対する条件)
HAVINGグループ化後にグループをフィルター(集計結果に対する条件)

🔄 4. DataFrameとSQLの相互変換

4-1. SQLの結果をDataFrameで処理

spark.sql()の結果はDataFrameとして返されるので、
DataFrameのメソッドで更に処理を続けることができます。

# SQLの結果をDataFrameで更に処理

# SQLで集計
result_sql = spark.sql("""
    SELECT 
        department,
        SUM(salary) AS total_salary
    FROM employees
    GROUP BY department
""")

print("【1】SQLの結果:")
result_sql.show()

# DataFrameのメソッドで更に処理(百万円単位に変換)
result_processed = result_sql.withColumn("total_millions",
    F.round(F.col("total_salary") / 10000, 1)
)

print("【2】DataFrameで更に処理:")
result_processed.show()
【1】SQLの結果:
+----------+------------+
|department|total_salary|
+----------+------------+
|    技術部|      850000|
|    管理部|      380000|
|    営業部|      970000|
+----------+------------+

【2】DataFrameで更に処理:
+----------+------------+--------------+
|department|total_salary|total_millions|
+----------+------------+--------------+
|    技術部|      850000|          85.0|
|    管理部|      380000|          38.0|
|    営業部|      970000|          97.0|
+----------+------------+--------------+

4-2. DataFrameとSQLの使い分け

状況 おすすめ 理由
複雑な集計・JOIN SQL 可読性が高い
データクリーニング DataFrame 柔軟な処理が得意
段階的な処理 DataFrame デバッグしやすい
レポート作成 SQL ビジネス担当者も理解しやすい
既存SQLの移行 SQL そのまま使える

4-3. 同じ処理をDataFrameとSQLで比較

# 同じ処理をDataFrameとSQLで比較

# ===== DataFrameで処理 =====
print("【DataFrame】部門別の平均給与(35万以上):")
result_df = df_employees \
    .groupBy("department") \
    .agg(F.avg("salary").alias("avg_salary")) \
    .filter(F.col("avg_salary") >= 350000)
result_df.show()

# ===== SQLで処理 =====
print("【SQL】部門別の平均給与(35万以上):")
result_sql = spark.sql("""
    SELECT department, AVG(salary) AS avg_salary
    FROM employees
    GROUP BY department
    HAVING AVG(salary) >= 350000
""")
result_sql.show()

print("→ 結果は同じ!書き方が違うだけ")
【DataFrame】部門別の平均給与(35万以上):
+----------+----------+
|department|avg_salary|
+----------+----------+
|    技術部|  425000.0|
|    管理部|  380000.0|
+----------+----------+

【SQL】部門別の平均給与(35万以上):
+----------+----------+
|department|avg_salary|
+----------+----------+
|    技術部|  425000.0|
|    管理部|  380000.0|
+----------+----------+

→ 結果は同じ!書き方が違うだけ
💡 パフォーマンスは同じ

DataFrameとSQLは内部的に同じエンジンで実行されるため、
パフォーマンスは同等です。読みやすさや好みで使い分けましょう

📊 5. 実践的なSQLパターン

5-1. ランキング(Window関数)

# ランキングをSQLで計算

print("給与ランキング(TOP3):")
spark.sql("""
    SELECT 
        name,
        salary,
        rank
    FROM (
        SELECT 
            name,
            salary,
            ROW_NUMBER() OVER (ORDER BY salary DESC) AS rank
        FROM employees
    ) ranked
    WHERE rank <= 3
""").show()
給与ランキング(TOP3):
+----+------+----+
|name|salary|rank|
+----+------+----+
|伊藤|450000|   1|
|山田|400000|   2|
|渡辺|380000|   3|
+----+------+----+

5-2. 前月比(LAG関数)

# 前月比をSQLで計算

# 月別売上データのビューを作成
df_monthly.createOrReplaceTempView("monthly_sales")

print("前月比:")
spark.sql("""
    SELECT 
        month,
        sales,
        LAG(sales, 1) OVER (ORDER BY month) AS prev_month,
        sales - LAG(sales, 1) OVER (ORDER BY month) AS diff,
        ROUND(((sales - LAG(sales, 1) OVER (ORDER BY month)) / 
               LAG(sales, 1) OVER (ORDER BY month)) * 100, 1) AS growth_rate
    FROM monthly_sales
    ORDER BY month
""").show()
前月比:
+-------+-------+----------+-------+-----------+
|  month|  sales|prev_month|   diff|growth_rate|
+-------+-------+----------+-------+-----------+
|2024-01|1000000|      null|   null|       null|
|2024-02|1200000|   1000000| 200000|       20.0|
|2024-03|1100000|   1200000|-100000|       -8.3|
|2024-04|1300000|   1100000| 200000|       18.2|
+-------+-------+----------+-------+-----------+

5-3. ピボット集計(CASE WHEN)

# ピボット集計をSQLで実現

print("カテゴリ別売上(ピボット形式):")
spark.sql("""
    SELECT 
        SUM(CASE WHEN category = '食品' THEN price * quantity ELSE 0 END) AS 食品,
        SUM(CASE WHEN category = '電化製品' THEN price * quantity ELSE 0 END) AS 電化製品,
        SUM(CASE WHEN category = '衣料品' THEN price * quantity ELSE 0 END) AS 衣料品,
        SUM(price * quantity) AS 合計
    FROM sales
""").show()
カテゴリ別売上(ピボット形式):
+-----+--------+------+------+
| 食品|電化製品|衣料品|  合計|
+-----+--------+------+------+
|32500|  180000| 49000|261500|
+-----+--------+------+------+

📝 練習問題

問題 1 基礎

基本的なSELECT

employeesテーブルから、給与が350000以上の社員の名前と給与を表示してください。

【解答】
spark.sql("""
    SELECT name, salary
    FROM employees
    WHERE salary >= 350000
""").show()
+----+------+
|name|salary|
+----+------+
|鈴木|350000|
|山田|400000|
|伊藤|450000|
|渡辺|380000|
+----+------+
問題 2 基礎

GROUP BYで集計

employeesテーブルで、部門ごとの社員数と平均給与を計算してください。

【解答】
spark.sql("""
    SELECT 
        department,
        COUNT(*) AS 社員数,
        AVG(salary) AS 平均給与
    FROM employees
    GROUP BY department
""").show()
+----------+------+------------------+
|department|社員数|          平均給与|
+----------+------+------------------+
|    技術部|     2|          425000.0|
|    管理部|     1|          380000.0|
|    営業部|     3|323333.33333333331|
+----------+------+------------------+
問題 3 応用

HAVINGでフィルター

employeesテーブルで、平均給与が350000以上の部門のみを表示してください。

【解答】
spark.sql("""
    SELECT 
        department,
        AVG(salary) AS 平均給与
    FROM employees
    GROUP BY department
    HAVING AVG(salary) >= 350000
""").show()
+----------+--------+
|department|平均給与|
+----------+--------+
|    技術部|425000.0|
|    管理部|380000.0|
+----------+--------+
問題 4 実践

Window関数でランキング

employeesテーブルで、給与の高い順に順位を付けて、TOP3を表示してください。
(ヒント:ROW_NUMBER() OVER (ORDER BY salary DESC) を使用)

【解答】
spark.sql("""
    SELECT name, salary, rank
    FROM (
        SELECT 
            name, 
            salary,
            ROW_NUMBER() OVER (ORDER BY salary DESC) AS rank
        FROM employees
    ) ranked
    WHERE rank <= 3
""").show()
+----+------+----+
|name|salary|rank|
+----+------+----+
|伊藤|450000|   1|
|山田|400000|   2|
|渡辺|380000|   3|
+----+------+----+

❓ よくある質問

Q1: DataFrameとSQLのパフォーマンスは違いますか?
いいえ、同じです。どちらも内部的に同じSpark SQLエンジンで実行されるため、パフォーマンスは同等です。書きやすい方、読みやすい方を選んでOKです。
Q2: テンポラリビューは永続化されますか?
いいえ、一時的です。SparkSessionが終了すると消えます。永続化したい場合は、df.write.saveAsTable("table_name")を使って実際のテーブルとして保存する必要があります。
Q3: 複数行にわたるSQLを書く方法は?
Pythonの三重引用符(”””)を使います:spark.sql("""SELECT ... FROM ... WHERE ...""")。これにより、改行を含む長いSQLも読みやすく書けます。
Q4: SQLの結果をDataFrameとして更に処理できますか?
はい、できます。spark.sql()の戻り値はDataFrameなので、そのまま.filter().withColumn()などのDataFrameメソッドを使えます。

📝 STEP 14 のまとめ

✅ このステップで学んだこと

Spark SQL:標準SQLでSparkを操作できる
テンポラリビュー:createOrReplaceTempViewでビュー作成
SQLクエリ実行:spark.sql()で実行、結果はDataFrame
相互変換:DataFrameとSQLを自由に行き来できる
実践パターン:ランキング、前月比、ピボット集計

💡 重要ポイント

・DataFrameとSQLはパフォーマンス同等
・テンポラリビューは一時的(セッション終了で消える)
可読性を優先して使い分ける
・SQLの結果はDataFrameとして返る

🎯 次のステップの予告

次のSTEP 15では、「結合(JOIN)操作」を学びます。
複数のテーブルを結合する方法を、SQLとDataFrameの両方でマスターしましょう!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 14

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE