📄 STEP 22: データフォーマットとI/O最適化
Parquetなどの列指向フォーマットで、I/Oを劇的に高速化しよう
📋 このステップで学ぶこと
- 行指向 vs 列指向フォーマット
- Parquetの特徴と使い方
- 圧縮形式の選択
- パーティション分割されたファイル
📁 0. サンプルデータの準備
このステップでは、データフォーマットの選択でI/Oを最適化する方法を学びます。
まず、SparkSessionを初期化し、サンプルデータを準備しましょう。
0-1. SparkSessionの初期化
# SparkSessionの初期化 from pyspark.sql import SparkSession from pyspark.sql import functions as F import time import os # SparkSessionを作成 spark = SparkSession.builder \ .appName("Data Format") \ .getOrCreate() print("SparkSession準備完了")
SparkSession準備完了
0-2. サンプルデータの作成
# サンプルデータを作成(100万件) data = [(i, f"name_{i}", i % 100, i * 10, f"2024-{(i % 12) + 1:02d}-01") for i in range(1000000)] df = spark.createDataFrame(data, ["id", "name", "category", "value", "date"]) print("サンプルデータ(最初の5件):") df.show(5)
サンプルデータ(最初の5件): +---+------+--------+-----+----------+ | id| name|category|value| date| +---+------+--------+-----+----------+ | 0|name_0| 0| 0|2024-01-01| | 1|name_1| 1| 10|2024-02-01| | 2|name_2| 2| 20|2024-03-01| | 3|name_3| 3| 30|2024-04-01| | 4|name_4| 4| 40|2024-05-01| +---+------+--------+-----+----------+
0-3. 全サンプルデータを一括作成するコード
# ======================================== # STEP 22 サンプルデータ一括作成スクリプト # ======================================== from pyspark.sql import SparkSession from pyspark.sql import functions as F import time spark = SparkSession.builder.appName("Data Format").getOrCreate() # サンプルデータ(100万件) data = [(i, f"name_{i}", i % 100, i * 10, f"2024-{(i % 12) + 1:02d}-01") for i in range(1000000)] df = spark.createDataFrame(data, ["id", "name", "category", "value", "date"]) print("✅ サンプルデータを作成しました") print(" - df: 100万件のデータ(id, name, category, value, date)")
✅ サンプルデータを作成しました - df: 100万件のデータ(id, name, category, value, date)
📊 1. 行指向 vs 列指向フォーマット
1-1. 2つのフォーマットの違い
| フォーマット | 代表例 | 特徴 | 向いている処理 |
|---|---|---|---|
| 行指向 | CSV, JSON | 1行ずつ格納 | 全カラム取得、INSERT |
| 列指向 | Parquet, ORC | カラムごとに格納 | 特定カラムの集計(高速!) |
【行指向(CSV)】
id=1, name=A, value=100
id=2, name=B, value=200
id=3, name=C, value=300
→ 1行ずつ読む
【列指向(Parquet)】
id: [1, 2, 3]
name: [A, B, C]
value: [100, 200, 300]
→ 必要なカラムだけ読む(高速!)
1-2. Parquetが高速な理由
・必要なカラムだけ読む:I/O削減
・圧縮効率が高い:同じ型のデータが連続
・スキーマ内蔵:型情報を自動保持
・述語プッシュダウン:フィルターが高速
📄 2. Parquetの使い方
2-1. Parquetで保存
# Parquet形式で保存 df.write.parquet("data/sample.parquet", mode="overwrite") print("Parquetで保存しました")
Parquetで保存しました
2-2. Parquetを読み込み
# Parquet形式で読み込み df_parquet = spark.read.parquet("data/sample.parquet") print("Parquetを読み込み:") df_parquet.show(3) df_parquet.printSchema()
Parquetを読み込み: +---+------+--------+-----+----------+ | id| name|category|value| date| +---+------+--------+-----+----------+ | 0|name_0| 0| 0|2024-01-01| | 1|name_1| 1| 10|2024-02-01| | 2|name_2| 2| 20|2024-03-01| +---+------+--------+-----+----------+ root |-- id: long (nullable = true) |-- name: string (nullable = true) |-- category: long (nullable = true) |-- value: long (nullable = true) |-- date: string (nullable = true) → スキーマ(型情報)が自動で保持されている!
2-3. CSV vs Parquet パフォーマンス比較
# CSVとParquetで同じデータを保存 # CSV保存 df.write.csv("data/sample.csv", header=True, mode="overwrite") # Parquet保存 df.write.parquet("data/sample.parquet", mode="overwrite") # 読み込み速度を比較 print("【CSV】") start = time.time() df_csv = spark.read.csv("data/sample.csv", header=True, inferSchema=True) df_csv.select("value").agg(F.sum("value")).show() print(f"処理時間: {time.time() - start:.2f}秒") print("【Parquet】") start = time.time() df_parquet = spark.read.parquet("data/sample.parquet") df_parquet.select("value").agg(F.sum("value")).show() print(f"処理時間: {time.time() - start:.2f}秒")
【CSV】 +-------------+ | sum(value)| +-------------+ |4999995000000| +-------------+ 処理時間: 3.25秒 【Parquet】 +-------------+ | sum(value)| +-------------+ |4999995000000| +-------------+ 処理時間: 0.85秒 ← 約4倍速い!
・valueカラムだけ読んでいる
・CSVは全カラムを読む必要がある
・スキーマ推論(inferSchema)が不要
🗜️ 3. 圧縮形式の選択
3-1. 主な圧縮形式
| 圧縮形式 | 圧縮率 | 速度 | 使用場面 |
|---|---|---|---|
| snappy | 中 | 高速(デフォルト) | 一般的な用途 |
| gzip | 高 | やや遅い | ストレージ節約 |
| none | なし | 最速 | 圧縮コストを避ける |
3-2. 圧縮形式を指定して保存
# 圧縮形式を指定 # snappy(デフォルト、高速) df.write.parquet("data/sample_snappy.parquet", compression="snappy", mode="overwrite") # gzip(高圧縮) df.write.parquet("data/sample_gzip.parquet", compression="gzip", mode="overwrite") print("圧縮形式を指定して保存しました")
圧縮形式を指定して保存しました
・通常:snappy(デフォルト)でOK
・ストレージ節約:gzip
・高速処理優先:snappy or none
📂 4. パーティション分割されたファイル
4-1. パーティション分割とは
パーティション分割は、特定のカラムの値ごとにディレクトリを分けて保存する方法です。
フィルター時に不要なディレクトリをスキップできるため高速です。
data/
├── category=0/
│ └── part-0001.parquet
├── category=1/
│ └── part-0001.parquet
├── category=2/
│ └── part-0001.parquet
…
→ category=0のデータが欲しい時、そのディレクトリだけ読む
4-2. パーティション分割して保存
# categoryでパーティション分割して保存 df.write.partitionBy("category") \ .parquet("data/sample_partitioned.parquet", mode="overwrite") print("パーティション分割して保存しました")
パーティション分割して保存しました
4-3. パーティション分割の効果
# パーティション分割されたデータを読み込み df_partitioned = spark.read.parquet("data/sample_partitioned.parquet") # category=0のデータだけ取得(高速) print("【パーティション分割の効果】") start = time.time() result = df_partitioned.filter(F.col("category") == 0).count() print(f"category=0の件数: {result}") print(f"処理時間: {time.time() - start:.2f}秒") print("→ category=0のディレクトリだけ読んでいる(高速)")
【パーティション分割の効果】 category=0の件数: 10000 処理時間: 0.35秒 → category=0のディレクトリだけ読んでいる(高速)
・カーディナリティが高すぎるとディレクトリが大量に
・日付やカテゴリなど適度な粒度で分割
・よく使うフィルター条件でパーティション
📝 練習問題
Parquetで保存
DataFrameをParquet形式で保存してください。
df.write.parquet("output/data.parquet", mode="overwrite")
Parquetを読み込み
Parquet形式のファイルを読み込んでください。
df = spark.read.parquet("output/data.parquet") df.show(5)
gzip圧縮で保存
gzip圧縮でParquetファイルを保存してください。
df.write.parquet("output/data_gzip.parquet", compression="gzip", mode="overwrite")
パーティション分割して保存
categoryでパーティション分割してParquetを保存してください。
df.write.partitionBy("category") \ .parquet("output/data_partitioned.parquet", mode="overwrite")
❓ よくある質問
📝 STEP 22 のまとめ
・行指向(CSV)vs 列指向(Parquet)
・Parquet:列指向で高速、圧縮効率が良い
・圧縮形式:snappy(デフォルト)、gzip(高圧縮)
・パーティション分割:フィルターが高速
・大量データはParquetを使う
・snappy圧縮がデフォルト
・よく使うフィルター条件でパーティション分割
STEP 18〜22で、パフォーマンス最適化の基礎を学びました。
次のPart 6では、「Spark on Cloud」を学びます!
学習メモ
ビッグデータ処理(Apache Spark) - Step 22