STEP 18:Lazy Evaluationの理解

⚡ STEP 18: Lazy Evaluationの理解

Sparkの遅延評価を理解して、効率的なデータ処理をマスターしよう

📋 このステップで学ぶこと

  • Lazy Evaluation(遅延評価)とは何か
  • TransformationとActionの違い
  • 実行計画の確認方法(explain())
  • キャッシュの重要性と使い方

📁 0. サンプルデータの準備

このステップでは、Lazy Evaluation(遅延評価)の仕組みを理解し、パフォーマンス最適化の基礎を学びます。
まず、SparkSessionを初期化し、サンプルデータを準備しましょう。

0-1. SparkSessionの初期化

# SparkSessionの初期化

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import time

# SparkSessionを作成
spark = SparkSession.builder \
    .appName("Lazy Evaluation") \
    .getOrCreate()

print("SparkSession準備完了")
SparkSession準備完了

0-2. サンプルデータの作成

# サンプルデータを作成(10万件)

data = [(i, f"name_{i}", i % 100, i * 10) for i in range(100000)]
df = spark.createDataFrame(data, ["id", "name", "category", "value"])

print("サンプルデータ(最初の5件):")
df.show(5)
サンプルデータ(最初の5件):
+---+------+--------+-----+
| id|  name|category|value|
+---+------+--------+-----+
|  0|name_0|       0|    0|
|  1|name_1|       1|   10|
|  2|name_2|       2|   20|
|  3|name_3|       3|   30|
|  4|name_4|       4|   40|
+---+------+--------+-----+

0-3. 全サンプルデータを一括作成するコード

# ========================================
# STEP 18 サンプルデータ一括作成スクリプト
# ========================================

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import time

spark = SparkSession.builder.appName("Lazy Evaluation").getOrCreate()

# サンプルデータ(10万件)
data = [(i, f"name_{i}", i % 100, i * 10) for i in range(100000)]
df = spark.createDataFrame(data, ["id", "name", "category", "value"])

print("✅ サンプルデータを作成しました")
print("  - df: 10万件のデータ(id, name, category, value)")
✅ サンプルデータを作成しました
  - df: 10万件のデータ(id, name, category, value)

🔄 1. Lazy Evaluationとは?

1-1. 遅延評価の概念

Lazy Evaluation(遅延評価)とは、処理を実行するタイミングを遅らせる仕組みです。
Sparkでは、必要になるまで実際の計算を行いません

🎯 Sparkの遅延評価

df = spark.read.csv(“data.csv”)   # 実行されない(準備)
df = df.filter(F.col(“age”) > 20)  # 実行されない(準備)
df = df.select(“name”, “age”)    # 実行されない(準備)
df.show()               # ここで初めて実行!

show()が呼ばれるまで、実際のデータ処理は行われません。
Sparkは全体の処理を見てから最適化します。

1-2. Lazy Evaluationの実演

# Transformationは即座に実行されない

print("【Transformationのみ(実行されない)】")
start = time.time()

# これらは全てTransformation → 実行されない
df_filtered = df.filter(F.col("value") > 500000)
df_selected = df_filtered.select("id", "name")
df_sorted = df_selected.orderBy("id")

end = time.time()
print(f"処理時間: {end - start:.4f}秒")
print("→ ほぼ0秒!実際の処理は行われていない")
【Transformationのみ(実行されない)】
処理時間: 0.0012秒
→ ほぼ0秒!実際の処理は行われていない
# Actionを実行すると処理が走る

print("【Actionを実行(ここで初めて実行)】")
start = time.time()

# show()はAction → ここで初めて実際に処理される
df_sorted.show(5)

end = time.time()
print(f"処理時間: {end - start:.2f}秒")
print("→ 時間がかかる!実際にデータ処理が行われた")
【Actionを実行(ここで初めて実行)】
+-----+----------+
|   id|      name|
+-----+----------+
|50001|name_50001|
|50002|name_50002|
|50003|name_50003|
|50004|name_50004|
|50005|name_50005|
+-----+----------+

処理時間: 1.23秒
→ 時間がかかる!実際にデータ処理が行われた

🔀 2. TransformationとAction

2-1. 2種類の操作

Sparkの操作はTransformation(変換)Action(アクション)の2種類に分かれます。

種類 特徴 主な操作
Transformation 新しいDataFrameを返す
遅延評価(すぐには実行されない)
filter(), select(), withColumn(), groupBy(), join(), orderBy()
Action 結果を返す
即座に実行される
show(), count(), collect(), take(), write.csv()

2-2. 主なTransformation一覧

# よく使うTransformation(遅延評価される)

select()          # カラム選択
filter() / where()  # 行のフィルタリング
withColumn()      # カラム追加
drop()            # カラム削除
distinct()        # 重複削除
groupBy()         # グループ化
join()            # 結合
orderBy() / sort()  # ソート

2-3. 主なAction一覧

# よく使うAction(即座に実行される)

show()            # 表示
count()           # 件数カウント
collect()         # 全データ取得
take(n)           # 最初のn件取得
first()           # 最初の1件
write.csv()       # ファイル書き込み
write.parquet()   # Parquet書き込み
💡 TransformationとActionの判別方法

Transformation:戻り値がDataFrame
Action:戻り値がintlistNoneなど
簡単な見分け方:「結果をすぐ見たい」ならAction

🌳 3. 実行計画の確認(explain)

3-1. explain()で実行計画を見る

explain()を使うと、Sparkがどのように処理を実行するかを確認できます。

# 複数のTransformationを組み合わせる

df_result = df \
    .filter(F.col("category") < 10) \
    .filter(F.col("value") > 50000) \
    .select("name", "value") \
    .orderBy("value", ascending=False)

# 実行計画を表示
print("【実行計画】")
df_result.explain()
【実行計画】
== Physical Plan ==
*(1) Sort [value#3 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(value#3 DESC NULLS LAST, 200)
   +- *(1) Project [name#1, value#3]
      +- *(1) Filter ((category#2 < 10) AND (value#3 > 50000))
         +- *(1) Scan ExistingRDD[id#0,name#1,category#2,value#3]

3-2. 実行計画の読み方

📊 実行計画の読み方(下から上に読む)

1. Scan:データを読み込む
2. Filter:2つの条件でフィルター(1つにまとめられている!
3. Project:name, valueのみ選択
4. Exchange:データの再配置(シャッフル)
5. Sort:valueで降順ソート

🎯 Sparkの最適化(Catalyst Optimizer)

フィルター結合:複数のフィルターを1つにまとめる
述語プッシュダウン:フィルターを早い段階で適用
列プルーニング:不要なカラムは読まない

💾 4. キャッシュの重要性

4-1. キャッシュとは?

キャッシュは、計算結果をメモリに保存して、次回以降の処理を高速化する機能です。
同じDataFrameを複数回使う場合に有効です。

📊 キャッシュの効果

【キャッシュなし】
df.count() → データを読んで計算(遅い)
df.count() → また同じ計算を実行(遅い)

【キャッシュあり】
df.cache()
df.count() → データを読んで計算 + メモリに保存
df.count() → メモリから読む(速い!)

4-2. キャッシュなし vs キャッシュあり

# フィルター処理
df_filtered = df.filter(F.col("value") > 500000)

print("【キャッシュなし】")
start = time.time()

# 1回目
count1 = df_filtered.count()
print(f"1回目 count: {count1}")

# 2回目(また最初から計算)
count2 = df_filtered.count()
print(f"2回目 count: {count2}")

end = time.time()
print(f"総処理時間: {end - start:.2f}秒")
【キャッシュなし】
1回目 count: 49999
2回目 count: 49999
総処理時間: 1.85秒
print("【キャッシュあり】")

# キャッシュを有効化
df_filtered_cached = df_filtered.cache()

start = time.time()

# 1回目(計算 + キャッシュに保存)
count1 = df_filtered_cached.count()
print(f"1回目 count: {count1}")

# 2回目(キャッシュから読む、速い!)
count2 = df_filtered_cached.count()
print(f"2回目 count: {count2}")

end = time.time()
print(f"総処理時間: {end - start:.2f}秒")

# キャッシュ解放
df_filtered_cached.unpersist()
【キャッシュあり】
1回目 count: 49999
2回目 count: 49999
総処理時間: 1.05秒
✅ キャッシュが有効な場面

・同じDataFrameを複数回使う
反復処理(機械学習など)
・重い前処理の結果を再利用

⚠️ キャッシュの注意点

メモリ消費:大量データのキャッシュはメモリ不足の原因
使い終わったら解放unpersist()でメモリ開放
1回しか使わないなら不要:キャッシュ自体にコストがかかる

📝 練習問題

問題 1 基礎

TransformationとActionの判別

次の操作の中から、Actionを全て選んでください。

1. filter()   2. show()   3. withColumn()   4. count()   5. groupBy()   6. collect()

【解答】

Actionは:2. show()4. count()6. collect()

残り(filter, withColumn, groupBy)はTransformationです。

問題 2 基礎

キャッシュの基本

DataFrameをキャッシュして、2回countを実行してください。

【解答】
# キャッシュ
df_cached = df.filter(F.col("value") > 500000).cache()

# 1回目
count1 = df_cached.count()
print(f"1回目: {count1}")

# 2回目(キャッシュから読む)
count2 = df_cached.count()
print(f"2回目: {count2}")

# キャッシュ解放
df_cached.unpersist()
問題 3 応用

実行計画の確認

次の処理の実行計画を表示してください。

result = df.filter(F.col("id") > 500).select("id", "name").orderBy("id")
【解答】
# 実行計画を表示
result.explain()
== Physical Plan ==
*(1) Sort [id#0 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(id#0 ASC NULLS FIRST, 200)
   +- *(1) Project [id#0, name#1]
      +- *(1) Filter (id#0 > 500)
         +- *(1) Scan ExistingRDD[...]
問題 4 実践

処理順序の最適化

次のコードを最適化してください(フィルターを先に実行)。

result = df.withColumn("doubled", F.col("value") * 2).filter(F.col("category") < 10)
【解答】
# 最適化:フィルターを先に
result = df \
    .filter(F.col("category") < 10) \
    .withColumn("doubled", F.col("value") * 2)

# withColumn()はフィルター後の少ない行だけ処理

❓ よくある質問

Q1: いつキャッシュすべきですか?
同じDataFrameを2回以上使う場合にキャッシュします。1回しか使わない場合はキャッシュ不要です。
Q2: cache()とpersist()の違いは?
cache()はメモリのみに保存。persist()ではストレージレベルを指定できます(例:MEMORY_AND_DISK)。
Q3: explain()で何がわかりますか?
Sparkの実行計画がわかります。どのような順序で処理されるか、シャッフルが発生するかなどを確認できます。
Q4: Lazy Evaluationのデメリットは?
エラーが遅れて発覚することです。Transformationでエラーがあっても、Action実行時まで気づきません。

📝 STEP 18 のまとめ

✅ このステップで学んだこと

Lazy Evaluation:必要になるまで処理を遅らせる
Transformation:遅延評価される(filter, select等)
Action:即座に実行される(show, count等)
explain():実行計画を確認
cache():同じデータを複数回使う時に高速化

💡 重要ポイント

・Actionが呼ばれるまで実行されない
・同じDataFrameを複数回使うならキャッシュ
・フィルターは早めに適用
・explain()で実行計画を確認

🎯 次のステップの予告

次のSTEP 19では、「パーティショニング戦略」を学びます。
データの分割方法を最適化して、並列処理を効率化しましょう!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 18

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE