🗄️ STEP 14: Spark SQLの基礎
SQLでSparkを操作!テンポラリビューの作成とSQLクエリの実行をマスターしよう
📋 このステップで学ぶこと
- サンプルデータの準備
- Spark SQLとは何か、なぜ便利なのか
- テンポラリビュー(一時テーブル)の作成
- SQLクエリの実行方法(SELECT、WHERE、ORDER BY、GROUP BY)
- DataFrameとSQLの相互変換と使い分け
📁 0. サンプルデータの準備
このステップでは、Spark SQLを使って標準的なSQLでSparkを操作する方法を学びます。
まず、SparkSessionを初期化し、演習用のサンプルデータを準備しましょう。
0-1. SparkSessionの初期化
# SparkSessionの初期化 from pyspark.sql import SparkSession from pyspark.sql import functions as F # SparkSessionを作成 spark = SparkSession.builder \ .appName("Spark SQL Basics") \ .getOrCreate() print("SparkSession準備完了")
SparkSession準備完了
0-2. 売上サンプルデータの作成
# 売上データを作成 # データ:(商品, カテゴリ, 価格, 数量) sales_data = [ ("商品A", "食品", 1000, 10), ("商品B", "食品", 1500, 15), ("商品C", "電化製品", 50000, 2), ("商品D", "電化製品", 80000, 1), ("商品E", "衣料品", 5000, 5), ("商品F", "衣料品", 8000, 3) ] df_sales = spark.createDataFrame(sales_data, ["product", "category", "price", "quantity"]) print("売上データ:") df_sales.show()
売上データ: +-------+--------+-----+--------+ |product|category|price|quantity| +-------+--------+-----+--------+ | 商品A| 食品| 1000| 10| | 商品B| 食品| 1500| 15| | 商品C|電化製品|50000| 2| | 商品D|電化製品|80000| 1| | 商品E| 衣料品| 5000| 5| | 商品F| 衣料品| 8000| 3| +-------+--------+-----+--------+
0-3. 社員サンプルデータの作成
# 社員データを作成 # データ:(部門, 名前, 給与) employees_data = [ ("営業部", "田中", 300000), ("営業部", "鈴木", 350000), ("営業部", "佐藤", 320000), ("技術部", "山田", 400000), ("技術部", "伊藤", 450000), ("管理部", "渡辺", 380000) ] df_employees = spark.createDataFrame(employees_data, ["department", "name", "salary"]) print("社員データ:") df_employees.show()
社員データ: +----------+----+------+ |department|name|salary| +----------+----+------+ | 営業部|田中|300000| | 営業部|鈴木|350000| | 営業部|佐藤|320000| | 技術部|山田|400000| | 技術部|伊藤|450000| | 管理部|渡辺|380000| +----------+----+------+
0-4. 全サンプルデータを一括作成するコード
# ======================================== # STEP 14 サンプルデータ一括作成スクリプト # ======================================== from pyspark.sql import SparkSession from pyspark.sql import functions as F spark = SparkSession.builder.appName("Spark SQL Basics").getOrCreate() # 1. 売上データ sales_data = [ ("商品A", "食品", 1000, 10), ("商品B", "食品", 1500, 15), ("商品C", "電化製品", 50000, 2), ("商品D", "電化製品", 80000, 1), ("商品E", "衣料品", 5000, 5), ("商品F", "衣料品", 8000, 3) ] df_sales = spark.createDataFrame(sales_data, ["product", "category", "price", "quantity"]) # 2. 社員データ employees_data = [ ("営業部", "田中", 300000), ("営業部", "鈴木", 350000), ("営業部", "佐藤", 320000), ("技術部", "山田", 400000), ("技術部", "伊藤", 450000), ("管理部", "渡辺", 380000) ] df_employees = spark.createDataFrame(employees_data, ["department", "name", "salary"]) # 3. 月別売上データ monthly_data = [ ("2024-01", 1000000), ("2024-02", 1200000), ("2024-03", 1100000), ("2024-04", 1300000) ] df_monthly = spark.createDataFrame(monthly_data, ["month", "sales"]) print("✅ サンプルデータを作成しました") print(" - df_sales: 売上データ") print(" - df_employees: 社員データ") print(" - df_monthly: 月別売上データ")
✅ サンプルデータを作成しました - df_sales: 売上データ - df_employees: 社員データ - df_monthly: 月別売上データ
💡 1. Spark SQLとは?
1-1. Spark SQLの概要
Spark SQLは、Sparkで標準的なSQLを使ってデータを操作できる機能です。
DataFrameをSQLのテーブルのように扱えます。
・SQLの知識が活かせる:既存のSQLスキルをそのまま使える
・可読性が高い:複雑な処理もSQLなら理解しやすい
・チーム協力しやすい:SQLは多くの人が知っている
・パフォーマンス同等:DataFrameと同じエンジンで実行される
DataFrameとSpark SQLは内部的には同じものです:
DataFrame API → Spark SQL エンジン ← SQL
↓ ↓
同じ実行計画を生成
↓
同じパフォーマンス
つまり、どちらで書いても速度は同じ。好みや状況に応じて使い分けられます!
📝 2. テンポラリビューの作成
2-1. テンポラリビュー(一時テーブル)とは?
テンポラリビューは、DataFrameに一時的な名前を付けてSQLから参照できるようにする機能です。
実際のテーブルではなく、メモリ上の一時的なビューです。
# テンポラリビューの基本構文 # 1. DataFrameにビュー名を付ける df.createOrReplaceTempView("テーブル名") # 2. SQLクエリを実行(結果はDataFrameとして返る) result = spark.sql("SELECT * FROM テーブル名") # 3. 結果を表示 result.show()
2-2. 実践:テンポラリビューを作成してSQLで操作
# テンポラリビューを作成してSQLでクエリ実行 # 1. DataFrameにビュー名「sales」を付ける df_sales.createOrReplaceTempView("sales") print("テンポラリビュー 'sales' を作成しました") # 2. SQLでクエリ実行 result = spark.sql(""" SELECT * FROM sales """) print("\nSQLクエリの結果:") result.show()
テンポラリビュー 'sales' を作成しました SQLクエリの結果: +-------+--------+-----+--------+ |product|category|price|quantity| +-------+--------+-----+--------+ | 商品A| 食品| 1000| 10| | 商品B| 食品| 1500| 15| | 商品C|電化製品|50000| 2| | 商品D|電化製品|80000| 1| | 商品E| 衣料品| 5000| 5| | 商品F| 衣料品| 8000| 3| +-------+--------+-----+--------+
2-3. 複数のテンポラリビューを作成
# 複数のDataFrameにビューを作成 # 売上データにビュー名を付ける df_sales.createOrReplaceTempView("sales") # 社員データにビュー名を付ける df_employees.createOrReplaceTempView("employees") print("テンポラリビューを作成しました:") print("- sales(売上データ)") print("- employees(社員データ)")
テンポラリビューを作成しました: - sales(売上データ) - employees(社員データ)
・createOrReplaceTempView:同名のビューがあれば上書き
・一時的:SparkSessionが終了すると消える
・メモリ上:実際のテーブルではなくビュー(参照)
・複数作成可能:複数のビューを作成してJOINなどに活用
🔍 3. SQLクエリの実行
3-1. 基本的なSELECT
# 基本的なSELECT(社員データを使用) # 1. 全カラム選択 print("【1】全カラム選択:") spark.sql("SELECT * FROM employees").show() # 2. 特定カラムのみ選択 print("【2】特定カラムのみ:") spark.sql(""" SELECT name, salary FROM employees """).show() # 3. カラムに別名を付ける print("【3】カラムに別名:") spark.sql(""" SELECT name AS 氏名, salary AS 給与 FROM employees """).show()
【1】全カラム選択: +----------+----+------+ |department|name|salary| +----------+----+------+ | 営業部|田中|300000| | 営業部|鈴木|350000| | 営業部|佐藤|320000| | 技術部|山田|400000| | 技術部|伊藤|450000| | 管理部|渡辺|380000| +----------+----+------+ 【2】特定カラムのみ: +----+------+ |name|salary| +----+------+ |田中|300000| |鈴木|350000| |佐藤|320000| |山田|400000| |伊藤|450000| |渡辺|380000| +----+------+ 【3】カラムに別名: +----+------+ |氏名| 給与| +----+------+ |田中|300000| |鈴木|350000| |佐藤|320000| |山田|400000| |伊藤|450000| |渡辺|380000| +----+------+
3-2. WHERE句でフィルタリング
# WHERE句でフィルタリング # 1. 単純な条件 print("【1】給与が35万円以上:") spark.sql(""" SELECT * FROM employees WHERE salary >= 350000 """).show() # 2. 複数条件(AND) print("【2】営業部かつ給与32万円以上:") spark.sql(""" SELECT * FROM employees WHERE department = '営業部' AND salary >= 320000 """).show() # 3. IN句(複数の値のいずれかに一致) print("【3】営業部または技術部:") spark.sql(""" SELECT * FROM employees WHERE department IN ('営業部', '技術部') """).show()
【1】給与が35万円以上: +----------+----+------+ |department|name|salary| +----------+----+------+ | 営業部|鈴木|350000| | 技術部|山田|400000| | 技術部|伊藤|450000| | 管理部|渡辺|380000| +----------+----+------+ 【2】営業部かつ給与32万円以上: +----------+----+------+ |department|name|salary| +----------+----+------+ | 営業部|鈴木|350000| | 営業部|佐藤|320000| +----------+----+------+ 【3】営業部または技術部: +----------+----+------+ |department|name|salary| +----------+----+------+ | 営業部|田中|300000| | 営業部|鈴木|350000| | 営業部|佐藤|320000| | 技術部|山田|400000| | 技術部|伊藤|450000| +----------+----+------+
3-3. ORDER BYで並び替え
# ORDER BYで並び替え # 1. 降順(DESC) print("【1】給与の降順:") spark.sql(""" SELECT * FROM employees ORDER BY salary DESC """).show() # 2. LIMITで件数制限(TOP3) print("【2】給与TOP3:") spark.sql(""" SELECT * FROM employees ORDER BY salary DESC LIMIT 3 """).show()
【1】給与の降順: +----------+----+------+ |department|name|salary| +----------+----+------+ | 技術部|伊藤|450000| | 技術部|山田|400000| | 管理部|渡辺|380000| | 営業部|鈴木|350000| | 営業部|佐藤|320000| | 営業部|田中|300000| +----------+----+------+ 【2】給与TOP3: +----------+----+------+ |department|name|salary| +----------+----+------+ | 技術部|伊藤|450000| | 技術部|山田|400000| | 管理部|渡辺|380000| +----------+----+------+
3-4. GROUP BYで集計
# GROUP BYで集計 # 1. 部門別の集計 print("【1】部門別の人数と平均給与:") spark.sql(""" SELECT department, COUNT(*) AS 人数, AVG(salary) AS 平均給与, MAX(salary) AS 最高給与, MIN(salary) AS 最低給与 FROM employees GROUP BY department ORDER BY 平均給与 DESC """).show() # 2. HAVINGで集計後のフィルター print("【2】平均給与が35万円以上の部門:") spark.sql(""" SELECT department, AVG(salary) AS 平均給与 FROM employees GROUP BY department HAVING AVG(salary) >= 350000 """).show()
【1】部門別の人数と平均給与: +----------+----+------------------+--------+--------+ |department|人数| 平均給与|最高給与|最低給与| +----------+----+------------------+--------+--------+ | 技術部| 2| 425000.0| 450000| 400000| | 管理部| 1| 380000.0| 380000| 380000| | 営業部| 3|323333.33333333331| 350000| 300000| +----------+----+------------------+--------+--------+ 【2】平均給与が35万円以上の部門: +----------+--------+ |department|平均給与| +----------+--------+ | 技術部|425000.0| | 管理部|380000.0| +----------+--------+
・WHERE:グループ化前に行をフィルター(個々の行に対する条件)
・HAVING:グループ化後にグループをフィルター(集計結果に対する条件)
🔄 4. DataFrameとSQLの相互変換
4-1. SQLの結果をDataFrameで処理
spark.sql()の結果はDataFrameとして返されるので、
DataFrameのメソッドで更に処理を続けることができます。
# SQLの結果をDataFrameで更に処理 # SQLで集計 result_sql = spark.sql(""" SELECT department, SUM(salary) AS total_salary FROM employees GROUP BY department """) print("【1】SQLの結果:") result_sql.show() # DataFrameのメソッドで更に処理(百万円単位に変換) result_processed = result_sql.withColumn("total_millions", F.round(F.col("total_salary") / 10000, 1) ) print("【2】DataFrameで更に処理:") result_processed.show()
【1】SQLの結果: +----------+------------+ |department|total_salary| +----------+------------+ | 技術部| 850000| | 管理部| 380000| | 営業部| 970000| +----------+------------+ 【2】DataFrameで更に処理: +----------+------------+--------------+ |department|total_salary|total_millions| +----------+------------+--------------+ | 技術部| 850000| 85.0| | 管理部| 380000| 38.0| | 営業部| 970000| 97.0| +----------+------------+--------------+
4-2. DataFrameとSQLの使い分け
| 状況 | おすすめ | 理由 |
|---|---|---|
| 複雑な集計・JOIN | SQL | 可読性が高い |
| データクリーニング | DataFrame | 柔軟な処理が得意 |
| 段階的な処理 | DataFrame | デバッグしやすい |
| レポート作成 | SQL | ビジネス担当者も理解しやすい |
| 既存SQLの移行 | SQL | そのまま使える |
4-3. 同じ処理をDataFrameとSQLで比較
# 同じ処理をDataFrameとSQLで比較 # ===== DataFrameで処理 ===== print("【DataFrame】部門別の平均給与(35万以上):") result_df = df_employees \ .groupBy("department") \ .agg(F.avg("salary").alias("avg_salary")) \ .filter(F.col("avg_salary") >= 350000) result_df.show() # ===== SQLで処理 ===== print("【SQL】部門別の平均給与(35万以上):") result_sql = spark.sql(""" SELECT department, AVG(salary) AS avg_salary FROM employees GROUP BY department HAVING AVG(salary) >= 350000 """) result_sql.show() print("→ 結果は同じ!書き方が違うだけ")
【DataFrame】部門別の平均給与(35万以上): +----------+----------+ |department|avg_salary| +----------+----------+ | 技術部| 425000.0| | 管理部| 380000.0| +----------+----------+ 【SQL】部門別の平均給与(35万以上): +----------+----------+ |department|avg_salary| +----------+----------+ | 技術部| 425000.0| | 管理部| 380000.0| +----------+----------+ → 結果は同じ!書き方が違うだけ
DataFrameとSQLは内部的に同じエンジンで実行されるため、
パフォーマンスは同等です。読みやすさや好みで使い分けましょう。
📊 5. 実践的なSQLパターン
5-1. ランキング(Window関数)
# ランキングをSQLで計算 print("給与ランキング(TOP3):") spark.sql(""" SELECT name, salary, rank FROM ( SELECT name, salary, ROW_NUMBER() OVER (ORDER BY salary DESC) AS rank FROM employees ) ranked WHERE rank <= 3 """).show()
給与ランキング(TOP3): +----+------+----+ |name|salary|rank| +----+------+----+ |伊藤|450000| 1| |山田|400000| 2| |渡辺|380000| 3| +----+------+----+
5-2. 前月比(LAG関数)
# 前月比をSQLで計算 # 月別売上データのビューを作成 df_monthly.createOrReplaceTempView("monthly_sales") print("前月比:") spark.sql(""" SELECT month, sales, LAG(sales, 1) OVER (ORDER BY month) AS prev_month, sales - LAG(sales, 1) OVER (ORDER BY month) AS diff, ROUND(((sales - LAG(sales, 1) OVER (ORDER BY month)) / LAG(sales, 1) OVER (ORDER BY month)) * 100, 1) AS growth_rate FROM monthly_sales ORDER BY month """).show()
前月比: +-------+-------+----------+-------+-----------+ | month| sales|prev_month| diff|growth_rate| +-------+-------+----------+-------+-----------+ |2024-01|1000000| null| null| null| |2024-02|1200000| 1000000| 200000| 20.0| |2024-03|1100000| 1200000|-100000| -8.3| |2024-04|1300000| 1100000| 200000| 18.2| +-------+-------+----------+-------+-----------+
5-3. ピボット集計(CASE WHEN)
# ピボット集計をSQLで実現 print("カテゴリ別売上(ピボット形式):") spark.sql(""" SELECT SUM(CASE WHEN category = '食品' THEN price * quantity ELSE 0 END) AS 食品, SUM(CASE WHEN category = '電化製品' THEN price * quantity ELSE 0 END) AS 電化製品, SUM(CASE WHEN category = '衣料品' THEN price * quantity ELSE 0 END) AS 衣料品, SUM(price * quantity) AS 合計 FROM sales """).show()
カテゴリ別売上(ピボット形式): +-----+--------+------+------+ | 食品|電化製品|衣料品| 合計| +-----+--------+------+------+ |32500| 180000| 49000|261500| +-----+--------+------+------+
📝 練習問題
基本的なSELECT
employeesテーブルから、給与が350000以上の社員の名前と給与を表示してください。
spark.sql("""
SELECT name, salary
FROM employees
WHERE salary >= 350000
""").show()+----+------+ |name|salary| +----+------+ |鈴木|350000| |山田|400000| |伊藤|450000| |渡辺|380000| +----+------+
GROUP BYで集計
employeesテーブルで、部門ごとの社員数と平均給与を計算してください。
spark.sql("""
SELECT
department,
COUNT(*) AS 社員数,
AVG(salary) AS 平均給与
FROM employees
GROUP BY department
""").show()+----------+------+------------------+ |department|社員数| 平均給与| +----------+------+------------------+ | 技術部| 2| 425000.0| | 管理部| 1| 380000.0| | 営業部| 3|323333.33333333331| +----------+------+------------------+
HAVINGでフィルター
employeesテーブルで、平均給与が350000以上の部門のみを表示してください。
spark.sql("""
SELECT
department,
AVG(salary) AS 平均給与
FROM employees
GROUP BY department
HAVING AVG(salary) >= 350000
""").show()+----------+--------+ |department|平均給与| +----------+--------+ | 技術部|425000.0| | 管理部|380000.0| +----------+--------+
Window関数でランキング
employeesテーブルで、給与の高い順に順位を付けて、TOP3を表示してください。
(ヒント:ROW_NUMBER() OVER (ORDER BY salary DESC) を使用)
spark.sql("""
SELECT name, salary, rank
FROM (
SELECT
name,
salary,
ROW_NUMBER() OVER (ORDER BY salary DESC) AS rank
FROM employees
) ranked
WHERE rank <= 3
""").show()+----+------+----+ |name|salary|rank| +----+------+----+ |伊藤|450000| 1| |山田|400000| 2| |渡辺|380000| 3| +----+------+----+
❓ よくある質問
df.write.saveAsTable("table_name")を使って実際のテーブルとして保存する必要があります。
spark.sql("""SELECT ... FROM ... WHERE ...""")。これにより、改行を含む長いSQLも読みやすく書けます。
spark.sql()の戻り値はDataFrameなので、そのまま.filter()や.withColumn()などのDataFrameメソッドを使えます。
📝 STEP 14 のまとめ
・Spark SQL:標準SQLでSparkを操作できる
・テンポラリビュー:createOrReplaceTempViewでビュー作成
・SQLクエリ実行:spark.sql()で実行、結果はDataFrame
・相互変換:DataFrameとSQLを自由に行き来できる
・実践パターン:ランキング、前月比、ピボット集計
・DataFrameとSQLはパフォーマンス同等
・テンポラリビューは一時的(セッション終了で消える)
・可読性を優先して使い分ける
・SQLの結果はDataFrameとして返る
次のSTEP 15では、「結合(JOIN)操作」を学びます。
複数のテーブルを結合する方法を、SQLとDataFrameの両方でマスターしましょう!
学習メモ
ビッグデータ処理(Apache Spark) - Step 14