⚡ STEP 7: RDDの実践演習
Word CountとログファイルをRDDで分析して実践力を身につける
📋 このステップで学ぶこと
- サンプルデータの作成方法
- テキストファイル処理の基礎
- Word Countの実装(MapReduceの基本パターン)
- なぜDataFrameが推奨されるか
- RDDとDataFrameの使い分け
- 実践演習:ログファイル分析
📁 0. サンプルデータの作成
このステップでは、テキストファイルやログファイルを使ってRDDの実践的な使い方を学びます。
まず、演習で使用するサンプルデータを作成しましょう。
0-1. サンプルテキストファイル(sample.txt)の作成
Word Countやテキスト処理の基本を学ぶためのシンプルなテキストファイルを作成します。
Pythonのopen()関数を使って、ファイルに文字列を書き込みます。
# サンプルテキストファイルを作成 # sample.txt の内容を定義 sample_text = """Hello World Apache Spark is awesome Spark is fast Big Data processing with Spark""" # ファイルに書き込み("w"モードで新規作成) with open("sample.txt", "w") as f: f.write(sample_text) print("sample.txt を作成しました")
sample.txt を作成しました
0-2. サンプルログファイル(access.log)の作成
Webサーバーのアクセスログを模したサンプルファイルを作成します。
実際のログ分析で使用される形式に近いデータです。
# サンプルログファイルを作成 # アクセスログの内容を定義 # 形式: IPアドレス - - [日時] "メソッド URL プロトコル" ステータスコード バイト数 access_log = """192.168.1.1 - - [10/Oct/2023:13:55:36] "GET /index.html HTTP/1.1" 200 2326 192.168.1.2 - - [10/Oct/2023:13:56:12] "GET /about.html HTTP/1.1" 200 1523 192.168.1.1 - - [10/Oct/2023:13:57:28] "GET /contact.html HTTP/1.1" 404 0 192.168.1.3 - - [10/Oct/2023:13:58:45] "POST /api/login HTTP/1.1" 200 89 192.168.1.2 - - [10/Oct/2023:13:59:01] "GET /index.html HTTP/1.1" 200 2326 192.168.1.4 - - [10/Oct/2023:14:00:15] "GET /products.html HTTP/1.1" 200 4512 192.168.1.1 - - [10/Oct/2023:14:01:32] "GET /notfound.html HTTP/1.1" 404 0""" # ファイルに書き込み with open("access.log", "w") as f: f.write(access_log) print("access.log を作成しました")
access.log を作成しました
0-3. 全サンプルデータを一括作成するコード
上記の2つのファイルを一括で作成できるコードです。
このコードを実行すれば、演習に必要な全ファイルが準備できます。
# ======================================== # STEP 7 サンプルデータ一括作成スクリプト # ======================================== # 1. sample.txt(テキスト処理用) sample_text = """Hello World Apache Spark is awesome Spark is fast Big Data processing with Spark""" with open("sample.txt", "w") as f: f.write(sample_text) # 2. access.log(ログ分析用) access_log = """192.168.1.1 - - [10/Oct/2023:13:55:36] "GET /index.html HTTP/1.1" 200 2326 192.168.1.2 - - [10/Oct/2023:13:56:12] "GET /about.html HTTP/1.1" 200 1523 192.168.1.1 - - [10/Oct/2023:13:57:28] "GET /contact.html HTTP/1.1" 404 0 192.168.1.3 - - [10/Oct/2023:13:58:45] "POST /api/login HTTP/1.1" 200 89 192.168.1.2 - - [10/Oct/2023:13:59:01] "GET /index.html HTTP/1.1" 200 2326 192.168.1.4 - - [10/Oct/2023:14:00:15] "GET /products.html HTTP/1.1" 200 4512 192.168.1.1 - - [10/Oct/2023:14:01:32] "GET /notfound.html HTTP/1.1" 404 0""" with open("access.log", "w") as f: f.write(access_log) print("✅ サンプルデータを作成しました") print(" - sample.txt") print(" - access.log")
✅ サンプルデータを作成しました - sample.txt - access.log
上記のコードを実行すると、Jupyter Notebookを起動したディレクトリにファイルが作成されます。
SparkでtextFile()を使う際は、同じディレクトリでSparkを起動してください。
📄 1. テキストファイル処理
1-1. textFile()の基本:ファイルを行単位で読み込む
SparkでテキストファイルをRDDとして読み込むには、textFile()メソッドを使います。
このメソッドは、ファイルの各行を1つの要素として持つRDDを作成します。
・ファイルの各行がRDDの1要素になる
・改行文字(\n)で区切られた文字列のリストとして読み込まれる
・大きなファイルでも分散処理で効率的に読み込める
コードを段階的に見ていきましょう。まず、SparkContextをインポートして初期化します。
# SparkContextをインポート from pyspark import SparkContext # SparkContextを作成 # "local[*]" は全CPUコアを使用するローカルモード # "TextFile" はアプリケーション名(Spark UIで表示される) sc = SparkContext("local[*]", "TextFile")
次に、textFile()でファイルを読み込みます。
# テキストファイルをRDDとして読み込み # 各行が1つの要素になる rdd = sc.textFile("sample.txt")
collect()はRDDの全要素をリストとして取得するActionです。
count()は要素数をカウントするActionです。
# collect():全要素をリストとして取得 for line in rdd.collect(): print(line) # count():要素数(行数)をカウント print(f"Total lines: {rdd.count()}")
以下が完成コードです。入力して実行してみましょう。
# テキストファイルの読み込み - 完成コード from pyspark import SparkContext # SparkContext作成("local[*]"は全CPUコアを使用) sc = SparkContext("local[*]", "TextFile") # テキストファイルを読み込む(各行がRDDの1要素) rdd = sc.textFile("sample.txt") # 各行を表示 for line in rdd.collect(): print(line) # 行数をカウント print(f"Total lines: {rdd.count()}") # SparkContextを終了(リソース解放) sc.stop()
Hello World Apache Spark is awesome Spark is fast Big Data processing with Spark Total lines: 4
1-2. filter():特定の条件に合う行を抽出
filter()は、条件に合致する要素だけを残すTransformationです。
lambda関数で条件を指定します。条件に合致する要素のみが新しいRDDに含まれます。
filter(lambda x: 条件)
・条件がTrueの要素だけを残す
・元のRDDは変更されない(新しいRDDが作成される)
・Transformationなので、Actionが呼ばれるまで実行されない
# 特定の単語を含む行を抽出する from pyspark import SparkContext sc = SparkContext("local[*]", "Filter") # テキストファイルを読み込む rdd = sc.textFile("sample.txt") # filter():条件に合う要素だけを残す # lambda line: "Spark" in line は「行に"Spark"が含まれているか」を判定 spark_lines = rdd.filter(lambda line: "Spark" in line) # collect()でリストとして取得して表示 print(spark_lines.collect()) sc.stop()
['Apache Spark is awesome', 'Spark is fast', 'Big Data processing with Spark']
1-3. map()とreduce():各行の単語数をカウント
map()は各要素に関数を適用し、reduce()は全要素を1つの値に集約します。
これらを組み合わせて、各行の単語数と平均単語数を計算してみましょう。
・map(lambda x: f(x)):各要素xに関数fを適用して変換
・reduce(lambda x, y: x + y):全要素を順番に結合して1つの値にする
・split():文字列を空白で分割してリストにする
# 各行の単語数をカウントする from pyspark import SparkContext sc = SparkContext("local[*]", "WordCount") rdd = sc.textFile("sample.txt") # map():各行を単語数(整数)に変換 # line.split() は行を単語のリストに分割 # len() でリストの長さ(単語数)を取得 word_counts = rdd.map(lambda line: len(line.split())) # 各行の単語数を表示 print("各行の単語数:", word_counts.collect()) # reduce():全要素を足し合わせて合計を計算 # lambda x, y: x + y は「2つの値を足す」という意味 total_words = word_counts.reduce(lambda x, y: x + y) # count()で行数を取得し、平均を計算 avg = total_words / word_counts.count() print(f"平均単語数: {avg}") sc.stop()
各行の単語数: [2, 4, 3, 5] 平均単語数: 3.5
📊 2. Word Countの実装
2-1. Word Countとは?
Word Count(単語カウント)は、テキスト内の各単語が何回出現するかをカウントする処理です。
分散処理の「Hello World」とも呼ばれ、MapReduceパターンの基本を学ぶのに最適です。
① 読み込み:テキストファイルを行単位で読み込む
② 分割:各行を単語に分割する
③ ペア作成:各単語を (単語, 1) のペアにする
④ グループ化:同じ単語のペアをまとめる
⑤ 集計:各グループの「1」を合計する
2-2. flatMap()の理解:行を単語に分割
Word Countを実装する前に、flatMap()の動作を理解しましょう。
map()とflatMap()の違いを比較します。
| メソッド | 動作 | 入力例 | 出力例 |
|---|---|---|---|
| map() | 各要素を1対1で変換 | [“Hello World”, “Hi”] | [[“Hello”, “World”], [“Hi”]] |
| flatMap() | 変換後に1つのリストに平坦化 | [“Hello World”, “Hi”] | [“Hello”, “World”, “Hi”] |
# map() と flatMap() の違いを確認 from pyspark import SparkContext sc = SparkContext("local[*]", "FlatMap") # サンプルデータ rdd = sc.parallelize(["Hello World", "Hi Spark"]) # map():各行をリストに変換(入れ子になる) map_result = rdd.map(lambda line: line.split()) print("map()の結果:", map_result.collect()) # flatMap():各行をリストに変換して平坦化 flatmap_result = rdd.flatMap(lambda line: line.split()) print("flatMap()の結果:", flatmap_result.collect()) sc.stop()
map()の結果: [['Hello', 'World'], ['Hi', 'Spark']] flatMap()の結果: ['Hello', 'World', 'Hi', 'Spark']
Word Countでは、各単語を個別に処理する必要があります。
flatMap()を使うことで、入れ子のリストを避け、すべての単語をフラットなリストとして取得できます。
2-3. reduceByKey()の理解:キー別に集約
reduceByKey()は、同じキーを持つ要素の値を集約するTransformationです。
(キー, 値) のペアを持つRDDに対して使用します。
# reduceByKey() の動作を確認 from pyspark import SparkContext sc = SparkContext("local[*]", "ReduceByKey") # (キー, 値) のペアを持つRDD pairs = sc.parallelize([ ("hello", 1), ("world", 1), ("hello", 1), ("hello", 1) ]) # reduceByKey():同じキーの値を足し合わせる # ("hello", 1) + ("hello", 1) + ("hello", 1) → ("hello", 3) result = pairs.reduceByKey(lambda x, y: x + y) print(result.collect()) sc.stop()
[('hello', 3), ('world', 1)]2-4. Word Count 基本版の実装
ここまで学んだflatMap()、map()、reduceByKey()を組み合わせて、Word Countを実装します。
# Word Count 基本版 from pyspark import SparkContext sc = SparkContext("local[*]", "WordCount") # サンプルテキスト text = [ "Hello World", "Hello Spark", "Spark is fast", "Spark is awesome" ] rdd = sc.parallelize(text) # ステップ1: flatMap() で各行を単語に分割 words = rdd.flatMap(lambda line: line.split()) print("①単語リスト:", words.collect()) # ステップ2: map() で各単語を (単語, 1) のペアに変換 word_pairs = words.map(lambda word: (word, 1)) print("②ペアリスト:", word_pairs.collect()) # ステップ3: reduceByKey() で同じ単語の値を合計 word_counts = word_pairs.reduceByKey(lambda x, y: x + y) print("③カウント結果:", word_counts.collect()) sc.stop()
①単語リスト: ['Hello', 'World', 'Hello', 'Spark', 'Spark', 'is', 'fast', 'Spark', 'is', 'awesome']
②ペアリスト: [('Hello', 1), ('World', 1), ('Hello', 1), ('Spark', 1), ...]
③カウント結果: [('Hello', 2), ('World', 1), ('Spark', 3), ('is', 2), ('fast', 1), ('awesome', 1)]2-5. Word Count 改良版:大文字小文字を統一
基本版では「Hello」と「hello」が別々にカウントされます。
lower()を使って小文字に統一し、さらにsortBy()で降順にソートします。
# Word Count 改良版(大文字小文字を統一) from pyspark import SparkContext sc = SparkContext("local[*]", "WordCountImproved") text = [ "Hello World", "hello Spark", "SPARK is fast", "Spark is awesome" ] rdd = sc.parallelize(text) # メソッドチェーンで一連の処理を連結 # lower():小文字に変換 # sortBy():指定した値でソート(ascending=Falseで降順) word_counts = rdd.flatMap(lambda line: line.lower().split()) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda x, y: x + y) \ .sortBy(lambda x: x[1], ascending=False) print("降順でソートした結果:") print(word_counts.collect()) sc.stop()
降順でソートした結果:
[('spark', 3), ('hello', 2), ('is', 2), ('world', 1), ('fast', 1), ('awesome', 1)]2-6. Word Count 完全版:句読点を除去してTOP 5を表示
実際のテキストには句読点(!、.、,など)が含まれています。
正規表現を使って句読点を除去し、出現頻度の高い単語TOP 5を表示します。
# Word Count 完全版(句読点除去 + TOP 5表示) import re from pyspark import SparkContext sc = SparkContext("local[*]", "WordCountComplete") text = [ "Hello, World!", "Hello Spark.", "Spark is fast!", "Spark is awesome." ] rdd = sc.parallelize(text) def clean_word(word): """単語から句読点を除去して小文字に変換する関数""" # re.sub():正規表現でマッチした部分を置換 # r'[^\w]' は「アルファベット・数字・アンダースコア以外」を意味 # それを空文字''に置換することで削除 return re.sub(r'[^\w]', '', word).lower() # 処理の流れ: # 1. flatMap: 行→単語に分割 # 2. map: 各単語から句読点を除去 # 3. filter: 空文字を除外 # 4. map: (単語, 1)のペアを作成 # 5. reduceByKey: 同じ単語の値を合計 # 6. sortBy: カウント数で降順ソート word_counts = rdd.flatMap(lambda line: line.split()) \ .map(clean_word) \ .filter(lambda word: word != '') \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda x, y: x + y) \ .sortBy(lambda x: x[1], ascending=False) # take(5): 上位5件を取得 top5 = word_counts.take(5) print("TOP 5 Words:") for word, count in top5: print(f" {word}: {count}") sc.stop()
TOP 5 Words: spark: 3 is: 2 hello: 2 world: 1 fast: 1
🔍 3. なぜDataFrameが推奨されるか
3-1. RDDの課題
ここまでRDDを使ってきましたが、実は現在のSparkではDataFrameの使用が推奨されています。
RDDにはいくつかの課題があるためです。
1. 低レベルAPI:細かい処理を自分で書く必要がある
2. 最適化されない:SparkがRDDの処理を最適化できない
3. 型がない:実行時までエラーがわからない
4. コードが冗長:同じ処理でもDataFrameより長くなる
3-2. 同じ処理をRDDとDataFrameで比較
「30歳以上の人を都市別にカウントする」という同じ処理を、RDDとDataFrameで実装して比較してみましょう。
RDDでの実装
# RDDでの実装 - コードが長く、直感的でない from pyspark import SparkContext sc = SparkContext("local[*]", "RDD") # データ作成(名前, 年齢, 都市) data = [ ("Alice", 30, "Tokyo"), ("Bob", 25, "Osaka"), ("Charlie", 35, "Tokyo"), ("David", 28, "Tokyo") ] rdd = sc.parallelize(data) # 30歳以上をフィルター(インデックス[1]が年齢) filtered = rdd.filter(lambda x: x[1] >= 30) # 都市別にグループ化してカウント # x[2]は都市、(都市, 1)のペアを作成してreduceByKey city_counts = filtered.map(lambda x: (x[2], 1)) \ .reduceByKey(lambda x, y: x + y) print(city_counts.collect()) # 結果: [('Tokyo', 2)] sc.stop()
DataFrameでの実装
# DataFrameでの実装 - シンプルで直感的! from pyspark.sql import SparkSession spark = SparkSession.builder.appName("DataFrame").getOrCreate() # データ作成(カラム名を指定できる) data = [ ("Alice", 30, "Tokyo"), ("Bob", 25, "Osaka"), ("Charlie", 35, "Tokyo"), ("David", 28, "Tokyo") ] df = spark.createDataFrame(data, ["name", "age", "city"]) # 30歳以上をフィルター、都市別にカウント # カラム名で直接アクセスできる! result = df.filter(df.age >= 30) \ .groupBy("city") \ .count() result.show() spark.stop()
+-----+-----+ | city|count| +-----+-----+ |Tokyo| 2| +-----+-----+
3-3. RDDとDataFrameの比較表
| 項目 | RDD | DataFrame |
|---|---|---|
| コード量 | 多い(冗長) | 少ない(簡潔) |
| カラムアクセス | インデックス(x[1]) | 名前(df.age) |
| 最適化 | なし | Catalyst Optimizer |
| 実行速度 | 遅い | 速い(2〜10倍) |
| SQL互換 | なし | あり |
Catalyst Optimizer:クエリを分析して最適な実行計画を自動生成
Tungsten実行エンジン:メモリ効率を最大化する低レベル最適化
列指向ストレージ:必要な列だけを読み込み、I/Oを削減
⚖️ 4. RDDとDataFrameの使い分け
4-1. 使い分けの基準
| ケース | RDDを使う | DataFrameを使う |
|---|---|---|
| データ構造 | 非構造化データ (テキスト、バイナリ) |
構造化データ (CSV、JSON、DB) |
| 処理内容 | カスタム処理 複雑なロジック |
標準的な集計・変換 SQL的な処理 |
| パフォーマンス | 最適化されない | 自動最適化 |
| 推奨度 | 特殊な用途のみ | ほとんどの用途 |
4-2. RDDを使うべきケース(稀)
・テキスト処理:行単位の複雑なパース処理が必要な場合
・バイナリデータ:画像、音声などの非構造化データ
・カスタムパーティショニング:独自の分散戦略が必要な場合
・レガシーコード:既存のRDDコードを保守する場合
99%のケースでDataFrameを使ってください。
RDDは、DataFrameでは実現できない特殊な処理が必要な場合のみ使います。
次のSTEP 8からはDataFrameに集中して学習していきます。
📝 5. 実践演習:ログファイル分析
ここからは、セクション0で作成したaccess.logファイルを使って、実践的なログ分析を行います。
RDDが適しているテキスト処理の例として、Webサーバーのアクセスログを分析してみましょう。
5-1. ログファイルの構造を理解する
まず、ログファイルの各フィールドがどのインデックスに対応するかを確認します。
# ログの1行をスペースで分割した場合のインデックス対応
"192.168.1.1 - - [10/Oct/2023:13:55:36] "GET /index.html HTTP/1.1" 200 2326"
インデックス:
[0] → "192.168.1.1" # IPアドレス
[1] → "-" # ユーザー識別子
[2] → "-" # ユーザーID
[3] → "[10/Oct/2023:13:55:36]" # タイムスタンプ
[4] → "" # 空白(ダブルクォートが分割される)
[5] → "GET # HTTPメソッド(クォート付き)
[6] → /index.html" # URL
[7] → "HTTP/1.1" # プロトコル
[8] → "200" # ステータスコード
[9] → "2326" # バイト数5-2. 演習1: アクセス回数の多いIPアドレスTOP 3
各IPアドレスからのアクセス回数をカウントし、上位3つを表示します。
split()[0]でログの最初のフィールド(IPアドレス)を取得します。
# IPアドレス別アクセス回数TOP 3 from pyspark import SparkContext sc = SparkContext("local[*]", "LogAnalysis") # ログファイルを読み込み logs = sc.textFile("access.log") # 処理の流れ: # 1. 各行からIPアドレス(インデックス0)を抽出 # 2. (IP, 1)のペアを作成 # 3. 同じIPの値を合計 # 4. カウント数で降順ソート ip_counts = logs.map(lambda line: line.split()[0]) \ .map(lambda ip: (ip, 1)) \ .reduceByKey(lambda x, y: x + y) \ .sortBy(lambda x: x[1], ascending=False) # TOP 3を表示 print("TOP 3 IPアドレス:") for ip, count in ip_counts.take(3): print(f" {ip}: {count}回") sc.stop()
TOP 3 IPアドレス: 192.168.1.1: 3回 192.168.1.2: 2回 192.168.1.3: 1回
5-3. 演習2: HTTPステータスコード別の集計
ステータスコード(200, 404など)ごとの件数をカウントします。
split()[8]でステータスコードを取得します。
# ステータスコード別の集計 from pyspark import SparkContext sc = SparkContext("local[*]", "StatusCode") logs = sc.textFile("access.log") # ステータスコード(インデックス8)を抽出してカウント status_counts = logs.map(lambda line: line.split()[8]) \ .map(lambda status: (status, 1)) \ .reduceByKey(lambda x, y: x + y) print("ステータスコード別件数:") for status, count in status_counts.collect(): print(f" {status}: {count}件") sc.stop()
ステータスコード別件数: 200: 5件 404: 2件
5-4. 演習3: 404エラーのURLを抽出
404エラー(ページが見つからない)が発生したURLを抽出します。
filter()で404を含む行だけを残し、split()[6]でURLを取得します。
# 404エラーのURL抽出 from pyspark import SparkContext sc = SparkContext("local[*]", "404Errors") logs = sc.textFile("access.log") # 処理の流れ: # 1. filter(): 行に"404"が含まれるものだけを残す # 2. map(): 各行からURL(インデックス6)を抽出 error_404 = logs.filter(lambda line: "404" in line) error_urls = error_404.map(lambda line: line.split()[6]) print("404エラーのURL:") for url in error_urls.collect(): print(f" {url}") sc.stop()
404エラーのURL: /contact.html /notfound.html
5-5. 演習4: GETとPOSTリクエストの比率
HTTPメソッド(GET, POST)の使用比率を計算します。
strip('"')でメソッドについているダブルクォートを除去します。
# HTTPメソッド別の集計と比率計算 from pyspark import SparkContext sc = SparkContext("local[*]", "RequestMethod") logs = sc.textFile("access.log") # HTTPメソッド(インデックス5)を抽出 # strip('"')でダブルクォートを除去 method_counts = logs.map(lambda line: line.split()[5].strip('"')) \ .map(lambda method: (method, 1)) \ .reduceByKey(lambda x, y: x + y) # 合計リクエスト数を取得 total = logs.count() print("HTTPメソッド別分布:") for method, count in method_counts.collect(): # 比率を計算(小数点1桁まで) percentage = (count / total) * 100 print(f" {method}: {count}件 ({percentage:.1f}%)") sc.stop()
HTTPメソッド別分布: GET: 6件 (85.7%) POST: 1件 (14.3%)
5-6. 参考:DataFrameでの同じ処理
参考として、DataFrameを使った場合のコードも紹介します。
ログを構造化データとして扱えるため、より直感的に書けます。
# DataFrameを使ったログ分析(参考) from pyspark.sql import SparkSession from pyspark.sql.functions import split, col spark = SparkSession.builder.appName("LogDF").getOrCreate() # テキストファイルを1カラムのDataFrameとして読み込み logs_df = spark.read.text("access.log") # split()で分割し、最初の要素をIPアドレスとして抽出 logs_df = logs_df.withColumn("ip", split(col("value"), " ")[0]) # IPアドレス別にカウントして降順ソート ip_counts = logs_df.groupBy("ip").count() \ .orderBy(col("count").desc()) ip_counts.show(3) spark.stop()
+-------------+-----+ | ip|count| +-------------+-----+ |192.168.1.1 | 3| |192.168.1.2 | 2| |192.168.1.3 | 1| +-------------+-----+
ログファイル分析のようなテキスト処理は、RDDでも実装できます。
しかし、構造化されたデータ(CSV、JSONなど)の分析は、DataFrameが圧倒的に効率的です。
次のSTEP 8からDataFrameを本格的に学習していきます。
📝 STEP 7 のまとめ
・textFile()でテキストファイルを行単位で読み込める
・flatMap()で入れ子リストを平坦化できる
・Word CountはMapReduceの基本パターン
・reduceByKey()で同じキーの値を集約
・DataFrameの方がシンプルで高速(推奨)
・RDDは非構造化データの処理に適している
・ログファイル分析などのテキスト処理でRDDを活用
RDDの基礎を理解したことで、Sparkの内部動作が理解できるようになりました。
ここからはDataFrameに集中して学習します。
DataFrameは、RDDの上に構築された高レベルAPIです。
RDDの概念を理解していることで、DataFrameをより深く理解できます!
次のSTEP 8では、「DataFrameの作成と基本操作」を学びます。
いよいよDataFrameの世界へ!Pandasに似た直感的なAPIで、大量データを高速処理しましょう!
学習メモ
ビッグデータ処理(Apache Spark) - Step 7