⚡ STEP 6: RDDの概念と基本操作
Sparkの基礎となるRDDを理解して、分散処理の本質を掴む
📋 このステップで学ぶこと
- RDDとは何か
- Transformations vs Actions
- Lazy Evaluationの理解
- 基本的なTransformations(map、filter)
- 基本的なActions(collect、count)
- 練習問題8問
学習時間の目安: 2時間
🎯 1. RDDとは何か
1-1. RDDの定義
RDD(Resilient Distributed Dataset)は、Sparkの最も基本的なデータ構造です。分散処理を実現するための土台となっています。
RDD = Resilient Distributed Dataset
Resilient(レジリエント)
→ 復元力がある、障害に強い
→ 一部が壊れても自動で復旧できる
Distributed(ディストリビューテッド)
→ 分散された
→ 複数のマシンに分散して保存・処理される
Dataset(データセット)
→ データの集まり
→ プログラムで扱えるデータの集合
1-2. RDDの特徴
一度作成したら変更できない。操作すると新しいRDDを作成する。
データは複数のノードに分散して保存される。並列処理が可能。
ノードが故障しても、自動で復旧。データの系譜(Lineage)を記録。
変換処理はすぐに実行されない。Actionが呼ばれた時に初めて実行。
1-3. RDDの作成方法
from pyspark import SparkContext # SparkContextの作成 sc = SparkContext("local[*]", "RDD Example") # 方法1: Pythonのリストから作成 rdd1 = sc.parallelize([1, 2, 3, 4, 5]) # 方法2: テキストファイルから作成 rdd2 = sc.textFile("data.txt") # 方法3: range関数 rdd3 = sc.parallelize(range(1, 101)) # 1〜100 # RDDの内容を確認 print(rdd1.collect()) # [1, 2, 3, 4, 5] # SparkContextを停止 sc.stop()
SparkContext:RDDを使う時の古いAPI
SparkSession:DataFrame/Datasetを使う時の新しいAPI
現在はSparkSessionが推奨ですが、RDDを理解するためにSparkContextを使います。
1-4. RDDのパーティション分割
元データ:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
↓ 4つのパーティションに分割
Partition 0:[1, 2, 3] → Worker 1で処理
Partition 1:[4, 5] → Worker 2で処理
Partition 2:[6, 7, 8] → Worker 3で処理
Partition 3:[9, 10] → Worker 4で処理
→ 各Workerが並列で処理
→ 処理時間が短縮される!
🔄 2. Transformations vs Actions
2-1. 2種類の操作
RDDの操作は、Transformations(変換)とActions(アクション)の2種類に分類されます。
| 項目 | Transformations | Actions |
|---|---|---|
| 定義 | RDDを変換して新しいRDDを返す | RDDから結果を取得する |
| 実行タイミング | 遅延評価 すぐには実行されない |
即座に実行 ここで初めて処理が実行される |
| 戻り値 | 新しいRDD | Pythonオブジェクト (リスト、数値など) |
| 例 | map, filter, flatMap union, distinct, groupBy |
collect, count, first take, reduce, save |
2-2. Transformationsの例
# Transformationsの動作 sc = SparkContext("local[*]", "Transformations") # RDDを作成 rdd = sc.parallelize([1, 2, 3, 4, 5]) # Transformation 1: map(各要素を2倍に) rdd2 = rdd.map(lambda x: x * 2) # → まだ実行されない!計画だけ立てる # Transformation 2: filter(偶数だけ残す) rdd3 = rdd2.filter(lambda x: x % 2 == 0) # → まだ実行されない! # ここまで、実際の処理は何も実行されていない print("Transformations defined, but not executed yet") sc.stop()
2-3. Actionsの例
# Actionsの動作 sc = SparkContext("local[*]", "Actions") rdd = sc.parallelize([1, 2, 3, 4, 5]) rdd2 = rdd.map(lambda x: x * 2) rdd3 = rdd2.filter(lambda x: x % 2 == 0) # Action: collect()を呼ぶ result = rdd3.collect() # → ここで初めて実行される! # → Transformationsがまとめて実行される print(result)
[2, 4, 6, 8, 10]
遅延評価により、Sparkは処理全体を最適化できます。
例:フィルター処理を先に実行すれば、map処理するデータ量が減る
→ 無駄な計算を省いて高速化!
⏱️ 3. Lazy Evaluationの理解
3-1. Lazy Evaluationとは?
Lazy Evaluation(遅延評価)とは、必要になるまで計算を実行しない戦略です。
通常の評価(Eager Evaluation):
・お客さんが「前菜」と言った瞬間、料理開始
・「メイン」と言った瞬間、料理開始
・「デザート」と言った瞬間、料理開始
→ 非効率(一度に作れば早いのに)
遅延評価(Lazy Evaluation):
・お客さんが全ての注文を言い終わるまで待つ
・全ての注文を聞いてから、効率的に調理計画を立てる
・「お会計お願いします」と言われた時点で料理開始
→ 効率的!
3-2. Lazy Evaluationの動作フロー
# 動作フローの例 sc = SparkContext("local[*]", "Lazy") # ステップ1: RDD作成(即座に実行) rdd = sc.parallelize([1, 2, 3, 4, 5]) print("RDD created") # ステップ2: Transformation(まだ実行されない) rdd2 = rdd.map(lambda x: x * 2) print("Transformation 1 defined (NOT executed)") # ステップ3: Transformation(まだ実行されない) rdd3 = rdd2.filter(lambda x: x > 5) print("Transformation 2 defined (NOT executed)") # ステップ4: Transformation(まだ実行されない) rdd4 = rdd3.map(lambda x: x + 1) print("Transformation 3 defined (NOT executed)") # ステップ5: Action(ここで初めて実行される!) result = rdd4.collect() print("Action called - NOW everything is executed!") print(result) sc.stop()
RDD created Transformation 1 defined (NOT executed) Transformation 2 defined (NOT executed) Transformation 3 defined (NOT executed) Action called - NOW everything is executed! [7, 9, 11]
3-3. Lazy Evaluationのメリット
全体の処理を見てから、最も効率的な実行計画を立てられる。
最終的に使わないデータは計算しない。
中間結果を全てメモリに保持しない。必要な分だけ計算。
複数の処理を1つのステージにまとめて実行できる。
非最適化(もし即座に実行されたら):
rdd = 100万件
rdd2 = map(100万回計算)
rdd3 = filter(100万回チェック)
→ 合計200万回の操作
最適化後(Sparkが実際に行う):
1. filterを先に実行して絞り込む
2. 残ったデータだけmapを実行
→ 大幅に計算量削減!
🗺️ 4. 基本的なTransformations
4-1. map() – 各要素を変換
map()は、RDDの各要素に関数を適用して、新しいRDDを作成します。
# map()の基本 sc = SparkContext("local[*]", "Map") # 元のRDD rdd = sc.parallelize([1, 2, 3, 4, 5]) # 各要素を2倍にする rdd2 = rdd.map(lambda x: x * 2) print(rdd2.collect()) # [2, 4, 6, 8, 10] # 各要素を2乗する rdd3 = rdd.map(lambda x: x ** 2) print(rdd3.collect()) # [1, 4, 9, 16, 25] # 文字列に変換 rdd4 = rdd.map(lambda x: f"Number: {x}") print(rdd4.collect()) # ['Number: 1', 'Number: 2', 'Number: 3', 'Number: 4', 'Number: 5'] sc.stop()
4-2. filter() – 条件に合う要素だけ残す
filter()は、条件を満たす要素だけを残します。
# filter()の基本 sc = SparkContext("local[*]", "Filter") rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) # 偶数だけ残す rdd2 = rdd.filter(lambda x: x % 2 == 0) print(rdd2.collect()) # [2, 4, 6, 8, 10] # 5より大きい数だけ残す rdd3 = rdd.filter(lambda x: x > 5) print(rdd3.collect()) # [6, 7, 8, 9, 10] # 複数の条件 rdd4 = rdd.filter(lambda x: x > 3 and x < 8) print(rdd4.collect()) # [4, 5, 6, 7] sc.stop()
4-3. flatMap() – 平坦化しながら変換
flatMap()は、各要素を複数の要素に展開します。
# flatMap()の基本 sc = SparkContext("local[*]", "FlatMap") # 文章を単語に分割 rdd = sc.parallelize(["Hello World", "Spark is awesome"]) # map()を使った場合 rdd_map = rdd.map(lambda x: x.split(" ")) print(rdd_map.collect()) # [['Hello', 'World'], ['Spark', 'is', 'awesome']] # → リストのリストになる # flatMap()を使った場合 rdd_flatmap = rdd.flatMap(lambda x: x.split(" ")) print(rdd_flatmap.collect()) # ['Hello', 'World', 'Spark', 'is', 'awesome'] # → 平坦化される sc.stop()
map():1つの入力 → 1つの出力
flatMap():1つの入力 → 複数の出力(0個以上)
4-4. distinct() – 重複を削除
# distinct()の基本 sc = SparkContext("local[*]", "Distinct") rdd = sc.parallelize([1, 2, 2, 3, 3, 3, 4, 5, 5]) # 重複を削除 rdd2 = rdd.distinct() print(rdd2.collect()) # [1, 2, 3, 4, 5] sc.stop()
4-5. その他の主要Transformations
| Transformation | 説明と例 |
|---|---|
union() |
2つのRDDを結合rdd1.union(rdd2) |
intersection() |
2つのRDDの共通要素rdd1.intersection(rdd2) |
subtract() |
差集合(rdd1にあってrdd2にない要素)rdd1.subtract(rdd2) |
sortBy() |
並び替えrdd.sortBy(lambda x: x) |
sample() |
ランダムサンプリングrdd.sample(False, 0.1) |
▶️ 5. 基本的なActions
5-1. collect() – 全要素を取得
collect()は、RDDの全要素をPythonリストとして取得します。
# collect()の使い方 sc = SparkContext("local[*]", "Collect") rdd = sc.parallelize([1, 2, 3, 4, 5]) # 全要素を取得 result = rdd.collect() print(result) # [1, 2, 3, 4, 5] print(type(result)) # <class 'list'> sc.stop()
・全データをDriverのメモリに集める
・大量データではメモリエラーのリスク
・本番環境では慎重に使う(集計後の小さなデータのみ)
5-2. count() – 要素数をカウント
# count()の使い方 sc = SparkContext("local[*]", "Count") rdd = sc.parallelize([1, 2, 3, 4, 5]) # 要素数をカウント count = rdd.count() print(count) # 5 sc.stop()
5-3. first() – 最初の要素を取得
# first()の使い方 sc = SparkContext("local[*]", "First") rdd = sc.parallelize([1, 2, 3, 4, 5]) # 最初の要素 first_element = rdd.first() print(first_element) # 1 sc.stop()
5-4. take() – 最初のn個を取得
# take()の使い方 sc = SparkContext("local[*]", "Take") rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) # 最初の3個を取得 result = rdd.take(3) print(result) # [1, 2, 3] sc.stop()
5-5. reduce() – 要素を集約
reduce()は、要素を1つずつ集約して、最終的に1つの値にします。
# reduce()の使い方 sc = SparkContext("local[*]", "Reduce") rdd = sc.parallelize([1, 2, 3, 4, 5]) # 合計を計算 total = rdd.reduce(lambda x, y: x + y) print(total) # 15 # 最大値を計算 max_value = rdd.reduce(lambda x, y: x if x > y else y) print(max_value) # 5 sc.stop()
5-6. その他の主要Actions
| Action | 説明と例 |
|---|---|
countByValue() |
各値の出現回数をカウントrdd.countByValue() |
foreach() |
各要素に関数を適用(副作用のみ)rdd.foreach(lambda x: print(x)) |
saveAsTextFile() |
テキストファイルとして保存rdd.saveAsTextFile("output") |
max() |
最大値を取得rdd.max() |
min() |
最小値を取得rdd.min() |
📝 練習問題
1から10までの数値を持つRDDを作成してください。
from pyspark import SparkContext sc = SparkContext("local[*]", "Problem1") # 方法1 rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) # 方法2(rangeを使用) rdd = sc.parallelize(range(1, 11)) print(rdd.collect()) sc.stop()
問題1で作成したRDDの各要素を3倍にして、結果を表示してください。
sc = SparkContext("local[*]", "Problem2") rdd = sc.parallelize(range(1, 11)) # map()で3倍にする rdd2 = rdd.map(lambda x: x * 3) print(rdd2.collect()) # [3, 6, 9, 12, 15, 18, 21, 24, 27, 30] sc.stop()
1から20までの数値から、5の倍数だけを抽出してください。
sc = SparkContext("local[*]", "Problem3") rdd = sc.parallelize(range(1, 21)) # filter()で5の倍数だけ残す rdd2 = rdd.filter(lambda x: x % 5 == 0) print(rdd2.collect()) # [5, 10, 15, 20] sc.stop()
1から10までの数値の合計を、reduce()を使って計算してください。
sc = SparkContext("local[*]", "Problem4") rdd = sc.parallelize(range(1, 11)) # reduce()で合計を計算 total = rdd.reduce(lambda x, y: x + y) print(total) # 55 sc.stop()
文章”Hello World Spark is awesome”を単語に分割し、各単語を大文字にしてください。
sc = SparkContext("local[*]", "Problem5") text = "Hello World Spark is awesome" rdd = sc.parallelize([text]) # flatMap()で単語に分割、map()で大文字に rdd2 = rdd.flatMap(lambda x: x.split(" ")) \ .map(lambda x: x.upper()) print(rdd2.collect()) # ['HELLO', 'WORLD', 'SPARK', 'IS', 'AWESOME'] sc.stop()
[1, 2, 2, 3, 3, 3, 4, 4, 4, 4]のRDDから重複を削除し、各値の出現回数をカウントしてください。
sc = SparkContext("local[*]", "Problem6") rdd = sc.parallelize([1, 2, 2, 3, 3, 3, 4, 4, 4, 4]) # 方法1: distinct()で重複削除 rdd_distinct = rdd.distinct() print("Distinct:", rdd_distinct.collect()) # [1, 2, 3, 4] # 方法2: countByValue()で各値の出現回数 count_dict = rdd.countByValue() print("Count:", count_dict) # {1: 1, 2: 2, 3: 3, 4: 4} sc.stop()
1から100までの数値から、素数だけを抽出してください。
sc = SparkContext("local[*]", "Problem7") def is_prime(n): if n < 2: return False for i in range(2, int(n ** 0.5) + 1): if n % i == 0: return False return True rdd = sc.parallelize(range(1, 101)) # filter()で素数だけ残す rdd_primes = rdd.filter(is_prime) print(rdd_primes.collect()) # [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97] sc.stop()
2つのRDD [1, 2, 3, 4, 5] と [4, 5, 6, 7, 8] の共通要素と、1つ目のRDDにだけ存在する要素を求めてください。
sc = SparkContext("local[*]", "Problem8") rdd1 = sc.parallelize([1, 2, 3, 4, 5]) rdd2 = sc.parallelize([4, 5, 6, 7, 8]) # 共通要素 common = rdd1.intersection(rdd2) print("Common:", common.collect()) # [4, 5] # rdd1にだけ存在する要素 only_in_rdd1 = rdd1.subtract(rdd2) print("Only in rdd1:", only_in_rdd1.collect()) # [1, 2, 3] sc.stop()
📝 STEP 6 のまとめ
・RDDはSparkの基本データ構造(不変、分散、耐障害性)
・Transformationsは新しいRDDを返す(遅延評価)
・Actionsは結果を返す(即座に実行)
・Lazy Evaluationにより処理全体を最適化
・map()で各要素を変換、filter()で絞り込み
・collect()は慎重に使う(大量データは危険)
RDDはSparkの基礎ですが、現在はDataFrameが推奨されています。
RDDの概念を理解することで、Sparkの内部動作が理解でき、効率的なコードが書けるようになります。
次のステップからは、DataFrameに集中して学習します!
次のSTEP 7では、「RDDの実践演習」を行います。
ログファイル分析を通じて、RDDの使い方を実践的に学びましょう!
学習メモ
ビッグデータ処理(Apache Spark) - Step 6