STEP 6:RDDの概念と基本操作

⚡ STEP 6: RDDの概念と基本操作

Sparkの基礎となるRDDを理解して、分散処理の本質を掴む

📋 このステップで学ぶこと

  • RDDとは何か
  • Transformations vs Actions
  • Lazy Evaluationの理解
  • 基本的なTransformations(map、filter)
  • 基本的なActions(collect、count)
  • 練習問題8問

学習時間の目安: 2時間

🎯 1. RDDとは何か

1-1. RDDの定義

RDD(Resilient Distributed Dataset)は、Sparkの最も基本的なデータ構造です。分散処理を実現するための土台となっています。

【RDDの正式名称の意味】

RDD = Resilient Distributed Dataset

Resilient(レジリエント)
→ 復元力がある、障害に強い
→ 一部が壊れても自動で復旧できる

Distributed(ディストリビューテッド)
→ 分散された
→ 複数のマシンに分散して保存・処理される

Dataset(データセット)
→ データの集まり
→ プログラムで扱えるデータの集合

1-2. RDDの特徴

1️⃣ 不変(Immutable)

一度作成したら変更できない。操作すると新しいRDDを作成する。

2️⃣ 分散(Distributed)

データは複数のノードに分散して保存される。並列処理が可能。

3️⃣ 耐障害性(Resilient)

ノードが故障しても、自動で復旧。データの系譜(Lineage)を記録。

4️⃣ 遅延評価(Lazy)

変換処理はすぐに実行されない。Actionが呼ばれた時に初めて実行。

1-3. RDDの作成方法

from pyspark import SparkContext

# SparkContextの作成
sc = SparkContext("local[*]", "RDD Example")

# 方法1: Pythonのリストから作成
rdd1 = sc.parallelize([1, 2, 3, 4, 5])

# 方法2: テキストファイルから作成
rdd2 = sc.textFile("data.txt")

# 方法3: range関数
rdd3 = sc.parallelize(range(1, 101))  # 1〜100

# RDDの内容を確認
print(rdd1.collect())  # [1, 2, 3, 4, 5]

# SparkContextを停止
sc.stop()
💡 SparkContext vs SparkSession

SparkContext:RDDを使う時の古いAPI
SparkSession:DataFrame/Datasetを使う時の新しいAPI

現在はSparkSessionが推奨ですが、RDDを理解するためにSparkContextを使います。

1-4. RDDのパーティション分割

【RDDのパーティション分割イメージ】

元データ:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

↓ 4つのパーティションに分割

Partition 0:[1, 2, 3] → Worker 1で処理
Partition 1:[4, 5] → Worker 2で処理
Partition 2:[6, 7, 8] → Worker 3で処理
Partition 3:[9, 10] → Worker 4で処理

→ 各Workerが並列で処理
処理時間が短縮される!

🔄 2. Transformations vs Actions

2-1. 2種類の操作

RDDの操作は、Transformations(変換)Actions(アクション)の2種類に分類されます。

項目 Transformations Actions
定義 RDDを変換して新しいRDDを返す RDDから結果を取得する
実行タイミング 遅延評価
すぐには実行されない
即座に実行
ここで初めて処理が実行される
戻り値 新しいRDD Pythonオブジェクト
(リスト、数値など)
map, filter, flatMap
union, distinct, groupBy
collect, count, first
take, reduce, save

2-2. Transformationsの例

# Transformationsの動作

sc = SparkContext("local[*]", "Transformations")

# RDDを作成
rdd = sc.parallelize([1, 2, 3, 4, 5])

# Transformation 1: map(各要素を2倍に)
rdd2 = rdd.map(lambda x: x * 2)  
# → まだ実行されない!計画だけ立てる

# Transformation 2: filter(偶数だけ残す)
rdd3 = rdd2.filter(lambda x: x % 2 == 0)  
# → まだ実行されない!

# ここまで、実際の処理は何も実行されていない
print("Transformations defined, but not executed yet")

sc.stop()

2-3. Actionsの例

# Actionsの動作

sc = SparkContext("local[*]", "Actions")

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.map(lambda x: x * 2)
rdd3 = rdd2.filter(lambda x: x % 2 == 0)

# Action: collect()を呼ぶ
result = rdd3.collect()  
# → ここで初めて実行される!
# → Transformationsがまとめて実行される

print(result)
[2, 4, 6, 8, 10]
💡 なぜ遅延評価なのか?

遅延評価により、Sparkは処理全体を最適化できます。
例:フィルター処理を先に実行すれば、map処理するデータ量が減る
→ 無駄な計算を省いて高速化

⏱️ 3. Lazy Evaluationの理解

3-1. Lazy Evaluationとは?

Lazy Evaluation(遅延評価)とは、必要になるまで計算を実行しない戦略です。

📝 例え話:レストランの注文

通常の評価(Eager Evaluation):
・お客さんが「前菜」と言った瞬間、料理開始
・「メイン」と言った瞬間、料理開始
・「デザート」と言った瞬間、料理開始
→ 非効率(一度に作れば早いのに)

遅延評価(Lazy Evaluation):
・お客さんが全ての注文を言い終わるまで待つ
・全ての注文を聞いてから、効率的に調理計画を立てる
・「お会計お願いします」と言われた時点で料理開始
効率的!

3-2. Lazy Evaluationの動作フロー

# 動作フローの例

sc = SparkContext("local[*]", "Lazy")

# ステップ1: RDD作成(即座に実行)
rdd = sc.parallelize([1, 2, 3, 4, 5])
print("RDD created")

# ステップ2: Transformation(まだ実行されない)
rdd2 = rdd.map(lambda x: x * 2)
print("Transformation 1 defined (NOT executed)")

# ステップ3: Transformation(まだ実行されない)
rdd3 = rdd2.filter(lambda x: x > 5)
print("Transformation 2 defined (NOT executed)")

# ステップ4: Transformation(まだ実行されない)
rdd4 = rdd3.map(lambda x: x + 1)
print("Transformation 3 defined (NOT executed)")

# ステップ5: Action(ここで初めて実行される!)
result = rdd4.collect()
print("Action called - NOW everything is executed!")
print(result)

sc.stop()
RDD created
Transformation 1 defined (NOT executed)
Transformation 2 defined (NOT executed)
Transformation 3 defined (NOT executed)
Action called - NOW everything is executed!
[7, 9, 11]

3-3. Lazy Evaluationのメリット

1️⃣ 最適化

全体の処理を見てから、最も効率的な実行計画を立てられる。

2️⃣ 無駄な計算の削減

最終的に使わないデータは計算しない

3️⃣ メモリ効率

中間結果を全てメモリに保持しない。必要な分だけ計算。

4️⃣ パイプライン化

複数の処理を1つのステージにまとめて実行できる。

【最適化の例】

非最適化(もし即座に実行されたら):
rdd = 100万件
rdd2 = map(100万回計算)
rdd3 = filter(100万回チェック)
→ 合計200万回の操作

最適化後(Sparkが実際に行う):
1. filterを先に実行して絞り込む
2. 残ったデータだけmapを実行
大幅に計算量削減!

🗺️ 4. 基本的なTransformations

4-1. map() – 各要素を変換

map()は、RDDの各要素に関数を適用して、新しいRDDを作成します。

# map()の基本

sc = SparkContext("local[*]", "Map")

# 元のRDD
rdd = sc.parallelize([1, 2, 3, 4, 5])

# 各要素を2倍にする
rdd2 = rdd.map(lambda x: x * 2)
print(rdd2.collect())  # [2, 4, 6, 8, 10]

# 各要素を2乗する
rdd3 = rdd.map(lambda x: x ** 2)
print(rdd3.collect())  # [1, 4, 9, 16, 25]

# 文字列に変換
rdd4 = rdd.map(lambda x: f"Number: {x}")
print(rdd4.collect())  
# ['Number: 1', 'Number: 2', 'Number: 3', 'Number: 4', 'Number: 5']

sc.stop()

4-2. filter() – 条件に合う要素だけ残す

filter()は、条件を満たす要素だけを残します。

# filter()の基本

sc = SparkContext("local[*]", "Filter")

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# 偶数だけ残す
rdd2 = rdd.filter(lambda x: x % 2 == 0)
print(rdd2.collect())  # [2, 4, 6, 8, 10]

# 5より大きい数だけ残す
rdd3 = rdd.filter(lambda x: x > 5)
print(rdd3.collect())  # [6, 7, 8, 9, 10]

# 複数の条件
rdd4 = rdd.filter(lambda x: x > 3 and x < 8)
print(rdd4.collect())  # [4, 5, 6, 7]

sc.stop()

4-3. flatMap() – 平坦化しながら変換

flatMap()は、各要素を複数の要素に展開します。

# flatMap()の基本

sc = SparkContext("local[*]", "FlatMap")

# 文章を単語に分割
rdd = sc.parallelize(["Hello World", "Spark is awesome"])

# map()を使った場合
rdd_map = rdd.map(lambda x: x.split(" "))
print(rdd_map.collect())  
# [['Hello', 'World'], ['Spark', 'is', 'awesome']]
# → リストのリストになる

# flatMap()を使った場合
rdd_flatmap = rdd.flatMap(lambda x: x.split(" "))
print(rdd_flatmap.collect())  
# ['Hello', 'World', 'Spark', 'is', 'awesome']
# → 平坦化される

sc.stop()
map() vs flatMap()

map():1つの入力 → 1つの出力
flatMap():1つの入力 → 複数の出力(0個以上)

4-4. distinct() – 重複を削除

# distinct()の基本

sc = SparkContext("local[*]", "Distinct")

rdd = sc.parallelize([1, 2, 2, 3, 3, 3, 4, 5, 5])

# 重複を削除
rdd2 = rdd.distinct()
print(rdd2.collect())  # [1, 2, 3, 4, 5]

sc.stop()

4-5. その他の主要Transformations

Transformation 説明と例
union() 2つのRDDを結合
rdd1.union(rdd2)
intersection() 2つのRDDの共通要素
rdd1.intersection(rdd2)
subtract() 差集合(rdd1にあってrdd2にない要素)
rdd1.subtract(rdd2)
sortBy() 並び替え
rdd.sortBy(lambda x: x)
sample() ランダムサンプリング
rdd.sample(False, 0.1)

▶️ 5. 基本的なActions

5-1. collect() – 全要素を取得

collect()は、RDDの全要素をPythonリストとして取得します。

# collect()の使い方

sc = SparkContext("local[*]", "Collect")

rdd = sc.parallelize([1, 2, 3, 4, 5])

# 全要素を取得
result = rdd.collect()
print(result)       # [1, 2, 3, 4, 5]
print(type(result)) # <class 'list'>

sc.stop()
⚠️ collect()の注意点

・全データをDriverのメモリに集める
・大量データではメモリエラーのリスク
・本番環境では慎重に使う(集計後の小さなデータのみ)

5-2. count() – 要素数をカウント

# count()の使い方

sc = SparkContext("local[*]", "Count")

rdd = sc.parallelize([1, 2, 3, 4, 5])

# 要素数をカウント
count = rdd.count()
print(count)  # 5

sc.stop()

5-3. first() – 最初の要素を取得

# first()の使い方

sc = SparkContext("local[*]", "First")

rdd = sc.parallelize([1, 2, 3, 4, 5])

# 最初の要素
first_element = rdd.first()
print(first_element)  # 1

sc.stop()

5-4. take() – 最初のn個を取得

# take()の使い方

sc = SparkContext("local[*]", "Take")

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# 最初の3個を取得
result = rdd.take(3)
print(result)  # [1, 2, 3]

sc.stop()

5-5. reduce() – 要素を集約

reduce()は、要素を1つずつ集約して、最終的に1つの値にします。

# reduce()の使い方

sc = SparkContext("local[*]", "Reduce")

rdd = sc.parallelize([1, 2, 3, 4, 5])

# 合計を計算
total = rdd.reduce(lambda x, y: x + y)
print(total)  # 15

# 最大値を計算
max_value = rdd.reduce(lambda x, y: x if x > y else y)
print(max_value)  # 5

sc.stop()

5-6. その他の主要Actions

Action 説明と例
countByValue() 各値の出現回数をカウント
rdd.countByValue()
foreach() 各要素に関数を適用(副作用のみ)
rdd.foreach(lambda x: print(x))
saveAsTextFile() テキストファイルとして保存
rdd.saveAsTextFile("output")
max() 最大値を取得
rdd.max()
min() 最小値を取得
rdd.min()

📝 練習問題

問題 1 基礎

1から10までの数値を持つRDDを作成してください。

【解答】
from pyspark import SparkContext

sc = SparkContext("local[*]", "Problem1")

# 方法1
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# 方法2(rangeを使用)
rdd = sc.parallelize(range(1, 11))

print(rdd.collect())
sc.stop()
問題 2 基礎

問題1で作成したRDDの各要素を3倍にして、結果を表示してください。

【解答】
sc = SparkContext("local[*]", "Problem2")

rdd = sc.parallelize(range(1, 11))

# map()で3倍にする
rdd2 = rdd.map(lambda x: x * 3)

print(rdd2.collect())
# [3, 6, 9, 12, 15, 18, 21, 24, 27, 30]

sc.stop()
問題 3 基礎

1から20までの数値から、5の倍数だけを抽出してください。

【解答】
sc = SparkContext("local[*]", "Problem3")

rdd = sc.parallelize(range(1, 21))

# filter()で5の倍数だけ残す
rdd2 = rdd.filter(lambda x: x % 5 == 0)

print(rdd2.collect())
# [5, 10, 15, 20]

sc.stop()
問題 4 応用

1から10までの数値の合計を、reduce()を使って計算してください。

【解答】
sc = SparkContext("local[*]", "Problem4")

rdd = sc.parallelize(range(1, 11))

# reduce()で合計を計算
total = rdd.reduce(lambda x, y: x + y)

print(total)  # 55

sc.stop()
問題 5 応用

文章”Hello World Spark is awesome”を単語に分割し、各単語を大文字にしてください。

【解答】
sc = SparkContext("local[*]", "Problem5")

text = "Hello World Spark is awesome"
rdd = sc.parallelize([text])

# flatMap()で単語に分割、map()で大文字に
rdd2 = rdd.flatMap(lambda x: x.split(" ")) \
          .map(lambda x: x.upper())

print(rdd2.collect())
# ['HELLO', 'WORLD', 'SPARK', 'IS', 'AWESOME']

sc.stop()
問題 6 応用

[1, 2, 2, 3, 3, 3, 4, 4, 4, 4]のRDDから重複を削除し、各値の出現回数をカウントしてください。

【解答】
sc = SparkContext("local[*]", "Problem6")

rdd = sc.parallelize([1, 2, 2, 3, 3, 3, 4, 4, 4, 4])

# 方法1: distinct()で重複削除
rdd_distinct = rdd.distinct()
print("Distinct:", rdd_distinct.collect())  # [1, 2, 3, 4]

# 方法2: countByValue()で各値の出現回数
count_dict = rdd.countByValue()
print("Count:", count_dict)  
# {1: 1, 2: 2, 3: 3, 4: 4}

sc.stop()
問題 7 発展

1から100までの数値から、素数だけを抽出してください。

【解答】
sc = SparkContext("local[*]", "Problem7")

def is_prime(n):
    if n < 2:
        return False
    for i in range(2, int(n ** 0.5) + 1):
        if n % i == 0:
            return False
    return True

rdd = sc.parallelize(range(1, 101))

# filter()で素数だけ残す
rdd_primes = rdd.filter(is_prime)

print(rdd_primes.collect())
# [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]

sc.stop()
問題 8 発展

2つのRDD [1, 2, 3, 4, 5] と [4, 5, 6, 7, 8] の共通要素と、1つ目のRDDにだけ存在する要素を求めてください。

【解答】
sc = SparkContext("local[*]", "Problem8")

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])

# 共通要素
common = rdd1.intersection(rdd2)
print("Common:", common.collect())  # [4, 5]

# rdd1にだけ存在する要素
only_in_rdd1 = rdd1.subtract(rdd2)
print("Only in rdd1:", only_in_rdd1.collect())  # [1, 2, 3]

sc.stop()

📝 STEP 6 のまとめ

✅ このステップで学んだこと

RDDはSparkの基本データ構造(不変、分散、耐障害性)
Transformationsは新しいRDDを返す(遅延評価)
Actionsは結果を返す(即座に実行)
Lazy Evaluationにより処理全体を最適化
map()で各要素を変換、filter()で絞り込み
collect()は慎重に使う(大量データは危険)

💡 重要ポイント

RDDはSparkの基礎ですが、現在はDataFrameが推奨されています。
RDDの概念を理解することで、Sparkの内部動作が理解でき、効率的なコードが書けるようになります。
次のステップからは、DataFrameに集中して学習します!

🎯 次のステップの予告

次のSTEP 7では、「RDDの実践演習」を行います。
ログファイル分析を通じて、RDDの使い方を実践的に学びましょう!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 6

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE