STEP 22:データフォーマットとI/O最適化

📄 STEP 22: データフォーマットとI/O最適化

Parquetなどの列指向フォーマットで、I/Oを劇的に高速化しよう

📋 このステップで学ぶこと

  • 行指向 vs 列指向フォーマット
  • Parquetの特徴と使い方
  • 圧縮形式の選択
  • パーティション分割されたファイル

📁 0. サンプルデータの準備

このステップでは、データフォーマットの選択でI/Oを最適化する方法を学びます。
まず、SparkSessionを初期化し、サンプルデータを準備しましょう。

0-1. SparkSessionの初期化

# SparkSessionの初期化

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import time
import os

# SparkSessionを作成
spark = SparkSession.builder \
    .appName("Data Format") \
    .getOrCreate()

print("SparkSession準備完了")
SparkSession準備完了

0-2. サンプルデータの作成

# サンプルデータを作成(100万件)

data = [(i, f"name_{i}", i % 100, i * 10, f"2024-{(i % 12) + 1:02d}-01") 
        for i in range(1000000)]
df = spark.createDataFrame(data, ["id", "name", "category", "value", "date"])

print("サンプルデータ(最初の5件):")
df.show(5)
サンプルデータ(最初の5件):
+---+------+--------+-----+----------+
| id|  name|category|value|      date|
+---+------+--------+-----+----------+
|  0|name_0|       0|    0|2024-01-01|
|  1|name_1|       1|   10|2024-02-01|
|  2|name_2|       2|   20|2024-03-01|
|  3|name_3|       3|   30|2024-04-01|
|  4|name_4|       4|   40|2024-05-01|
+---+------+--------+-----+----------+

0-3. 全サンプルデータを一括作成するコード

# ========================================
# STEP 22 サンプルデータ一括作成スクリプト
# ========================================

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import time

spark = SparkSession.builder.appName("Data Format").getOrCreate()

# サンプルデータ(100万件)
data = [(i, f"name_{i}", i % 100, i * 10, f"2024-{(i % 12) + 1:02d}-01") 
        for i in range(1000000)]
df = spark.createDataFrame(data, ["id", "name", "category", "value", "date"])

print("✅ サンプルデータを作成しました")
print("  - df: 100万件のデータ(id, name, category, value, date)")
✅ サンプルデータを作成しました
  - df: 100万件のデータ(id, name, category, value, date)

📊 1. 行指向 vs 列指向フォーマット

1-1. 2つのフォーマットの違い

フォーマット 代表例 特徴 向いている処理
行指向 CSV, JSON 1行ずつ格納 全カラム取得、INSERT
列指向 Parquet, ORC カラムごとに格納 特定カラムの集計(高速!)
📊 データの格納イメージ

【行指向(CSV)】
id=1, name=A, value=100
id=2, name=B, value=200
id=3, name=C, value=300
→ 1行ずつ読む

【列指向(Parquet)】
id: [1, 2, 3]
name: [A, B, C]
value: [100, 200, 300]
必要なカラムだけ読む(高速!)

1-2. Parquetが高速な理由

✅ Parquetの利点

必要なカラムだけ読む:I/O削減
圧縮効率が高い:同じ型のデータが連続
スキーマ内蔵:型情報を自動保持
述語プッシュダウン:フィルターが高速

📄 2. Parquetの使い方

2-1. Parquetで保存

# Parquet形式で保存

df.write.parquet("data/sample.parquet", mode="overwrite")

print("Parquetで保存しました")
Parquetで保存しました

2-2. Parquetを読み込み

# Parquet形式で読み込み

df_parquet = spark.read.parquet("data/sample.parquet")

print("Parquetを読み込み:")
df_parquet.show(3)
df_parquet.printSchema()
Parquetを読み込み:
+---+------+--------+-----+----------+
| id|  name|category|value|      date|
+---+------+--------+-----+----------+
|  0|name_0|       0|    0|2024-01-01|
|  1|name_1|       1|   10|2024-02-01|
|  2|name_2|       2|   20|2024-03-01|
+---+------+--------+-----+----------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- category: long (nullable = true)
 |-- value: long (nullable = true)
 |-- date: string (nullable = true)

→ スキーマ(型情報)が自動で保持されている!

2-3. CSV vs Parquet パフォーマンス比較

# CSVとParquetで同じデータを保存

# CSV保存
df.write.csv("data/sample.csv", header=True, mode="overwrite")

# Parquet保存
df.write.parquet("data/sample.parquet", mode="overwrite")

# 読み込み速度を比較
print("【CSV】")
start = time.time()
df_csv = spark.read.csv("data/sample.csv", header=True, inferSchema=True)
df_csv.select("value").agg(F.sum("value")).show()
print(f"処理時間: {time.time() - start:.2f}秒")

print("【Parquet】")
start = time.time()
df_parquet = spark.read.parquet("data/sample.parquet")
df_parquet.select("value").agg(F.sum("value")).show()
print(f"処理時間: {time.time() - start:.2f}秒")
【CSV】
+-------------+
|   sum(value)|
+-------------+
|4999995000000|
+-------------+

処理時間: 3.25秒

【Parquet】
+-------------+
|   sum(value)|
+-------------+
|4999995000000|
+-------------+

処理時間: 0.85秒  ← 約4倍速い!
🎯 Parquetが速い理由

valueカラムだけ読んでいる
・CSVは全カラムを読む必要がある
・スキーマ推論(inferSchema)が不要

🗜️ 3. 圧縮形式の選択

3-1. 主な圧縮形式

圧縮形式 圧縮率 速度 使用場面
snappy 高速(デフォルト) 一般的な用途
gzip やや遅い ストレージ節約
none なし 最速 圧縮コストを避ける

3-2. 圧縮形式を指定して保存

# 圧縮形式を指定

# snappy(デフォルト、高速)
df.write.parquet("data/sample_snappy.parquet", 
                 compression="snappy", mode="overwrite")

# gzip(高圧縮)
df.write.parquet("data/sample_gzip.parquet", 
                 compression="gzip", mode="overwrite")

print("圧縮形式を指定して保存しました")
圧縮形式を指定して保存しました
💡 圧縮形式の選び方

通常:snappy(デフォルト)でOK
ストレージ節約:gzip
高速処理優先:snappy or none

📂 4. パーティション分割されたファイル

4-1. パーティション分割とは

パーティション分割は、特定のカラムの値ごとにディレクトリを分けて保存する方法です。
フィルター時に不要なディレクトリをスキップできるため高速です。

📊 パーティション分割のイメージ

data/
├── category=0/
│  └── part-0001.parquet
├── category=1/
│  └── part-0001.parquet
├── category=2/
│  └── part-0001.parquet


→ category=0のデータが欲しい時、そのディレクトリだけ読む

4-2. パーティション分割して保存

# categoryでパーティション分割して保存

df.write.partitionBy("category") \
    .parquet("data/sample_partitioned.parquet", mode="overwrite")

print("パーティション分割して保存しました")
パーティション分割して保存しました

4-3. パーティション分割の効果

# パーティション分割されたデータを読み込み

df_partitioned = spark.read.parquet("data/sample_partitioned.parquet")

# category=0のデータだけ取得(高速)
print("【パーティション分割の効果】")
start = time.time()
result = df_partitioned.filter(F.col("category") == 0).count()
print(f"category=0の件数: {result}")
print(f"処理時間: {time.time() - start:.2f}秒")
print("→ category=0のディレクトリだけ読んでいる(高速)")
【パーティション分割の効果】
category=0の件数: 10000
処理時間: 0.35秒
→ category=0のディレクトリだけ読んでいる(高速)
⚠️ パーティション分割の注意点

カーディナリティが高すぎるとディレクトリが大量に
日付カテゴリなど適度な粒度で分割
よく使うフィルター条件でパーティション

📝 練習問題

問題 1 基礎

Parquetで保存

DataFrameをParquet形式で保存してください。

【解答】
df.write.parquet("output/data.parquet", mode="overwrite")
問題 2 基礎

Parquetを読み込み

Parquet形式のファイルを読み込んでください。

【解答】
df = spark.read.parquet("output/data.parquet")
df.show(5)
問題 3 応用

gzip圧縮で保存

gzip圧縮でParquetファイルを保存してください。

【解答】
df.write.parquet("output/data_gzip.parquet", 
                 compression="gzip", mode="overwrite")
問題 4 実践

パーティション分割して保存

categoryでパーティション分割してParquetを保存してください。

【解答】
df.write.partitionBy("category") \
    .parquet("output/data_partitioned.parquet", mode="overwrite")

❓ よくある質問

Q1: ParquetとORCどちらがいい?
ParquetがSparkでは一般的です。Hiveを多用する場合はORCも選択肢になります。
Q2: CSVはもう使わない方がいい?
データ交換人が読む用途ならCSVも有用です。ただし、大量データの処理にはParquetを推奨します。
Q3: 圧縮形式はどれがいい?
snappy(デフォルト)で問題ありません。ストレージを節約したい場合はgzipを検討してください。
Q4: パーティションキーは何がいい?
よく使うフィルター条件(日付、地域、カテゴリなど)をパーティションキーにします。

📝 STEP 22 のまとめ

✅ このステップで学んだこと

行指向(CSV)vs 列指向(Parquet)
Parquet:列指向で高速、圧縮効率が良い
圧縮形式:snappy(デフォルト)、gzip(高圧縮)
パーティション分割:フィルターが高速

💡 重要ポイント

・大量データはParquetを使う
snappy圧縮がデフォルト
・よく使うフィルター条件でパーティション分割

🎯 Part 5(パフォーマンス最適化)の完了!

STEP 18〜22で、パフォーマンス最適化の基礎を学びました。
次のPart 6では、「Spark on Cloud」を学びます!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 22

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE