⚡ STEP 4: SparkSessionの基礎
SparkSessionを自在に操り、様々なデータを読み込めるようになる
📋 このステップで学ぶこと
- SparkSessionとは何か
- 設定パラメータの理解と使い方
- データの読み込み(CSV、JSON、Parquet)
- show()、printSchema()の使い方
- 練習問題8問
学習時間の目安: 2時間
🎯 1. SparkSessionとは
1-1. SparkSessionの役割
SparkSessionは、Spark 2.0以降の統一されたエントリーポイントです。Sparkのあらゆる機能にアクセスするための入口です。
Spark 1.x時代:
・SparkContext(RDD用)
・SQLContext(SQL用)
・HiveContext(Hive用)
→ 複数のContextを使い分ける必要があった
Spark 2.0以降:
・SparkSessionに統一!
・すべての機能に1つのエントリーポイントでアクセス
・シンプルで使いやすい
1-2. SparkSessionでできること
CSV、JSON、Parquet、ORC、Avro、JDBC(SQL Database)など、様々な形式のデータを読み込める。
DataFrameの作成、変換、集計などの操作を実行できる。
SQLクエリを直接実行して、データを操作できる。
Sparkの動作を制御する各種設定を管理できる。
1-3. 基本的なSparkSession作成
from pyspark.sql import SparkSession # SparkSessionの作成 spark = SparkSession.builder \ .appName("MyApplication") \ .master("local[*]") \ .getOrCreate() # Sparkのバージョン確認 print(f"Spark Version: {spark.version}") # SparkSessionの停止(使い終わったら) spark.stop()
getOrCreate()は、既存のSparkSessionがあればそれを返し、なければ新規作成します。
これにより、同じノートブック内で複数回実行しても問題ありません。
⚙️ 2. 設定パラメータの理解
2-1. config()メソッドの使い方
.config()メソッドで、Sparkの動作を細かく制御できます。
# configメソッドの基本構文 spark = SparkSession.builder \ .appName("MyApp") \ .master("local[*]") \ .config("spark.設定項目", "値") \ .config("spark.別の設定", "値") \ .getOrCreate()
2-2. よく使う設定パラメータ
| 設定項目 | 説明 |
|---|---|
spark.driver.memory |
Driverのメモリ量(例:4g、8g) |
spark.executor.memory |
Executorのメモリ量(例:4g、8g) |
spark.executor.cores |
Executor1つあたりのコア数 |
spark.sql.shuffle.partitions |
シャッフル時のパーティション数(デフォルト200) |
spark.default.parallelism |
並列処理のデフォルト並列度 |
spark.sql.adaptive.enabled |
適応的クエリ実行の有効化(true/false) |
2-3. 実践的な設定例
# ローカル環境での推奨設定 spark = SparkSession.builder \ .appName("MyApp") \ .master("local[*]") \ .config("spark.driver.memory", "4g") \ .config("spark.executor.memory", "4g") \ .config("spark.sql.shuffle.partitions", "8") \ .config("spark.sql.adaptive.enabled", "true") \ .getOrCreate()
driver.memory:Driverに4GBのメモリを割り当て
executor.memory:Executorに4GBのメモリを割り当て
shuffle.partitions:シャッフル時のパーティション数を8に削減(ローカルでは200は多すぎる)
adaptive.enabled:実行時の統計情報を元に最適化を行う
2-4. 設定の確認方法
# すべての設定を表示 spark.sparkContext.getConf().getAll() # 特定の設定を確認 print(spark.conf.get("spark.driver.memory")) print(spark.conf.get("spark.sql.shuffle.partitions"))
・一部の設定はSparkSession作成時にしか変更できない
・例:driver.memory、executor.memoryは起動後変更不可
・設定を変更したら、SparkSessionを再起動する必要がある
📂 3. データの読み込み
3-1. spark.read の基本
spark.readは、様々な形式のデータを読み込むためのDataFrameReaderです。
# 基本構文 # 基本的な読み込み df = spark.read.形式("ファイルパス") # オプション付き読み込み df = spark.read \ .option("オプション名", "値") \ .option("別のオプション", "値") \ .形式("ファイルパス")
3-2. CSV形式の読み込み
# シンプルな読み込み df = spark.read.csv("data.csv") # ヘッダー付き、スキーマ推論 df = spark.read.csv("data.csv", header=True, inferSchema=True) # オプションを個別に指定 df = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .option("sep", ",") \ .csv("data.csv") # データを確認 df.show()
header:1行目をヘッダーとして扱う(true/false)
inferSchema:データ型を自動推論(true/false)
sep:区切り文字(デフォルトは,)
encoding:文字エンコーディング(例:UTF-8、Shift_JIS)
quote:引用符文字(デフォルトは")
nullValue:NULL値の表現(例:”NA”、”null”)
# 日本語CSVファイルの読み込み df = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .option("encoding", "UTF-8") \ .csv("japanese_data.csv") # TSV(タブ区切り)の読み込み df = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .option("sep", "\t") \ .csv("data.tsv") # NULL値を扱う df = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .option("nullValue", "NA") \ .csv("data_with_na.csv")
3-3. JSON形式の読み込み
# シンプルな読み込み df = spark.read.json("data.json") # 複数ファイルを一度に読み込み df = spark.read.json("data/*.json") # 複数行JSONの読み込み df = spark.read \ .option("multiLine", "true") \ .json("data.json") df.show()
multiLine:複数行にまたがるJSONを読み込む(true/false)
primitivesAsString:全てを文字列として読み込む(true/false)
dateFormat:日付フォーマット(例:”yyyy-MM-dd”)
通常のJSON(1行1レコード):
{"name": "Alice", "age": 30}
{"name": "Bob", "age": 25}
→ spark.read.json("data.json")
複数行JSON(配列形式):
[{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]
→ spark.read.option("multiLine", "true").json("data.json")
3-4. Parquet形式の読み込み
# Parquetの読み込み(最もシンプル) df = spark.read.parquet("data.parquet") # 複数ファイル df = spark.read.parquet("data/*.parquet") # パーティション分割されたParquet df = spark.read.parquet("data/year=2024/month=11/") df.show()
Parquetは、列指向フォーマットです。
メリット:
・圧縮率が高い(CSVの1/10のサイズ)
・読み込みが速い(必要な列だけ読める)
・スキーマ情報を保持
Sparkの標準フォーマットとして推奨されています。
Part 5(パフォーマンス最適化)で詳しく学びます。
3-5. その他の形式
# ORC形式 df = spark.read.orc("data.orc") # Avro形式 df = spark.read.format("avro").load("data.avro") # テキストファイル df = spark.read.text("data.txt") # データベース(JDBC) df = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql://localhost:5432/mydb") \ .option("dbtable", "users") \ .option("user", "username") \ .option("password", "password") \ .load()
👀 4. show()とprintSchema()の使い方
4-1. show()メソッド
show()は、DataFrameの内容を表形式で表示します。
# 最初の20行を表示(デフォルト) df.show() # 表示行数を指定 df.show(5) # 最初の5行 # 列の文字数制限を解除 df.show(truncate=False) # 垂直表示(列数が多い時に便利) df.show(vertical=True) # 組み合わせ df.show(10, truncate=False)
+-------+---+-------+ | name|age| city| +-------+---+-------+ | Alice| 30| Tokyo| | Bob| 25| Osaka| |Charlie| 35| Tokyo| +-------+---+-------+
4-2. printSchema()メソッド
printSchema()は、DataFrameのスキーマ(構造)を表示します。
# スキーマを表示
df.printSchema()root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- city: string (nullable = true)
カラム名:name、age、city
データ型:string、integer
nullable:NULL値を許容するか(true/false)
4-3. その他の確認メソッド
# 列名を取得 df.columns # 出力: ['name', 'age', 'city'] # 行数をカウント df.count() # 出力: 3 # 最初のn行を取得(Pythonのリストとして) df.take(2) # 出力: [Row(name='Alice', age=30, city='Tokyo'), # Row(name='Bob', age=25, city='Osaka')] # 最初の1行を取得 df.first() # 出力: Row(name='Alice', age=30, city='Tokyo') # 統計情報を表示 df.describe().show() # 出力: count, mean, stddev, min, max
# describe()の出力例
df.describe().show()+-------+-----+------------------+ |summary| name| age| +-------+-----+------------------+ | count| 3| 3| | mean| null| 30.0| | stddev| null| 5.0| | min|Alice| 25| | max| Eve| 35| +-------+-----+------------------+
📝 練習問題
SparkSessionを作成し、アプリケーション名を”Practice1″、ローカルモードで全CPUコアを使用するように設定してください。
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Practice1") \ .master("local[*]") \ .getOrCreate() print(f"Spark Version: {spark.version}") spark.stop()
以下のデータからDataFrameを作成し、show()で表示してください。
データ: [(“Apple”, 100), (“Banana”, 50), (“Orange”, 80)]
カラム名: [“product”, “price”]
data = [("Apple", 100), ("Banana", 50), ("Orange", 80)] columns = ["product", "price"] df = spark.createDataFrame(data, columns) df.show()
+-------+-----+ |product|price| +-------+-----+ | Apple| 100| | Banana| 50| | Orange| 80| +-------+-----+
問題2で作成したDataFrameのスキーマを表示してください。
df.printSchema()
root |-- product: string (nullable = true) |-- price: long (nullable = true)
CSVファイルを読み込む際に、ヘッダーを使用し、スキーマを自動推論するコードを書いてください。
# 方法1:引数で指定 df = spark.read.csv("data.csv", header=True, inferSchema=True) # 方法2:optionで指定 df = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .csv("data.csv") df.show() df.printSchema()
SparkSessionを作成する際に、Driverメモリを2GB、Executorメモリを2GBに設定してください。
spark = SparkSession.builder \
.appName("MemoryConfig") \
.master("local[*]") \
.config("spark.driver.memory", "2g") \
.config("spark.executor.memory", "2g") \
.getOrCreate()
# 設定確認
print(spark.conf.get("spark.driver.memory"))
print(spark.conf.get("spark.executor.memory"))
spark.stop()TSVファイル(タブ区切り)を読み込むコードを書いてください。
df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.option("sep", "\t") \
.csv("data.tsv")
df.show()
ポイント:TSVはCSVの一種で、sepオプションで区切り文字を\t(タブ)に指定します。
JSON形式の複数行データを読み込むコードを書いてください。
df = spark.read \
.option("multiLine", "true") \
.json("data.json")
df.show()
df.printSchema()
ポイント:multiLineオプションをtrueにすることで、配列形式やオブジェクト形式の複数行JSONを読み込めます。
DataFrameの最初の3行だけを表示し、列の文字数制限を解除するコードを書いてください。
df.show(3, truncate=False) # または df.show(n=3, truncate=False)
ポイント:truncate=Falseにより、長い文字列も省略されずに全て表示されます。
📝 STEP 4 のまとめ
・SparkSessionはSparkの統一されたエントリーポイント
・.config()でメモリやパーティション数などを設定
・CSVはheaderとinferSchemaオプションが重要
・JSONはmultiLineオプションで複数行対応
・Parquetは列指向フォーマットで高速・省スペース
・show()でデータを表示、printSchema()でスキーマを確認
データの読み込みは、Sparkの第一歩です。
様々なフォーマットに対応できることが、Sparkの大きな強みです。
特にParquet形式は、ビッグデータ処理で頻繁に使われるので、しっかり覚えましょう!
次のSTEP 5では、「PandasからSparkへの移行」を学びます。
Pandasユーザーが最速でSparkをマスターできるよう、対応表と実践演習を用意しています!
学習メモ
ビッグデータ処理(Apache Spark) - Step 4