💾 STEP 20: キャッシングと永続化
cache()とpersist()を使いこなして、反復計算を高速化しよう
📋 このステップで学ぶこと
- cache()とpersist()の違い
- ストレージレベルの選択
- いつキャッシュすべきか
- キャッシュの解放方法
📁 0. サンプルデータの準備
このステップでは、キャッシングを活用して反復計算を高速化する方法を学びます。
まず、SparkSessionを初期化し、サンプルデータを準備しましょう。
0-1. SparkSessionの初期化
# SparkSessionの初期化 from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark import StorageLevel import time # SparkSessionを作成 spark = SparkSession.builder \ .appName("Caching") \ .getOrCreate() print("SparkSession準備完了")
SparkSession準備完了
0-2. サンプルデータの作成
# サンプルデータを作成(100万件) data = [(i, f"name_{i}", i % 100, i * 10) for i in range(1000000)] df = spark.createDataFrame(data, ["id", "name", "category", "value"]) print("サンプルデータ(最初の5件):") df.show(5) print(f"データ件数: {df.count()}")
サンプルデータ(最初の5件): +---+------+--------+-----+ | id| name|category|value| +---+------+--------+-----+ | 0|name_0| 0| 0| | 1|name_1| 1| 10| | 2|name_2| 2| 20| | 3|name_3| 3| 30| | 4|name_4| 4| 40| +---+------+--------+-----+ データ件数: 1000000
0-3. 全サンプルデータを一括作成するコード
# ======================================== # STEP 20 サンプルデータ一括作成スクリプト # ======================================== from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark import StorageLevel import time spark = SparkSession.builder.appName("Caching").getOrCreate() # サンプルデータ(100万件) data = [(i, f"name_{i}", i % 100, i * 10) for i in range(1000000)] df = spark.createDataFrame(data, ["id", "name", "category", "value"]) print("✅ サンプルデータを作成しました") print(" - df: 100万件のデータ(id, name, category, value)")
✅ サンプルデータを作成しました - df: 100万件のデータ(id, name, category, value)
💾 1. cache()とpersist()の基本
1-1. キャッシュの仕組み
cache()とpersist()は、計算結果をメモリに保存して、再利用を高速化する機能です。
【キャッシュなし】
df.count() → データを読んで計算(遅い)
df.count() → また最初から計算(遅い)
【キャッシュあり】
df.cache()
df.count() → 計算 + メモリに保存
df.count() → メモリから読む(速い!)
1-2. キャッシュの効果を確認
# フィルター処理 df_filtered = df.filter(F.col("value") > 5000000) print("【キャッシュなし】") start = time.time() count1 = df_filtered.count() print(f"1回目: {count1}件, {time.time() - start:.2f}秒") start = time.time() count2 = df_filtered.count() print(f"2回目: {count2}件, {time.time() - start:.2f}秒")
【キャッシュなし】 1回目: 499999件, 1.23秒 2回目: 499999件, 1.18秒 ← また計算している
print("【キャッシュあり】") # キャッシュを有効化 df_cached = df_filtered.cache() start = time.time() count1 = df_cached.count() print(f"1回目: {count1}件, {time.time() - start:.2f}秒") start = time.time() count2 = df_cached.count() print(f"2回目: {count2}件, {time.time() - start:.2f}秒") # キャッシュ解放 df_cached.unpersist()
【キャッシュあり】 1回目: 499999件, 1.35秒 ← 計算 + キャッシュ保存 2回目: 499999件, 0.08秒 ← キャッシュから読む(超速い!)
1-3. cache() vs persist()
| メソッド | 説明 | ストレージレベル |
|---|---|---|
| cache() | シンプル、メモリのみ | MEMORY_ONLY(固定) |
| persist() | ストレージレベル指定可能 | 選択可能 |
# cache()はpersist()のショートカット df.cache() # = df.persist(StorageLevel.MEMORY_ONLY) # persist()ではストレージレベルを指定可能 df.persist(StorageLevel.MEMORY_ONLY) # メモリのみ df.persist(StorageLevel.MEMORY_AND_DISK) # メモリ優先、溢れたらディスク df.persist(StorageLevel.DISK_ONLY) # ディスクのみ
📊 2. ストレージレベルの選択
2-1. 主要なストレージレベル
| レベル | 特徴 | 使用場面 |
|---|---|---|
| MEMORY_ONLY | メモリのみ、最速 | データがメモリに収まる場合 |
| MEMORY_AND_DISK | メモリ優先、安全 | 一般的な用途(推奨) |
| DISK_ONLY | ディスクのみ | メモリが足りない場合 |
2-2. MEMORY_AND_DISKの使用例
# MEMORY_AND_DISK(推奨) # メモリに収まらなければディスクに保存(安全) df_persisted = df.filter(F.col("value") > 5000000) \ .persist(StorageLevel.MEMORY_AND_DISK) print("【MEMORY_AND_DISK】") start = time.time() count1 = df_persisted.count() print(f"1回目: {count1}件, {time.time() - start:.2f}秒") start = time.time() count2 = df_persisted.count() print(f"2回目: {count2}件, {time.time() - start:.2f}秒") # 解放 df_persisted.unpersist()
【MEMORY_AND_DISK】 1回目: 499999件, 1.42秒 2回目: 499999件, 0.09秒
・通常:cache()で十分
・大きなデータ:persist(MEMORY_AND_DISK)が安全
・メモリ不足:persist(DISK_ONLY)
🎯 3. いつキャッシュすべきか
3-1. キャッシュが有効な場面
・同じDataFrameを2回以上使う
・反復処理(機械学習など)
・重い前処理結果の再利用
・1回しか使わない
・データ量が極めて大きい(メモリ不足)
・シンプルなETL(読む→変換→書く)
3-2. 実践例:複数の集計でキャッシュ活用
# 前処理結果をキャッシュ(何度も使う) df_preprocessed = df \ .filter(F.col("value") > 1000000) \ .withColumn("value_scaled", F.col("value") / 1000) \ .cache() # 複数の集計をキャッシュから実行(全て高速) print("【複数の集計】") # 集計1:件数 start = time.time() count = df_preprocessed.count() print(f"件数: {count}, {time.time() - start:.2f}秒") # 集計2:合計 start = time.time() total = df_preprocessed.agg(F.sum("value")).collect()[0][0] print(f"合計: {total:,.0f}, {time.time() - start:.2f}秒") # 集計3:カテゴリ別 start = time.time() df_preprocessed.groupBy("category").count().show(5) print(f"カテゴリ集計: {time.time() - start:.2f}秒") # 解放 df_preprocessed.unpersist()
【複数の集計】 件数: 899999, 1.45秒 ← 初回(キャッシュ保存) 合計: 4,949,985,000,000, 0.12秒 ← キャッシュから +--------+-----+ |category|count| +--------+-----+ | 31| 9000| | 85| 9000| | 65| 9000| | 53| 9000| | 78| 9000| +--------+-----+ カテゴリ集計: 0.15秒 ← キャッシュから
🧹 4. キャッシュの解放
4-1. unpersist()でメモリ解放
使い終わったキャッシュは必ず解放しましょう。メモリを無駄に消費し続けます。
# キャッシュの解放 df_cached = df.cache() df_cached.count() # キャッシュに保存 # キャッシュ状態を確認 print(f"キャッシュされている: {df_cached.is_cached}") # 解放 df_cached.unpersist() print(f"キャッシュされている: {df_cached.is_cached}")
キャッシュされている: True キャッシュされている: False
・メモリ消費:大量データは注意
・使用後は解放:unpersist()で明示的に
・初回は遅い:保存時間が必要
📝 練習問題
基本的なキャッシュ
DataFrameをキャッシュして、2回countを実行してください。
df_cached = df.filter(F.col("value") > 5000000).cache() # 1回目 count1 = df_cached.count() print(f"1回目: {count1}") # 2回目(高速) count2 = df_cached.count() print(f"2回目: {count2}") # 解放 df_cached.unpersist()
MEMORY_AND_DISKでpersist
DataFrameをMEMORY_AND_DISKレベルで永続化してください。
from pyspark import StorageLevel df_persisted = df.persist(StorageLevel.MEMORY_AND_DISK) # 使用 df_persisted.count() df_persisted.agg(F.sum("value")).show() # 解放 df_persisted.unpersist()
キャッシュ状態の確認
DataFrameがキャッシュされているか確認してください。
df_cached = df.cache() df_cached.count() # キャッシュに保存 # 状態確認 print(f"キャッシュされている: {df_cached.is_cached}") # 解放後 df_cached.unpersist() print(f"キャッシュされている: {df_cached.is_cached}")
複数の集計でキャッシュ活用
前処理結果をキャッシュして、件数・合計・平均の3つを計算してください。
# 前処理をキャッシュ df_preprocessed = df \ .filter(F.col("value") > 1000000) \ .cache() # 複数の集計(全てキャッシュから高速) count = df_preprocessed.count() total = df_preprocessed.agg(F.sum("value")).collect()[0][0] avg = df_preprocessed.agg(F.avg("value")).collect()[0][0] print(f"件数: {count}") print(f"合計: {total:,.0f}") print(f"平均: {avg:,.0f}") # 解放 df_preprocessed.unpersist()
❓ よくある質問
cache()で十分です。大きなデータでメモリ不足が心配な場合はpersist(MEMORY_AND_DISK)を使います。
unpersist()で解放します。メモリを無駄に消費し続けます。
📝 STEP 20 のまとめ
・cache():メモリに保存(シンプル)
・persist():ストレージレベル指定可能
・MEMORY_AND_DISK:安全な選択
・unpersist():使用後は必ず解放
・2回以上使う場合にキャッシュ
・1回だけなら不要
・使用後は解放を忘れずに
次のSTEP 21では、「シャッフル最適化」を学びます。
シャッフルを削減してパフォーマンスを大幅に改善しましょう!
学習メモ
ビッグデータ処理(Apache Spark) - Step 20