STEP 4:SparkSessionの基礎

⚡ STEP 4: SparkSessionの基礎

SparkSessionを自在に操り、様々なデータを読み込めるようになる

📋 このステップで学ぶこと

  • SparkSessionとは何か
  • 設定パラメータの理解と使い方
  • データの読み込み(CSV、JSON、Parquet)
  • show()、printSchema()の使い方
  • 練習問題8問

学習時間の目安: 2時間

🎯 1. SparkSessionとは

1-1. SparkSessionの役割

SparkSessionは、Spark 2.0以降の統一されたエントリーポイントです。Sparkのあらゆる機能にアクセスするための入口です。

📝 SparkSessionの歴史

Spark 1.x時代:
・SparkContext(RDD用)
・SQLContext(SQL用)
・HiveContext(Hive用)
→ 複数のContextを使い分ける必要があった

Spark 2.0以降:
SparkSessionに統一!
・すべての機能に1つのエントリーポイントでアクセス
・シンプルで使いやすい

1-2. SparkSessionでできること

1️⃣ データの読み込み

CSV、JSON、Parquet、ORC、Avro、JDBC(SQL Database)など、様々な形式のデータを読み込める。

2️⃣ DataFrame操作

DataFrameの作成、変換、集計などの操作を実行できる。

3️⃣ SQL実行

SQLクエリを直接実行して、データを操作できる。

4️⃣ 設定管理

Sparkの動作を制御する各種設定を管理できる。

1-3. 基本的なSparkSession作成

from pyspark.sql import SparkSession

# SparkSessionの作成
spark = SparkSession.builder \
    .appName("MyApplication") \
    .master("local[*]") \
    .getOrCreate()

# Sparkのバージョン確認
print(f"Spark Version: {spark.version}")

# SparkSessionの停止(使い終わったら)
spark.stop()
💡 getOrCreate()の意味

getOrCreate()は、既存のSparkSessionがあればそれを返し、なければ新規作成します。
これにより、同じノートブック内で複数回実行しても問題ありません。

⚙️ 2. 設定パラメータの理解

2-1. config()メソッドの使い方

.config()メソッドで、Sparkの動作を細かく制御できます。

# configメソッドの基本構文

spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local[*]") \
    .config("spark.設定項目", "値") \
    .config("spark.別の設定", "値") \
    .getOrCreate()

2-2. よく使う設定パラメータ

設定項目 説明
spark.driver.memory Driverのメモリ量(例:4g、8g)
spark.executor.memory Executorのメモリ量(例:4g、8g)
spark.executor.cores Executor1つあたりのコア数
spark.sql.shuffle.partitions シャッフル時のパーティション数(デフォルト200)
spark.default.parallelism 並列処理のデフォルト並列度
spark.sql.adaptive.enabled 適応的クエリ実行の有効化(true/false)

2-3. 実践的な設定例

# ローカル環境での推奨設定

spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()
設定の解説

driver.memory:Driverに4GBのメモリを割り当て
executor.memory:Executorに4GBのメモリを割り当て
shuffle.partitions:シャッフル時のパーティション数を8に削減(ローカルでは200は多すぎる)
adaptive.enabled:実行時の統計情報を元に最適化を行う

2-4. 設定の確認方法

# すべての設定を表示
spark.sparkContext.getConf().getAll()

# 特定の設定を確認
print(spark.conf.get("spark.driver.memory"))
print(spark.conf.get("spark.sql.shuffle.partitions"))
⚠️ 設定変更の注意点

・一部の設定はSparkSession作成時にしか変更できない
・例:driver.memoryexecutor.memoryは起動後変更不可
・設定を変更したら、SparkSessionを再起動する必要がある

📂 3. データの読み込み

3-1. spark.read の基本

spark.readは、様々な形式のデータを読み込むためのDataFrameReaderです。

# 基本構文

# 基本的な読み込み
df = spark.read.形式("ファイルパス")

# オプション付き読み込み
df = spark.read \
    .option("オプション名", "値") \
    .option("別のオプション", "値") \
    .形式("ファイルパス")

3-2. CSV形式の読み込み

# シンプルな読み込み
df = spark.read.csv("data.csv")

# ヘッダー付き、スキーマ推論
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# オプションを個別に指定
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("sep", ",") \
    .csv("data.csv")

# データを確認
df.show()
CSVオプションの説明

header:1行目をヘッダーとして扱う(true/false)
inferSchema:データ型を自動推論(true/false)
sep:区切り文字(デフォルトは,
encoding:文字エンコーディング(例:UTF-8、Shift_JIS)
quote:引用符文字(デフォルトは"
nullValue:NULL値の表現(例:”NA”、”null”)

# 日本語CSVファイルの読み込み
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("encoding", "UTF-8") \
    .csv("japanese_data.csv")

# TSV(タブ区切り)の読み込み
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("sep", "\t") \
    .csv("data.tsv")

# NULL値を扱う
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("nullValue", "NA") \
    .csv("data_with_na.csv")

3-3. JSON形式の読み込み

# シンプルな読み込み
df = spark.read.json("data.json")

# 複数ファイルを一度に読み込み
df = spark.read.json("data/*.json")

# 複数行JSONの読み込み
df = spark.read \
    .option("multiLine", "true") \
    .json("data.json")

df.show()
JSONオプションの説明

multiLine:複数行にまたがるJSONを読み込む(true/false)
primitivesAsString:全てを文字列として読み込む(true/false)
dateFormat:日付フォーマット(例:”yyyy-MM-dd”)

JSONの形式の違い

通常のJSON(1行1レコード):
{"name": "Alice", "age": 30}
{"name": "Bob", "age": 25}
spark.read.json("data.json")

複数行JSON(配列形式):
[{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]
spark.read.option("multiLine", "true").json("data.json")

3-4. Parquet形式の読み込み

# Parquetの読み込み(最もシンプル)
df = spark.read.parquet("data.parquet")

# 複数ファイル
df = spark.read.parquet("data/*.parquet")

# パーティション分割されたParquet
df = spark.read.parquet("data/year=2024/month=11/")

df.show()
💡 Parquetとは?

Parquetは、列指向フォーマットです。

メリット:
・圧縮率が高い(CSVの1/10のサイズ)
・読み込みが速い(必要な列だけ読める)
・スキーマ情報を保持

Sparkの標準フォーマットとして推奨されています。
Part 5(パフォーマンス最適化)で詳しく学びます。

3-5. その他の形式

# ORC形式
df = spark.read.orc("data.orc")

# Avro形式
df = spark.read.format("avro").load("data.avro")

# テキストファイル
df = spark.read.text("data.txt")

# データベース(JDBC)
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/mydb") \
    .option("dbtable", "users") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

👀 4. show()とprintSchema()の使い方

4-1. show()メソッド

show()は、DataFrameの内容を表形式で表示します。

# 最初の20行を表示(デフォルト)
df.show()

# 表示行数を指定
df.show(5)  # 最初の5行

# 列の文字数制限を解除
df.show(truncate=False)

# 垂直表示(列数が多い時に便利)
df.show(vertical=True)

# 組み合わせ
df.show(10, truncate=False)
+-------+---+-------+
|   name|age|   city|
+-------+---+-------+
|  Alice| 30|  Tokyo|
|    Bob| 25|  Osaka|
|Charlie| 35|  Tokyo|
+-------+---+-------+

4-2. printSchema()メソッド

printSchema()は、DataFrameのスキーマ(構造)を表示します。

# スキーマを表示
df.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)
スキーマ情報の見方

カラム名:nameagecity
データ型:stringinteger
nullable:NULL値を許容するか(true/false)

4-3. その他の確認メソッド

# 列名を取得
df.columns
# 出力: ['name', 'age', 'city']

# 行数をカウント
df.count()
# 出力: 3

# 最初のn行を取得(Pythonのリストとして)
df.take(2)
# 出力: [Row(name='Alice', age=30, city='Tokyo'), 
#        Row(name='Bob', age=25, city='Osaka')]

# 最初の1行を取得
df.first()
# 出力: Row(name='Alice', age=30, city='Tokyo')

# 統計情報を表示
df.describe().show()
# 出力: count, mean, stddev, min, max
# describe()の出力例
df.describe().show()
+-------+-----+------------------+
|summary| name|               age|
+-------+-----+------------------+
|  count|    3|                 3|
|   mean| null|              30.0|
| stddev| null|               5.0|
|    min|Alice|                25|
|    max|  Eve|                35|
+-------+-----+------------------+

📝 練習問題

問題 1 基礎

SparkSessionを作成し、アプリケーション名を”Practice1″、ローカルモードで全CPUコアを使用するように設定してください。

【解答】
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Practice1") \
    .master("local[*]") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
spark.stop()
問題 2 基礎

以下のデータからDataFrameを作成し、show()で表示してください。

データ: [(“Apple”, 100), (“Banana”, 50), (“Orange”, 80)]
カラム名: [“product”, “price”]

【解答】
data = [("Apple", 100), ("Banana", 50), ("Orange", 80)]
columns = ["product", "price"]

df = spark.createDataFrame(data, columns)
df.show()
+-------+-----+
|product|price|
+-------+-----+
|  Apple|  100|
| Banana|   50|
| Orange|   80|
+-------+-----+
問題 3 基礎

問題2で作成したDataFrameのスキーマを表示してください。

【解答】
df.printSchema()
root
 |-- product: string (nullable = true)
 |-- price: long (nullable = true)
問題 4 応用

CSVファイルを読み込む際に、ヘッダーを使用し、スキーマを自動推論するコードを書いてください。

【解答】
# 方法1:引数で指定
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# 方法2:optionで指定
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("data.csv")

df.show()
df.printSchema()
問題 5 応用

SparkSessionを作成する際に、Driverメモリを2GB、Executorメモリを2GBに設定してください。

【解答】
spark = SparkSession.builder \
    .appName("MemoryConfig") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# 設定確認
print(spark.conf.get("spark.driver.memory"))
print(spark.conf.get("spark.executor.memory"))

spark.stop()
問題 6 応用

TSVファイル(タブ区切り)を読み込むコードを書いてください。

【解答】
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("sep", "\t") \
    .csv("data.tsv")

df.show()

ポイント:TSVはCSVの一種で、sepオプションで区切り文字を\t(タブ)に指定します。

問題 7 発展

JSON形式の複数行データを読み込むコードを書いてください。

【解答】
df = spark.read \
    .option("multiLine", "true") \
    .json("data.json")

df.show()
df.printSchema()

ポイント:multiLineオプションをtrueにすることで、配列形式やオブジェクト形式の複数行JSONを読み込めます。

問題 8 発展

DataFrameの最初の3行だけを表示し、列の文字数制限を解除するコードを書いてください。

【解答】
df.show(3, truncate=False)

# または
df.show(n=3, truncate=False)

ポイント:truncate=Falseにより、長い文字列も省略されずに全て表示されます。

📝 STEP 4 のまとめ

✅ このステップで学んだこと

SparkSessionはSparkの統一されたエントリーポイント
.config()メモリやパーティション数などを設定
CSVheaderinferSchemaオプションが重要
JSONmultiLineオプションで複数行対応
Parquetは列指向フォーマットで高速・省スペース
show()でデータを表示、printSchema()でスキーマを確認

💡 重要ポイント

データの読み込みは、Sparkの第一歩です。
様々なフォーマットに対応できることが、Sparkの大きな強みです。
特にParquet形式は、ビッグデータ処理で頻繁に使われるので、しっかり覚えましょう!

🎯 次のステップの予告

次のSTEP 5では、「PandasからSparkへの移行」を学びます。
Pandasユーザーが最速でSparkをマスターできるよう、対応表と実践演習を用意しています!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 4

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE