STEP 8:DataFrameの作成と基本操作

⚡ STEP 8: DataFrameの作成と基本操作

Spark DataFrameをマスターして、Pandasのように大量データを扱う

📋 このステップで学ぶこと

  • DataFrameとは何か
  • PandasとSparkの対応表(詳細版)
  • リストからDataFrame作成
  • Pandasからの変換(toPandas、createDataFrame)
  • select、filter、whereの使い方
  • カラムの追加・削除
  • 実践演習:基本的なデータ抽出
  • 練習問題15問

学習時間の目安: 2時間

🎯 1. DataFrameとは

1-1. DataFrameの定義

DataFrameは、Sparkの高レベルAPIで、表形式のデータ構造です。Pandasと非常に似ていますが、分散処理に対応しています。

【DataFrameの特徴】

✅ 表形式(行と列)
✅ スキーマ(列名とデータ型)を持つ
✅ RDDの上に構築された高レベルAPI
✅ Catalyst Optimizerによる自動最適化
✅ Tungsten実行エンジンによる高速化
✅ SQL互換(SQLでも操作可能)
✅ 多言語対応(Python、Scala、Java、R)

1-2. DataFrameの構造

+-------+---+-------+
|   name|age|   city|
+-------+---+-------+
|  Alice| 30|  Tokyo|
|    Bob| 25|  Osaka|
|Charlie| 35|  Tokyo|
+-------+---+-------+

スキーマ(Schema):
  name: string
  age: integer
  city: string

データは複数のパーティションに分散:
  Partition 0: [Alice, Bob]
  Partition 1: [Charlie]
💡 DataFrameとRDDの関係

DataFrameは、内部的にRow型のRDDとして実装されています。
DataFrame = RDD[Row] + スキーマ + 最適化
つまり、DataFrameはRDDの進化版です!

📊 2. PandasとSparkの対応表(詳細版)

2-1. データ作成・読み込み

Pandas PySpark
pd.DataFrame(data) spark.createDataFrame(data)
pd.read_csv("file.csv") spark.read.csv("file.csv", header=True)
pd.read_json("file.json") spark.read.json("file.json")
pd.read_parquet("file.parquet") spark.read.parquet("file.parquet")

2-2. データ確認

Pandas PySpark
df.head() df.show()
df.shape (df.count(), len(df.columns))
df.info() df.printSchema()
df.describe() df.describe().show()
df.columns df.columns

2-3. 列操作

Pandas PySpark
df["col"] df.select("col") または df["col"]
df[["col1", "col2"]] df.select("col1", "col2")
df["new"] = df["old"] * 2 df.withColumn("new", df.old * 2)
df.drop("col", axis=1) df.drop("col")
df.rename(columns={"old": "new"}) df.withColumnRenamed("old", "new")

2-4. 行操作

Pandas PySpark
df[df["age"] > 30] df.filter(df.age > 30) または df.where(df.age > 30)
df.sort_values("age") df.orderBy("age") または df.sort("age")
df.drop_duplicates() df.dropDuplicates() または df.distinct()
df.sample(frac=0.1) df.sample(fraction=0.1)

📝 3. リストからDataFrame作成

3-1. 基本的な作成方法

# 方法1: リストのタプルから作成

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CreateDF").getOrCreate()

# データ準備
data = [
    ("Alice", 30, "Tokyo"),
    ("Bob", 25, "Osaka"),
    ("Charlie", 35, "Tokyo")
]

# カラム名を指定
columns = ["name", "age", "city"]

# DataFrameを作成
df = spark.createDataFrame(data, columns)

df.show()

spark.stop()
+-------+---+-----+
|   name|age| city|
+-------+---+-----+
|  Alice| 30|Tokyo|
|    Bob| 25|Osaka|
|Charlie| 35|Tokyo|
+-------+---+-----+
# 方法2: リストの辞書から作成

spark = SparkSession.builder.appName("CreateDF2").getOrCreate()

# 辞書形式のデータ
data = [
    {"name": "Alice", "age": 30, "city": "Tokyo"},
    {"name": "Bob", "age": 25, "city": "Osaka"},
    {"name": "Charlie", "age": 35, "city": "Tokyo"}
]

# DataFrameを作成
df = spark.createDataFrame(data)

df.show()

spark.stop()

3-2. スキーマを明示的に指定

# スキーマを明示的に指定

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("WithSchema").getOrCreate()

# スキーマを定義
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

# データ
data = [
    ("Alice", 30, "Tokyo"),
    ("Bob", 25, "Osaka"),
    ("Charlie", 35, "Tokyo")
]

# スキーマ付きでDataFrameを作成
df = spark.createDataFrame(data, schema)

df.printSchema()

spark.stop()
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)
💡 スキーマ指定のメリット

スキーマを明示的に指定すると:
✅ データ型を正確にコントロールできる
✅ 型推論のオーバーヘッドを削減
✅ エラーを早期に発見できる

🔄 4. Pandasからの変換

4-1. Pandas → Spark

# Pandas DataFrameをSpark DataFrameに変換

import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PandasToSpark").getOrCreate()

# Pandas DataFrameを作成
pandas_df = pd.DataFrame({
    "name": ["Alice", "Bob", "Charlie"],
    "age": [30, 25, 35],
    "city": ["Tokyo", "Osaka", "Tokyo"]
})

# Spark DataFrameに変換
spark_df = spark.createDataFrame(pandas_df)

spark_df.show()

spark.stop()

4-2. Spark → Pandas

# Spark DataFrameをPandas DataFrameに変換

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkToPandas").getOrCreate()

# Spark DataFrameを作成
data = [("Alice", 30, "Tokyo"), ("Bob", 25, "Osaka")]
spark_df = spark.createDataFrame(data, ["name", "age", "city"])

# Pandas DataFrameに変換
pandas_df = spark_df.toPandas()

print(type(pandas_df))
print(pandas_df)

spark.stop()
<class 'pandas.core.frame.DataFrame'>
     name  age   city
0   Alice   30  Tokyo
1     Bob   25  Osaka
⚠️ toPandas()の注意点(再確認)

・全データをDriverのメモリに集める
・大量データではメモリエラーのリスク
集計後の小さなデータでのみ使用
・またはサンプリングしてから使用

# 安全なtoPandas()の使い方

# 悪い例:大量データをそのまま変換
# pandas_df = big_df.toPandas()  # メモリエラー!

# 良い例1:集計後に変換
aggregated_df = big_df.groupBy("city").count()
pandas_df = aggregated_df.toPandas()  # OK!

# 良い例2:サンプリングしてから変換
sampled_df = big_df.sample(fraction=0.01)  # 1%をサンプリング
pandas_df = sampled_df.toPandas()  # OK!

# 良い例3:limitで制限
limited_df = big_df.limit(1000)  # 最初の1000行
pandas_df = limited_df.toPandas()  # OK!

🔍 5. select、filter、where

5-1. select() – 列を選択

# select()の使い方

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Select").getOrCreate()

data = [
    ("Alice", 30, "Tokyo", 50000),
    ("Bob", 25, "Osaka", 45000),
    ("Charlie", 35, "Tokyo", 60000)
]
df = spark.createDataFrame(data, ["name", "age", "city", "salary"])

# 1列だけ選択
df.select("name").show()

# 複数列を選択
df.select("name", "age").show()

# 全ての列を選択
df.select("*").show()

# col()を使った選択
from pyspark.sql.functions import col
df.select(col("name"), col("age")).show()

spark.stop()
# select()での計算

# 計算した列を追加
df.select("name", "salary", (df.salary * 1.1).alias("new_salary")).show()
+-------+------+----------+
|   name|salary|new_salary|
+-------+------+----------+
|  Alice| 50000|   55000.0|
|    Bob| 45000|   49500.0|
|Charlie| 60000|   66000.0|
+-------+------+----------+

5-2. filter() / where() – 行をフィルター

# filter()とwhere()の使い方

spark = SparkSession.builder.appName("Filter").getOrCreate()

data = [
    ("Alice", 30, "Tokyo"),
    ("Bob", 25, "Osaka"),
    ("Charlie", 35, "Tokyo"),
    ("David", 28, "Osaka")
]
df = spark.createDataFrame(data, ["name", "age", "city"])

# 単一条件
df.filter(df.age > 28).show()

# whereも同じ
df.where(df.age > 28).show()

# 複数条件(AND)
df.filter((df.age > 25) & (df.city == "Tokyo")).show()

# 複数条件(OR)
df.filter((df.age > 30) | (df.city == "Osaka")).show()

spark.stop()
+-------+---+-----+
|   name|age| city|
+-------+---+-----+
|  Alice| 30|Tokyo|
|Charlie| 35|Tokyo|
+-------+---+-----+
filter() vs where()

filter()where()完全に同じです。
好みで選んでOKですが、filter()の方が一般的です。

5-3. SQL文字列でのフィルター

# SQL文字列を使ったフィルター

# SQL風の文字列でフィルター
df.filter("age > 28").show()

# 複数条件
df.filter("age > 25 AND city = 'Tokyo'").show()

# isin
df.filter(df.city.isin(["Tokyo", "Osaka"])).show()

➕ 6. カラムの追加・削除

6-1. withColumn() – カラムを追加

# withColumn()の使い方

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

spark = SparkSession.builder.appName("WithColumn").getOrCreate()

data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])

# 新しい列を追加(計算)
df2 = df.withColumn("age_doubled", df.age * 2)
df2.show()

# 固定値の列を追加
df3 = df.withColumn("country", lit("Japan"))
df3.show()

# 既存の列を上書き
df4 = df.withColumn("age", df.age + 1)
df4.show()

# 複数の列を追加(連続で実行)
df5 = df.withColumn("age_doubled", df.age * 2) \
        .withColumn("age_tripled", df.age * 3)
df5.show()

spark.stop()
# 条件付きカラム追加

from pyspark.sql.functions import when

spark = SparkSession.builder.appName("ConditionalColumn").getOrCreate()

data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])

# 条件分岐で新しい列を追加
df2 = df.withColumn(
    "category",
    when(df.age < 30, "Young")
    .when(df.age < 35, "Middle")
    .otherwise("Senior")
)

df2.show()

spark.stop()
+-------+---+--------+
|   name|age|category|
+-------+---+--------+
|  Alice| 30|  Middle|
|    Bob| 25|   Young|
|Charlie| 35|  Senior|
+-------+---+--------+

6-2. drop() – カラムを削除

# drop()の使い方

spark = SparkSession.builder.appName("Drop").getOrCreate()

data = [("Alice", 30, "Tokyo"), ("Bob", 25, "Osaka")]
df = spark.createDataFrame(data, ["name", "age", "city"])

# 1列を削除
df2 = df.drop("age")
df2.show()

# 複数列を削除
df3 = df.drop("age", "city")
df3.show()

spark.stop()

6-3. withColumnRenamed() – カラム名を変更

# withColumnRenamed()の使い方

spark = SparkSession.builder.appName("Rename").getOrCreate()

data = [("Alice", 30), ("Bob", 25)]
df = spark.createDataFrame(data, ["name", "age"])

# 1列の名前を変更
df2 = df.withColumnRenamed("name", "full_name")
df2.show()

# 複数列の名前を変更
df3 = df.withColumnRenamed("name", "full_name") \
        .withColumnRenamed("age", "years")
df3.show()

spark.stop()

🔧 7. 実践演習:基本的なデータ抽出

7-1. 演習データの準備

# 従業員データ

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Practice").getOrCreate()

data = [
    ("Alice", 30, "Engineering", 80000),
    ("Bob", 25, "Sales", 60000),
    ("Charlie", 35, "Engineering", 95000),
    ("David", 28, "Marketing", 70000),
    ("Eve", 32, "Sales", 75000),
    ("Frank", 29, "Engineering", 85000)
]

df = spark.createDataFrame(data, ["name", "age", "department", "salary"])

df.show()

7-2. 演習1: Engineeringの従業員だけ表示

result = df.filter(df.department == "Engineering")
result.show()
+-------+---+-----------+------+
|   name|age| department|salary|
+-------+---+-----------+------+
|  Alice| 30|Engineering| 80000|
|Charlie| 35|Engineering| 95000|
|  Frank| 29|Engineering| 85000|
+-------+---+-----------+------+

7-3. 演習2: 30歳以上で年収75000以上の従業員

result = df.filter((df.age >= 30) & (df.salary >= 75000))
result.show()
+-------+---+-----------+------+
|   name|age| department|salary|
+-------+---+-----------+------+
|  Alice| 30|Engineering| 80000|
|Charlie| 35|Engineering| 95000|
|    Eve| 32|      Sales| 75000|
+-------+---+-----------+------+

7-4. 演習3: 名前と年収だけ表示、年収順に並べ替え

result = df.select("name", "salary").orderBy(df.salary.desc())
result.show()
+-------+------+
|   name|salary|
+-------+------+
|Charlie| 95000|
|  Frank| 85000|
|  Alice| 80000|
|    Eve| 75000|
|  David| 70000|
|    Bob| 60000|
+-------+------+

7-5. 演習4: 年収を10%アップした新しいカラムを追加

result = df.withColumn("new_salary", df.salary * 1.1)
result.show()
+-------+---+-----------+------+----------+
|   name|age| department|salary|new_salary|
+-------+---+-----------+------+----------+
|  Alice| 30|Engineering| 80000|   88000.0|
|    Bob| 25|      Sales| 60000|   66000.0|
|Charlie| 35|Engineering| 95000|  104500.0|
|  David| 28|  Marketing| 70000|   77000.0|
|    Eve| 32|      Sales| 75000|   82500.0|
|  Frank| 29|Engineering| 85000|   93500.0|
+-------+---+-----------+------+----------+

7-6. 演習5: 年齢カテゴリを追加

from pyspark.sql.functions import when

result = df.withColumn(
    "age_category",
    when(df.age < 30, "20代")
    .when(df.age < 35, "30代前半")
    .otherwise("30代後半以上")
)

result.show()
+-------+---+-----------+------+------------+
|   name|age| department|salary|age_category|
+-------+---+-----------+------+------------+
|  Alice| 30|Engineering| 80000|    30代前半|
|    Bob| 25|      Sales| 60000|        20代|
|Charlie| 35|Engineering| 95000|  30代後半以上|
|  David| 28|  Marketing| 70000|        20代|
|    Eve| 32|      Sales| 75000|    30代前半|
|  Frank| 29|Engineering| 85000|        20代|
+-------+---+-----------+------+------------+

📝 練習問題

問題 1 基礎

リストからDataFrameを作成してください。データ: [(“Tokyo”, 13), (“Osaka”, 9), (“Nagoya”, 2)]、カラム名: [“city”, “population”]

【解答】
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Q1").getOrCreate()

data = [("Tokyo", 13), ("Osaka", 9), ("Nagoya", 2)]
df = spark.createDataFrame(data, ["city", "population"])

df.show()

spark.stop()
問題 2 基礎

問題1のDataFrameから、cityカラムだけを選択して表示してください。

【解答】
df.select("city").show()

# または
df.select(df.city).show()

# または
from pyspark.sql.functions import col
df.select(col("city")).show()
問題 3 基礎

人口が5以上の都市だけを抽出してください。

【解答】
df.filter(df.population >= 5).show()

# または
df.where(df.population >= 5).show()

# または(SQL文字列)
df.filter("population >= 5").show()
問題 4 基礎

人口を100万人単位に変換する新しいカラム”population_million”を追加してください。

【解答】
df_new = df.withColumn("population_million", df.population)

df_new.show()

# 注:問題文の「100万人単位」は、既に百万単位なのでそのまま
問題 5 応用

cityカラムの名前を”city_name”に変更してください。

【解答】
df_renamed = df.withColumnRenamed("city", "city_name")

df_renamed.show()

問題6〜15は、以下のデータを使用します:

data = [
    ("Alice", 30, "Tokyo", 80000),
    ("Bob", 25, "Osaka", 60000),
    ("Charlie", 35, "Tokyo", 95000),
    ("David", 28, "Osaka", 70000),
    ("Eve", 32, "Tokyo", 75000)
]
df = spark.createDataFrame(data, ["name", "age", "city", "salary"])
問題 6 応用

Tokyoに住む人だけを抽出してください。

【解答】
df.filter(df.city == "Tokyo").show()
問題 7 応用

30歳以上で年収が75000以上の人を抽出してください。

【解答】
df.filter((df.age >= 30) & (df.salary >= 75000)).show()
問題 8 応用

名前と年収だけを選択し、年収の降順で並べ替えてください。

【解答】
df.select("name", "salary").orderBy(df.salary.desc()).show()

# または
df.select("name", "salary").sort(df.salary.desc()).show()
問題 9 応用

年収を20%アップした新しいカラム”new_salary”を追加してください。

【解答】
df.withColumn("new_salary", df.salary * 1.2).show()
問題 10 発展

年齢が30未満なら”Junior”、30以上なら”Senior”というカラム”level”を追加してください。

【解答】
from pyspark.sql.functions import when

df.withColumn("level", when(df.age < 30, "Junior").otherwise("Senior")).show()
問題 11 発展

cityカラムを削除してください。

【解答】
df.drop("city").show()
問題 12 発展

TokyoまたはOsakaに住む人で、年収が70000以上の人を抽出してください。

【解答】
# 方法1
df.filter((df.city.isin(["Tokyo", "Osaka"])) & (df.salary >= 70000)).show()

# 方法2
df.filter(((df.city == "Tokyo") | (df.city == "Osaka")) & (df.salary >= 70000)).show()
問題 13 発展

名前、年齢、新しいカラム”age_group”(30未満:”20代”、30以上35未満:”30代前半”、35以上:”30代後半以上”)を表示してください。

【解答】
from pyspark.sql.functions import when

df.select(
    "name",
    "age",
    when(df.age < 30, "20代")
    .when(df.age < 35, "30代前半")
    .otherwise("30代後半以上")
    .alias("age_group")
).show()
問題 14 発展

DataFrameの行数と列数を表示してください。

【解答】
row_count = df.count()
col_count = len(df.columns)

print(f"Rows: {row_count}, Columns: {col_count}")

# 出力:Rows: 5, Columns: 4
問題 15 発展

スキーマを表示してください。

【解答】
df.printSchema()

# 出力:
# root
#  |-- name: string (nullable = true)
#  |-- age: long (nullable = true)
#  |-- city: string (nullable = true)
#  |-- salary: long (nullable = true)

📝 STEP 8 のまとめ

✅ このステップで学んだこと

DataFrameは表形式で、スキーマを持つ高レベルAPI
createDataFrame()でリストからDataFrameを作成
toPandas()でPandasに変換(小さいデータのみ)
select()で列を選択、filter()で行を絞り込み
withColumn()でカラムを追加、drop()で削除
when()で条件分岐、orderBy()で並び替え

💡 重要ポイント

DataFrameはImmutable(不変)です。
操作は常に新しいDataFrameを返すので、変数に代入する必要があります。
Pandasと似ていますが、in-placeの変更はできません。

🎯 次のステップの予告

次のSTEP 9では、「カラム操作と関数①」を学びます。
文字列操作、数値計算など、pyspark.sql.functionsを使いこなしましょう!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 8

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE