STEP 19:パーティショニング戦略

📊 STEP 19: パーティショニング戦略

データの分割方法を最適化して、並列処理を効率化しよう

📋 このステップで学ぶこと

  • パーティションとは何か
  • パーティション数の確認と変更
  • repartition()とcoalesce()の使い分け
  • 最適なパーティション数の決め方

📁 0. サンプルデータの準備

このステップでは、パーティションを理解し、並列処理を最適化する方法を学びます。
まず、SparkSessionを初期化し、サンプルデータを準備しましょう。

0-1. SparkSessionの初期化

# SparkSessionの初期化

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# SparkSessionを作成
spark = SparkSession.builder \
    .appName("Partitioning") \
    .getOrCreate()

print("SparkSession準備完了")
SparkSession準備完了

0-2. サンプルデータの作成

# サンプルデータを作成(10万件)

data = [(i, f"name_{i}", i % 100, i * 10) for i in range(100000)]
df = spark.createDataFrame(data, ["id", "name", "category", "value"])

print("サンプルデータ(最初の5件):")
df.show(5)
print(f"パーティション数: {df.rdd.getNumPartitions()}")
サンプルデータ(最初の5件):
+---+------+--------+-----+
| id|  name|category|value|
+---+------+--------+-----+
|  0|name_0|       0|    0|
|  1|name_1|       1|   10|
|  2|name_2|       2|   20|
|  3|name_3|       3|   30|
|  4|name_4|       4|   40|
+---+------+--------+-----+

パーティション数: 8

0-3. 全サンプルデータを一括作成するコード

# ========================================
# STEP 19 サンプルデータ一括作成スクリプト
# ========================================

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("Partitioning").getOrCreate()

# サンプルデータ(10万件)
data = [(i, f"name_{i}", i % 100, i * 10) for i in range(100000)]
df = spark.createDataFrame(data, ["id", "name", "category", "value"])

print("✅ サンプルデータを作成しました")
print("  - df: 10万件のデータ(id, name, category, value)")
✅ サンプルデータを作成しました
  - df: 10万件のデータ(id, name, category, value)

📦 1. パーティションとは?

1-1. パーティションの概念

パーティションとは、データを分割した単位のことです。
Sparkは各パーティションを並列で処理します。

🎯 パーティションのイメージ

【100万件のデータを4パーティションに分割】

パーティション1: 25万件 → Executor 1で処理
パーティション2: 25万件 → Executor 2で処理
パーティション3: 25万件 → Executor 3で処理
パーティション4: 25万件 → Executor 4で処理

4つ同時に処理できる!

1-2. パーティション数の確認

# パーティション数を確認

print(f"パーティション数: {df.rdd.getNumPartitions()}")

# 各パーティションのデータ件数を確認
partition_counts = df.rdd.mapPartitions(lambda x: [sum(1 for _ in x)]).collect()
print(f"各パーティションの件数: {partition_counts}")
パーティション数: 8
各パーティションの件数: [12500, 12500, 12500, 12500, 12500, 12500, 12500, 12500]
💡 パーティション数が重要な理由

少なすぎる:並列処理が活かせない(遅い)
多すぎる:オーバーヘッドが増える(遅い)
適切な数:CPU数の2〜4倍が目安

🔄 2. repartition()とcoalesce()

2-1. 2つの違い

メソッド 用途 シャッフル 使用場面
repartition(n) パーティション数を増減 あり(重い) 増やす時、均等にしたい時
coalesce(n) パーティション数を減らす なし(軽い) 減らす時のみ

2-2. repartition()の使い方

# パーティション数を変更(増やす or 減らす)

print(f"元のパーティション数: {df.rdd.getNumPartitions()}")

# repartition: 増やす
df_repartitioned = df.repartition(16)
print(f"repartition(16)後: {df_repartitioned.rdd.getNumPartitions()}")

# repartition: 減らす
df_repartitioned2 = df.repartition(4)
print(f"repartition(4)後: {df_repartitioned2.rdd.getNumPartitions()}")
元のパーティション数: 8
repartition(16)後: 16
repartition(4)後: 4

2-3. coalesce()の使い方

# パーティション数を減らす(シャッフルなし、効率的)

print(f"元のパーティション数: {df.rdd.getNumPartitions()}")

# coalesce: 減らす(効率的)
df_coalesced = df.coalesce(4)
print(f"coalesce(4)後: {df_coalesced.rdd.getNumPartitions()}")

# 注意:coalesceで増やすことはできない
df_coalesced2 = df.coalesce(16)
print(f"coalesce(16)後: {df_coalesced2.rdd.getNumPartitions()}")  # 増えない!
元のパーティション数: 8
coalesce(4)後: 4
coalesce(16)後: 8  ← 増えない!元のまま
🎯 使い分けのポイント

・パーティションを減らす時 → coalesce()(軽い)
・パーティションを増やす時 → repartition()
・データを均等に分けたい時 → repartition()

⚙️ 3. 最適なパーティション数の決め方

3-1. 目安となる計算式

📊 パーティション数の目安

パーティション数 = 総CPU数 × 2〜4

例:4コアのマシン
→ 8〜16パーティションが目安

例:クラスター全体で100コア
→ 200〜400パーティションが目安

3-2. パーティション数の影響を確認

import time

# 大きめのデータ
large_data = [(i, i * 10) for i in range(1000000)]
df_large = spark.createDataFrame(large_data, ["id", "value"])

# パーティション数を変えて処理時間を比較
for num_partitions in [1, 4, 8, 16]:
    df_test = df_large.repartition(num_partitions)
    
    start = time.time()
    df_test.groupBy(F.col("id") % 100).count().count()
    end = time.time()
    
    print(f"パーティション数 {num_partitions:2d}: {end - start:.2f}秒")
パーティション数  1: 2.85秒  ← 並列化されない
パーティション数  4: 1.12秒
パーティション数  8: 0.89秒  ← 最適
パーティション数 16: 0.95秒  ← オーバーヘッド増
⚠️ パーティション数の注意点

1パーティション:並列化できず非常に遅い
多すぎ:タスク管理のオーバーヘッドが増える
ファイル出力時:パーティション数 = 出力ファイル数

🎯 4. 実践的なパーティション管理

4-1. ファイル出力時のパーティション

# ファイル出力時、パーティション数 = 出力ファイル数

# 多すぎるパーティションのまま出力すると、小さいファイルが大量にできる
print(f"現在のパーティション数: {df.rdd.getNumPartitions()}")

# 出力前にcoalesceで減らす(ベストプラクティス)
df_output = df.coalesce(4)  # 4ファイルに出力
print(f"出力用パーティション数: {df_output.rdd.getNumPartitions()}")

# Parquetで出力(4ファイルになる)
# df_output.write.parquet("/output/data")
現在のパーティション数: 8
出力用パーティション数: 4

4-2. フィルター後のパーティション調整

# フィルター後、データが減ってパーティションがスカスカになることがある

# 1%のデータだけ残るフィルター
df_filtered = df.filter(F.col("category") == 0)

print(f"フィルター前: {df.count()}件, {df.rdd.getNumPartitions()}パーティション")
print(f"フィルター後: {df_filtered.count()}件, {df_filtered.rdd.getNumPartitions()}パーティション")

# パーティションを減らす(効率化)
df_optimized = df_filtered.coalesce(2)
print(f"最適化後: {df_optimized.count()}件, {df_optimized.rdd.getNumPartitions()}パーティション")
フィルター前: 100000件, 8パーティション
フィルター後: 1000件, 8パーティション  ← パーティションが空いている
最適化後: 1000件, 2パーティション  ← 適切なサイズに
✅ パーティション管理のベストプラクティス

大きなフィルター後coalesce()で減らす
ファイル出力前は適切な数に調整
JOINの前は同じキーでパーティション

📝 練習問題

問題 1 基礎

パーティション数の確認

DataFrameのパーティション数を確認してください。

【解答】
num_partitions = df.rdd.getNumPartitions()
print(f"パーティション数: {num_partitions}")
問題 2 基礎

coalesceでパーティションを減らす

パーティション数を4に減らしてください。

【解答】
# パーティションを減らす時はcoalesce(軽い)
df_coalesced = df.coalesce(4)
print(f"パーティション数: {df_coalesced.rdd.getNumPartitions()}")
問題 3 応用

repartitionでパーティションを増やす

パーティション数を16に増やしてください。

【解答】
# パーティションを増やす時はrepartition
df_repartitioned = df.repartition(16)
print(f"パーティション数: {df_repartitioned.rdd.getNumPartitions()}")
問題 4 実践

フィルター後のパーティション最適化

データを1%にフィルターした後、適切なパーティション数に調整してください。

【解答】
# 1%にフィルター
df_filtered = df.filter(F.col("category") == 0)
print(f"フィルター後: {df_filtered.count()}件")
print(f"パーティション数: {df_filtered.rdd.getNumPartitions()}")

# データ量に応じてパーティションを減らす
df_optimized = df_filtered.coalesce(2)
print(f"最適化後パーティション数: {df_optimized.rdd.getNumPartitions()}")

❓ よくある質問

Q1: repartitionとcoalesceはどちらを使うべき?
減らす時はcoalesce(シャッフルなしで軽い)、増やす時はrepartitionを使います。
Q2: パーティション数の目安は?
CPU数の2〜4倍が目安です。例えば4コアなら8〜16パーティション。
Q3: パーティション数が多すぎるとどうなる?
タスク管理のオーバーヘッドが増えて遅くなります。また、ファイル出力時に小さいファイルが大量にできます。
Q4: パーティション数が少なすぎるとどうなる?
並列処理が活かせず遅くなります。特に1パーティションだと全く並列化されません。

📝 STEP 19 のまとめ

✅ このステップで学んだこと

パーティション:データの分割単位、並列処理の基本
repartition():パーティション数を増減(シャッフルあり)
coalesce():パーティション数を減らす(シャッフルなし、軽い)
最適な数:CPU数の2〜4倍が目安

💡 重要ポイント

減らす時はcoalesce(軽い)
増やす時はrepartition
フィルター後は調整を検討
ファイル出力前は適切な数に

🎯 次のステップの予告

次のSTEP 20では、「キャッシングと永続化」を学びます。
反復計算を高速化するキャッシング戦略をマスターしましょう!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 19

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE