STEP 11:集計とグルーピング

📊 STEP 11: 集計とグルーピング

groupBy()とagg()をマスターして、ビジネスインサイトを引き出すデータ集計スキルを習得しよう

📋 このステップで学ぶこと

  • サンプルデータの準備
  • groupBy()によるグループ化の基礎
  • 基本的な集計関数(sum、avg、count、max、min)
  • agg()を使った複数集計の実行
  • pivot()による集計表(クロス集計)の作成
  • 実践的な売上データ分析

📁 0. サンプルデータの準備

このステップでは、売上データを使って集計とグルーピングを学びます。
まず、SparkSessionを初期化し、演習で使用するサンプルデータを準備しましょう。

0-1. SparkSessionの初期化

SparkSessionは、Spark DataFrameを使うための入口です。
getOrCreate()を使うと、既存のセッションがあれば再利用し、なければ新規作成します。

# SparkSessionの初期化

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# SparkSessionを作成(または既存のものを取得)
spark = SparkSession.builder \
    .appName("GroupByPractice") \
    .getOrCreate()

print("SparkSession準備完了")
SparkSession準備完了

0-2. 基本的な売上サンプルデータの作成

groupBy()の基本を学ぶためのシンプルな売上データを作成します。
createDataFrame()でPythonのリストからDataFrameを作成できます。

# 基本的な売上データを作成

# データ:(商品名, カテゴリ, 売上金額)
sales_data = [
    ("商品A", "食品", 1000),
    ("商品B", "食品", 1500),
    ("商品C", "電化製品", 30000),
    ("商品D", "電化製品", 25000),
    ("商品E", "衣料品", 5000),
    ("商品F", "衣料品", 8000)
]

# DataFrameを作成(カラム名を指定)
df_sales = spark.createDataFrame(sales_data, ["product", "category", "sales"])

print("売上データ:")
df_sales.show()
売上データ:
+-------+--------+-----+
|product|category|sales|
+-------+--------+-----+
|  商品A|    食品| 1000|
|  商品B|    食品| 1500|
|  商品C|電化製品|30000|
|  商品D|電化製品|25000|
|  商品E|  衣料品| 5000|
|  商品F|  衣料品| 8000|
+-------+--------+-----+

0-3. 月別・店舗別の売上サンプルデータの作成

より実践的な分析のために、月別・店舗別の売上データも作成します。

# 月別・店舗別の売上データを作成

# データ:(月, 店舗, 売上金額)
monthly_data = [
    ("2024-01", "店舗A", 1000000),
    ("2024-01", "店舗B", 1500000),
    ("2024-01", "店舗C", 800000),
    ("2024-02", "店舗A", 1200000),
    ("2024-02", "店舗B", 1400000),
    ("2024-02", "店舗C", 900000),
    ("2024-03", "店舗A", 1100000),
    ("2024-03", "店舗B", 1600000),
    ("2024-03", "店舗C", 850000)
]

df_monthly = spark.createDataFrame(monthly_data, ["month", "store", "sales"])

print("月別・店舗別売上データ:")
df_monthly.show()
月別・店舗別売上データ:
+-------+-----+-------+
|  month|store|  sales|
+-------+-----+-------+
|2024-01|店舗A|1000000|
|2024-01|店舗B|1500000|
|2024-01|店舗C| 800000|
|2024-02|店舗A|1200000|
|2024-02|店舗B|1400000|
|2024-02|店舗C| 900000|
|2024-03|店舗A|1100000|
|2024-03|店舗B|1600000|
|2024-03|店舗C| 850000|
+-------+-----+-------+

0-4. 全サンプルデータを一括作成するコード

このステップで使用するすべてのサンプルデータを一括で作成できるコードです。

# ========================================
# STEP 11 サンプルデータ一括作成スクリプト
# ========================================

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("GroupByPractice").getOrCreate()

# 1. 基本的な売上データ
sales_data = [
    ("商品A", "食品", 1000), ("商品B", "食品", 1500),
    ("商品C", "電化製品", 30000), ("商品D", "電化製品", 25000),
    ("商品E", "衣料品", 5000), ("商品F", "衣料品", 8000)
]
df_sales = spark.createDataFrame(sales_data, ["product", "category", "sales"])

# 2. 月別・店舗別データ
monthly_data = [
    ("2024-01", "店舗A", 1000000), ("2024-01", "店舗B", 1500000), ("2024-01", "店舗C", 800000),
    ("2024-02", "店舗A", 1200000), ("2024-02", "店舗B", 1400000), ("2024-02", "店舗C", 900000),
    ("2024-03", "店舗A", 1100000), ("2024-03", "店舗B", 1600000), ("2024-03", "店舗C", 850000)
]
df_monthly = spark.createDataFrame(monthly_data, ["month", "store", "sales"])

# 3. 地域別・カテゴリ別データ(pivot用)
region_data = [
    ("東京", "食品", 500000), ("東京", "電化製品", 2000000), ("東京", "衣料品", 800000),
    ("大阪", "食品", 400000), ("大阪", "電化製品", 1800000), ("大阪", "衣料品", 700000),
    ("名古屋", "食品", 300000), ("名古屋", "電化製品", 1500000), ("名古屋", "衣料品", 600000)
]
df_region = spark.createDataFrame(region_data, ["region", "category", "sales"])

print("✅ サンプルデータを作成しました")
print("  - df_sales: 基本売上データ")
print("  - df_monthly: 月別・店舗別データ")
print("  - df_region: 地域別・カテゴリ別データ")
✅ サンプルデータを作成しました
  - df_sales: 基本売上データ
  - df_monthly: 月別・店舗別データ
  - df_region: 地域別・カテゴリ別データ

📌 1. groupBy()の基礎

1-1. groupBy()とは?

groupBy()は、特定のカラムの値が同じレコードをグループ化して、グループごとに集計を行う関数です。
SQLのGROUP BY句と同じ役割を果たします。

🎯 身近な例で理解する

クラスのテストの点数データがあるとします:

・太郎さん(1組):80点
・花子さん(1組):70点
・次郎さん(2組):75点
・美咲さん(2組):85点

「クラスごとの平均点を知りたい」場合、groupBy(“クラス”)で1組と2組にグループ化してから、平均を計算します。これがgroupBy()の基本的な考え方です。

1-2. groupBy()の基本構文

groupBy()は、グループ化したいカラム名を指定し、その後に集計関数を呼び出します。

# groupBy()の基本構文

# 1つのカラムでグループ化
df.groupBy("カラム名").集計関数()

# 複数のカラムでグループ化
df.groupBy("カラム1", "カラム2").集計関数()

# よく使う集計関数
.count()      # レコード数をカウント
.sum("列名")  # 合計
.avg("列名")  # 平均
.max("列名")  # 最大値
.min("列名")  # 最小値

1-3. 実践:カテゴリ別の売上合計

まず、最もシンプルな例として、カテゴリごとの売上合計を計算してみましょう。
sum("sales")は、sales列の値を合計する集計関数です。

# カテゴリ別の売上合計を計算

# groupBy("category"):categoryの値でグループ化
# sum("sales"):各グループのsalesを合計
result = df_sales.groupBy("category").sum("sales")

print("カテゴリ別売上合計:")
result.show()
カテゴリ別売上合計:
+--------+----------+
|category|sum(sales)|
+--------+----------+
|    食品|      2500|
|電化製品|     55000|
|  衣料品|     13000|
+--------+----------+

1-4. カテゴリ別の商品数をカウント

count()を使うと、各グループのレコード数をカウントできます。
これにより、カテゴリごとの商品数がわかります。

# カテゴリ別の商品数をカウント

# count():各グループのレコード数をカウント
result = df_sales.groupBy("category").count()

print("カテゴリ別商品数:")
result.show()
カテゴリ別商品数:
+--------+-----+
|category|count|
+--------+-----+
|    食品|    2|
|電化製品|    2|
|  衣料品|    2|
+--------+-----+

1-5. 複数カラムでグループ化

複数のカラムを指定すると、その組み合わせでグループ化されます。
例えば、「月」と「店舗」の両方でグループ化すると、「2024-01の店舗A」「2024-01の店舗B」のように細分化されます。

# 月と店舗の両方でグループ化

# groupBy("month", "store"):月×店舗の組み合わせでグループ化
result = df_monthly.groupBy("month", "store").sum("sales")

# orderBy():結果をソート(見やすくするため)
result = result.orderBy("month", "store")

print("月別・店舗別売上:")
result.show()
月別・店舗別売上:
+-------+-----+----------+
|  month|store|sum(sales)|
+-------+-----+----------+
|2024-01|店舗A|   1000000|
|2024-01|店舗B|   1500000|
|2024-01|店舗C|    800000|
|2024-02|店舗A|   1200000|
|2024-02|店舗B|   1400000|
|2024-02|店舗C|    900000|
|2024-03|店舗A|   1100000|
|2024-03|店舗B|   1600000|
|2024-03|店舗C|    850000|
+-------+-----+----------+
🎯 groupBy()のポイント

複数カラム指定可能groupBy("col1", "col2")で階層的なグループ化
集計前にfilter可能:グループ化前にデータを絞り込める
NULLも1つのグループ:NULL値も独立したグループとして扱われる
元のDataFrameは変更されない:新しいDataFrameが返される

🔢 2. 基本的な集計関数

2-1. 5つの基本集計関数

Sparkには、データを集計するための基本的な関数が5つあります。
これらはgroupBy()の後で使用するか、agg()の中で使用します。

関数 説明 使用例 NULLの扱い
count() レコード数をカウント F.count(“*”) または F.count(“col”) “*”は含む、列名指定は除外
sum() 数値の合計を計算 F.sum(“sales”) 無視される
avg() 平均値を計算 F.avg(“price”) 無視される
max() 最大値を取得 F.max(“score”) 無視される
min() 最小値を取得 F.min(“age”) 無視される

2-2. 各集計関数の実践例

月別売上データを使って、各集計関数の動作を確認してみましょう。
まず、sum()で月別の売上合計を計算します。

# 月別の売上合計(sum)

result = df_monthly.groupBy("month").sum("sales")
print("月別売上合計:")
result.orderBy("month").show()
月別売上合計:
+-------+----------+
|  month|sum(sales)|
+-------+----------+
|2024-01|   3300000|
|2024-02|   3500000|
|2024-03|   3550000|
+-------+----------+

次に、avg()で店舗別の平均売上を計算します。

# 店舗別の平均売上(avg)

result = df_monthly.groupBy("store").avg("sales")
print("店舗別平均売上:")
result.show()
店舗別平均売上:
+-----+------------------+
|store|        avg(sales)|
+-----+------------------+
|店舗A|         1100000.0|
|店舗B|         1500000.0|
|店舗C|          850000.0|
+-----+------------------+

max()min()で、店舗別の最高・最低売上を取得します。

# 店舗別の最高・最低売上(max, min)

# F.max()とF.min()を使用するためagg()を使う
result = df_monthly.groupBy("store").agg(
    F.max("sales").alias("最高売上"),
    F.min("sales").alias("最低売上")
)
print("店舗別の最高・最低売上:")
result.show()
店舗別の最高・最低売上:
+-----+--------+--------+
|store|最高売上|最低売上|
+-----+--------+--------+
|店舗A| 1200000| 1000000|
|店舗B| 1600000| 1400000|
|店舗C|  900000|  800000|
+-----+--------+--------+

2-3. countの違い:count(“*”) vs count(“列名”)

count()には重要な違いがあります。
count("*")はNULLを含めた全レコード数、count("列名")はNULL以外のレコード数をカウントします。

# countの違いを確認するためNULLを含むデータを作成

data_with_null = [
    ("A", 100),
    ("A", None),  # NULLを含む
    ("A", 200),
    ("B", 150),
    ("B", None)   # NULLを含む
]
df_null = spark.createDataFrame(data_with_null, ["group", "value"])

# count("*"):NULLを含めた全件数
# count("value"):NULL以外の件数
result = df_null.groupBy("group").agg(
    F.count("*").alias("全件数"),
    F.count("value").alias("NULL以外")
)
result.show()
+-----+------+--------+
|group|全件数|NULL以外|
+-----+------+--------+
|    A|     3|       2|
|    B|     2|       1|
+-----+------+--------+
💡 count()の使い分け

count("*"):レコード数を正確にカウントしたい場合
count("列名"):欠損値を除いた有効なデータ数を知りたい場合
countDistinct("列名"):ユニークな値の数を知りたい場合

🎯 3. agg()を使った複数集計

3-1. agg()とは?

agg()関数を使うと、複数の集計を同時に実行できます。
また、alias()を使って集計結果にわかりやすい名前を付けることができます。

# agg()の基本構文

df.groupBy("column").agg(
    F.sum("col1").alias("合計"),      # 合計を計算
    F.avg("col2").alias("平均"),      # 平均を計算
    F.max("col3").alias("最大")       # 最大値を取得
)

3-2. 実践:月別の詳細統計

月別売上データから、複数の統計値を一度に計算してみましょう。
alias()を使って、日本語のわかりやすい列名を付けます。

# 月別の詳細統計を計算

result = df_monthly.groupBy("month").agg(
    # count("*"):レコード数(店舗数)
    F.count("*").alias("店舗数"),
    
    # sum("sales"):売上合計
    F.sum("sales").alias("売上合計"),
    
    # avg("sales"):平均売上
    F.avg("sales").alias("平均売上"),
    
    # max("sales"):最高売上
    F.max("sales").alias("最高売上"),
    
    # min("sales"):最低売上
    F.min("sales").alias("最低売上")
)

print("月別詳細統計:")
result.orderBy("month").show()
月別詳細統計:
+-------+------+--------+------------------+--------+--------+
|  month|店舗数|売上合計|          平均売上|最高売上|最低売上|
+-------+------+--------+------------------+--------+--------+
|2024-01|     3| 3300000|         1100000.0| 1500000|  800000|
|2024-02|     3| 3500000|1166666.6666666667| 1400000|  900000|
|2024-03|     3| 3550000|1183333.3333333333| 1600000|  850000|
+-------+------+--------+------------------+--------+--------+

3-3. 計算式を含む集計

agg()の中では、集計関数同士の計算式も記述できます。
例えば、「売上合計 ÷ 件数」で平均を計算することもできます。

# 計算式を含む集計

# 商品データに数量を追加
product_data = [
    ("食品", "商品A", 100, 1000),
    ("食品", "商品B", 150, 1500),
    ("電化製品", "商品C", 10, 50000),
    ("電化製品", "商品D", 5, 80000)
]
df_product = spark.createDataFrame(product_data, 
    ["category", "product", "quantity", "sales"])

# カテゴリ別の統計と単価計算
result = df_product.groupBy("category").agg(
    F.sum("quantity").alias("総数量"),
    F.sum("sales").alias("総売上"),
    
    # 計算式:総売上 ÷ 総数量 = 平均単価
    (F.sum("sales") / F.sum("quantity")).alias("平均単価")
)

print("カテゴリ別統計(単価計算付き):")
result.show()
カテゴリ別統計(単価計算付き):
+--------+------+------+--------+
|category|総数量|総売上|平均単価|
+--------+------+------+--------+
|    食品|   250|  2500|    10.0|
|電化製品|    15|130000|  8666.7|
+--------+------+------+--------+

3-4. 条件付き集計(when()との組み合わせ)

when()と組み合わせると、条件に合うものだけを集計できます。
例えば、「100万円以上の売上件数」のようなカウントが可能です。

# 条件付き集計

result = df_monthly.groupBy("store").agg(
    # 全件数
    F.count("*").alias("全月数"),
    
    # 条件付きカウント:100万円以上の月数
    # when(条件, 1):条件に合えば1、otherwise(0):合わなければ0
    # sum()で合計すると条件に合った件数になる
    F.sum(F.when(F.col("sales") >= 1000000, 1).otherwise(0)).alias("100万以上"),
    
    # 条件付き合計:150万円以上の売上合計
    F.sum(F.when(F.col("sales") >= 1500000, F.col("sales")).otherwise(0)).alias("150万以上売上"),
    
    # 売上合計
    F.sum("sales").alias("売上合計")
)

print("条件付き集計結果:")
result.show()
条件付き集計結果:
+-----+------+----------+------------+--------+
|store|全月数|100万以上|150万以上売上|売上合計|
+-----+------+----------+------------+--------+
|店舗A|     3|         3|           0| 3300000|
|店舗B|     3|         3|     4500000| 4500000|
|店舗C|     3|         0|           0| 2550000|
+-----+------+----------+------------+--------+
💡 agg()のベストプラクティス

必ずalias()を使う:わかりやすい列名を付けると、後の処理が楽になる
計算式も記述可能:集計関数同士の演算もできる
条件付き集計:when()と組み合わせて柔軟な集計
パフォーマンス:1回のagg()で複数集計する方が効率的

📋 4. pivot()による集計表作成

4-1. pivot()とは?

pivot()は、特定のカラムの値を列として展開し、クロス集計表(ピボットテーブル)を作成する関数です。
Excelのピボットテーブルと同じような表が作れます。

📊 pivot()のイメージ

元のデータ(縦長):

店舗A, 1月, 100万円
店舗A, 2月, 120万円
店舗B, 1月, 150万円
店舗B, 2月, 140万円

pivot()後(横長):

店舗 | 1月  | 2月
店舗A | 100万円 | 120万円
店舗B | 150万円 | 140万円

4-2. pivot()の基本構文

# pivot()の基本構文

# 基本形
df.groupBy("行にしたいカラム") \
  .pivot("列にしたいカラム") \
  .agg(集計関数)

# 値を指定する形(パフォーマンス向上)
df.groupBy("行カラム") \
  .pivot("列カラム", ["値1", "値2", "値3"]) \
  .agg(集計関数)

4-3. 実践:店舗×月のクロス集計

店舗を行、月を列にしたクロス集計表を作成します。
pivot("month")で月の値を列として展開します。

# 店舗×月のクロス集計

# groupBy("store"):店舗を行にする
# pivot("month"):月を列にする
# sum("sales"):各セルに売上合計を表示
pivot_result = df_monthly.groupBy("store") \
    .pivot("month") \
    .sum("sales")

print("店舗×月のクロス集計:")
pivot_result.show()
店舗×月のクロス集計:
+-----+-------+-------+-------+
|store|2024-01|2024-02|2024-03|
+-----+-------+-------+-------+
|店舗A|1000000|1200000|1100000|
|店舗B|1500000|1400000|1600000|
|店舗C| 800000| 900000| 850000|
+-----+-------+-------+-------+

4-4. 値を指定してパフォーマンス向上

pivot()に値のリストを渡すと、Sparkが事前に列数を把握できるため、パフォーマンスが大幅に向上します。
特にデータ量が多い場合は、必ず値を指定しましょう。

# 値を指定してpivot(パフォーマンス向上)

# pivot()の第2引数に値のリストを渡す
pivot_result = df_monthly.groupBy("store") \
    .pivot("month", ["2024-01", "2024-02", "2024-03"]) \
    .agg(F.sum("sales"))

print("値を指定したpivot:")
pivot_result.show()
値を指定したpivot:
+-----+-------+-------+-------+
|store|2024-01|2024-02|2024-03|
+-----+-------+-------+-------+
|店舗A|1000000|1200000|1100000|
|店舗B|1500000|1400000|1600000|
|店舗C| 800000| 900000| 850000|
+-----+-------+-------+-------+

4-5. 合計列を追加する

pivot()の結果に、各行の合計を追加することもできます。
withColumn()で新しい列を作成し、各月の値を足し合わせます。

# 合計列を追加

pivot_result = df_monthly.groupBy("store") \
    .pivot("month", ["2024-01", "2024-02", "2024-03"]) \
    .agg(F.sum("sales"))

# withColumn()で合計列を追加
# F.col("列名")でカラムを参照
pivot_with_total = pivot_result.withColumn(
    "合計",
    F.col("2024-01") + F.col("2024-02") + F.col("2024-03")
)

print("合計列付きクロス集計:")
pivot_with_total.show()
合計列付きクロス集計:
+-----+-------+-------+-------+-------+
|store|2024-01|2024-02|2024-03|   合計|
+-----+-------+-------+-------+-------+
|店舗A|1000000|1200000|1100000|3300000|
|店舗B|1500000|1400000|1600000|4500000|
|店舗C| 800000| 900000| 850000|2550000|
+-----+-------+-------+-------+-------+

4-6. 地域×カテゴリのクロス集計

セクション0で作成した地域別データを使って、地域×カテゴリのクロス集計を作成します。

# 地域×カテゴリのクロス集計

pivot_result = df_region.groupBy("region") \
    .pivot("category", ["食品", "電化製品", "衣料品"]) \
    .agg(F.sum("sales"))

# 合計列を追加
pivot_with_total = pivot_result.withColumn(
    "合計",
    F.col("食品") + F.col("電化製品") + F.col("衣料品")
)

print("地域×カテゴリ売上:")
pivot_with_total.show()
地域×カテゴリ売上:
+------+------+--------+------+-------+
|region|  食品|電化製品|衣料品|   合計|
+------+------+--------+------+-------+
|  東京|500000| 2000000|800000|3300000|
|  大阪|400000| 1800000|700000|2900000|
|名古屋|300000| 1500000|600000|2400000|
+------+------+--------+------+-------+
⚠️ pivot()使用時の注意点

パフォーマンス:ユニーク値が多いと(数千以上)メモリを大量消費
値の指定:可能な限り.pivot("col", [値リスト])で値を指定
NULL処理:NULLも1つの列として扱われる
列名:pivot後の列名は元の値がそのまま使われる

💼 5. 実践演習:ECサイトの売上分析

ここまで学んだgroupBy()、agg()、pivot()を組み合わせて、ECサイトの売上データを多角的に分析してみましょう。

5-1. ECサイトのサンプルデータを作成

# ECサイトの注文データを作成

order_data = [
    ("2024-01-15", "店舗A", "食品", 50, 5000, "東京"),
    ("2024-01-15", "店舗A", "電化製品", 2, 80000, "東京"),
    ("2024-01-16", "店舗B", "衣料品", 10, 30000, "大阪"),
    ("2024-01-16", "店舗B", "食品", 30, 3000, "大阪"),
    ("2024-01-17", "店舗C", "電化製品", 5, 200000, "名古屋"),
    ("2024-01-17", "店舗A", "衣料品", 15, 45000, "東京"),
    ("2024-01-18", "店舗B", "食品", 40, 4000, "大阪"),
    ("2024-01-18", "店舗C", "電化製品", 3, 120000, "名古屋"),
    ("2024-01-19", "店舗A", "食品", 60, 6000, "東京"),
    ("2024-01-19", "店舗B", "衣料品", 20, 60000, "大阪")
]

df_order = spark.createDataFrame(order_data, 
    ["date", "store", "category", "quantity", "sales", "region"])

# 日付を型変換
df_order = df_order.withColumn("date", F.to_date("date", "yyyy-MM-dd"))

print("ECサイト注文データ:")
df_order.show()
ECサイト注文データ:
+----------+-----+--------+--------+------+------+
|      date|store|category|quantity| sales|region|
+----------+-----+--------+--------+------+------+
|2024-01-15|店舗A|    食品|      50|  5000|  東京|
|2024-01-15|店舗A|電化製品|       2| 80000|  東京|
|2024-01-16|店舗B|  衣料品|      10| 30000|  大阪|
|2024-01-16|店舗B|    食品|      30|  3000|  大阪|
|2024-01-17|店舗C|電化製品|       5|200000|名古屋|
|2024-01-17|店舗A|  衣料品|      15| 45000|  東京|
|2024-01-18|店舗B|    食品|      40|  4000|  大阪|
|2024-01-18|店舗C|電化製品|       3|120000|名古屋|
|2024-01-19|店舗A|    食品|      60|  6000|  東京|
|2024-01-19|店舗B|  衣料品|      20| 60000|  大阪|
+----------+-----+--------+--------+------+------+

5-2. 分析1:店舗別パフォーマンス

# 店舗別の総合パフォーマンス分析

store_performance = df_order.groupBy("store", "region").agg(
    F.count("*").alias("取引件数"),
    F.sum("quantity").alias("総販売数"),
    F.sum("sales").alias("総売上"),
    F.avg("sales").alias("平均売上"),
    (F.sum("sales") / F.sum("quantity")).alias("平均単価")
).orderBy(F.desc("総売上"))

print("【分析1】店舗別パフォーマンス:")
store_performance.show(truncate=False)
【分析1】店舗別パフォーマンス:
+-----+------+--------+--------+------+------------------+------------------+
|store|region|取引件数|総販売数|総売上|          平均売上|          平均単価|
+-----+------+--------+--------+------+------------------+------------------+
|店舗C|名古屋|       2|       8|320000|          160000.0|           40000.0|
|店舗A|  東京|       4|     127|136000|           34000.0|1070.8661417322834|
|店舗B|  大阪|       4|     100| 97000|           24250.0|             970.0|
+-----+------+--------+--------+------+------------------+------------------+

5-3. 分析2:地域×カテゴリのクロス集計

# 地域×カテゴリのクロス集計

region_category = df_order.groupBy("region") \
    .pivot("category", ["食品", "電化製品", "衣料品"]) \
    .agg(F.sum("sales"))

# NULLを0に置換して合計を計算
region_category = region_category.fillna(0)
region_category = region_category.withColumn(
    "合計",
    F.col("食品") + F.col("電化製品") + F.col("衣料品")
)

print("【分析2】地域×カテゴリ売上:")
region_category.show()
【分析2】地域×カテゴリ売上:
+------+-----+--------+------+------+
|region| 食品|電化製品|衣料品|  合計|
+------+-----+--------+------+------+
|  東京|11000|   80000| 45000|136000|
|  大阪| 7000|       0| 90000| 97000|
|名古屋|    0|  320000|     0|320000|
+------+-----+--------+------+------+
💡 実務での集計分析のポイント

目的を明確に:何を知りたいのかを最初に決める
複数の角度から分析:時系列、カテゴリ、地域など多角的に
比較を入れる:絶対値だけでなく、比率や前期比も
異常値に注目:最大値・最小値から問題を発見
わかりやすく整理:ソートや列名の工夫で見やすく

📝 練習問題

問題 1 基礎

基本的なgroupBy集計

次のデータから、部門ごとの社員数と平均給与を計算してください。

data = [
    ("営業部", "田中", 300000),
    ("営業部", "鈴木", 350000),
    ("営業部", "佐藤", 320000),
    ("技術部", "山田", 400000),
    ("技術部", "伊藤", 450000),
    ("管理部", "渡辺", 380000)
]
df = spark.createDataFrame(data, ["department", "name", "salary"])
【解答】
from pyspark.sql import functions as F

result = df.groupBy("department").agg(
    F.count("*").alias("社員数"),
    F.avg("salary").alias("平均給与")
)

result.show()
+----------+------+------------------+
|department|社員数|          平均給与|
+----------+------+------------------+
|    営業部|     3|323333.33333333337|
|    技術部|     2|          425000.0|
|    管理部|     1|          380000.0|
+----------+------+------------------+
問題 2 応用

複数の統計値を同時に計算

次のテストの点数データから、科目ごとの最高点、最低点、平均点、受験者数を計算してください。

data = [
    ("数学", 85), ("数学", 92), ("数学", 78),
    ("英語", 88), ("英語", 95), ("英語", 82),
    ("国語", 75), ("国語", 80), ("国語", 90)
]
df = spark.createDataFrame(data, ["subject", "score"])
【解答】
result = df.groupBy("subject").agg(
    F.max("score").alias("最高点"),
    F.min("score").alias("最低点"),
    F.avg("score").alias("平均点"),
    F.count("*").alias("受験者数")
)

result.show()
+-------+------+------+------------------+--------+
|subject|最高点|最低点|            平均点|受験者数|
+-------+------+------+------------------+--------+
|   数学|    92|    78|              85.0|       3|
|   英語|    95|    82|88.333333333333333|       3|
|   国語|    90|    75|81.666666666666667|       3|
+-------+------+------+------------------+--------+
問題 3 応用

条件付き集計

次の売上データから、店舗ごとに「10万円以上の取引件数」と「10万円未満の取引件数」を計算してください。

data = [
    ("店舗A", 50000), ("店舗A", 150000), ("店舗A", 80000),
    ("店舗B", 120000), ("店舗B", 90000), ("店舗B", 200000)
]
df = spark.createDataFrame(data, ["store", "sales"])
【解答】
result = df.groupBy("store").agg(
    F.sum(F.when(F.col("sales") >= 100000, 1).otherwise(0)).alias("10万円以上"),
    F.sum(F.when(F.col("sales") < 100000, 1).otherwise(0)).alias("10万円未満"),
    F.sum("sales").alias("売上合計")
)

result.show()
+-----+----------+----------+--------+
|store|10万円以上|10万円未満|売上合計|
+-----+----------+----------+--------+
|店舗A|         1|         2|  280000|
|店舗B|         2|         1|  410000|
+-----+----------+----------+--------+
問題 4 実践

pivot()を使ったクロス集計

次のデータを使って、地域×カテゴリの売上クロス集計表を作成し、各地域の合計も計算してください。

data = [
    ("東京", "食品", 100000),
    ("東京", "電化製品", 500000),
    ("東京", "衣料品", 200000),
    ("大阪", "食品", 80000),
    ("大阪", "電化製品", 400000),
    ("大阪", "衣料品", 150000)
]
df = spark.createDataFrame(data, ["region", "category", "sales"])
【解答】
# pivot()でクロス集計
pivot_result = df.groupBy("region") \
    .pivot("category", ["食品", "電化製品", "衣料品"]) \
    .agg(F.sum("sales"))

# 各地域の合計を追加
pivot_with_total = pivot_result.withColumn("合計",
    F.col("食品") + F.col("電化製品") + F.col("衣料品")
)

pivot_with_total.show()
+------+------+--------+------+------+
|region|  食品|電化製品|衣料品|  合計|
+------+------+--------+------+------+
|  東京|100000|  500000|200000|800000|
|  大阪| 80000|  400000|150000|630000|
+------+------+--------+------+------+

❓ よくある質問

Q1: groupBy()後にfilter()はできますか?
はい、できます。集計後の結果に対してfilter()を使えます。
例:df.groupBy("category").agg(F.sum("sales").alias("total")).filter(F.col("total") > 100000)
SQLのHAVING句に相当します。
Q2: PandasのgroupByとの違いは?
基本的な考え方は同じですが、Sparkは分散処理を行うため、大量データでも高速です。また、Sparkでは.agg()を明示的に使う必要があります。Pandasは.groupby().sum()のように直接メソッドを呼べますが、Sparkは.groupBy().agg(F.sum())と書きます。
Q3: pivot()のパフォーマンスが悪い場合は?
pivot()する列の値を明示してください:.pivot("column", [値1, 値2, ...])。これにより、Sparkは事前に列数を把握でき、パフォーマンスが大幅に向上します。ユニーク値が数千以上ある場合は、pivot()の使用を避け、通常のgroupBy()で処理することを検討してください。
Q4: NULLを含むデータをgroupBy()するとどうなりますか?
NULLも1つのグループとして扱われます。NULLを除外したい場合は、df.filter(F.col("category").isNotNull()).groupBy("category")...のように事前にフィルタしてください。

📝 STEP 11 のまとめ

✅ このステップで学んだこと

groupBy():特定のカラムでグループ化して集計
基本集計関数:count、sum、avg、max、minの使い方
agg():複数の集計を同時に実行し、alias()で列名を付ける
条件付き集計:when()と組み合わせて柔軟な集計
pivot():クロス集計表(ピボットテーブル)の作成

💡 重要ポイント

・agg()では必ずalias()で列名を指定する
・pivot()は値を明示してパフォーマンスを向上させる
・複数の集計を1回のagg()にまとめると効率的
・集計は複数の角度から行うことでインサイトが得られる

🎯 次のステップの予告

次のSTEP 12では、「ウィンドウ関数」を学びます。
グループ内での順位付けや累積計算など、さらに高度な分析手法をマスターしましょう!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 11

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE