🔗 STEP 15: 結合(JOIN)操作
複数のDataFrameを結合!inner、left、right、outer joinとブロードキャスト結合をマスターしよう
📋 このステップで学ぶこと
- サンプルデータの準備(顧客、注文、商品データ)
- 4種類のJOIN(inner、left、right、outer)
- 複数テーブルの結合
- ブロードキャスト結合によるパフォーマンス最適化
📁 0. サンプルデータの準備
このステップでは、JOIN(結合)を使って複数のDataFrameを結合する方法を学びます。
まず、SparkSessionを初期化し、顧客・注文・商品のサンプルデータを準備しましょう。
0-1. SparkSessionの初期化
# SparkSessionの初期化 from pyspark.sql import SparkSession from pyspark.sql import functions as F # SparkSessionを作成 spark = SparkSession.builder \ .appName("JOIN Example") \ .getOrCreate() print("SparkSession準備完了")
SparkSession準備完了
0-2. 顧客データの作成
# 顧客データを作成 # データ:(顧客ID, 名前, 地域) customers_data = [ ("C001", "田中太郎", "東京"), ("C002", "鈴木花子", "大阪"), ("C003", "佐藤次郎", "名古屋"), ("C004", "山田美咲", "福岡") # 注文なしの顧客 ] df_customers = spark.createDataFrame(customers_data, ["customer_id", "name", "region"]) print("顧客データ:") df_customers.show()
顧客データ: +-----------+--------+------+ |customer_id| name|region| +-----------+--------+------+ | C001|田中太郎| 東京| | C002|鈴木花子| 大阪| | C003|佐藤次郎|名古屋| | C004|山田美咲| 福岡| +-----------+--------+------+
0-3. 注文データの作成
# 注文データを作成 # データ:(注文ID, 顧客ID, 商品ID, 数量, 注文日) orders_data = [ ("O001", "C001", "P001", 2, "2024-01-15"), ("O002", "C002", "P002", 1, "2024-01-16"), ("O003", "C001", "P003", 3, "2024-01-17"), ("O004", "C003", "P001", 1, "2024-01-18") # C004(山田美咲)の注文はない ] df_orders = spark.createDataFrame(orders_data, ["order_id", "customer_id", "product_id", "quantity", "order_date"]) print("注文データ:") df_orders.show()
注文データ: +--------+-----------+----------+--------+----------+ |order_id|customer_id|product_id|quantity|order_date| +--------+-----------+----------+--------+----------+ | O001| C001| P001| 2|2024-01-15| | O002| C002| P002| 1|2024-01-16| | O003| C001| P003| 3|2024-01-17| | O004| C003| P001| 1|2024-01-18| +--------+-----------+----------+--------+----------+
0-4. 商品データの作成
# 商品データを作成 # データ:(商品ID, 商品名, カテゴリ, 価格) products_data = [ ("P001", "商品A", "食品", 1000), ("P002", "商品B", "電化製品", 50000), ("P003", "商品C", "衣料品", 5000) ] df_products = spark.createDataFrame(products_data, ["product_id", "product_name", "category", "price"]) print("商品データ:") df_products.show()
商品データ: +----------+------------+--------+-----+ |product_id|product_name|category|price| +----------+------------+--------+-----+ | P001| 商品A| 食品| 1000| | P002| 商品B|電化製品|50000| | P003| 商品C| 衣料品| 5000| +----------+------------+--------+-----+
0-5. 全サンプルデータを一括作成するコード
# ======================================== # STEP 15 サンプルデータ一括作成スクリプト # ======================================== from pyspark.sql import SparkSession from pyspark.sql import functions as F spark = SparkSession.builder.appName("JOIN Example").getOrCreate() # 1. 顧客データ customers_data = [ ("C001", "田中太郎", "東京"), ("C002", "鈴木花子", "大阪"), ("C003", "佐藤次郎", "名古屋"), ("C004", "山田美咲", "福岡") ] df_customers = spark.createDataFrame(customers_data, ["customer_id", "name", "region"]) # 2. 注文データ orders_data = [ ("O001", "C001", "P001", 2, "2024-01-15"), ("O002", "C002", "P002", 1, "2024-01-16"), ("O003", "C001", "P003", 3, "2024-01-17"), ("O004", "C003", "P001", 1, "2024-01-18") ] df_orders = spark.createDataFrame(orders_data, ["order_id", "customer_id", "product_id", "quantity", "order_date"]) # 3. 商品データ products_data = [ ("P001", "商品A", "食品", 1000), ("P002", "商品B", "電化製品", 50000), ("P003", "商品C", "衣料品", 5000) ] df_products = spark.createDataFrame(products_data, ["product_id", "product_name", "category", "price"]) print("✅ サンプルデータを作成しました") print(" - df_customers: 顧客データ(4名)") print(" - df_orders: 注文データ(4件)") print(" - df_products: 商品データ(3件)")
✅ サンプルデータを作成しました - df_customers: 顧客データ(4名) - df_orders: 注文データ(4件) - df_products: 商品データ(3件)
🔗 1. JOINの基本概念
1-1. JOINとは?
JOIN(結合)とは、複数のDataFrameを共通のキーで結合して1つにする操作です。
リレーショナルデータベースのJOINと同じ概念です。
# JOINの基本構文 # DataFrameでのJOIN result = df1.join(df2, 結合条件, 結合タイプ) # SQLでのJOIN spark.sql(""" SELECT * FROM table1 JOIN table2 ON table1.key = table2.key """)
1-2. 4種類のJOIN
| JOINタイプ | 説明 | 使用場面 |
|---|---|---|
| INNER JOIN | 両方に存在するデータのみ返す | 最も一般的 |
| LEFT JOIN | 左側のテーブルを全て返す | マスターデータを保持 |
| RIGHT JOIN | 右側のテーブルを全て返す | あまり使わない |
| FULL OUTER JOIN | 両方のテーブルを全て返す | 差分チェック |
1-3. 視覚的な理解
テーブルA テーブルB
ID | 名前 ID | 商品
—+—- —+—-
1 | 田中 1 | りんご
2 | 鈴木 2 | みかん
3 | 佐藤 4 | ぶどう
【INNER JOIN】両方に存在するIDのみ → ID 1, 2
【LEFT JOIN】左側(A)を全て保持 → ID 1, 2, 3
【RIGHT JOIN】右側(B)を全て保持 → ID 1, 2, 4
【FULL OUTER JOIN】両方を全て保持 → ID 1, 2, 3, 4
🔀 2. INNER JOIN(内部結合)
2-1. INNER JOINの特徴
両方のテーブルに存在するレコードのみを返します。
最も一般的に使われるJOINです。
2-2. DataFrameでINNER JOIN
# DataFrameでINNER JOIN # 顧客と注文をcustomer_idで結合 result = df_customers.join( df_orders, "customer_id", # 同名カラムは文字列で指定(推奨) "inner" ) print("【INNER JOIN - DataFrame】") result.show()
【INNER JOIN - DataFrame】 +-----------+--------+------+--------+----------+--------+----------+ |customer_id| name|region|order_id|product_id|quantity|order_date| +-----------+--------+------+--------+----------+--------+----------+ | C001|田中太郎| 東京| O001| P001| 2|2024-01-15| | C001|田中太郎| 東京| O003| P003| 3|2024-01-17| | C002|鈴木花子| 大阪| O002| P002| 1|2024-01-16| | C003|佐藤次郎|名古屋| O004| P001| 1|2024-01-18| +-----------+--------+------+--------+----------+--------+----------+
INNER JOINでは、両方に存在するデータのみ返されます。
山田美咲(C004)は注文がないため、結果から除外されます。
2-3. SQLでINNER JOIN
# SQLでINNER JOIN # テンポラリビューを作成 df_customers.createOrReplaceTempView("customers") df_orders.createOrReplaceTempView("orders") print("【INNER JOIN - SQL】") spark.sql(""" SELECT c.customer_id, c.name, c.region, o.order_id, o.product_id, o.quantity FROM customers c INNER JOIN orders o ON c.customer_id = o.customer_id """).show()
【INNER JOIN - SQL】 +-----------+--------+------+--------+----------+--------+ |customer_id| name|region|order_id|product_id|quantity| +-----------+--------+------+--------+----------+--------+ | C001|田中太郎| 東京| O001| P001| 2| | C001|田中太郎| 東京| O003| P003| 3| | C002|鈴木花子| 大阪| O002| P002| 1| | C003|佐藤次郎|名古屋| O004| P001| 1| +-----------+--------+------+--------+----------+--------+
⬅️ 3. LEFT JOIN(左外部結合)
3-1. LEFT JOINの特徴
左側のテーブルの全てのレコードを保持し、右側にマッチするデータがない場合はNULLで埋めます。
3-2. DataFrameでLEFT JOIN
# DataFrameでLEFT JOIN result = df_customers.join( df_orders, "customer_id", "left" # または "left_outer" ) print("【LEFT JOIN - DataFrame】") result.show()
【LEFT JOIN - DataFrame】 +-----------+--------+------+--------+----------+--------+----------+ |customer_id| name|region|order_id|product_id|quantity|order_date| +-----------+--------+------+--------+----------+--------+----------+ | C001|田中太郎| 東京| O001| P001| 2|2024-01-15| | C001|田中太郎| 東京| O003| P003| 3|2024-01-17| | C002|鈴木花子| 大阪| O002| P002| 1|2024-01-16| | C003|佐藤次郎|名古屋| O004| P001| 1|2024-01-18| | C004|山田美咲| 福岡| null| null| null| null| +-----------+--------+------+--------+----------+--------+----------+
LEFT JOINでは、左側(顧客)を全て保持します。
山田美咲(C004)は注文がないため、注文関連カラムがNULLになります。
3-3. 注文がない顧客の抽出
# LEFT JOINで注文がない顧客を抽出 result = df_customers.join( df_orders, "customer_id", "left" ).filter(F.col("order_id").isNull()) # order_idがNULL = 注文なし print("【注文がない顧客】") result.select("customer_id", "name", "region").show()
【注文がない顧客】 +-----------+--------+------+ |customer_id| name|region| +-----------+--------+------+ | C004|山田美咲| 福岡| +-----------+--------+------+
・マスターデータを保持:全顧客、全商品を確実に表示したい
・欠損データの検出:注文がない顧客、売上がない商品を発見
・0件集計:NULL値を0に変換して集計
🔗 4. 複数テーブルのJOIN
4-1. 3つのテーブルを結合
実務では3つ以上のテーブルを結合することがよくあります。
順番にJOINしていきます。
# 3つのテーブルを結合(顧客 → 注文 → 商品) result = df_orders \ .join(df_customers, "customer_id", "inner") \ .join(df_products, "product_id", "inner") \ .select( "order_id", "order_date", "name", "region", "product_name", "category", "quantity", "price", (F.col("quantity") * F.col("price")).alias("amount") ) print("【3テーブル結合】") result.show()
【3テーブル結合】 +--------+----------+--------+------+------------+--------+--------+-----+------+ |order_id|order_date| name|region|product_name|category|quantity|price|amount| +--------+----------+--------+------+------------+--------+--------+-----+------+ | O001|2024-01-15|田中太郎| 東京| 商品A| 食品| 2| 1000| 2000| | O002|2024-01-16|鈴木花子| 大阪| 商品B|電化製品| 1|50000| 50000| | O003|2024-01-17|田中太郎| 東京| 商品C| 衣料品| 3| 5000| 15000| | O004|2024-01-18|佐藤次郎|名古屋| 商品A| 食品| 1| 1000| 1000| +--------+----------+--------+------+------------+--------+--------+-----+------+
4-2. SQLで3テーブル結合
# SQLで3テーブル結合 # テンポラリビューを作成 df_products.createOrReplaceTempView("products") print("【3テーブル結合 - SQL】") spark.sql(""" SELECT o.order_id, o.order_date, c.name, c.region, p.product_name, p.category, o.quantity, p.price, o.quantity * p.price AS amount FROM orders o INNER JOIN customers c ON o.customer_id = c.customer_id INNER JOIN products p ON o.product_id = p.product_id ORDER BY o.order_date """).show()
【3テーブル結合 - SQL】 +--------+----------+--------+------+------------+--------+--------+-----+------+ |order_id|order_date| name|region|product_name|category|quantity|price|amount| +--------+----------+--------+------+------------+--------+--------+-----+------+ | O001|2024-01-15|田中太郎| 東京| 商品A| 食品| 2| 1000| 2000| | O002|2024-01-16|鈴木花子| 大阪| 商品B|電化製品| 1|50000| 50000| | O003|2024-01-17|田中太郎| 東京| 商品C| 衣料品| 3| 5000| 15000| | O004|2024-01-18|佐藤次郎|名古屋| 商品A| 食品| 1| 1000| 1000| +--------+----------+--------+------+------------+--------+--------+-----+------+
4-3. 結合後の集計
# 結合後に顧客別集計 customer_summary = result.groupBy("name", "region").agg( F.count("order_id").alias("注文数"), F.sum("amount").alias("購入金額") ).orderBy(F.desc("購入金額")) print("【顧客別購入集計】") customer_summary.show()
【顧客別購入集計】 +--------+------+------+--------+ | name|region|注文数|購入金額| +--------+------+------+--------+ |鈴木花子| 大阪| 1| 50000| |田中太郎| 東京| 2| 17000| |佐藤次郎|名古屋| 1| 1000| +--------+------+------+--------+
⚡ 5. ブロードキャスト結合
5-1. ブロードキャスト結合とは?
小さいテーブルを全ノードに配布してJOINすることで、
シャッフルを回避してパフォーマンスを大幅に向上させる手法です。
通常のJOIN:大きいテーブル同士 → シャッフル発生(重い!)
ブロードキャストJOIN:小さいテーブルを全ノードにコピー → シャッフルなし(速い!)
小さいテーブル(マスターデータなど)を使う場合、10〜100倍高速化することも!
5-2. ブロードキャストJOINの実装
# ブロードキャストJOINの実装 from pyspark.sql.functions import broadcast # 小さいテーブル(商品マスター)をブロードキャスト result = df_orders.join( broadcast(df_products), # 小さいテーブルをbroadcast()で囲む "product_id", "inner" ) print("【ブロードキャストJOIN】") result.show()
【ブロードキャストJOIN】 +----------+--------+-----------+--------+----------+------------+--------+-----+ |product_id|order_id|customer_id|quantity|order_date|product_name|category|price| +----------+--------+-----------+--------+----------+------------+--------+-----+ | P001| O001| C001| 2|2024-01-15| 商品A| 食品| 1000| | P001| O004| C003| 1|2024-01-18| 商品A| 食品| 1000| | P002| O002| C002| 1|2024-01-16| 商品B|電化製品|50000| | P003| O003| C001| 3|2024-01-17| 商品C| 衣料品| 5000| +----------+--------+-----------+--------+----------+------------+--------+-----+
5-3. 実行計画で確認
# 実行計画を確認(BroadcastHashJoinが使われているか) print("【実行計画】") result.explain()
【実行計画】
== Physical Plan ==
*(2) Project [product_id#..., order_id#..., ...]
+- *(2) BroadcastHashJoin [product_id#...], [product_id#...], Inner, BuildRight, false
:- *(2) Filter isnotnull(product_id#...)
: +- *(2) Scan ...
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Scan ...
・小さいテーブル:通常は10MB〜1GB以下
・マスターデータ:商品マスター、顧客マスターなど
・明示的指定を推奨:broadcast()で明示する
・大きすぎるテーブルは×:メモリ不足でエラーになる
・両方大きい場合は×:通常のJOINを使う
📝 練習問題
INNER JOIN
顧客データと注文データをINNER JOINして、顧客名と注文金額を表示してください。
result = df_customers.join(df_orders, "customer_id", "inner") \ .select("name", "order_id", "quantity") result.show()
+--------+--------+--------+ | name|order_id|quantity| +--------+--------+--------+ |田中太郎| O001| 2| |田中太郎| O003| 3| |鈴木花子| O002| 1| |佐藤次郎| O004| 1| +--------+--------+--------+
LEFT JOINで欠損検出
LEFT JOINを使って、注文がない顧客を抽出してください。
result = df_customers.join(df_orders, "customer_id", "left") \ .filter(F.col("order_id").isNull()) \ .select("customer_id", "name", "region") print("注文がない顧客:") result.show()
注文がない顧客: +-----------+--------+------+ |customer_id| name|region| +-----------+--------+------+ | C004|山田美咲| 福岡| +-----------+--------+------+
3テーブル結合
顧客、注文、商品の3テーブルを結合し、注文金額(quantity × price)を計算してください。
result = df_orders \
.join(df_customers, "customer_id", "inner") \
.join(df_products, "product_id", "inner") \
.withColumn("amount", F.col("quantity") * F.col("price")) \
.select("name", "product_name", "quantity", "price", "amount")
result.show()+--------+------------+--------+-----+------+ | name|product_name|quantity|price|amount| +--------+------------+--------+-----+------+ |田中太郎| 商品A| 2| 1000| 2000| |鈴木花子| 商品B| 1|50000| 50000| |田中太郎| 商品C| 3| 5000| 15000| |佐藤次郎| 商品A| 1| 1000| 1000| +--------+------------+--------+-----+------+
ブロードキャストJOIN
注文データと商品データを、ブロードキャストJOINで結合してください。
from pyspark.sql.functions import broadcast result = df_orders.join( broadcast(df_products), # 小さいテーブルをbroadcast "product_id", "inner" ) print("ブロードキャストJOIN:") result.show() # 実行計画を確認 result.explain()
ブロードキャストJOIN: +----------+--------+-----------+--------+----------+------------+--------+-----+ |product_id|order_id|customer_id|quantity|order_date|product_name|category|price| +----------+--------+-----------+--------+----------+------------+--------+-----+ | P001| O001| C001| 2|2024-01-15| 商品A| 食品| 1000| | P001| O004| C003| 1|2024-01-18| 商品A| 食品| 1000| | P002| O002| C002| 1|2024-01-16| 商品B|電化製品|50000| | P003| O003| C001| 3|2024-01-17| 商品C| 衣料品| 5000| +----------+--------+-----------+--------+----------+------------+--------+-----+
❓ よくある質問
"key")か、片方をリネームするか、JOIN後にdrop()で削除してください。最も簡単なのは文字列指定です。
📝 STEP 15 のまとめ
・4種類のJOIN:INNER、LEFT、RIGHT、FULL OUTER
・複数テーブルJOIN:3つ以上のテーブルの結合
・ブロードキャストJOIN:パフォーマンス最適化の切り札
・欠損データ検出:LEFT JOINでNULLを検出
・INNER JOINは両方に存在するデータのみ
・LEFT JOINは左側を全て保持
・小さいテーブルはブロードキャストで高速化
・カラム名重複は文字列指定で回避
次のSTEP 16では、「サブクエリとCTE」を学びます。
複雑なクエリをわかりやすく整理する方法をマスターしましょう!
学習メモ
ビッグデータ処理(Apache Spark) - Step 15