📊 STEP 19: パーティショニング戦略
データの分割方法を最適化して、並列処理を効率化しよう
📋 このステップで学ぶこと
- パーティションとは何か
- パーティション数の確認と変更
- repartition()とcoalesce()の使い分け
- 最適なパーティション数の決め方
📁 0. サンプルデータの準備
このステップでは、パーティションを理解し、並列処理を最適化する方法を学びます。
まず、SparkSessionを初期化し、サンプルデータを準備しましょう。
0-1. SparkSessionの初期化
# SparkSessionの初期化 from pyspark.sql import SparkSession from pyspark.sql import functions as F # SparkSessionを作成 spark = SparkSession.builder \ .appName("Partitioning") \ .getOrCreate() print("SparkSession準備完了")
SparkSession準備完了
0-2. サンプルデータの作成
# サンプルデータを作成(10万件) data = [(i, f"name_{i}", i % 100, i * 10) for i in range(100000)] df = spark.createDataFrame(data, ["id", "name", "category", "value"]) print("サンプルデータ(最初の5件):") df.show(5) print(f"パーティション数: {df.rdd.getNumPartitions()}")
サンプルデータ(最初の5件): +---+------+--------+-----+ | id| name|category|value| +---+------+--------+-----+ | 0|name_0| 0| 0| | 1|name_1| 1| 10| | 2|name_2| 2| 20| | 3|name_3| 3| 30| | 4|name_4| 4| 40| +---+------+--------+-----+ パーティション数: 8
0-3. 全サンプルデータを一括作成するコード
# ======================================== # STEP 19 サンプルデータ一括作成スクリプト # ======================================== from pyspark.sql import SparkSession from pyspark.sql import functions as F spark = SparkSession.builder.appName("Partitioning").getOrCreate() # サンプルデータ(10万件) data = [(i, f"name_{i}", i % 100, i * 10) for i in range(100000)] df = spark.createDataFrame(data, ["id", "name", "category", "value"]) print("✅ サンプルデータを作成しました") print(" - df: 10万件のデータ(id, name, category, value)")
✅ サンプルデータを作成しました - df: 10万件のデータ(id, name, category, value)
📦 1. パーティションとは?
1-1. パーティションの概念
パーティションとは、データを分割した単位のことです。
Sparkは各パーティションを並列で処理します。
【100万件のデータを4パーティションに分割】
パーティション1: 25万件 → Executor 1で処理
パーティション2: 25万件 → Executor 2で処理
パーティション3: 25万件 → Executor 3で処理
パーティション4: 25万件 → Executor 4で処理
→ 4つ同時に処理できる!
1-2. パーティション数の確認
# パーティション数を確認 print(f"パーティション数: {df.rdd.getNumPartitions()}") # 各パーティションのデータ件数を確認 partition_counts = df.rdd.mapPartitions(lambda x: [sum(1 for _ in x)]).collect() print(f"各パーティションの件数: {partition_counts}")
パーティション数: 8 各パーティションの件数: [12500, 12500, 12500, 12500, 12500, 12500, 12500, 12500]
・少なすぎる:並列処理が活かせない(遅い)
・多すぎる:オーバーヘッドが増える(遅い)
・適切な数:CPU数の2〜4倍が目安
🔄 2. repartition()とcoalesce()
2-1. 2つの違い
| メソッド | 用途 | シャッフル | 使用場面 |
|---|---|---|---|
| repartition(n) | パーティション数を増減 | あり(重い) | 増やす時、均等にしたい時 |
| coalesce(n) | パーティション数を減らす | なし(軽い) | 減らす時のみ |
2-2. repartition()の使い方
# パーティション数を変更(増やす or 減らす) print(f"元のパーティション数: {df.rdd.getNumPartitions()}") # repartition: 増やす df_repartitioned = df.repartition(16) print(f"repartition(16)後: {df_repartitioned.rdd.getNumPartitions()}") # repartition: 減らす df_repartitioned2 = df.repartition(4) print(f"repartition(4)後: {df_repartitioned2.rdd.getNumPartitions()}")
元のパーティション数: 8 repartition(16)後: 16 repartition(4)後: 4
2-3. coalesce()の使い方
# パーティション数を減らす(シャッフルなし、効率的) print(f"元のパーティション数: {df.rdd.getNumPartitions()}") # coalesce: 減らす(効率的) df_coalesced = df.coalesce(4) print(f"coalesce(4)後: {df_coalesced.rdd.getNumPartitions()}") # 注意:coalesceで増やすことはできない df_coalesced2 = df.coalesce(16) print(f"coalesce(16)後: {df_coalesced2.rdd.getNumPartitions()}") # 増えない!
元のパーティション数: 8 coalesce(4)後: 4 coalesce(16)後: 8 ← 増えない!元のまま
・パーティションを減らす時 → coalesce()(軽い)
・パーティションを増やす時 → repartition()
・データを均等に分けたい時 → repartition()
⚙️ 3. 最適なパーティション数の決め方
3-1. 目安となる計算式
パーティション数 = 総CPU数 × 2〜4
例:4コアのマシン
→ 8〜16パーティションが目安
例:クラスター全体で100コア
→ 200〜400パーティションが目安
3-2. パーティション数の影響を確認
import time # 大きめのデータ large_data = [(i, i * 10) for i in range(1000000)] df_large = spark.createDataFrame(large_data, ["id", "value"]) # パーティション数を変えて処理時間を比較 for num_partitions in [1, 4, 8, 16]: df_test = df_large.repartition(num_partitions) start = time.time() df_test.groupBy(F.col("id") % 100).count().count() end = time.time() print(f"パーティション数 {num_partitions:2d}: {end - start:.2f}秒")
パーティション数 1: 2.85秒 ← 並列化されない パーティション数 4: 1.12秒 パーティション数 8: 0.89秒 ← 最適 パーティション数 16: 0.95秒 ← オーバーヘッド増
・1パーティション:並列化できず非常に遅い
・多すぎ:タスク管理のオーバーヘッドが増える
・ファイル出力時:パーティション数 = 出力ファイル数
🎯 4. 実践的なパーティション管理
4-1. ファイル出力時のパーティション
# ファイル出力時、パーティション数 = 出力ファイル数 # 多すぎるパーティションのまま出力すると、小さいファイルが大量にできる print(f"現在のパーティション数: {df.rdd.getNumPartitions()}") # 出力前にcoalesceで減らす(ベストプラクティス) df_output = df.coalesce(4) # 4ファイルに出力 print(f"出力用パーティション数: {df_output.rdd.getNumPartitions()}") # Parquetで出力(4ファイルになる) # df_output.write.parquet("/output/data")
現在のパーティション数: 8 出力用パーティション数: 4
4-2. フィルター後のパーティション調整
# フィルター後、データが減ってパーティションがスカスカになることがある # 1%のデータだけ残るフィルター df_filtered = df.filter(F.col("category") == 0) print(f"フィルター前: {df.count()}件, {df.rdd.getNumPartitions()}パーティション") print(f"フィルター後: {df_filtered.count()}件, {df_filtered.rdd.getNumPartitions()}パーティション") # パーティションを減らす(効率化) df_optimized = df_filtered.coalesce(2) print(f"最適化後: {df_optimized.count()}件, {df_optimized.rdd.getNumPartitions()}パーティション")
フィルター前: 100000件, 8パーティション フィルター後: 1000件, 8パーティション ← パーティションが空いている 最適化後: 1000件, 2パーティション ← 適切なサイズに
・大きなフィルター後はcoalesce()で減らす
・ファイル出力前は適切な数に調整
・JOINの前は同じキーでパーティション
📝 練習問題
パーティション数の確認
DataFrameのパーティション数を確認してください。
num_partitions = df.rdd.getNumPartitions() print(f"パーティション数: {num_partitions}")
coalesceでパーティションを減らす
パーティション数を4に減らしてください。
# パーティションを減らす時はcoalesce(軽い) df_coalesced = df.coalesce(4) print(f"パーティション数: {df_coalesced.rdd.getNumPartitions()}")
repartitionでパーティションを増やす
パーティション数を16に増やしてください。
# パーティションを増やす時はrepartition df_repartitioned = df.repartition(16) print(f"パーティション数: {df_repartitioned.rdd.getNumPartitions()}")
フィルター後のパーティション最適化
データを1%にフィルターした後、適切なパーティション数に調整してください。
# 1%にフィルター df_filtered = df.filter(F.col("category") == 0) print(f"フィルター後: {df_filtered.count()}件") print(f"パーティション数: {df_filtered.rdd.getNumPartitions()}") # データ量に応じてパーティションを減らす df_optimized = df_filtered.coalesce(2) print(f"最適化後パーティション数: {df_optimized.rdd.getNumPartitions()}")
❓ よくある質問
📝 STEP 19 のまとめ
・パーティション:データの分割単位、並列処理の基本
・repartition():パーティション数を増減(シャッフルあり)
・coalesce():パーティション数を減らす(シャッフルなし、軽い)
・最適な数:CPU数の2〜4倍が目安
・減らす時はcoalesce(軽い)
・増やす時はrepartition
・フィルター後は調整を検討
・ファイル出力前は適切な数に
次のSTEP 20では、「キャッシングと永続化」を学びます。
反復計算を高速化するキャッシング戦略をマスターしましょう!
学習メモ
ビッグデータ処理(Apache Spark) - Step 19