🔧 STEP 17: UDF(ユーザー定義関数)
独自の関数を作ってSparkを拡張!Pythonで自由に処理をカスタマイズしよう
📋 このステップで学ぶこと
- UDF(ユーザー定義関数)とは何か
- UDFの作成と登録方法
- DataFrameでのUDF使用
- SQLでのUDF使用
- UDFのパフォーマンス注意点
📁 0. サンプルデータの準備
このステップでは、UDF(ユーザー定義関数)を使って独自の関数を作成する方法を学びます。
まず、SparkSessionを初期化し、サンプルデータを準備しましょう。
0-1. SparkSessionの初期化
# SparkSessionの初期化とUDFに必要なインポート from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.functions import udf from pyspark.sql.types import StringType, IntegerType, FloatType, BooleanType # SparkSessionを作成 spark = SparkSession.builder \ .appName("UDF Example") \ .getOrCreate() print("SparkSession準備完了")
SparkSession準備完了
0-2. 人物データの作成
# 人物データを作成 # データ:(名前, 年齢, 身長cm, 体重kg) people_data = [ ("田中太郎", 25, 170, 65), ("鈴木花子", 30, 160, 50), ("佐藤次郎", 35, 175, 70), ("山田美咲", 18, 165, 55) ] df_people = spark.createDataFrame(people_data, ["name", "age", "height", "weight"]) print("人物データ:") df_people.show()
人物データ: +--------+---+------+------+ | name|age|height|weight| +--------+---+------+------+ |田中太郎| 25| 170| 65| |鈴木花子| 30| 160| 50| |佐藤次郎| 35| 175| 70| |山田美咲| 18| 165| 55| +--------+---+------+------+
0-3. 商品データの作成
# 商品データを作成 # データ:(商品名, 価格, カテゴリ) products_data = [ ("ノートPC", 100000, "電化製品"), ("コーヒー", 500, "食品"), ("Tシャツ", 3000, "衣料品"), ("スマートフォン", 80000, "電化製品") ] df_products = spark.createDataFrame(products_data, ["product", "price", "category"]) df_products.createOrReplaceTempView("products") print("商品データ:") df_products.show()
商品データ: +--------------+------+--------+ | product| price|category| +--------------+------+--------+ | ノートPC|100000|電化製品| | コーヒー| 500| 食品| | Tシャツ| 3000| 衣料品| |スマートフォン| 80000|電化製品| +--------------+------+--------+
0-4. 全サンプルデータを一括作成するコード
# ======================================== # STEP 17 サンプルデータ一括作成スクリプト # ======================================== from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.functions import udf from pyspark.sql.types import StringType, IntegerType, FloatType, BooleanType spark = SparkSession.builder.appName("UDF Example").getOrCreate() # 1. 人物データ people_data = [ ("田中太郎", 25, 170, 65), ("鈴木花子", 30, 160, 50), ("佐藤次郎", 35, 175, 70), ("山田美咲", 18, 165, 55) ] df_people = spark.createDataFrame(people_data, ["name", "age", "height", "weight"]) # 2. 商品データ products_data = [ ("ノートPC", 100000, "電化製品"), ("コーヒー", 500, "食品"), ("Tシャツ", 3000, "衣料品"), ("スマートフォン", 80000, "電化製品") ] df_products = spark.createDataFrame(products_data, ["product", "price", "category"]) df_products.createOrReplaceTempView("products") print("✅ サンプルデータを作成しました") print(" - df_people: 人物データ") print(" - df_products: 商品データ(products ビュー)")
✅ サンプルデータを作成しました - df_people: 人物データ - df_products: 商品データ(products ビュー)
🔧 1. UDFとは?
1-1. UDF(User Defined Function)の概要
UDFとは、ユーザーが独自に定義できる関数のことです。
Sparkの標準関数でできない処理を、Pythonで自由に実装できます。
・複雑な文字列処理:正規表現の高度な処理
・ビジネスロジック:会社独自の計算式や判定ロジック
・データクレンジング:複雑な条件での値の補正
| 項目 | 標準関数(推奨) | UDF(必要な時のみ) |
|---|---|---|
| 速度 | ✅ 高速(最適化されている) | ⚠️ やや遅い(Pythonオーバーヘッド) |
| 柔軟性 | 限定的 | ✅ 自由に実装可能 |
| 使用場面 | まずこちらを検討 | 標準関数でできない時 |
まず標準関数で実現できないかを確認しましょう。
Sparkには豊富な標準関数があります:when(), coalesce(), regexp_replace()など
📝 2. UDFの基本的な作成と使用
2-1. @udfデコレータで作成(推奨)
# @udfデコレータでUDFを作成 @udf(returnType=StringType()) def age_category(age): """年齢を年齢層に分類するUDF""" if age is None: return "不明" if age < 20: return "10代" elif age < 30: return "20代" elif age < 40: return "30代" else: return "40代以上" # UDFを使用 result = df_people.withColumn("age_group", age_category("age")) print("【年齢層を追加】") result.show()
【年齢層を追加】 +--------+---+------+------+---------+ | name|age|height|weight|age_group| +--------+---+------+------+---------+ |田中太郎| 25| 170| 65| 20代| |鈴木花子| 30| 160| 50| 30代| |佐藤次郎| 35| 175| 70| 30代| |山田美咲| 18| 165| 55| 10代| +--------+---+------+------+---------+
2-2. 数値を返すUDF
# BMIを計算するUDF @udf(returnType=FloatType()) def calculate_bmi(height_cm, weight_kg): """BMIを計算(体重kg / 身長m^2)""" if height_cm is None or weight_kg is None: return None height_m = height_cm / 100 return weight_kg / (height_m ** 2) # UDFを使用 result = df_people.withColumn("bmi", F.round(calculate_bmi("height", "weight"), 1)) print("【BMIを計算】") result.show()
【BMIを計算】 +--------+---+------+------+----+ | name|age|height|weight| bmi| +--------+---+------+------+----+ |田中太郎| 25| 170| 65|22.5| |鈴木花子| 30| 160| 50|19.5| |佐藤次郎| 35| 175| 70|22.9| |山田美咲| 18| 165| 55|20.2| +--------+---+------+------+----+
2-3. Boolean型を返すUDF
# 成人かどうか判定するUDF @udf(returnType=BooleanType()) def is_adult(age): """20歳以上かどうか判定""" if age is None: return None return age >= 20 # UDFを使用 result = df_people.withColumn("is_adult", is_adult("age")) print("【成人判定】") result.show()
【成人判定】 +--------+---+------+------+--------+ | name|age|height|weight|is_adult| +--------+---+------+------+--------+ |田中太郎| 25| 170| 65| true| |鈴木花子| 30| 160| 50| true| |佐藤次郎| 35| 175| 70| true| |山田美咲| 18| 165| 55| false| +--------+---+------+------+--------+
・NULL処理必須:NULLが入る可能性を常に考慮
・型の明示:returnTypeを必ず指定
・テスト:小さいデータで動作確認してから使用
🔗 3. SQLでのUDF使用
3-1. UDFをSQLに登録する
UDFはSQLでも使用できます。spark.udf.register()で登録します。
# 消費税を計算するUDFをSQLに登録 def calc_tax(price): """消費税(10%)を計算""" if price is None: return None return int(price * 0.1) # SQLに登録 spark.udf.register("calc_tax", calc_tax, IntegerType()) # SQLで使用 print("【消費税計算(SQL)】") spark.sql(""" SELECT product, price, calc_tax(price) AS tax, price + calc_tax(price) AS total FROM products """).show()
【消費税計算(SQL)】 +--------------+------+-----+------+ | product| price| tax| total| +--------------+------+-----+------+ | ノートPC|100000|10000|110000| | コーヒー| 500| 50| 550| | Tシャツ| 3000| 300| 3300| |スマートフォン| 80000| 8000| 88000| +--------------+------+-----+------+
3-2. 複雑なビジネスロジックをSQLに登録
# 割引率を計算するUDF def calculate_discount(price, category): """ カテゴリ別の割引率を適用 - 電化製品:5%引き - 食品:割引なし - 衣料品:10%引き """ if price is None: return None discount_rates = { "電化製品": 0.05, "食品": 0, "衣料品": 0.10 } rate = discount_rates.get(category, 0) return int(price * (1 - rate)) # SQLに登録 spark.udf.register("calc_discount", calculate_discount, IntegerType()) # SQLで使用 print("【カテゴリ別割引(SQL)】") spark.sql(""" SELECT product, category, price AS 定価, calc_discount(price, category) AS 割引後価格 FROM products """).show()
【カテゴリ別割引(SQL)】 +--------------+--------+------+----------+ | product|category| 定価|割引後価格| +--------------+--------+------+----------+ | ノートPC|電化製品|100000| 95000| | コーヒー| 食品| 500| 500| | Tシャツ| 衣料品| 3000| 2700| |スマートフォン|電化製品| 80000| 76000| +--------------+--------+------+----------+
⚡ 4. UDFのパフォーマンス注意点
4-1. UDFが遅い理由
UDFはPythonで実行されるため、シリアライズ(データ変換)のオーバーヘッドが発生します。
Spark(JVM) → データ変換 → Python → 処理実行 → データ変換 → Spark(JVM)
↑ オーバーヘッド発生 ↑
4-2. 標準関数の方が速い例
# 標準関数 vs UDF のパフォーマンス比較 # 【悪い例】UDFで大文字変換 @udf(returnType=StringType()) def to_upper_udf(text): return text.upper() if text else None # UDF版(遅い) result_udf = df_people.withColumn("name_upper", to_upper_udf("name")) # 【良い例】標準関数で大文字変換(速い) result_standard = df_people.withColumn("name_upper", F.upper("name")) print("UDF版:") result_udf.select("name", "name_upper").show() print("標準関数版(こちらを使うべき):") result_standard.select("name", "name_upper").show()
UDF版: +--------+----------+ | name|name_upper| +--------+----------+ |田中太郎| 田中太郎| |鈴木花子| 鈴木花子| |佐藤次郎| 佐藤次郎| |山田美咲| 山田美咲| +--------+----------+ 標準関数版(こちらを使うべき): +--------+----------+ | name|name_upper| +--------+----------+ |田中太郎| 田中太郎| |鈴木花子| 鈴木花子| |佐藤次郎| 佐藤次郎| |山田美咲| 山田美咲| +--------+----------+
・標準関数を優先:まず標準関数で実現できないか検討
・Pandas UDFを検討:ベクトル化処理で高速化(Spark 2.3以降)
・重い処理は避ける:外部API呼び出しなどはNG
・外部API呼び出し:並列処理で大量リクエストが発生
・ファイルI/O:各ノードでファイル操作は困難
・状態を持つ処理:グローバル変数、カウンターなど
📝 練習問題
文字数をカウントするUDF
名前の文字数をカウントするUDFを作成してください。
@udf(returnType=IntegerType()) def count_chars(text): if text is None: return 0 return len(text) result = df_people.withColumn("name_length", count_chars("name")) result.show()
+--------+---+------+------+-----------+ | name|age|height|weight|name_length| +--------+---+------+------+-----------+ |田中太郎| 25| 170| 65| 4| |鈴木花子| 30| 160| 50| 4| |佐藤次郎| 35| 175| 70| 4| |山田美咲| 18| 165| 55| 4| +--------+---+------+------+-----------+
合格判定UDF
点数を受け取り、60点以上なら「合格」、未満なら「不合格」を返すUDFを作成してください。
@udf(returnType=StringType()) def pass_or_fail(score): if score is None: return "不明" return "合格" if score >= 60 else "不合格" # 使用例 scores = [("田中", 75), ("鈴木", 55), ("佐藤", 80)] df = spark.createDataFrame(scores, ["name", "score"]) df.withColumn("result", pass_or_fail("score")).show()
+----+-----+------+ |name|score|result| +----+-----+------+ |田中| 75| 合格| |鈴木| 55|不合格| |佐藤| 80| 合格| +----+-----+------+
SQLでUDFを使用
消費税(10%)を計算するUDFをSQLに登録し、商品の税込価格を計算してください。
def calc_tax(price): if price is None: return None return int(price * 0.1) spark.udf.register("calc_tax", calc_tax, IntegerType()) spark.sql(""" SELECT product, price, calc_tax(price) AS tax, price + calc_tax(price) AS total FROM products """).show()
+--------------+------+-----+------+ | product| price| tax| total| +--------------+------+-----+------+ | ノートPC|100000|10000|110000| | コーヒー| 500| 50| 550| | Tシャツ| 3000| 300| 3300| |スマートフォン| 80000| 8000| 88000| +--------------+------+-----+------+
BMIカテゴリ判定UDF
BMI値から「低体重」「標準」「肥満」を判定するUDFを作成してください。
BMI < 18.5:低体重、18.5〜25:標準、25以上:肥満
@udf(returnType=StringType()) def bmi_category(height_cm, weight_kg): if height_cm is None or weight_kg is None: return "不明" height_m = height_cm / 100 bmi = weight_kg / (height_m ** 2) if bmi < 18.5: return "低体重" elif bmi < 25: return "標準" else: return "肥満" result = df_people.withColumn("bmi_category", bmi_category("height", "weight")) result.show()
+--------+---+------+------+------------+ | name|age|height|weight|bmi_category| +--------+---+------+------+------------+ |田中太郎| 25| 170| 65| 標準| |鈴木花子| 30| 160| 50| 標準| |佐藤次郎| 35| 175| 70| 標準| |山田美咲| 18| 165| 55| 標準| +--------+---+------+------+------------+
❓ よくある質問
if value is None: return Noneのように明示的に処理します。
📝 STEP 17 のまとめ
・UDF:独自の関数を定義してSparkを拡張
・@udfデコレータ:簡単にUDFを作成
・spark.udf.register():SQLでUDFを使用
・パフォーマンス:標準関数より遅いので注意
・標準関数を優先:UDFは最後の手段
・NULL処理必須:NULLが来ることを想定
・型を明示:returnTypeを必ず指定
・小さいデータでテスト:本番前に動作確認
STEP 14〜17で、Spark SQLの基礎からUDFまで学びました。
次のPart 5では、「パフォーマンス最適化」を学びます!
学習メモ
ビッグデータ処理(Apache Spark) - Step 17