📊 STEP 11: 集計とグルーピング
groupBy()とagg()をマスターして、ビジネスインサイトを引き出すデータ集計スキルを習得しよう
📋 このステップで学ぶこと
- サンプルデータの準備
- groupBy()によるグループ化の基礎
- 基本的な集計関数(sum、avg、count、max、min)
- agg()を使った複数集計の実行
- pivot()による集計表(クロス集計)の作成
- 実践的な売上データ分析
📁 0. サンプルデータの準備
このステップでは、売上データを使って集計とグルーピングを学びます。
まず、SparkSessionを初期化し、演習で使用するサンプルデータを準備しましょう。
0-1. SparkSessionの初期化
SparkSessionは、Spark DataFrameを使うための入口です。
getOrCreate()を使うと、既存のセッションがあれば再利用し、なければ新規作成します。
# SparkSessionの初期化 from pyspark.sql import SparkSession from pyspark.sql import functions as F # SparkSessionを作成(または既存のものを取得) spark = SparkSession.builder \ .appName("GroupByPractice") \ .getOrCreate() print("SparkSession準備完了")
SparkSession準備完了
0-2. 基本的な売上サンプルデータの作成
groupBy()の基本を学ぶためのシンプルな売上データを作成します。
createDataFrame()でPythonのリストからDataFrameを作成できます。
# 基本的な売上データを作成 # データ:(商品名, カテゴリ, 売上金額) sales_data = [ ("商品A", "食品", 1000), ("商品B", "食品", 1500), ("商品C", "電化製品", 30000), ("商品D", "電化製品", 25000), ("商品E", "衣料品", 5000), ("商品F", "衣料品", 8000) ] # DataFrameを作成(カラム名を指定) df_sales = spark.createDataFrame(sales_data, ["product", "category", "sales"]) print("売上データ:") df_sales.show()
売上データ: +-------+--------+-----+ |product|category|sales| +-------+--------+-----+ | 商品A| 食品| 1000| | 商品B| 食品| 1500| | 商品C|電化製品|30000| | 商品D|電化製品|25000| | 商品E| 衣料品| 5000| | 商品F| 衣料品| 8000| +-------+--------+-----+
0-3. 月別・店舗別の売上サンプルデータの作成
より実践的な分析のために、月別・店舗別の売上データも作成します。
# 月別・店舗別の売上データを作成 # データ:(月, 店舗, 売上金額) monthly_data = [ ("2024-01", "店舗A", 1000000), ("2024-01", "店舗B", 1500000), ("2024-01", "店舗C", 800000), ("2024-02", "店舗A", 1200000), ("2024-02", "店舗B", 1400000), ("2024-02", "店舗C", 900000), ("2024-03", "店舗A", 1100000), ("2024-03", "店舗B", 1600000), ("2024-03", "店舗C", 850000) ] df_monthly = spark.createDataFrame(monthly_data, ["month", "store", "sales"]) print("月別・店舗別売上データ:") df_monthly.show()
月別・店舗別売上データ: +-------+-----+-------+ | month|store| sales| +-------+-----+-------+ |2024-01|店舗A|1000000| |2024-01|店舗B|1500000| |2024-01|店舗C| 800000| |2024-02|店舗A|1200000| |2024-02|店舗B|1400000| |2024-02|店舗C| 900000| |2024-03|店舗A|1100000| |2024-03|店舗B|1600000| |2024-03|店舗C| 850000| +-------+-----+-------+
0-4. 全サンプルデータを一括作成するコード
このステップで使用するすべてのサンプルデータを一括で作成できるコードです。
# ======================================== # STEP 11 サンプルデータ一括作成スクリプト # ======================================== from pyspark.sql import SparkSession from pyspark.sql import functions as F spark = SparkSession.builder.appName("GroupByPractice").getOrCreate() # 1. 基本的な売上データ sales_data = [ ("商品A", "食品", 1000), ("商品B", "食品", 1500), ("商品C", "電化製品", 30000), ("商品D", "電化製品", 25000), ("商品E", "衣料品", 5000), ("商品F", "衣料品", 8000) ] df_sales = spark.createDataFrame(sales_data, ["product", "category", "sales"]) # 2. 月別・店舗別データ monthly_data = [ ("2024-01", "店舗A", 1000000), ("2024-01", "店舗B", 1500000), ("2024-01", "店舗C", 800000), ("2024-02", "店舗A", 1200000), ("2024-02", "店舗B", 1400000), ("2024-02", "店舗C", 900000), ("2024-03", "店舗A", 1100000), ("2024-03", "店舗B", 1600000), ("2024-03", "店舗C", 850000) ] df_monthly = spark.createDataFrame(monthly_data, ["month", "store", "sales"]) # 3. 地域別・カテゴリ別データ(pivot用) region_data = [ ("東京", "食品", 500000), ("東京", "電化製品", 2000000), ("東京", "衣料品", 800000), ("大阪", "食品", 400000), ("大阪", "電化製品", 1800000), ("大阪", "衣料品", 700000), ("名古屋", "食品", 300000), ("名古屋", "電化製品", 1500000), ("名古屋", "衣料品", 600000) ] df_region = spark.createDataFrame(region_data, ["region", "category", "sales"]) print("✅ サンプルデータを作成しました") print(" - df_sales: 基本売上データ") print(" - df_monthly: 月別・店舗別データ") print(" - df_region: 地域別・カテゴリ別データ")
✅ サンプルデータを作成しました - df_sales: 基本売上データ - df_monthly: 月別・店舗別データ - df_region: 地域別・カテゴリ別データ
📌 1. groupBy()の基礎
1-1. groupBy()とは?
groupBy()は、特定のカラムの値が同じレコードをグループ化して、グループごとに集計を行う関数です。
SQLのGROUP BY句と同じ役割を果たします。
クラスのテストの点数データがあるとします:
・太郎さん(1組):80点
・花子さん(1組):70点
・次郎さん(2組):75点
・美咲さん(2組):85点
「クラスごとの平均点を知りたい」場合、groupBy(“クラス”)で1組と2組にグループ化してから、平均を計算します。これがgroupBy()の基本的な考え方です。
1-2. groupBy()の基本構文
groupBy()は、グループ化したいカラム名を指定し、その後に集計関数を呼び出します。
# groupBy()の基本構文 # 1つのカラムでグループ化 df.groupBy("カラム名").集計関数() # 複数のカラムでグループ化 df.groupBy("カラム1", "カラム2").集計関数() # よく使う集計関数 .count() # レコード数をカウント .sum("列名") # 合計 .avg("列名") # 平均 .max("列名") # 最大値 .min("列名") # 最小値
1-3. 実践:カテゴリ別の売上合計
まず、最もシンプルな例として、カテゴリごとの売上合計を計算してみましょう。
sum("sales")は、sales列の値を合計する集計関数です。
# カテゴリ別の売上合計を計算 # groupBy("category"):categoryの値でグループ化 # sum("sales"):各グループのsalesを合計 result = df_sales.groupBy("category").sum("sales") print("カテゴリ別売上合計:") result.show()
カテゴリ別売上合計: +--------+----------+ |category|sum(sales)| +--------+----------+ | 食品| 2500| |電化製品| 55000| | 衣料品| 13000| +--------+----------+
1-4. カテゴリ別の商品数をカウント
count()を使うと、各グループのレコード数をカウントできます。
これにより、カテゴリごとの商品数がわかります。
# カテゴリ別の商品数をカウント # count():各グループのレコード数をカウント result = df_sales.groupBy("category").count() print("カテゴリ別商品数:") result.show()
カテゴリ別商品数: +--------+-----+ |category|count| +--------+-----+ | 食品| 2| |電化製品| 2| | 衣料品| 2| +--------+-----+
1-5. 複数カラムでグループ化
複数のカラムを指定すると、その組み合わせでグループ化されます。
例えば、「月」と「店舗」の両方でグループ化すると、「2024-01の店舗A」「2024-01の店舗B」のように細分化されます。
# 月と店舗の両方でグループ化 # groupBy("month", "store"):月×店舗の組み合わせでグループ化 result = df_monthly.groupBy("month", "store").sum("sales") # orderBy():結果をソート(見やすくするため) result = result.orderBy("month", "store") print("月別・店舗別売上:") result.show()
月別・店舗別売上: +-------+-----+----------+ | month|store|sum(sales)| +-------+-----+----------+ |2024-01|店舗A| 1000000| |2024-01|店舗B| 1500000| |2024-01|店舗C| 800000| |2024-02|店舗A| 1200000| |2024-02|店舗B| 1400000| |2024-02|店舗C| 900000| |2024-03|店舗A| 1100000| |2024-03|店舗B| 1600000| |2024-03|店舗C| 850000| +-------+-----+----------+
・複数カラム指定可能:groupBy("col1", "col2")で階層的なグループ化
・集計前にfilter可能:グループ化前にデータを絞り込める
・NULLも1つのグループ:NULL値も独立したグループとして扱われる
・元のDataFrameは変更されない:新しいDataFrameが返される
🔢 2. 基本的な集計関数
2-1. 5つの基本集計関数
Sparkには、データを集計するための基本的な関数が5つあります。
これらはgroupBy()の後で使用するか、agg()の中で使用します。
| 関数 | 説明 | 使用例 | NULLの扱い |
|---|---|---|---|
| count() | レコード数をカウント | F.count(“*”) または F.count(“col”) | “*”は含む、列名指定は除外 |
| sum() | 数値の合計を計算 | F.sum(“sales”) | 無視される |
| avg() | 平均値を計算 | F.avg(“price”) | 無視される |
| max() | 最大値を取得 | F.max(“score”) | 無視される |
| min() | 最小値を取得 | F.min(“age”) | 無視される |
2-2. 各集計関数の実践例
月別売上データを使って、各集計関数の動作を確認してみましょう。
まず、sum()で月別の売上合計を計算します。
# 月別の売上合計(sum) result = df_monthly.groupBy("month").sum("sales") print("月別売上合計:") result.orderBy("month").show()
月別売上合計: +-------+----------+ | month|sum(sales)| +-------+----------+ |2024-01| 3300000| |2024-02| 3500000| |2024-03| 3550000| +-------+----------+
次に、avg()で店舗別の平均売上を計算します。
# 店舗別の平均売上(avg) result = df_monthly.groupBy("store").avg("sales") print("店舗別平均売上:") result.show()
店舗別平均売上: +-----+------------------+ |store| avg(sales)| +-----+------------------+ |店舗A| 1100000.0| |店舗B| 1500000.0| |店舗C| 850000.0| +-----+------------------+
max()とmin()で、店舗別の最高・最低売上を取得します。
# 店舗別の最高・最低売上(max, min) # F.max()とF.min()を使用するためagg()を使う result = df_monthly.groupBy("store").agg( F.max("sales").alias("最高売上"), F.min("sales").alias("最低売上") ) print("店舗別の最高・最低売上:") result.show()
店舗別の最高・最低売上: +-----+--------+--------+ |store|最高売上|最低売上| +-----+--------+--------+ |店舗A| 1200000| 1000000| |店舗B| 1600000| 1400000| |店舗C| 900000| 800000| +-----+--------+--------+
2-3. countの違い:count(“*”) vs count(“列名”)
count()には重要な違いがあります。
count("*")はNULLを含めた全レコード数、count("列名")はNULL以外のレコード数をカウントします。
# countの違いを確認するためNULLを含むデータを作成 data_with_null = [ ("A", 100), ("A", None), # NULLを含む ("A", 200), ("B", 150), ("B", None) # NULLを含む ] df_null = spark.createDataFrame(data_with_null, ["group", "value"]) # count("*"):NULLを含めた全件数 # count("value"):NULL以外の件数 result = df_null.groupBy("group").agg( F.count("*").alias("全件数"), F.count("value").alias("NULL以外") ) result.show()
+-----+------+--------+ |group|全件数|NULL以外| +-----+------+--------+ | A| 3| 2| | B| 2| 1| +-----+------+--------+
・count("*"):レコード数を正確にカウントしたい場合
・count("列名"):欠損値を除いた有効なデータ数を知りたい場合
・countDistinct("列名"):ユニークな値の数を知りたい場合
🎯 3. agg()を使った複数集計
3-1. agg()とは?
agg()関数を使うと、複数の集計を同時に実行できます。
また、alias()を使って集計結果にわかりやすい名前を付けることができます。
# agg()の基本構文 df.groupBy("column").agg( F.sum("col1").alias("合計"), # 合計を計算 F.avg("col2").alias("平均"), # 平均を計算 F.max("col3").alias("最大") # 最大値を取得 )
3-2. 実践:月別の詳細統計
月別売上データから、複数の統計値を一度に計算してみましょう。
alias()を使って、日本語のわかりやすい列名を付けます。
# 月別の詳細統計を計算 result = df_monthly.groupBy("month").agg( # count("*"):レコード数(店舗数) F.count("*").alias("店舗数"), # sum("sales"):売上合計 F.sum("sales").alias("売上合計"), # avg("sales"):平均売上 F.avg("sales").alias("平均売上"), # max("sales"):最高売上 F.max("sales").alias("最高売上"), # min("sales"):最低売上 F.min("sales").alias("最低売上") ) print("月別詳細統計:") result.orderBy("month").show()
月別詳細統計: +-------+------+--------+------------------+--------+--------+ | month|店舗数|売上合計| 平均売上|最高売上|最低売上| +-------+------+--------+------------------+--------+--------+ |2024-01| 3| 3300000| 1100000.0| 1500000| 800000| |2024-02| 3| 3500000|1166666.6666666667| 1400000| 900000| |2024-03| 3| 3550000|1183333.3333333333| 1600000| 850000| +-------+------+--------+------------------+--------+--------+
3-3. 計算式を含む集計
agg()の中では、集計関数同士の計算式も記述できます。
例えば、「売上合計 ÷ 件数」で平均を計算することもできます。
# 計算式を含む集計 # 商品データに数量を追加 product_data = [ ("食品", "商品A", 100, 1000), ("食品", "商品B", 150, 1500), ("電化製品", "商品C", 10, 50000), ("電化製品", "商品D", 5, 80000) ] df_product = spark.createDataFrame(product_data, ["category", "product", "quantity", "sales"]) # カテゴリ別の統計と単価計算 result = df_product.groupBy("category").agg( F.sum("quantity").alias("総数量"), F.sum("sales").alias("総売上"), # 計算式:総売上 ÷ 総数量 = 平均単価 (F.sum("sales") / F.sum("quantity")).alias("平均単価") ) print("カテゴリ別統計(単価計算付き):") result.show()
カテゴリ別統計(単価計算付き): +--------+------+------+--------+ |category|総数量|総売上|平均単価| +--------+------+------+--------+ | 食品| 250| 2500| 10.0| |電化製品| 15|130000| 8666.7| +--------+------+------+--------+
3-4. 条件付き集計(when()との組み合わせ)
when()と組み合わせると、条件に合うものだけを集計できます。
例えば、「100万円以上の売上件数」のようなカウントが可能です。
# 条件付き集計 result = df_monthly.groupBy("store").agg( # 全件数 F.count("*").alias("全月数"), # 条件付きカウント:100万円以上の月数 # when(条件, 1):条件に合えば1、otherwise(0):合わなければ0 # sum()で合計すると条件に合った件数になる F.sum(F.when(F.col("sales") >= 1000000, 1).otherwise(0)).alias("100万以上"), # 条件付き合計:150万円以上の売上合計 F.sum(F.when(F.col("sales") >= 1500000, F.col("sales")).otherwise(0)).alias("150万以上売上"), # 売上合計 F.sum("sales").alias("売上合計") ) print("条件付き集計結果:") result.show()
条件付き集計結果: +-----+------+----------+------------+--------+ |store|全月数|100万以上|150万以上売上|売上合計| +-----+------+----------+------------+--------+ |店舗A| 3| 3| 0| 3300000| |店舗B| 3| 3| 4500000| 4500000| |店舗C| 3| 0| 0| 2550000| +-----+------+----------+------------+--------+
・必ずalias()を使う:わかりやすい列名を付けると、後の処理が楽になる
・計算式も記述可能:集計関数同士の演算もできる
・条件付き集計:when()と組み合わせて柔軟な集計
・パフォーマンス:1回のagg()で複数集計する方が効率的
📋 4. pivot()による集計表作成
4-1. pivot()とは?
pivot()は、特定のカラムの値を列として展開し、クロス集計表(ピボットテーブル)を作成する関数です。
Excelのピボットテーブルと同じような表が作れます。
元のデータ(縦長):
店舗A, 1月, 100万円
店舗A, 2月, 120万円
店舗B, 1月, 150万円
店舗B, 2月, 140万円
pivot()後(横長):
店舗 | 1月 | 2月
店舗A | 100万円 | 120万円
店舗B | 150万円 | 140万円
4-2. pivot()の基本構文
# pivot()の基本構文 # 基本形 df.groupBy("行にしたいカラム") \ .pivot("列にしたいカラム") \ .agg(集計関数) # 値を指定する形(パフォーマンス向上) df.groupBy("行カラム") \ .pivot("列カラム", ["値1", "値2", "値3"]) \ .agg(集計関数)
4-3. 実践:店舗×月のクロス集計
店舗を行、月を列にしたクロス集計表を作成します。
pivot("month")で月の値を列として展開します。
# 店舗×月のクロス集計 # groupBy("store"):店舗を行にする # pivot("month"):月を列にする # sum("sales"):各セルに売上合計を表示 pivot_result = df_monthly.groupBy("store") \ .pivot("month") \ .sum("sales") print("店舗×月のクロス集計:") pivot_result.show()
店舗×月のクロス集計: +-----+-------+-------+-------+ |store|2024-01|2024-02|2024-03| +-----+-------+-------+-------+ |店舗A|1000000|1200000|1100000| |店舗B|1500000|1400000|1600000| |店舗C| 800000| 900000| 850000| +-----+-------+-------+-------+
4-4. 値を指定してパフォーマンス向上
pivot()に値のリストを渡すと、Sparkが事前に列数を把握できるため、パフォーマンスが大幅に向上します。
特にデータ量が多い場合は、必ず値を指定しましょう。
# 値を指定してpivot(パフォーマンス向上) # pivot()の第2引数に値のリストを渡す pivot_result = df_monthly.groupBy("store") \ .pivot("month", ["2024-01", "2024-02", "2024-03"]) \ .agg(F.sum("sales")) print("値を指定したpivot:") pivot_result.show()
値を指定したpivot: +-----+-------+-------+-------+ |store|2024-01|2024-02|2024-03| +-----+-------+-------+-------+ |店舗A|1000000|1200000|1100000| |店舗B|1500000|1400000|1600000| |店舗C| 800000| 900000| 850000| +-----+-------+-------+-------+
4-5. 合計列を追加する
pivot()の結果に、各行の合計を追加することもできます。
withColumn()で新しい列を作成し、各月の値を足し合わせます。
# 合計列を追加 pivot_result = df_monthly.groupBy("store") \ .pivot("month", ["2024-01", "2024-02", "2024-03"]) \ .agg(F.sum("sales")) # withColumn()で合計列を追加 # F.col("列名")でカラムを参照 pivot_with_total = pivot_result.withColumn( "合計", F.col("2024-01") + F.col("2024-02") + F.col("2024-03") ) print("合計列付きクロス集計:") pivot_with_total.show()
合計列付きクロス集計: +-----+-------+-------+-------+-------+ |store|2024-01|2024-02|2024-03| 合計| +-----+-------+-------+-------+-------+ |店舗A|1000000|1200000|1100000|3300000| |店舗B|1500000|1400000|1600000|4500000| |店舗C| 800000| 900000| 850000|2550000| +-----+-------+-------+-------+-------+
4-6. 地域×カテゴリのクロス集計
セクション0で作成した地域別データを使って、地域×カテゴリのクロス集計を作成します。
# 地域×カテゴリのクロス集計 pivot_result = df_region.groupBy("region") \ .pivot("category", ["食品", "電化製品", "衣料品"]) \ .agg(F.sum("sales")) # 合計列を追加 pivot_with_total = pivot_result.withColumn( "合計", F.col("食品") + F.col("電化製品") + F.col("衣料品") ) print("地域×カテゴリ売上:") pivot_with_total.show()
地域×カテゴリ売上: +------+------+--------+------+-------+ |region| 食品|電化製品|衣料品| 合計| +------+------+--------+------+-------+ | 東京|500000| 2000000|800000|3300000| | 大阪|400000| 1800000|700000|2900000| |名古屋|300000| 1500000|600000|2400000| +------+------+--------+------+-------+
・パフォーマンス:ユニーク値が多いと(数千以上)メモリを大量消費
・値の指定:可能な限り.pivot("col", [値リスト])で値を指定
・NULL処理:NULLも1つの列として扱われる
・列名:pivot後の列名は元の値がそのまま使われる
💼 5. 実践演習:ECサイトの売上分析
ここまで学んだgroupBy()、agg()、pivot()を組み合わせて、ECサイトの売上データを多角的に分析してみましょう。
5-1. ECサイトのサンプルデータを作成
# ECサイトの注文データを作成 order_data = [ ("2024-01-15", "店舗A", "食品", 50, 5000, "東京"), ("2024-01-15", "店舗A", "電化製品", 2, 80000, "東京"), ("2024-01-16", "店舗B", "衣料品", 10, 30000, "大阪"), ("2024-01-16", "店舗B", "食品", 30, 3000, "大阪"), ("2024-01-17", "店舗C", "電化製品", 5, 200000, "名古屋"), ("2024-01-17", "店舗A", "衣料品", 15, 45000, "東京"), ("2024-01-18", "店舗B", "食品", 40, 4000, "大阪"), ("2024-01-18", "店舗C", "電化製品", 3, 120000, "名古屋"), ("2024-01-19", "店舗A", "食品", 60, 6000, "東京"), ("2024-01-19", "店舗B", "衣料品", 20, 60000, "大阪") ] df_order = spark.createDataFrame(order_data, ["date", "store", "category", "quantity", "sales", "region"]) # 日付を型変換 df_order = df_order.withColumn("date", F.to_date("date", "yyyy-MM-dd")) print("ECサイト注文データ:") df_order.show()
ECサイト注文データ: +----------+-----+--------+--------+------+------+ | date|store|category|quantity| sales|region| +----------+-----+--------+--------+------+------+ |2024-01-15|店舗A| 食品| 50| 5000| 東京| |2024-01-15|店舗A|電化製品| 2| 80000| 東京| |2024-01-16|店舗B| 衣料品| 10| 30000| 大阪| |2024-01-16|店舗B| 食品| 30| 3000| 大阪| |2024-01-17|店舗C|電化製品| 5|200000|名古屋| |2024-01-17|店舗A| 衣料品| 15| 45000| 東京| |2024-01-18|店舗B| 食品| 40| 4000| 大阪| |2024-01-18|店舗C|電化製品| 3|120000|名古屋| |2024-01-19|店舗A| 食品| 60| 6000| 東京| |2024-01-19|店舗B| 衣料品| 20| 60000| 大阪| +----------+-----+--------+--------+------+------+
5-2. 分析1:店舗別パフォーマンス
# 店舗別の総合パフォーマンス分析 store_performance = df_order.groupBy("store", "region").agg( F.count("*").alias("取引件数"), F.sum("quantity").alias("総販売数"), F.sum("sales").alias("総売上"), F.avg("sales").alias("平均売上"), (F.sum("sales") / F.sum("quantity")).alias("平均単価") ).orderBy(F.desc("総売上")) print("【分析1】店舗別パフォーマンス:") store_performance.show(truncate=False)
【分析1】店舗別パフォーマンス: +-----+------+--------+--------+------+------------------+------------------+ |store|region|取引件数|総販売数|総売上| 平均売上| 平均単価| +-----+------+--------+--------+------+------------------+------------------+ |店舗C|名古屋| 2| 8|320000| 160000.0| 40000.0| |店舗A| 東京| 4| 127|136000| 34000.0|1070.8661417322834| |店舗B| 大阪| 4| 100| 97000| 24250.0| 970.0| +-----+------+--------+--------+------+------------------+------------------+
5-3. 分析2:地域×カテゴリのクロス集計
# 地域×カテゴリのクロス集計 region_category = df_order.groupBy("region") \ .pivot("category", ["食品", "電化製品", "衣料品"]) \ .agg(F.sum("sales")) # NULLを0に置換して合計を計算 region_category = region_category.fillna(0) region_category = region_category.withColumn( "合計", F.col("食品") + F.col("電化製品") + F.col("衣料品") ) print("【分析2】地域×カテゴリ売上:") region_category.show()
【分析2】地域×カテゴリ売上: +------+-----+--------+------+------+ |region| 食品|電化製品|衣料品| 合計| +------+-----+--------+------+------+ | 東京|11000| 80000| 45000|136000| | 大阪| 7000| 0| 90000| 97000| |名古屋| 0| 320000| 0|320000| +------+-----+--------+------+------+
・目的を明確に:何を知りたいのかを最初に決める
・複数の角度から分析:時系列、カテゴリ、地域など多角的に
・比較を入れる:絶対値だけでなく、比率や前期比も
・異常値に注目:最大値・最小値から問題を発見
・わかりやすく整理:ソートや列名の工夫で見やすく
📝 練習問題
基本的なgroupBy集計
次のデータから、部門ごとの社員数と平均給与を計算してください。
data = [
("営業部", "田中", 300000),
("営業部", "鈴木", 350000),
("営業部", "佐藤", 320000),
("技術部", "山田", 400000),
("技術部", "伊藤", 450000),
("管理部", "渡辺", 380000)
]
df = spark.createDataFrame(data, ["department", "name", "salary"])from pyspark.sql import functions as F result = df.groupBy("department").agg( F.count("*").alias("社員数"), F.avg("salary").alias("平均給与") ) result.show()
+----------+------+------------------+ |department|社員数| 平均給与| +----------+------+------------------+ | 営業部| 3|323333.33333333337| | 技術部| 2| 425000.0| | 管理部| 1| 380000.0| +----------+------+------------------+
複数の統計値を同時に計算
次のテストの点数データから、科目ごとの最高点、最低点、平均点、受験者数を計算してください。
data = [
("数学", 85), ("数学", 92), ("数学", 78),
("英語", 88), ("英語", 95), ("英語", 82),
("国語", 75), ("国語", 80), ("国語", 90)
]
df = spark.createDataFrame(data, ["subject", "score"])result = df.groupBy("subject").agg( F.max("score").alias("最高点"), F.min("score").alias("最低点"), F.avg("score").alias("平均点"), F.count("*").alias("受験者数") ) result.show()
+-------+------+------+------------------+--------+ |subject|最高点|最低点| 平均点|受験者数| +-------+------+------+------------------+--------+ | 数学| 92| 78| 85.0| 3| | 英語| 95| 82|88.333333333333333| 3| | 国語| 90| 75|81.666666666666667| 3| +-------+------+------+------------------+--------+
条件付き集計
次の売上データから、店舗ごとに「10万円以上の取引件数」と「10万円未満の取引件数」を計算してください。
data = [
("店舗A", 50000), ("店舗A", 150000), ("店舗A", 80000),
("店舗B", 120000), ("店舗B", 90000), ("店舗B", 200000)
]
df = spark.createDataFrame(data, ["store", "sales"])result = df.groupBy("store").agg( F.sum(F.when(F.col("sales") >= 100000, 1).otherwise(0)).alias("10万円以上"), F.sum(F.when(F.col("sales") < 100000, 1).otherwise(0)).alias("10万円未満"), F.sum("sales").alias("売上合計") ) result.show()
+-----+----------+----------+--------+ |store|10万円以上|10万円未満|売上合計| +-----+----------+----------+--------+ |店舗A| 1| 2| 280000| |店舗B| 2| 1| 410000| +-----+----------+----------+--------+
pivot()を使ったクロス集計
次のデータを使って、地域×カテゴリの売上クロス集計表を作成し、各地域の合計も計算してください。
data = [
("東京", "食品", 100000),
("東京", "電化製品", 500000),
("東京", "衣料品", 200000),
("大阪", "食品", 80000),
("大阪", "電化製品", 400000),
("大阪", "衣料品", 150000)
]
df = spark.createDataFrame(data, ["region", "category", "sales"])# pivot()でクロス集計 pivot_result = df.groupBy("region") \ .pivot("category", ["食品", "電化製品", "衣料品"]) \ .agg(F.sum("sales")) # 各地域の合計を追加 pivot_with_total = pivot_result.withColumn("合計", F.col("食品") + F.col("電化製品") + F.col("衣料品") ) pivot_with_total.show()
+------+------+--------+------+------+ |region| 食品|電化製品|衣料品| 合計| +------+------+--------+------+------+ | 東京|100000| 500000|200000|800000| | 大阪| 80000| 400000|150000|630000| +------+------+--------+------+------+
❓ よくある質問
例:
df.groupBy("category").agg(F.sum("sales").alias("total")).filter(F.col("total") > 100000)SQLの
HAVING句に相当します。
.agg()を明示的に使う必要があります。Pandasは.groupby().sum()のように直接メソッドを呼べますが、Sparkは.groupBy().agg(F.sum())と書きます。
.pivot("column", [値1, 値2, ...])。これにより、Sparkは事前に列数を把握でき、パフォーマンスが大幅に向上します。ユニーク値が数千以上ある場合は、pivot()の使用を避け、通常のgroupBy()で処理することを検討してください。
df.filter(F.col("category").isNotNull()).groupBy("category")...のように事前にフィルタしてください。
📝 STEP 11 のまとめ
・groupBy():特定のカラムでグループ化して集計
・基本集計関数:count、sum、avg、max、minの使い方
・agg():複数の集計を同時に実行し、alias()で列名を付ける
・条件付き集計:when()と組み合わせて柔軟な集計
・pivot():クロス集計表(ピボットテーブル)の作成
・agg()では必ずalias()で列名を指定する
・pivot()は値を明示してパフォーマンスを向上させる
・複数の集計を1回のagg()にまとめると効率的
・集計は複数の角度から行うことでインサイトが得られる
次のSTEP 12では、「ウィンドウ関数」を学びます。
グループ内での順位付けや累積計算など、さらに高度な分析手法をマスターしましょう!
学習メモ
ビッグデータ処理(Apache Spark) - Step 11