⚡ STEP 8: DataFrameの作成と基本操作
Spark DataFrameをマスターして、Pandasのように大量データを扱う
📋 このステップで学ぶこと
- DataFrameとは何か
- PandasとSparkの対応表(詳細版)
- リストからDataFrame作成
- Pandasからの変換(toPandas、createDataFrame)
- select、filter、whereの使い方
- カラムの追加・削除
- 実践演習:基本的なデータ抽出
- 練習問題15問
学習時間の目安: 2時間
🎯 1. DataFrameとは
1-1. DataFrameの定義
DataFrameは、Sparkの高レベルAPIで、表形式のデータ構造です。Pandasと非常に似ていますが、分散処理に対応しています。
✅ 表形式(行と列)
✅ スキーマ(列名とデータ型)を持つ
✅ RDDの上に構築された高レベルAPI
✅ Catalyst Optimizerによる自動最適化
✅ Tungsten実行エンジンによる高速化
✅ SQL互換(SQLでも操作可能)
✅ 多言語対応(Python、Scala、Java、R)
1-2. DataFrameの構造
+-------+---+-------+ | name|age| city| +-------+---+-------+ | Alice| 30| Tokyo| | Bob| 25| Osaka| |Charlie| 35| Tokyo| +-------+---+-------+ スキーマ(Schema): name: string age: integer city: string データは複数のパーティションに分散: Partition 0: [Alice, Bob] Partition 1: [Charlie]
DataFrameは、内部的にRow型のRDDとして実装されています。
DataFrame = RDD[Row] + スキーマ + 最適化
つまり、DataFrameはRDDの進化版です!
📊 2. PandasとSparkの対応表(詳細版)
2-1. データ作成・読み込み
| Pandas | PySpark |
|---|---|
pd.DataFrame(data) |
spark.createDataFrame(data) |
pd.read_csv("file.csv") |
spark.read.csv("file.csv", header=True) |
pd.read_json("file.json") |
spark.read.json("file.json") |
pd.read_parquet("file.parquet") |
spark.read.parquet("file.parquet") |
2-2. データ確認
| Pandas | PySpark |
|---|---|
df.head() |
df.show() |
df.shape |
(df.count(), len(df.columns)) |
df.info() |
df.printSchema() |
df.describe() |
df.describe().show() |
df.columns |
df.columns |
2-3. 列操作
| Pandas | PySpark |
|---|---|
df["col"] |
df.select("col") または df["col"] |
df[["col1", "col2"]] |
df.select("col1", "col2") |
df["new"] = df["old"] * 2 |
df.withColumn("new", df.old * 2) |
df.drop("col", axis=1) |
df.drop("col") |
df.rename(columns={"old": "new"}) |
df.withColumnRenamed("old", "new") |
2-4. 行操作
| Pandas | PySpark |
|---|---|
df[df["age"] > 30] |
df.filter(df.age > 30) または df.where(df.age > 30) |
df.sort_values("age") |
df.orderBy("age") または df.sort("age") |
df.drop_duplicates() |
df.dropDuplicates() または df.distinct() |
df.sample(frac=0.1) |
df.sample(fraction=0.1) |
📝 3. リストからDataFrame作成
3-1. 基本的な作成方法
# 方法1: リストのタプルから作成 from pyspark.sql import SparkSession spark = SparkSession.builder.appName("CreateDF").getOrCreate() # データ準備 data = [ ("Alice", 30, "Tokyo"), ("Bob", 25, "Osaka"), ("Charlie", 35, "Tokyo") ] # カラム名を指定 columns = ["name", "age", "city"] # DataFrameを作成 df = spark.createDataFrame(data, columns) df.show() spark.stop()
+-------+---+-----+ | name|age| city| +-------+---+-----+ | Alice| 30|Tokyo| | Bob| 25|Osaka| |Charlie| 35|Tokyo| +-------+---+-----+
# 方法2: リストの辞書から作成 spark = SparkSession.builder.appName("CreateDF2").getOrCreate() # 辞書形式のデータ data = [ {"name": "Alice", "age": 30, "city": "Tokyo"}, {"name": "Bob", "age": 25, "city": "Osaka"}, {"name": "Charlie", "age": 35, "city": "Tokyo"} ] # DataFrameを作成 df = spark.createDataFrame(data) df.show() spark.stop()
3-2. スキーマを明示的に指定
# スキーマを明示的に指定 from pyspark.sql.types import StructType, StructField, StringType, IntegerType spark = SparkSession.builder.appName("WithSchema").getOrCreate() # スキーマを定義 schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True), StructField("city", StringType(), True) ]) # データ data = [ ("Alice", 30, "Tokyo"), ("Bob", 25, "Osaka"), ("Charlie", 35, "Tokyo") ] # スキーマ付きでDataFrameを作成 df = spark.createDataFrame(data, schema) df.printSchema() spark.stop()
root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- city: string (nullable = true)
スキーマを明示的に指定すると:
✅ データ型を正確にコントロールできる
✅ 型推論のオーバーヘッドを削減
✅ エラーを早期に発見できる
🔄 4. Pandasからの変換
4-1. Pandas → Spark
# Pandas DataFrameをSpark DataFrameに変換 import pandas as pd from pyspark.sql import SparkSession spark = SparkSession.builder.appName("PandasToSpark").getOrCreate() # Pandas DataFrameを作成 pandas_df = pd.DataFrame({ "name": ["Alice", "Bob", "Charlie"], "age": [30, 25, 35], "city": ["Tokyo", "Osaka", "Tokyo"] }) # Spark DataFrameに変換 spark_df = spark.createDataFrame(pandas_df) spark_df.show() spark.stop()
4-2. Spark → Pandas
# Spark DataFrameをPandas DataFrameに変換 from pyspark.sql import SparkSession spark = SparkSession.builder.appName("SparkToPandas").getOrCreate() # Spark DataFrameを作成 data = [("Alice", 30, "Tokyo"), ("Bob", 25, "Osaka")] spark_df = spark.createDataFrame(data, ["name", "age", "city"]) # Pandas DataFrameに変換 pandas_df = spark_df.toPandas() print(type(pandas_df)) print(pandas_df) spark.stop()
<class 'pandas.core.frame.DataFrame'>
name age city
0 Alice 30 Tokyo
1 Bob 25 Osaka
・全データをDriverのメモリに集める
・大量データではメモリエラーのリスク
・集計後の小さなデータでのみ使用
・またはサンプリングしてから使用
# 安全なtoPandas()の使い方 # 悪い例:大量データをそのまま変換 # pandas_df = big_df.toPandas() # メモリエラー! # 良い例1:集計後に変換 aggregated_df = big_df.groupBy("city").count() pandas_df = aggregated_df.toPandas() # OK! # 良い例2:サンプリングしてから変換 sampled_df = big_df.sample(fraction=0.01) # 1%をサンプリング pandas_df = sampled_df.toPandas() # OK! # 良い例3:limitで制限 limited_df = big_df.limit(1000) # 最初の1000行 pandas_df = limited_df.toPandas() # OK!
🔍 5. select、filter、where
5-1. select() – 列を選択
# select()の使い方 from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Select").getOrCreate() data = [ ("Alice", 30, "Tokyo", 50000), ("Bob", 25, "Osaka", 45000), ("Charlie", 35, "Tokyo", 60000) ] df = spark.createDataFrame(data, ["name", "age", "city", "salary"]) # 1列だけ選択 df.select("name").show() # 複数列を選択 df.select("name", "age").show() # 全ての列を選択 df.select("*").show() # col()を使った選択 from pyspark.sql.functions import col df.select(col("name"), col("age")).show() spark.stop()
# select()での計算 # 計算した列を追加 df.select("name", "salary", (df.salary * 1.1).alias("new_salary")).show()
+-------+------+----------+ | name|salary|new_salary| +-------+------+----------+ | Alice| 50000| 55000.0| | Bob| 45000| 49500.0| |Charlie| 60000| 66000.0| +-------+------+----------+
5-2. filter() / where() – 行をフィルター
# filter()とwhere()の使い方 spark = SparkSession.builder.appName("Filter").getOrCreate() data = [ ("Alice", 30, "Tokyo"), ("Bob", 25, "Osaka"), ("Charlie", 35, "Tokyo"), ("David", 28, "Osaka") ] df = spark.createDataFrame(data, ["name", "age", "city"]) # 単一条件 df.filter(df.age > 28).show() # whereも同じ df.where(df.age > 28).show() # 複数条件(AND) df.filter((df.age > 25) & (df.city == "Tokyo")).show() # 複数条件(OR) df.filter((df.age > 30) | (df.city == "Osaka")).show() spark.stop()
+-------+---+-----+ | name|age| city| +-------+---+-----+ | Alice| 30|Tokyo| |Charlie| 35|Tokyo| +-------+---+-----+
filter()とwhere()は完全に同じです。
好みで選んでOKですが、filter()の方が一般的です。
5-3. SQL文字列でのフィルター
# SQL文字列を使ったフィルター # SQL風の文字列でフィルター df.filter("age > 28").show() # 複数条件 df.filter("age > 25 AND city = 'Tokyo'").show() # isin df.filter(df.city.isin(["Tokyo", "Osaka"])).show()
➕ 6. カラムの追加・削除
6-1. withColumn() – カラムを追加
# withColumn()の使い方 from pyspark.sql import SparkSession from pyspark.sql.functions import col, lit spark = SparkSession.builder.appName("WithColumn").getOrCreate() data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)] df = spark.createDataFrame(data, ["name", "age"]) # 新しい列を追加(計算) df2 = df.withColumn("age_doubled", df.age * 2) df2.show() # 固定値の列を追加 df3 = df.withColumn("country", lit("Japan")) df3.show() # 既存の列を上書き df4 = df.withColumn("age", df.age + 1) df4.show() # 複数の列を追加(連続で実行) df5 = df.withColumn("age_doubled", df.age * 2) \ .withColumn("age_tripled", df.age * 3) df5.show() spark.stop()
# 条件付きカラム追加 from pyspark.sql.functions import when spark = SparkSession.builder.appName("ConditionalColumn").getOrCreate() data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)] df = spark.createDataFrame(data, ["name", "age"]) # 条件分岐で新しい列を追加 df2 = df.withColumn( "category", when(df.age < 30, "Young") .when(df.age < 35, "Middle") .otherwise("Senior") ) df2.show() spark.stop()
+-------+---+--------+ | name|age|category| +-------+---+--------+ | Alice| 30| Middle| | Bob| 25| Young| |Charlie| 35| Senior| +-------+---+--------+
6-2. drop() – カラムを削除
# drop()の使い方 spark = SparkSession.builder.appName("Drop").getOrCreate() data = [("Alice", 30, "Tokyo"), ("Bob", 25, "Osaka")] df = spark.createDataFrame(data, ["name", "age", "city"]) # 1列を削除 df2 = df.drop("age") df2.show() # 複数列を削除 df3 = df.drop("age", "city") df3.show() spark.stop()
6-3. withColumnRenamed() – カラム名を変更
# withColumnRenamed()の使い方 spark = SparkSession.builder.appName("Rename").getOrCreate() data = [("Alice", 30), ("Bob", 25)] df = spark.createDataFrame(data, ["name", "age"]) # 1列の名前を変更 df2 = df.withColumnRenamed("name", "full_name") df2.show() # 複数列の名前を変更 df3 = df.withColumnRenamed("name", "full_name") \ .withColumnRenamed("age", "years") df3.show() spark.stop()
🔧 7. 実践演習:基本的なデータ抽出
7-1. 演習データの準備
# 従業員データ from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Practice").getOrCreate() data = [ ("Alice", 30, "Engineering", 80000), ("Bob", 25, "Sales", 60000), ("Charlie", 35, "Engineering", 95000), ("David", 28, "Marketing", 70000), ("Eve", 32, "Sales", 75000), ("Frank", 29, "Engineering", 85000) ] df = spark.createDataFrame(data, ["name", "age", "department", "salary"]) df.show()
7-2. 演習1: Engineeringの従業員だけ表示
result = df.filter(df.department == "Engineering")
result.show()+-------+---+-----------+------+ | name|age| department|salary| +-------+---+-----------+------+ | Alice| 30|Engineering| 80000| |Charlie| 35|Engineering| 95000| | Frank| 29|Engineering| 85000| +-------+---+-----------+------+
7-3. 演習2: 30歳以上で年収75000以上の従業員
result = df.filter((df.age >= 30) & (df.salary >= 75000)) result.show()
+-------+---+-----------+------+ | name|age| department|salary| +-------+---+-----------+------+ | Alice| 30|Engineering| 80000| |Charlie| 35|Engineering| 95000| | Eve| 32| Sales| 75000| +-------+---+-----------+------+
7-4. 演習3: 名前と年収だけ表示、年収順に並べ替え
result = df.select("name", "salary").orderBy(df.salary.desc()) result.show()
+-------+------+ | name|salary| +-------+------+ |Charlie| 95000| | Frank| 85000| | Alice| 80000| | Eve| 75000| | David| 70000| | Bob| 60000| +-------+------+
7-5. 演習4: 年収を10%アップした新しいカラムを追加
result = df.withColumn("new_salary", df.salary * 1.1) result.show()
+-------+---+-----------+------+----------+ | name|age| department|salary|new_salary| +-------+---+-----------+------+----------+ | Alice| 30|Engineering| 80000| 88000.0| | Bob| 25| Sales| 60000| 66000.0| |Charlie| 35|Engineering| 95000| 104500.0| | David| 28| Marketing| 70000| 77000.0| | Eve| 32| Sales| 75000| 82500.0| | Frank| 29|Engineering| 85000| 93500.0| +-------+---+-----------+------+----------+
7-6. 演習5: 年齢カテゴリを追加
from pyspark.sql.functions import when result = df.withColumn( "age_category", when(df.age < 30, "20代") .when(df.age < 35, "30代前半") .otherwise("30代後半以上") ) result.show()
+-------+---+-----------+------+------------+ | name|age| department|salary|age_category| +-------+---+-----------+------+------------+ | Alice| 30|Engineering| 80000| 30代前半| | Bob| 25| Sales| 60000| 20代| |Charlie| 35|Engineering| 95000| 30代後半以上| | David| 28| Marketing| 70000| 20代| | Eve| 32| Sales| 75000| 30代前半| | Frank| 29|Engineering| 85000| 20代| +-------+---+-----------+------+------------+
📝 練習問題
リストからDataFrameを作成してください。データ: [(“Tokyo”, 13), (“Osaka”, 9), (“Nagoya”, 2)]、カラム名: [“city”, “population”]
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Q1").getOrCreate() data = [("Tokyo", 13), ("Osaka", 9), ("Nagoya", 2)] df = spark.createDataFrame(data, ["city", "population"]) df.show() spark.stop()
問題1のDataFrameから、cityカラムだけを選択して表示してください。
df.select("city").show() # または df.select(df.city).show() # または from pyspark.sql.functions import col df.select(col("city")).show()
人口が5以上の都市だけを抽出してください。
df.filter(df.population >= 5).show() # または df.where(df.population >= 5).show() # または(SQL文字列) df.filter("population >= 5").show()
人口を100万人単位に変換する新しいカラム”population_million”を追加してください。
df_new = df.withColumn("population_million", df.population) df_new.show() # 注:問題文の「100万人単位」は、既に百万単位なのでそのまま
cityカラムの名前を”city_name”に変更してください。
df_renamed = df.withColumnRenamed("city", "city_name") df_renamed.show()
問題6〜15は、以下のデータを使用します:
data = [
("Alice", 30, "Tokyo", 80000),
("Bob", 25, "Osaka", 60000),
("Charlie", 35, "Tokyo", 95000),
("David", 28, "Osaka", 70000),
("Eve", 32, "Tokyo", 75000)
]
df = spark.createDataFrame(data, ["name", "age", "city", "salary"])Tokyoに住む人だけを抽出してください。
df.filter(df.city == "Tokyo").show()30歳以上で年収が75000以上の人を抽出してください。
df.filter((df.age >= 30) & (df.salary >= 75000)).show()
名前と年収だけを選択し、年収の降順で並べ替えてください。
df.select("name", "salary").orderBy(df.salary.desc()).show() # または df.select("name", "salary").sort(df.salary.desc()).show()
年収を20%アップした新しいカラム”new_salary”を追加してください。
df.withColumn("new_salary", df.salary * 1.2).show()
年齢が30未満なら”Junior”、30以上なら”Senior”というカラム”level”を追加してください。
from pyspark.sql.functions import when df.withColumn("level", when(df.age < 30, "Junior").otherwise("Senior")).show()
cityカラムを削除してください。
df.drop("city").show()TokyoまたはOsakaに住む人で、年収が70000以上の人を抽出してください。
# 方法1 df.filter((df.city.isin(["Tokyo", "Osaka"])) & (df.salary >= 70000)).show() # 方法2 df.filter(((df.city == "Tokyo") | (df.city == "Osaka")) & (df.salary >= 70000)).show()
名前、年齢、新しいカラム”age_group”(30未満:”20代”、30以上35未満:”30代前半”、35以上:”30代後半以上”)を表示してください。
from pyspark.sql.functions import when df.select( "name", "age", when(df.age < 30, "20代") .when(df.age < 35, "30代前半") .otherwise("30代後半以上") .alias("age_group") ).show()
DataFrameの行数と列数を表示してください。
row_count = df.count() col_count = len(df.columns) print(f"Rows: {row_count}, Columns: {col_count}") # 出力:Rows: 5, Columns: 4
スキーマを表示してください。
df.printSchema() # 出力: # root # |-- name: string (nullable = true) # |-- age: long (nullable = true) # |-- city: string (nullable = true) # |-- salary: long (nullable = true)
📝 STEP 8 のまとめ
・DataFrameは表形式で、スキーマを持つ高レベルAPI
・createDataFrame()でリストからDataFrameを作成
・toPandas()でPandasに変換(小さいデータのみ)
・select()で列を選択、filter()で行を絞り込み
・withColumn()でカラムを追加、drop()で削除
・when()で条件分岐、orderBy()で並び替え
DataFrameはImmutable(不変)です。
操作は常に新しいDataFrameを返すので、変数に代入する必要があります。
Pandasと似ていますが、in-placeの変更はできません。
次のSTEP 9では、「カラム操作と関数①」を学びます。
文字列操作、数値計算など、pyspark.sql.functionsを使いこなしましょう!
学習メモ
ビッグデータ処理(Apache Spark) - Step 8