⚡ STEP 18: Lazy Evaluationの理解
Sparkの遅延評価を理解して、効率的なデータ処理をマスターしよう
📋 このステップで学ぶこと
- Lazy Evaluation(遅延評価)とは何か
- TransformationとActionの違い
- 実行計画の確認方法(explain())
- キャッシュの重要性と使い方
📁 0. サンプルデータの準備
このステップでは、Lazy Evaluation(遅延評価)の仕組みを理解し、パフォーマンス最適化の基礎を学びます。
まず、SparkSessionを初期化し、サンプルデータを準備しましょう。
0-1. SparkSessionの初期化
# SparkSessionの初期化 from pyspark.sql import SparkSession from pyspark.sql import functions as F import time # SparkSessionを作成 spark = SparkSession.builder \ .appName("Lazy Evaluation") \ .getOrCreate() print("SparkSession準備完了")
SparkSession準備完了
0-2. サンプルデータの作成
# サンプルデータを作成(10万件) data = [(i, f"name_{i}", i % 100, i * 10) for i in range(100000)] df = spark.createDataFrame(data, ["id", "name", "category", "value"]) print("サンプルデータ(最初の5件):") df.show(5)
サンプルデータ(最初の5件): +---+------+--------+-----+ | id| name|category|value| +---+------+--------+-----+ | 0|name_0| 0| 0| | 1|name_1| 1| 10| | 2|name_2| 2| 20| | 3|name_3| 3| 30| | 4|name_4| 4| 40| +---+------+--------+-----+
0-3. 全サンプルデータを一括作成するコード
# ======================================== # STEP 18 サンプルデータ一括作成スクリプト # ======================================== from pyspark.sql import SparkSession from pyspark.sql import functions as F import time spark = SparkSession.builder.appName("Lazy Evaluation").getOrCreate() # サンプルデータ(10万件) data = [(i, f"name_{i}", i % 100, i * 10) for i in range(100000)] df = spark.createDataFrame(data, ["id", "name", "category", "value"]) print("✅ サンプルデータを作成しました") print(" - df: 10万件のデータ(id, name, category, value)")
✅ サンプルデータを作成しました - df: 10万件のデータ(id, name, category, value)
🔄 1. Lazy Evaluationとは?
1-1. 遅延評価の概念
Lazy Evaluation(遅延評価)とは、処理を実行するタイミングを遅らせる仕組みです。
Sparkでは、必要になるまで実際の計算を行いません。
df = spark.read.csv(“data.csv”) # 実行されない(準備)
df = df.filter(F.col(“age”) > 20) # 実行されない(準備)
df = df.select(“name”, “age”) # 実行されない(準備)
df.show() # ここで初めて実行!
show()が呼ばれるまで、実際のデータ処理は行われません。
Sparkは全体の処理を見てから最適化します。
1-2. Lazy Evaluationの実演
# Transformationは即座に実行されない print("【Transformationのみ(実行されない)】") start = time.time() # これらは全てTransformation → 実行されない df_filtered = df.filter(F.col("value") > 500000) df_selected = df_filtered.select("id", "name") df_sorted = df_selected.orderBy("id") end = time.time() print(f"処理時間: {end - start:.4f}秒") print("→ ほぼ0秒!実際の処理は行われていない")
【Transformationのみ(実行されない)】 処理時間: 0.0012秒 → ほぼ0秒!実際の処理は行われていない
# Actionを実行すると処理が走る print("【Actionを実行(ここで初めて実行)】") start = time.time() # show()はAction → ここで初めて実際に処理される df_sorted.show(5) end = time.time() print(f"処理時間: {end - start:.2f}秒") print("→ 時間がかかる!実際にデータ処理が行われた")
【Actionを実行(ここで初めて実行)】 +-----+----------+ | id| name| +-----+----------+ |50001|name_50001| |50002|name_50002| |50003|name_50003| |50004|name_50004| |50005|name_50005| +-----+----------+ 処理時間: 1.23秒 → 時間がかかる!実際にデータ処理が行われた
🔀 2. TransformationとAction
2-1. 2種類の操作
Sparkの操作はTransformation(変換)とAction(アクション)の2種類に分かれます。
| 種類 | 特徴 | 主な操作 |
|---|---|---|
| Transformation | 新しいDataFrameを返す 遅延評価(すぐには実行されない) |
filter(), select(), withColumn(), groupBy(), join(), orderBy() |
| Action | 結果を返す 即座に実行される |
show(), count(), collect(), take(), write.csv() |
2-2. 主なTransformation一覧
# よく使うTransformation(遅延評価される) select() # カラム選択 filter() / where() # 行のフィルタリング withColumn() # カラム追加 drop() # カラム削除 distinct() # 重複削除 groupBy() # グループ化 join() # 結合 orderBy() / sort() # ソート
2-3. 主なAction一覧
# よく使うAction(即座に実行される) show() # 表示 count() # 件数カウント collect() # 全データ取得 take(n) # 最初のn件取得 first() # 最初の1件 write.csv() # ファイル書き込み write.parquet() # Parquet書き込み
・Transformation:戻り値がDataFrame
・Action:戻り値がint、list、Noneなど
・簡単な見分け方:「結果をすぐ見たい」ならAction
🌳 3. 実行計画の確認(explain)
3-1. explain()で実行計画を見る
explain()を使うと、Sparkがどのように処理を実行するかを確認できます。
# 複数のTransformationを組み合わせる df_result = df \ .filter(F.col("category") < 10) \ .filter(F.col("value") > 50000) \ .select("name", "value") \ .orderBy("value", ascending=False) # 実行計画を表示 print("【実行計画】") df_result.explain()
【実行計画】
== Physical Plan ==
*(1) Sort [value#3 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(value#3 DESC NULLS LAST, 200)
+- *(1) Project [name#1, value#3]
+- *(1) Filter ((category#2 < 10) AND (value#3 > 50000))
+- *(1) Scan ExistingRDD[id#0,name#1,category#2,value#3]3-2. 実行計画の読み方
1. Scan:データを読み込む
2. Filter:2つの条件でフィルター(1つにまとめられている!)
3. Project:name, valueのみ選択
4. Exchange:データの再配置(シャッフル)
5. Sort:valueで降順ソート
・フィルター結合:複数のフィルターを1つにまとめる
・述語プッシュダウン:フィルターを早い段階で適用
・列プルーニング:不要なカラムは読まない
💾 4. キャッシュの重要性
4-1. キャッシュとは?
キャッシュは、計算結果をメモリに保存して、次回以降の処理を高速化する機能です。
同じDataFrameを複数回使う場合に有効です。
【キャッシュなし】
df.count() → データを読んで計算(遅い)
df.count() → また同じ計算を実行(遅い)
【キャッシュあり】
df.cache()
df.count() → データを読んで計算 + メモリに保存
df.count() → メモリから読む(速い!)
4-2. キャッシュなし vs キャッシュあり
# フィルター処理 df_filtered = df.filter(F.col("value") > 500000) print("【キャッシュなし】") start = time.time() # 1回目 count1 = df_filtered.count() print(f"1回目 count: {count1}") # 2回目(また最初から計算) count2 = df_filtered.count() print(f"2回目 count: {count2}") end = time.time() print(f"総処理時間: {end - start:.2f}秒")
【キャッシュなし】 1回目 count: 49999 2回目 count: 49999 総処理時間: 1.85秒
print("【キャッシュあり】") # キャッシュを有効化 df_filtered_cached = df_filtered.cache() start = time.time() # 1回目(計算 + キャッシュに保存) count1 = df_filtered_cached.count() print(f"1回目 count: {count1}") # 2回目(キャッシュから読む、速い!) count2 = df_filtered_cached.count() print(f"2回目 count: {count2}") end = time.time() print(f"総処理時間: {end - start:.2f}秒") # キャッシュ解放 df_filtered_cached.unpersist()
【キャッシュあり】 1回目 count: 49999 2回目 count: 49999 総処理時間: 1.05秒
・同じDataFrameを複数回使う
・反復処理(機械学習など)
・重い前処理の結果を再利用
・メモリ消費:大量データのキャッシュはメモリ不足の原因
・使い終わったら解放:unpersist()でメモリ開放
・1回しか使わないなら不要:キャッシュ自体にコストがかかる
📝 練習問題
TransformationとActionの判別
次の操作の中から、Actionを全て選んでください。
1. filter() 2. show() 3. withColumn() 4. count() 5. groupBy() 6. collect()
Actionは:2. show()、4. count()、6. collect()
残り(filter, withColumn, groupBy)はTransformationです。
キャッシュの基本
DataFrameをキャッシュして、2回countを実行してください。
# キャッシュ df_cached = df.filter(F.col("value") > 500000).cache() # 1回目 count1 = df_cached.count() print(f"1回目: {count1}") # 2回目(キャッシュから読む) count2 = df_cached.count() print(f"2回目: {count2}") # キャッシュ解放 df_cached.unpersist()
実行計画の確認
次の処理の実行計画を表示してください。
result = df.filter(F.col("id") > 500).select("id", "name").orderBy("id")# 実行計画を表示
result.explain()== Physical Plan ==
*(1) Sort [id#0 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(id#0 ASC NULLS FIRST, 200)
+- *(1) Project [id#0, name#1]
+- *(1) Filter (id#0 > 500)
+- *(1) Scan ExistingRDD[...]処理順序の最適化
次のコードを最適化してください(フィルターを先に実行)。
result = df.withColumn("doubled", F.col("value") * 2).filter(F.col("category") < 10)# 最適化:フィルターを先に result = df \ .filter(F.col("category") < 10) \ .withColumn("doubled", F.col("value") * 2) # withColumn()はフィルター後の少ない行だけ処理
❓ よくある質問
cache()はメモリのみに保存。persist()ではストレージレベルを指定できます(例:MEMORY_AND_DISK)。
📝 STEP 18 のまとめ
・Lazy Evaluation:必要になるまで処理を遅らせる
・Transformation:遅延評価される(filter, select等)
・Action:即座に実行される(show, count等)
・explain():実行計画を確認
・cache():同じデータを複数回使う時に高速化
・Actionが呼ばれるまで実行されない
・同じDataFrameを複数回使うならキャッシュ
・フィルターは早めに適用
・explain()で実行計画を確認
次のSTEP 19では、「パーティショニング戦略」を学びます。
データの分割方法を最適化して、並列処理を効率化しましょう!
学習メモ
ビッグデータ処理(Apache Spark) - Step 18