STEP 17:UDF(ユーザー定義関数)

🔧 STEP 17: UDF(ユーザー定義関数)

独自の関数を作ってSparkを拡張!Pythonで自由に処理をカスタマイズしよう

📋 このステップで学ぶこと

  • UDF(ユーザー定義関数)とは何か
  • UDFの作成と登録方法
  • DataFrameでのUDF使用
  • SQLでのUDF使用
  • UDFのパフォーマンス注意点

📁 0. サンプルデータの準備

このステップでは、UDF(ユーザー定義関数)を使って独自の関数を作成する方法を学びます。
まず、SparkSessionを初期化し、サンプルデータを準備しましょう。

0-1. SparkSessionの初期化

# SparkSessionの初期化とUDFに必要なインポート

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType, FloatType, BooleanType

# SparkSessionを作成
spark = SparkSession.builder \
    .appName("UDF Example") \
    .getOrCreate()

print("SparkSession準備完了")
SparkSession準備完了

0-2. 人物データの作成

# 人物データを作成

# データ:(名前, 年齢, 身長cm, 体重kg)
people_data = [
    ("田中太郎", 25, 170, 65),
    ("鈴木花子", 30, 160, 50),
    ("佐藤次郎", 35, 175, 70),
    ("山田美咲", 18, 165, 55)
]

df_people = spark.createDataFrame(people_data, ["name", "age", "height", "weight"])

print("人物データ:")
df_people.show()
人物データ:
+--------+---+------+------+
|    name|age|height|weight|
+--------+---+------+------+
|田中太郎| 25|   170|    65|
|鈴木花子| 30|   160|    50|
|佐藤次郎| 35|   175|    70|
|山田美咲| 18|   165|    55|
+--------+---+------+------+

0-3. 商品データの作成

# 商品データを作成

# データ:(商品名, 価格, カテゴリ)
products_data = [
    ("ノートPC", 100000, "電化製品"),
    ("コーヒー", 500, "食品"),
    ("Tシャツ", 3000, "衣料品"),
    ("スマートフォン", 80000, "電化製品")
]

df_products = spark.createDataFrame(products_data, ["product", "price", "category"])
df_products.createOrReplaceTempView("products")

print("商品データ:")
df_products.show()
商品データ:
+--------------+------+--------+
|       product| price|category|
+--------------+------+--------+
|      ノートPC|100000|電化製品|
|      コーヒー|   500|    食品|
|        Tシャツ|  3000|  衣料品|
|スマートフォン| 80000|電化製品|
+--------------+------+--------+

0-4. 全サンプルデータを一括作成するコード

# ========================================
# STEP 17 サンプルデータ一括作成スクリプト
# ========================================

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType, FloatType, BooleanType

spark = SparkSession.builder.appName("UDF Example").getOrCreate()

# 1. 人物データ
people_data = [
    ("田中太郎", 25, 170, 65), ("鈴木花子", 30, 160, 50),
    ("佐藤次郎", 35, 175, 70), ("山田美咲", 18, 165, 55)
]
df_people = spark.createDataFrame(people_data, ["name", "age", "height", "weight"])

# 2. 商品データ
products_data = [
    ("ノートPC", 100000, "電化製品"), ("コーヒー", 500, "食品"),
    ("Tシャツ", 3000, "衣料品"), ("スマートフォン", 80000, "電化製品")
]
df_products = spark.createDataFrame(products_data, ["product", "price", "category"])
df_products.createOrReplaceTempView("products")

print("✅ サンプルデータを作成しました")
print("  - df_people: 人物データ")
print("  - df_products: 商品データ(products ビュー)")
✅ サンプルデータを作成しました
  - df_people: 人物データ
  - df_products: 商品データ(products ビュー)

🔧 1. UDFとは?

1-1. UDF(User Defined Function)の概要

UDFとは、ユーザーが独自に定義できる関数のことです。
Sparkの標準関数でできない処理を、Pythonで自由に実装できます。

🎯 UDFが必要な場面

複雑な文字列処理:正規表現の高度な処理
ビジネスロジック:会社独自の計算式や判定ロジック
データクレンジング:複雑な条件での値の補正

項目 標準関数(推奨) UDF(必要な時のみ)
速度 ✅ 高速(最適化されている) ⚠️ やや遅い(Pythonオーバーヘッド)
柔軟性 限定的 ✅ 自由に実装可能
使用場面 まずこちらを検討 標準関数でできない時
💡 UDFを使う前に

まず標準関数で実現できないかを確認しましょう。
Sparkには豊富な標準関数があります:when(), coalesce(), regexp_replace()など

📝 2. UDFの基本的な作成と使用

2-1. @udfデコレータで作成(推奨)

# @udfデコレータでUDFを作成

@udf(returnType=StringType())
def age_category(age):
    """年齢を年齢層に分類するUDF"""
    if age is None:
        return "不明"
    if age < 20:
        return "10代"
    elif age < 30:
        return "20代"
    elif age < 40:
        return "30代"
    else:
        return "40代以上"

# UDFを使用
result = df_people.withColumn("age_group", age_category("age"))

print("【年齢層を追加】")
result.show()
【年齢層を追加】
+--------+---+------+------+---------+
|    name|age|height|weight|age_group|
+--------+---+------+------+---------+
|田中太郎| 25|   170|    65|     20代|
|鈴木花子| 30|   160|    50|     30代|
|佐藤次郎| 35|   175|    70|     30代|
|山田美咲| 18|   165|    55|     10代|
+--------+---+------+------+---------+

2-2. 数値を返すUDF

# BMIを計算するUDF

@udf(returnType=FloatType())
def calculate_bmi(height_cm, weight_kg):
    """BMIを計算(体重kg / 身長m^2)"""
    if height_cm is None or weight_kg is None:
        return None
    height_m = height_cm / 100
    return weight_kg / (height_m ** 2)

# UDFを使用
result = df_people.withColumn("bmi", F.round(calculate_bmi("height", "weight"), 1))

print("【BMIを計算】")
result.show()
【BMIを計算】
+--------+---+------+------+----+
|    name|age|height|weight| bmi|
+--------+---+------+------+----+
|田中太郎| 25|   170|    65|22.5|
|鈴木花子| 30|   160|    50|19.5|
|佐藤次郎| 35|   175|    70|22.9|
|山田美咲| 18|   165|    55|20.2|
+--------+---+------+------+----+

2-3. Boolean型を返すUDF

# 成人かどうか判定するUDF

@udf(returnType=BooleanType())
def is_adult(age):
    """20歳以上かどうか判定"""
    if age is None:
        return None
    return age >= 20

# UDFを使用
result = df_people.withColumn("is_adult", is_adult("age"))

print("【成人判定】")
result.show()
【成人判定】
+--------+---+------+------+--------+
|    name|age|height|weight|is_adult|
+--------+---+------+------+--------+
|田中太郎| 25|   170|    65|    true|
|鈴木花子| 30|   160|    50|    true|
|佐藤次郎| 35|   175|    70|    true|
|山田美咲| 18|   165|    55|   false|
+--------+---+------+------+--------+
⚠️ UDFの注意点

NULL処理必須:NULLが入る可能性を常に考慮
型の明示:returnTypeを必ず指定
テスト:小さいデータで動作確認してから使用

🔗 3. SQLでのUDF使用

3-1. UDFをSQLに登録する

UDFはSQLでも使用できます。spark.udf.register()で登録します。

# 消費税を計算するUDFをSQLに登録

def calc_tax(price):
    """消費税(10%)を計算"""
    if price is None:
        return None
    return int(price * 0.1)

# SQLに登録
spark.udf.register("calc_tax", calc_tax, IntegerType())

# SQLで使用
print("【消費税計算(SQL)】")
spark.sql("""
    SELECT 
        product,
        price,
        calc_tax(price) AS tax,
        price + calc_tax(price) AS total
    FROM products
""").show()
【消費税計算(SQL)】
+--------------+------+-----+------+
|       product| price|  tax| total|
+--------------+------+-----+------+
|      ノートPC|100000|10000|110000|
|      コーヒー|   500|   50|   550|
|        Tシャツ|  3000|  300|  3300|
|スマートフォン| 80000| 8000| 88000|
+--------------+------+-----+------+

3-2. 複雑なビジネスロジックをSQLに登録

# 割引率を計算するUDF

def calculate_discount(price, category):
    """
    カテゴリ別の割引率を適用
    - 電化製品:5%引き
    - 食品:割引なし
    - 衣料品:10%引き
    """
    if price is None:
        return None
    
    discount_rates = {
        "電化製品": 0.05,
        "食品": 0,
        "衣料品": 0.10
    }
    
    rate = discount_rates.get(category, 0)
    return int(price * (1 - rate))

# SQLに登録
spark.udf.register("calc_discount", calculate_discount, IntegerType())

# SQLで使用
print("【カテゴリ別割引(SQL)】")
spark.sql("""
    SELECT 
        product,
        category,
        price AS 定価,
        calc_discount(price, category) AS 割引後価格
    FROM products
""").show()
【カテゴリ別割引(SQL)】
+--------------+--------+------+----------+
|       product|category|    定価|割引後価格|
+--------------+--------+------+----------+
|      ノートPC|電化製品|100000|     95000|
|      コーヒー|    食品|   500|       500|
|        Tシャツ|  衣料品|  3000|      2700|
|スマートフォン|電化製品| 80000|     76000|
+--------------+--------+------+----------+

⚡ 4. UDFのパフォーマンス注意点

4-1. UDFが遅い理由

UDFはPythonで実行されるため、シリアライズ(データ変換)のオーバーヘッドが発生します。

📊 UDFの実行プロセス

Spark(JVM) → データ変換 → Python → 処理実行 → データ変換 → Spark(JVM)
         ↑ オーバーヘッド発生 ↑

4-2. 標準関数の方が速い例

# 標準関数 vs UDF のパフォーマンス比較

# 【悪い例】UDFで大文字変換
@udf(returnType=StringType())
def to_upper_udf(text):
    return text.upper() if text else None

# UDF版(遅い)
result_udf = df_people.withColumn("name_upper", to_upper_udf("name"))

# 【良い例】標準関数で大文字変換(速い)
result_standard = df_people.withColumn("name_upper", F.upper("name"))

print("UDF版:")
result_udf.select("name", "name_upper").show()

print("標準関数版(こちらを使うべき):")
result_standard.select("name", "name_upper").show()
UDF版:
+--------+----------+
|    name|name_upper|
+--------+----------+
|田中太郎|  田中太郎|
|鈴木花子|  鈴木花子|
|佐藤次郎|  佐藤次郎|
|山田美咲|  山田美咲|
+--------+----------+

標準関数版(こちらを使うべき):
+--------+----------+
|    name|name_upper|
+--------+----------+
|田中太郎|  田中太郎|
|鈴木花子|  鈴木花子|
|佐藤次郎|  佐藤次郎|
|山田美咲|  山田美咲|
+--------+----------+
💡 UDF最適化のポイント

標準関数を優先:まず標準関数で実現できないか検討
Pandas UDFを検討:ベクトル化処理で高速化(Spark 2.3以降)
重い処理は避ける:外部API呼び出しなどはNG

⚠️ UDFを避けるべき場面

外部API呼び出し:並列処理で大量リクエストが発生
ファイルI/O:各ノードでファイル操作は困難
状態を持つ処理:グローバル変数、カウンターなど

📝 練習問題

問題 1 基礎

文字数をカウントするUDF

名前の文字数をカウントするUDFを作成してください。

【解答】
@udf(returnType=IntegerType())
def count_chars(text):
    if text is None:
        return 0
    return len(text)

result = df_people.withColumn("name_length", count_chars("name"))
result.show()
+--------+---+------+------+-----------+
|    name|age|height|weight|name_length|
+--------+---+------+------+-----------+
|田中太郎| 25|   170|    65|          4|
|鈴木花子| 30|   160|    50|          4|
|佐藤次郎| 35|   175|    70|          4|
|山田美咲| 18|   165|    55|          4|
+--------+---+------+------+-----------+
問題 2 応用

合格判定UDF

点数を受け取り、60点以上なら「合格」、未満なら「不合格」を返すUDFを作成してください。

【解答】
@udf(returnType=StringType())
def pass_or_fail(score):
    if score is None:
        return "不明"
    return "合格" if score >= 60 else "不合格"

# 使用例
scores = [("田中", 75), ("鈴木", 55), ("佐藤", 80)]
df = spark.createDataFrame(scores, ["name", "score"])
df.withColumn("result", pass_or_fail("score")).show()
+----+-----+------+
|name|score|result|
+----+-----+------+
|田中|   75|  合格|
|鈴木|   55|不合格|
|佐藤|   80|  合格|
+----+-----+------+
問題 3 応用

SQLでUDFを使用

消費税(10%)を計算するUDFをSQLに登録し、商品の税込価格を計算してください。

【解答】
def calc_tax(price):
    if price is None:
        return None
    return int(price * 0.1)

spark.udf.register("calc_tax", calc_tax, IntegerType())

spark.sql("""
    SELECT 
        product,
        price,
        calc_tax(price) AS tax,
        price + calc_tax(price) AS total
    FROM products
""").show()
+--------------+------+-----+------+
|       product| price|  tax| total|
+--------------+------+-----+------+
|      ノートPC|100000|10000|110000|
|      コーヒー|   500|   50|   550|
|        Tシャツ|  3000|  300|  3300|
|スマートフォン| 80000| 8000| 88000|
+--------------+------+-----+------+
問題 4 実践

BMIカテゴリ判定UDF

BMI値から「低体重」「標準」「肥満」を判定するUDFを作成してください。
BMI < 18.5:低体重、18.5〜25:標準、25以上:肥満

【解答】
@udf(returnType=StringType())
def bmi_category(height_cm, weight_kg):
    if height_cm is None or weight_kg is None:
        return "不明"
    
    height_m = height_cm / 100
    bmi = weight_kg / (height_m ** 2)
    
    if bmi < 18.5:
        return "低体重"
    elif bmi < 25:
        return "標準"
    else:
        return "肥満"

result = df_people.withColumn("bmi_category", bmi_category("height", "weight"))
result.show()
+--------+---+------+------+------------+
|    name|age|height|weight|bmi_category|
+--------+---+------+------+------------+
|田中太郎| 25|   170|    65|        標準|
|鈴木花子| 30|   160|    50|        標準|
|佐藤次郎| 35|   175|    70|        標準|
|山田美咲| 18|   165|    55|        標準|
+--------+---+------+------+------------+

❓ よくある質問

Q1: UDFはいつ使うべきですか?
標準関数でできない場合のみ使います。まず標準関数で実現できないか確認し、UDFは最後の手段として使用してください。
Q2: UDFでエラーが出た時のデバッグ方法は?
1. 小さいデータで試す2. Python関数として単体テスト3. try-except追加の順で進めます。
Q3: NULL値はどう扱えばいいですか?
UDF内で必ずNULLチェックしてください。if value is None: return Noneのように明示的に処理します。
Q4: UDFで外部APIを呼び出せますか?
技術的には可能ですが、推奨されません。並列処理で大量のリクエストが発生し、APIの制限に引っかかる可能性があります。

📝 STEP 17 のまとめ

✅ このステップで学んだこと

UDF:独自の関数を定義してSparkを拡張
@udfデコレータ:簡単にUDFを作成
spark.udf.register():SQLでUDFを使用
パフォーマンス:標準関数より遅いので注意

💡 重要ポイント

標準関数を優先:UDFは最後の手段
NULL処理必須:NULLが来ることを想定
型を明示:returnTypeを必ず指定
小さいデータでテスト:本番前に動作確認

🎯 Part 4(Spark SQL)の完了!

STEP 14〜17で、Spark SQLの基礎からUDFまで学びました。
次のPart 5では、「パフォーマンス最適化」を学びます!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 17

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE