STEP 15:結合(JOIN)操作

🔗 STEP 15: 結合(JOIN)操作

複数のDataFrameを結合!inner、left、right、outer joinとブロードキャスト結合をマスターしよう

📋 このステップで学ぶこと

  • サンプルデータの準備(顧客、注文、商品データ)
  • 4種類のJOIN(inner、left、right、outer)
  • 複数テーブルの結合
  • ブロードキャスト結合によるパフォーマンス最適化

📁 0. サンプルデータの準備

このステップでは、JOIN(結合)を使って複数のDataFrameを結合する方法を学びます。
まず、SparkSessionを初期化し、顧客・注文・商品のサンプルデータを準備しましょう。

0-1. SparkSessionの初期化

# SparkSessionの初期化

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# SparkSessionを作成
spark = SparkSession.builder \
    .appName("JOIN Example") \
    .getOrCreate()

print("SparkSession準備完了")
SparkSession準備完了

0-2. 顧客データの作成

# 顧客データを作成

# データ:(顧客ID, 名前, 地域)
customers_data = [
    ("C001", "田中太郎", "東京"),
    ("C002", "鈴木花子", "大阪"),
    ("C003", "佐藤次郎", "名古屋"),
    ("C004", "山田美咲", "福岡")  # 注文なしの顧客
]

df_customers = spark.createDataFrame(customers_data, ["customer_id", "name", "region"])

print("顧客データ:")
df_customers.show()
顧客データ:
+-----------+--------+------+
|customer_id|    name|region|
+-----------+--------+------+
|       C001|田中太郎|  東京|
|       C002|鈴木花子|  大阪|
|       C003|佐藤次郎|名古屋|
|       C004|山田美咲|  福岡|
+-----------+--------+------+

0-3. 注文データの作成

# 注文データを作成

# データ:(注文ID, 顧客ID, 商品ID, 数量, 注文日)
orders_data = [
    ("O001", "C001", "P001", 2, "2024-01-15"),
    ("O002", "C002", "P002", 1, "2024-01-16"),
    ("O003", "C001", "P003", 3, "2024-01-17"),
    ("O004", "C003", "P001", 1, "2024-01-18")
    # C004(山田美咲)の注文はない
]

df_orders = spark.createDataFrame(orders_data, ["order_id", "customer_id", "product_id", "quantity", "order_date"])

print("注文データ:")
df_orders.show()
注文データ:
+--------+-----------+----------+--------+----------+
|order_id|customer_id|product_id|quantity|order_date|
+--------+-----------+----------+--------+----------+
|    O001|       C001|      P001|       2|2024-01-15|
|    O002|       C002|      P002|       1|2024-01-16|
|    O003|       C001|      P003|       3|2024-01-17|
|    O004|       C003|      P001|       1|2024-01-18|
+--------+-----------+----------+--------+----------+

0-4. 商品データの作成

# 商品データを作成

# データ:(商品ID, 商品名, カテゴリ, 価格)
products_data = [
    ("P001", "商品A", "食品", 1000),
    ("P002", "商品B", "電化製品", 50000),
    ("P003", "商品C", "衣料品", 5000)
]

df_products = spark.createDataFrame(products_data, ["product_id", "product_name", "category", "price"])

print("商品データ:")
df_products.show()
商品データ:
+----------+------------+--------+-----+
|product_id|product_name|category|price|
+----------+------------+--------+-----+
|      P001|       商品A|    食品| 1000|
|      P002|       商品B|電化製品|50000|
|      P003|       商品C|  衣料品| 5000|
+----------+------------+--------+-----+

0-5. 全サンプルデータを一括作成するコード

# ========================================
# STEP 15 サンプルデータ一括作成スクリプト
# ========================================

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("JOIN Example").getOrCreate()

# 1. 顧客データ
customers_data = [
    ("C001", "田中太郎", "東京"), ("C002", "鈴木花子", "大阪"),
    ("C003", "佐藤次郎", "名古屋"), ("C004", "山田美咲", "福岡")
]
df_customers = spark.createDataFrame(customers_data, ["customer_id", "name", "region"])

# 2. 注文データ
orders_data = [
    ("O001", "C001", "P001", 2, "2024-01-15"), ("O002", "C002", "P002", 1, "2024-01-16"),
    ("O003", "C001", "P003", 3, "2024-01-17"), ("O004", "C003", "P001", 1, "2024-01-18")
]
df_orders = spark.createDataFrame(orders_data, ["order_id", "customer_id", "product_id", "quantity", "order_date"])

# 3. 商品データ
products_data = [
    ("P001", "商品A", "食品", 1000), ("P002", "商品B", "電化製品", 50000),
    ("P003", "商品C", "衣料品", 5000)
]
df_products = spark.createDataFrame(products_data, ["product_id", "product_name", "category", "price"])

print("✅ サンプルデータを作成しました")
print("  - df_customers: 顧客データ(4名)")
print("  - df_orders: 注文データ(4件)")
print("  - df_products: 商品データ(3件)")
✅ サンプルデータを作成しました
  - df_customers: 顧客データ(4名)
  - df_orders: 注文データ(4件)
  - df_products: 商品データ(3件)

🔗 1. JOINの基本概念

1-1. JOINとは?

JOIN(結合)とは、複数のDataFrameを共通のキーで結合して1つにする操作です。
リレーショナルデータベースのJOINと同じ概念です。

# JOINの基本構文

# DataFrameでのJOIN
result = df1.join(df2, 結合条件, 結合タイプ)

# SQLでのJOIN
spark.sql("""
    SELECT * FROM table1
    JOIN table2 ON table1.key = table2.key
""")

1-2. 4種類のJOIN

JOINタイプ 説明 使用場面
INNER JOIN 両方に存在するデータのみ返す 最も一般的
LEFT JOIN 左側のテーブルを全て返す マスターデータを保持
RIGHT JOIN 右側のテーブルを全て返す あまり使わない
FULL OUTER JOIN 両方のテーブルを全て返す 差分チェック

1-3. 視覚的な理解

📊 JOINの動作イメージ

テーブルA       テーブルB
 ID | 名前       ID | 商品
 —+—-       —+—-
 1 | 田中       1 | りんご
 2 | 鈴木       2 | みかん
 3 | 佐藤       4 | ぶどう

【INNER JOIN】両方に存在するIDのみ → ID 1, 2
【LEFT JOIN】左側(A)を全て保持 → ID 1, 2, 3
【RIGHT JOIN】右側(B)を全て保持 → ID 1, 2, 4
【FULL OUTER JOIN】両方を全て保持 → ID 1, 2, 3, 4

🔀 2. INNER JOIN(内部結合)

2-1. INNER JOINの特徴

両方のテーブルに存在するレコードのみを返します。
最も一般的に使われるJOINです。

2-2. DataFrameでINNER JOIN

# DataFrameでINNER JOIN

# 顧客と注文をcustomer_idで結合
result = df_customers.join(
    df_orders,
    "customer_id",  # 同名カラムは文字列で指定(推奨)
    "inner"
)

print("【INNER JOIN - DataFrame】")
result.show()
【INNER JOIN - DataFrame】
+-----------+--------+------+--------+----------+--------+----------+
|customer_id|    name|region|order_id|product_id|quantity|order_date|
+-----------+--------+------+--------+----------+--------+----------+
|       C001|田中太郎|  東京|    O001|      P001|       2|2024-01-15|
|       C001|田中太郎|  東京|    O003|      P003|       3|2024-01-17|
|       C002|鈴木花子|  大阪|    O002|      P002|       1|2024-01-16|
|       C003|佐藤次郎|名古屋|    O004|      P001|       1|2024-01-18|
+-----------+--------+------+--------+----------+--------+----------+
⚠️ 注文がない顧客(C004)は結果に含まれない!

INNER JOINでは、両方に存在するデータのみ返されます。
山田美咲(C004)は注文がないため、結果から除外されます。

2-3. SQLでINNER JOIN

# SQLでINNER JOIN

# テンポラリビューを作成
df_customers.createOrReplaceTempView("customers")
df_orders.createOrReplaceTempView("orders")

print("【INNER JOIN - SQL】")
spark.sql("""
    SELECT 
        c.customer_id,
        c.name,
        c.region,
        o.order_id,
        o.product_id,
        o.quantity
    FROM customers c
    INNER JOIN orders o ON c.customer_id = o.customer_id
""").show()
【INNER JOIN - SQL】
+-----------+--------+------+--------+----------+--------+
|customer_id|    name|region|order_id|product_id|quantity|
+-----------+--------+------+--------+----------+--------+
|       C001|田中太郎|  東京|    O001|      P001|       2|
|       C001|田中太郎|  東京|    O003|      P003|       3|
|       C002|鈴木花子|  大阪|    O002|      P002|       1|
|       C003|佐藤次郎|名古屋|    O004|      P001|       1|
+-----------+--------+------+--------+----------+--------+

⬅️ 3. LEFT JOIN(左外部結合)

3-1. LEFT JOINの特徴

左側のテーブルの全てのレコードを保持し、右側にマッチするデータがない場合はNULLで埋めます。

3-2. DataFrameでLEFT JOIN

# DataFrameでLEFT JOIN

result = df_customers.join(
    df_orders,
    "customer_id",
    "left"  # または "left_outer"
)

print("【LEFT JOIN - DataFrame】")
result.show()
【LEFT JOIN - DataFrame】
+-----------+--------+------+--------+----------+--------+----------+
|customer_id|    name|region|order_id|product_id|quantity|order_date|
+-----------+--------+------+--------+----------+--------+----------+
|       C001|田中太郎|  東京|    O001|      P001|       2|2024-01-15|
|       C001|田中太郎|  東京|    O003|      P003|       3|2024-01-17|
|       C002|鈴木花子|  大阪|    O002|      P002|       1|2024-01-16|
|       C003|佐藤次郎|名古屋|    O004|      P001|       1|2024-01-18|
|       C004|山田美咲|  福岡|    null|      null|    null|      null|
+-----------+--------+------+--------+----------+--------+----------+
💡 注文がない顧客(C004)もNULLとして表示される!

LEFT JOINでは、左側(顧客)を全て保持します。
山田美咲(C004)は注文がないため、注文関連カラムがNULLになります。

3-3. 注文がない顧客の抽出

# LEFT JOINで注文がない顧客を抽出

result = df_customers.join(
    df_orders,
    "customer_id",
    "left"
).filter(F.col("order_id").isNull())  # order_idがNULL = 注文なし

print("【注文がない顧客】")
result.select("customer_id", "name", "region").show()
【注文がない顧客】
+-----------+--------+------+
|customer_id|    name|region|
+-----------+--------+------+
|       C004|山田美咲|  福岡|
+-----------+--------+------+
🎯 LEFT JOINの使いどころ

マスターデータを保持:全顧客、全商品を確実に表示したい
欠損データの検出:注文がない顧客、売上がない商品を発見
0件集計:NULL値を0に変換して集計

🔗 4. 複数テーブルのJOIN

4-1. 3つのテーブルを結合

実務では3つ以上のテーブルを結合することがよくあります。
順番にJOINしていきます。

# 3つのテーブルを結合(顧客 → 注文 → 商品)

result = df_orders \
    .join(df_customers, "customer_id", "inner") \
    .join(df_products, "product_id", "inner") \
    .select(
        "order_id",
        "order_date",
        "name",
        "region",
        "product_name",
        "category",
        "quantity",
        "price",
        (F.col("quantity") * F.col("price")).alias("amount")
    )

print("【3テーブル結合】")
result.show()
【3テーブル結合】
+--------+----------+--------+------+------------+--------+--------+-----+------+
|order_id|order_date|    name|region|product_name|category|quantity|price|amount|
+--------+----------+--------+------+------------+--------+--------+-----+------+
|    O001|2024-01-15|田中太郎|  東京|       商品A|    食品|       2| 1000|  2000|
|    O002|2024-01-16|鈴木花子|  大阪|       商品B|電化製品|       1|50000| 50000|
|    O003|2024-01-17|田中太郎|  東京|       商品C|  衣料品|       3| 5000| 15000|
|    O004|2024-01-18|佐藤次郎|名古屋|       商品A|    食品|       1| 1000|  1000|
+--------+----------+--------+------+------------+--------+--------+-----+------+

4-2. SQLで3テーブル結合

# SQLで3テーブル結合

# テンポラリビューを作成
df_products.createOrReplaceTempView("products")

print("【3テーブル結合 - SQL】")
spark.sql("""
    SELECT 
        o.order_id,
        o.order_date,
        c.name,
        c.region,
        p.product_name,
        p.category,
        o.quantity,
        p.price,
        o.quantity * p.price AS amount
    FROM orders o
    INNER JOIN customers c ON o.customer_id = c.customer_id
    INNER JOIN products p ON o.product_id = p.product_id
    ORDER BY o.order_date
""").show()
【3テーブル結合 - SQL】
+--------+----------+--------+------+------------+--------+--------+-----+------+
|order_id|order_date|    name|region|product_name|category|quantity|price|amount|
+--------+----------+--------+------+------------+--------+--------+-----+------+
|    O001|2024-01-15|田中太郎|  東京|       商品A|    食品|       2| 1000|  2000|
|    O002|2024-01-16|鈴木花子|  大阪|       商品B|電化製品|       1|50000| 50000|
|    O003|2024-01-17|田中太郎|  東京|       商品C|  衣料品|       3| 5000| 15000|
|    O004|2024-01-18|佐藤次郎|名古屋|       商品A|    食品|       1| 1000|  1000|
+--------+----------+--------+------+------------+--------+--------+-----+------+

4-3. 結合後の集計

# 結合後に顧客別集計

customer_summary = result.groupBy("name", "region").agg(
    F.count("order_id").alias("注文数"),
    F.sum("amount").alias("購入金額")
).orderBy(F.desc("購入金額"))

print("【顧客別購入集計】")
customer_summary.show()
【顧客別購入集計】
+--------+------+------+--------+
|    name|region|注文数|購入金額|
+--------+------+------+--------+
|鈴木花子|  大阪|     1|   50000|
|田中太郎|  東京|     2|   17000|
|佐藤次郎|名古屋|     1|    1000|
+--------+------+------+--------+

⚡ 5. ブロードキャスト結合

5-1. ブロードキャスト結合とは?

小さいテーブルを全ノードに配布してJOINすることで、
シャッフルを回避してパフォーマンスを大幅に向上させる手法です。

📊 通常のJOIN vs ブロードキャストJOIN

通常のJOIN:大きいテーブル同士 → シャッフル発生(重い!)
ブロードキャストJOIN:小さいテーブルを全ノードにコピー → シャッフルなし(速い!)

小さいテーブル(マスターデータなど)を使う場合、10〜100倍高速化することも!

5-2. ブロードキャストJOINの実装

# ブロードキャストJOINの実装

from pyspark.sql.functions import broadcast

# 小さいテーブル(商品マスター)をブロードキャスト
result = df_orders.join(
    broadcast(df_products),  # 小さいテーブルをbroadcast()で囲む
    "product_id",
    "inner"
)

print("【ブロードキャストJOIN】")
result.show()
【ブロードキャストJOIN】
+----------+--------+-----------+--------+----------+------------+--------+-----+
|product_id|order_id|customer_id|quantity|order_date|product_name|category|price|
+----------+--------+-----------+--------+----------+------------+--------+-----+
|      P001|    O001|       C001|       2|2024-01-15|       商品A|    食品| 1000|
|      P001|    O004|       C003|       1|2024-01-18|       商品A|    食品| 1000|
|      P002|    O002|       C002|       1|2024-01-16|       商品B|電化製品|50000|
|      P003|    O003|       C001|       3|2024-01-17|       商品C|  衣料品| 5000|
+----------+--------+-----------+--------+----------+------------+--------+-----+

5-3. 実行計画で確認

# 実行計画を確認(BroadcastHashJoinが使われているか)

print("【実行計画】")
result.explain()
【実行計画】
== Physical Plan ==
*(2) Project [product_id#..., order_id#..., ...]
+- *(2) BroadcastHashJoin [product_id#...], [product_id#...], Inner, BuildRight, false
   :- *(2) Filter isnotnull(product_id#...)
   :  +- *(2) Scan ...
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      +- *(1) Scan ...
💡 ブロードキャストJOINの条件

小さいテーブル:通常は10MB〜1GB以下
マスターデータ:商品マスター、顧客マスターなど
明示的指定を推奨broadcast()で明示する

⚠️ ブロードキャストJOINの注意点

大きすぎるテーブルは×:メモリ不足でエラーになる
両方大きい場合は×:通常のJOINを使う

📝 練習問題

問題 1 基礎

INNER JOIN

顧客データと注文データをINNER JOINして、顧客名と注文金額を表示してください。

【解答】
result = df_customers.join(df_orders, "customer_id", "inner") \
    .select("name", "order_id", "quantity")

result.show()
+--------+--------+--------+
|    name|order_id|quantity|
+--------+--------+--------+
|田中太郎|    O001|       2|
|田中太郎|    O003|       3|
|鈴木花子|    O002|       1|
|佐藤次郎|    O004|       1|
+--------+--------+--------+
問題 2 応用

LEFT JOINで欠損検出

LEFT JOINを使って、注文がない顧客を抽出してください。

【解答】
result = df_customers.join(df_orders, "customer_id", "left") \
    .filter(F.col("order_id").isNull()) \
    .select("customer_id", "name", "region")

print("注文がない顧客:")
result.show()
注文がない顧客:
+-----------+--------+------+
|customer_id|    name|region|
+-----------+--------+------+
|       C004|山田美咲|  福岡|
+-----------+--------+------+
問題 3 応用

3テーブル結合

顧客、注文、商品の3テーブルを結合し、注文金額(quantity × price)を計算してください。

【解答】
result = df_orders \
    .join(df_customers, "customer_id", "inner") \
    .join(df_products, "product_id", "inner") \
    .withColumn("amount", F.col("quantity") * F.col("price")) \
    .select("name", "product_name", "quantity", "price", "amount")

result.show()
+--------+------------+--------+-----+------+
|    name|product_name|quantity|price|amount|
+--------+------------+--------+-----+------+
|田中太郎|       商品A|       2| 1000|  2000|
|鈴木花子|       商品B|       1|50000| 50000|
|田中太郎|       商品C|       3| 5000| 15000|
|佐藤次郎|       商品A|       1| 1000|  1000|
+--------+------------+--------+-----+------+
問題 4 実践

ブロードキャストJOIN

注文データと商品データを、ブロードキャストJOINで結合してください。

【解答】
from pyspark.sql.functions import broadcast

result = df_orders.join(
    broadcast(df_products),  # 小さいテーブルをbroadcast
    "product_id",
    "inner"
)

print("ブロードキャストJOIN:")
result.show()

# 実行計画を確認
result.explain()
ブロードキャストJOIN:
+----------+--------+-----------+--------+----------+------------+--------+-----+
|product_id|order_id|customer_id|quantity|order_date|product_name|category|price|
+----------+--------+-----------+--------+----------+------------+--------+-----+
|      P001|    O001|       C001|       2|2024-01-15|       商品A|    食品| 1000|
|      P001|    O004|       C003|       1|2024-01-18|       商品A|    食品| 1000|
|      P002|    O002|       C002|       1|2024-01-16|       商品B|電化製品|50000|
|      P003|    O003|       C001|       3|2024-01-17|       商品C|  衣料品| 5000|
+----------+--------+-----------+--------+----------+------------+--------+-----+

❓ よくある質問

Q1: カラム名が重複してエラーになる場合は?
JOIN条件を文字列で指定する("key")か、片方をリネームするか、JOIN後にdrop()で削除してください。最も簡単なのは文字列指定です。
Q2: LEFT JOINとRIGHT JOINの使い分けは?
実務ではLEFT JOINがほとんどです。「基準となるテーブル(マスター)を左に置く」という慣習があり、これに従うとコードが読みやすくなります。
Q3: ブロードキャストするテーブルの大きさの目安は?
一般的に10MB〜1GB以下が目安です。デフォルトでは10MBまで自動ブロードキャストされます。
Q4: NULL値がある場合、JOINはどうなりますか?
NULLはマッチしません。片方のキーがNULLの場合、INNER JOINの結果に含まれません。

📝 STEP 15 のまとめ

✅ このステップで学んだこと

4種類のJOIN:INNER、LEFT、RIGHT、FULL OUTER
複数テーブルJOIN:3つ以上のテーブルの結合
ブロードキャストJOIN:パフォーマンス最適化の切り札
欠損データ検出:LEFT JOINでNULLを検出

💡 重要ポイント

・INNER JOINは両方に存在するデータのみ
・LEFT JOINは左側を全て保持
・小さいテーブルはブロードキャストで高速化
・カラム名重複は文字列指定で回避

🎯 次のステップの予告

次のSTEP 16では、「サブクエリとCTE」を学びます。
複雑なクエリをわかりやすく整理する方法をマスターしましょう!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 15

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE