STEP 20:キャッシングと永続化

💾 STEP 20: キャッシングと永続化

cache()とpersist()を使いこなして、反復計算を高速化しよう

📋 このステップで学ぶこと

  • cache()とpersist()の違い
  • ストレージレベルの選択
  • いつキャッシュすべきか
  • キャッシュの解放方法

📁 0. サンプルデータの準備

このステップでは、キャッシングを活用して反復計算を高速化する方法を学びます。
まず、SparkSessionを初期化し、サンプルデータを準備しましょう。

0-1. SparkSessionの初期化

# SparkSessionの初期化

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark import StorageLevel
import time

# SparkSessionを作成
spark = SparkSession.builder \
    .appName("Caching") \
    .getOrCreate()

print("SparkSession準備完了")
SparkSession準備完了

0-2. サンプルデータの作成

# サンプルデータを作成(100万件)

data = [(i, f"name_{i}", i % 100, i * 10) for i in range(1000000)]
df = spark.createDataFrame(data, ["id", "name", "category", "value"])

print("サンプルデータ(最初の5件):")
df.show(5)
print(f"データ件数: {df.count()}")
サンプルデータ(最初の5件):
+---+------+--------+-----+
| id|  name|category|value|
+---+------+--------+-----+
|  0|name_0|       0|    0|
|  1|name_1|       1|   10|
|  2|name_2|       2|   20|
|  3|name_3|       3|   30|
|  4|name_4|       4|   40|
+---+------+--------+-----+

データ件数: 1000000

0-3. 全サンプルデータを一括作成するコード

# ========================================
# STEP 20 サンプルデータ一括作成スクリプト
# ========================================

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark import StorageLevel
import time

spark = SparkSession.builder.appName("Caching").getOrCreate()

# サンプルデータ(100万件)
data = [(i, f"name_{i}", i % 100, i * 10) for i in range(1000000)]
df = spark.createDataFrame(data, ["id", "name", "category", "value"])

print("✅ サンプルデータを作成しました")
print("  - df: 100万件のデータ(id, name, category, value)")
✅ サンプルデータを作成しました
  - df: 100万件のデータ(id, name, category, value)

💾 1. cache()とpersist()の基本

1-1. キャッシュの仕組み

cache()persist()は、計算結果をメモリに保存して、再利用を高速化する機能です。

📊 キャッシュの効果

【キャッシュなし】
df.count() → データを読んで計算(遅い)
df.count() → また最初から計算(遅い)

【キャッシュあり】
df.cache()
df.count() → 計算 + メモリに保存
df.count() → メモリから読む(速い!)

1-2. キャッシュの効果を確認

# フィルター処理
df_filtered = df.filter(F.col("value") > 5000000)

print("【キャッシュなし】")
start = time.time()
count1 = df_filtered.count()
print(f"1回目: {count1}件, {time.time() - start:.2f}秒")

start = time.time()
count2 = df_filtered.count()
print(f"2回目: {count2}件, {time.time() - start:.2f}秒")
【キャッシュなし】
1回目: 499999件, 1.23秒
2回目: 499999件, 1.18秒  ← また計算している
print("【キャッシュあり】")

# キャッシュを有効化
df_cached = df_filtered.cache()

start = time.time()
count1 = df_cached.count()
print(f"1回目: {count1}件, {time.time() - start:.2f}秒")

start = time.time()
count2 = df_cached.count()
print(f"2回目: {count2}件, {time.time() - start:.2f}秒")

# キャッシュ解放
df_cached.unpersist()
【キャッシュあり】
1回目: 499999件, 1.35秒  ← 計算 + キャッシュ保存
2回目: 499999件, 0.08秒  ← キャッシュから読む(超速い!)

1-3. cache() vs persist()

メソッド 説明 ストレージレベル
cache() シンプル、メモリのみ MEMORY_ONLY(固定)
persist() ストレージレベル指定可能 選択可能
# cache()はpersist()のショートカット
df.cache()  # = df.persist(StorageLevel.MEMORY_ONLY)

# persist()ではストレージレベルを指定可能
df.persist(StorageLevel.MEMORY_ONLY)       # メモリのみ
df.persist(StorageLevel.MEMORY_AND_DISK)   # メモリ優先、溢れたらディスク
df.persist(StorageLevel.DISK_ONLY)         # ディスクのみ

📊 2. ストレージレベルの選択

2-1. 主要なストレージレベル

レベル 特徴 使用場面
MEMORY_ONLY メモリのみ、最速 データがメモリに収まる場合
MEMORY_AND_DISK メモリ優先、安全 一般的な用途(推奨)
DISK_ONLY ディスクのみ メモリが足りない場合

2-2. MEMORY_AND_DISKの使用例

# MEMORY_AND_DISK(推奨)
# メモリに収まらなければディスクに保存(安全)

df_persisted = df.filter(F.col("value") > 5000000) \
    .persist(StorageLevel.MEMORY_AND_DISK)

print("【MEMORY_AND_DISK】")
start = time.time()
count1 = df_persisted.count()
print(f"1回目: {count1}件, {time.time() - start:.2f}秒")

start = time.time()
count2 = df_persisted.count()
print(f"2回目: {count2}件, {time.time() - start:.2f}秒")

# 解放
df_persisted.unpersist()
【MEMORY_AND_DISK】
1回目: 499999件, 1.42秒
2回目: 499999件, 0.09秒
🎯 ストレージレベルの選び方

通常cache()で十分
大きなデータpersist(MEMORY_AND_DISK)が安全
メモリ不足persist(DISK_ONLY)

🎯 3. いつキャッシュすべきか

3-1. キャッシュが有効な場面

✅ キャッシュすべき場面

・同じDataFrameを2回以上使う
反復処理(機械学習など)
・重い前処理結果の再利用

❌ キャッシュ不要な場面

・1回しか使わない
・データ量が極めて大きい(メモリ不足)
・シンプルなETL(読む→変換→書く)

3-2. 実践例:複数の集計でキャッシュ活用

# 前処理結果をキャッシュ(何度も使う)

df_preprocessed = df \
    .filter(F.col("value") > 1000000) \
    .withColumn("value_scaled", F.col("value") / 1000) \
    .cache()

# 複数の集計をキャッシュから実行(全て高速)
print("【複数の集計】")

# 集計1:件数
start = time.time()
count = df_preprocessed.count()
print(f"件数: {count}, {time.time() - start:.2f}秒")

# 集計2:合計
start = time.time()
total = df_preprocessed.agg(F.sum("value")).collect()[0][0]
print(f"合計: {total:,.0f}, {time.time() - start:.2f}秒")

# 集計3:カテゴリ別
start = time.time()
df_preprocessed.groupBy("category").count().show(5)
print(f"カテゴリ集計: {time.time() - start:.2f}秒")

# 解放
df_preprocessed.unpersist()
【複数の集計】
件数: 899999, 1.45秒  ← 初回(キャッシュ保存)
合計: 4,949,985,000,000, 0.12秒  ← キャッシュから
+--------+-----+
|category|count|
+--------+-----+
|      31| 9000|
|      85| 9000|
|      65| 9000|
|      53| 9000|
|      78| 9000|
+--------+-----+

カテゴリ集計: 0.15秒  ← キャッシュから

🧹 4. キャッシュの解放

4-1. unpersist()でメモリ解放

使い終わったキャッシュは必ず解放しましょう。メモリを無駄に消費し続けます。

# キャッシュの解放

df_cached = df.cache()
df_cached.count()  # キャッシュに保存

# キャッシュ状態を確認
print(f"キャッシュされている: {df_cached.is_cached}")

# 解放
df_cached.unpersist()
print(f"キャッシュされている: {df_cached.is_cached}")
キャッシュされている: True
キャッシュされている: False
⚠️ キャッシュの注意点

メモリ消費:大量データは注意
使用後は解放:unpersist()で明示的に
初回は遅い:保存時間が必要

📝 練習問題

問題 1 基礎

基本的なキャッシュ

DataFrameをキャッシュして、2回countを実行してください。

【解答】
df_cached = df.filter(F.col("value") > 5000000).cache()

# 1回目
count1 = df_cached.count()
print(f"1回目: {count1}")

# 2回目(高速)
count2 = df_cached.count()
print(f"2回目: {count2}")

# 解放
df_cached.unpersist()
問題 2 応用

MEMORY_AND_DISKでpersist

DataFrameをMEMORY_AND_DISKレベルで永続化してください。

【解答】
from pyspark import StorageLevel

df_persisted = df.persist(StorageLevel.MEMORY_AND_DISK)

# 使用
df_persisted.count()
df_persisted.agg(F.sum("value")).show()

# 解放
df_persisted.unpersist()
問題 3 応用

キャッシュ状態の確認

DataFrameがキャッシュされているか確認してください。

【解答】
df_cached = df.cache()
df_cached.count()  # キャッシュに保存

# 状態確認
print(f"キャッシュされている: {df_cached.is_cached}")

# 解放後
df_cached.unpersist()
print(f"キャッシュされている: {df_cached.is_cached}")
問題 4 実践

複数の集計でキャッシュ活用

前処理結果をキャッシュして、件数・合計・平均の3つを計算してください。

【解答】
# 前処理をキャッシュ
df_preprocessed = df \
    .filter(F.col("value") > 1000000) \
    .cache()

# 複数の集計(全てキャッシュから高速)
count = df_preprocessed.count()
total = df_preprocessed.agg(F.sum("value")).collect()[0][0]
avg = df_preprocessed.agg(F.avg("value")).collect()[0][0]

print(f"件数: {count}")
print(f"合計: {total:,.0f}")
print(f"平均: {avg:,.0f}")

# 解放
df_preprocessed.unpersist()

❓ よくある質問

Q1: cache()とpersist()どちらを使うべき?
通常はcache()で十分です。大きなデータでメモリ不足が心配な場合はpersist(MEMORY_AND_DISK)を使います。
Q2: キャッシュはいつ解放すべき?
使い終わったらすぐunpersist()で解放します。メモリを無駄に消費し続けます。
Q3: キャッシュするとなぜ速くなる?
1回目の計算結果をメモリに保存するため、2回目以降は計算なしでメモリから読むだけになります。
Q4: キャッシュしすぎるとどうなる?
メモリ不足になります。OutOfMemoryErrorが発生したり、処理が遅くなったりします。

📝 STEP 20 のまとめ

✅ このステップで学んだこと

cache():メモリに保存(シンプル)
persist():ストレージレベル指定可能
MEMORY_AND_DISK:安全な選択
unpersist():使用後は必ず解放

💡 重要ポイント

2回以上使う場合にキャッシュ
1回だけなら不要
使用後は解放を忘れずに

🎯 次のステップの予告

次のSTEP 21では、「シャッフル最適化」を学びます。
シャッフルを削減してパフォーマンスを大幅に改善しましょう!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 20

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE