STEP 31:プロジェクト③ 機械学習向けデータ前処理

🤖 STEP 31: プロジェクト③ 機械学習向けデータ前処理

レコメンデーションシステム用データ準備 – MLパイプラインを構築しよう

📋 このプロジェクトで学ぶこと

  • 大規模データの特徴量エンジニアリング
  • カテゴリ変数のOne-Hot Encoding
  • 数値データの正規化(StandardScaler)
  • 訓練データとテストデータの分割
  • 機械学習パイプラインの構築
  • 処理時間・コストレポート作成

前提条件: STEP 1-30の全知識

🎯 1. プロジェクト概要

💡 特徴量エンジニアリングとは?

特徴量エンジニアリングは、機械学習モデルの精度を左右する最も重要なプロセスの1つです。生のデータから、モデルが学習しやすい形式の「特徴量」を作成します。

  • カテゴリ変数:文字列を数値に変換(例:男性→0、女性→1)
  • 数値変数:スケールを揃える(正規化・標準化)
  • 派生特徴量:既存データから新しい指標を計算

📊 あなたのミッション

あなたはMLエンジニアです。
ECサイトのレコメンデーションシステムのために、500万件のユーザー行動データから機械学習用の特徴量を作成してください:

  1. ユーザーの行動データから特徴量を抽出
  2. カテゴリ変数をエンコーディング
  3. 数値データを正規化
  4. 訓練データ・テストデータに分割
  5. 機械学習パイプラインで使用できる形式で保存

1.1 データ構造

【入力データ】 user_id : ユーザーID age : 年齢 gender : 性別(M/F) region : 地域(東京、大阪、名古屋など) product_category: 閲覧した商品カテゴリ view_count : 閲覧回数 purchase_count : 購入回数 total_amount : 総購入金額 avg_rating : 平均評価 last_visit_days : 最終訪問からの日数 【出力データ】 features: 特徴量ベクトル(機械学習モデルに入力) label : 目的変数(次回購入するかどうか 0/1)

🔧 2. 環境設定とデータ読み込み

💡 Spark MLlibとは?

Spark MLlibは、分散処理に対応した機械学習ライブラリです。大規模データに対して、以下のような処理を高速に実行できます:

  • 特徴量変換:StringIndexer、OneHotEncoder、StandardScaler等
  • 機械学習モデル:分類、回帰、クラスタリング、協調フィルタリング
  • パイプライン:複数の処理ステップを連結

2.1 必要なライブラリのインポート

# ml_feature_engineering.py
# ===================================
# 必要なライブラリのインポート
# ===================================

# SparkSession: Spark処理の入り口
from pyspark.sql import SparkSession

# functions: データ変換で使う関数群
from pyspark.sql.functions import *

# types: スキーマ定義で使う型
from pyspark.sql.types import *

# ===================================
# Spark MLlib関連のインポート
# ===================================

# 特徴量変換器
from pyspark.ml.feature import (
    StringIndexer,   # 文字列 → 数値インデックス
    OneHotEncoder,   # 数値 → 0/1ベクトル
    VectorAssembler, # 複数カラム → 1つのベクトル
    StandardScaler   # 数値の正規化
)

# パイプライン
from pyspark.ml import Pipeline

# 時間計測用
import time

2.2 SparkSessionの作成とデータ読み込み

# ===================================
# SparkSession作成
# ===================================
spark = SparkSession.builder \
    .appName("ML-Feature-Engineering") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print("[INFO] データ読み込み開始")
start_time = time.time()

# ===================================
# データ読み込み
# ===================================
# Parquet形式でユーザー行動データを読み込み
df_raw = spark.read.parquet("gs://my-bucket/user_behavior/")
print(f"[INFO] 読み込み完了: {df_raw.count():,}件")

# ===================================
# 基本統計の確認
# ===================================
print("\n[INFO] 基本統計")
df_raw.select('age', 'view_count', 'purchase_count', 'total_amount') \
    .describe() \
    .show()

# describe()の出力内容:
# - count: レコード数
# - mean: 平均値
# - stddev: 標準偏差
# - min: 最小値
# - max: 最大値

📊 describe()の出力例

+-------+------------------+------------------+------------------+
|summary|               age|        view_count|    purchase_count|
+-------+------------------+------------------+------------------+
|  count|           5000000|           5000000|           5000000|
|   mean|             35.42|             45.67|              3.21|
| stddev|             12.34|             28.45|              2.15|
|    min|                18|                 0|                 0|
|    max|                80|               500|                50|
+-------+------------------+------------------+------------------+

🧹 3. データクリーニング

💡 機械学習向けデータクリーニングのポイント

機械学習モデルは、不正なデータに対して敏感です。以下の点に注意してクリーニングを行います:

  • NULL値:多くのアルゴリズムはNULLを扱えないため、削除または補完
  • 異常値:年齢が負の値、120歳以上など明らかにおかしい値を除外
  • 範囲チェック:閲覧回数、購入回数が負でないことを確認
# ===================================
# データクリーニング
# ===================================
print("\n[INFO] データクリーニング開始")

df_clean = df_raw \
    .dropna() \
    .filter(col('age') > 0) \
    .filter(col('age') < 120) \
    .filter(col('view_count') >= 0) \
    .filter(col('purchase_count') >= 0) \
    .filter(col('total_amount') >= 0)

# 【処理の解説】
# dropna(): NULL値を含むレコードを全て削除
# filter(age > 0): 年齢が0以下のレコードを除外
# filter(age < 120): 年齢が120以上のレコードを除外
# filter(view_count >= 0): 閲覧回数が負のレコードを除外
# filter(purchase_count >= 0): 購入回数が負のレコードを除外
# filter(total_amount >= 0): 総購入金額が負のレコードを除外

print(f"[INFO] クリーニング後: {df_clean.count():,}件")

🔄 4. カテゴリ変数のエンコーディング

💡 なぜエンコーディングが必要か?

機械学習アルゴリズムは、基本的に数値しか処理できません。そのため、文字列(男性/女性、東京/大阪等)を数値に変換する必要があります。

2段階の変換プロセス:

  1. StringIndexer:文字列 → 数値インデックス(例:東京→0, 大阪→1)
  2. OneHotEncoder:数値 → 0/1ベクトル(例:0→[1,0,0], 1→[0,1,0])

4.1 StringIndexerの設定

# ===================================
# カテゴリ変数のエンコーディング設定
# ===================================
print("\n[INFO] カテゴリ変数エンコーディング開始")

# ----------------------------------------
# ステップ1: StringIndexer
# 文字列を数値インデックスに変換
# ----------------------------------------

# 性別(M/F)を数値に変換
gender_indexer = StringIndexer(
    inputCol="gender",           # 入力カラム
    outputCol="gender_index",    # 出力カラム
    handleInvalid="keep"         # 未知の値は特別なインデックスに
)

# 地域を数値に変換
region_indexer = StringIndexer(
    inputCol="region",
    outputCol="region_index",
    handleInvalid="keep"
)

# 商品カテゴリを数値に変換
category_indexer = StringIndexer(
    inputCol="product_category",
    outputCol="category_index",
    handleInvalid="keep"
)

# 【handleInvalidの解説】
# "error": 未知の値があるとエラー(デフォルト)
# "skip": 未知の値を含む行をスキップ
# "keep": 未知の値に特別なインデックスを割り当て

4.2 OneHotEncoderの設定

💡 One-Hot Encodingとは?

One-Hot Encodingは、カテゴリを「0と1のベクトル」で表現する方法です。

例:地域が3種類(東京、大阪、名古屋)の場合

  • 東京 → [1, 0, 0]
  • 大阪 → [0, 1, 0]
  • 名古屋 → [0, 0, 1]

これにより、「東京と大阪は数値的に近い」といった誤った解釈を避けられます。

# ----------------------------------------
# ステップ2: OneHotEncoder
# 数値インデックスを0/1ベクトルに変換
# ----------------------------------------

# 性別のOne-Hot Encoding
gender_encoder = OneHotEncoder(
    inputCols=["gender_index"],  # 入力(配列で指定)
    outputCols=["gender_vec"]    # 出力(配列で指定)
)

# 地域のOne-Hot Encoding
region_encoder = OneHotEncoder(
    inputCols=["region_index"],
    outputCols=["region_vec"]
)

# 商品カテゴリのOne-Hot Encoding
category_encoder = OneHotEncoder(
    inputCols=["category_index"],
    outputCols=["category_vec"]
)

print("[INFO] エンコーディング設定完了")

📊 5. 数値特徴量の作成

💡 派生特徴量(Feature Engineering)

既存のデータから新しい特徴量を作成することで、モデルの予測精度が向上します。ビジネス知識を活かして、有用な指標を設計することが重要です。

5.1 派生特徴量の作成

# ===================================
# 数値特徴量の作成
# ===================================
print("\n[INFO] 数値特徴量作成開始")

df_with_features = df_clean \
    .withColumn('purchase_rate',
        col('purchase_count') / (col('view_count') + 1)
    ) \
    .withColumn('avg_order_value',
        col('total_amount') / (col('purchase_count') + 1)
    ) \
    .withColumn('engagement_score',
        (col('view_count') + col('purchase_count') * 10) / (col('last_visit_days') + 1)
    ) \
    .withColumn('is_premium',
        when(col('total_amount') > 100000, 1.0).otherwise(0.0)
    )

# 【派生特徴量の解説】
#
# purchase_rate(購入率):
#   = 購入回数 / (閲覧回数 + 1)
#   → 閲覧したうち何割が購入に至ったか
#   → +1はゼロ除算を防ぐため
#
# avg_order_value(平均注文額):
#   = 総購入金額 / (購入回数 + 1)
#   → 1回あたりの平均購入金額
#
# engagement_score(エンゲージメントスコア):
#   = (閲覧回数 + 購入回数×10) / (最終訪問日数 + 1)
#   → 最近のアクティブ度を表す独自指標
#   → 購入を閲覧より10倍重要視
#
# is_premium(プレミアム会員フラグ):
#   = 総購入金額が10万円以上なら1、そうでなければ0
#   → 高額購入者を識別するフラグ

5.2 数値特徴量のリスト定義

# ===================================
# 数値カラムのリスト定義
# ===================================

# 機械学習モデルに入力する数値特徴量
numeric_cols = [
    'age',              # 年齢
    'view_count',       # 閲覧回数
    'purchase_count',   # 購入回数
    'total_amount',     # 総購入金額
    'avg_rating',       # 平均評価
    'last_visit_days',  # 最終訪問からの日数
    'purchase_rate',    # 購入率(派生)
    'avg_order_value',  # 平均注文額(派生)
    'engagement_score'  # エンゲージメントスコア(派生)
]

print(f"[INFO] 数値特徴量: {len(numeric_cols)}個")

🔗 6. 特徴量ベクトルの作成

💡 VectorAssemblerとStandardScaler

VectorAssemblerは、複数のカラムを1つの特徴量ベクトルにまとめます。機械学習モデルは、このベクトル形式で入力を受け取ります。

StandardScalerは、各特徴量を「平均0、標準偏差1」に変換します。これにより、スケールの異なる特徴量(年齢:18-80、金額:0-1000000)を同等に扱えます。

# ===================================
# 特徴量ベクトルの作成設定
# ===================================
print("\n[INFO] 特徴量ベクトル作成開始")

# ----------------------------------------
# ステップ1: 数値特徴量をまとめる
# ----------------------------------------
numeric_assembler = VectorAssembler(
    inputCols=numeric_cols,         # 入力: 数値カラムのリスト
    outputCol="numeric_features"    # 出力: 1つのベクトル
)

# ----------------------------------------
# ステップ2: 数値特徴量を正規化
# ----------------------------------------
scaler = StandardScaler(
    inputCol="numeric_features",        # 入力: まとめた数値ベクトル
    outputCol="scaled_numeric_features",# 出力: 正規化後のベクトル
    withStd=True,    # 標準偏差で割る(分散を1に)
    withMean=True    # 平均を引く(平均を0に)
)

# 【StandardScalerの計算式】
# 正規化後の値 = (元の値 - 平均) / 標準偏差
# 例: 年齢30歳、平均35歳、標準偏差10の場合
#     → (30 - 35) / 10 = -0.5

# ----------------------------------------
# ステップ3: 全特徴量を1つのベクトルにまとめる
# ----------------------------------------
feature_assembler = VectorAssembler(
    inputCols=[
        "scaled_numeric_features",  # 正規化済み数値特徴量
        "gender_vec",               # 性別(One-Hot)
        "region_vec",               # 地域(One-Hot)
        "category_vec",             # カテゴリ(One-Hot)
        "is_premium"                # プレミアムフラグ
    ],
    outputCol="features"            # 最終的な特徴量ベクトル
)

print("[INFO] 特徴量ベクトル設定完了")

🔗 7. 機械学習パイプラインの構築

💡 パイプラインの利点

Pipelineを使うと、複数の処理ステップを1つのオブジェクトにまとめられます。これにより:

  • 処理の順序が明確になる
  • モデルと前処理を一緒に保存・再利用できる
  • 新しいデータに対して同じ変換を適用しやすい

7.1 ラベル(目的変数)の作成

# ===================================
# ラベル(目的変数)の作成
# ===================================
print("\n[INFO] パイプライン構築開始")

# 「次回購入するかどうか」をラベルとして定義
# 最終訪問が30日以内 → 1(購入する見込み)
# 最終訪問が30日超過 → 0(購入しない見込み)

df_with_label = df_with_features.withColumn(
    'label',
    when(col('last_visit_days') < 30, 1.0).otherwise(0.0)
)

# 【ラベル設計の解説】
# 実際のプロジェクトでは、過去データから
# 「30日以内に訪問した人の何%が購入したか」を
# 分析し、閾値を決定します

7.2 パイプラインの構築と実行

# ===================================
# パイプラインの構築
# ===================================

# 全ての処理ステップを順番に定義
pipeline = Pipeline(stages=[
    # ステップ1-3: カテゴリ変数のインデックス化
    gender_indexer,    # 性別を数値に
    region_indexer,    # 地域を数値に
    category_indexer,  # カテゴリを数値に
    
    # ステップ4-6: One-Hot Encoding
    gender_encoder,    # 性別を0/1ベクトルに
    region_encoder,    # 地域を0/1ベクトルに
    category_encoder,  # カテゴリを0/1ベクトルに
    
    # ステップ7: 数値特徴量の集約
    numeric_assembler, # 数値カラムを1ベクトルに
    
    # ステップ8: 正規化
    scaler,            # 平均0、分散1に変換
    
    # ステップ9: 全特徴量の集約
    feature_assembler  # 最終特徴量ベクトル作成
])

# ===================================
# パイプラインの実行
# ===================================
print("[INFO] パイプライン実行中...")

# fit(): データを使ってパイプラインを学習
# transform(): 学習済みパイプラインでデータを変換
pipeline_model = pipeline.fit(df_with_label)
df_transformed = pipeline_model.transform(df_with_label)

print(f"[INFO] 変換後: {df_transformed.count():,}件")

📊 8. データ分割と保存

💡 訓練データとテストデータの分割

機械学習では、データを訓練データテストデータに分割します:

  • 訓練データ(80%):モデルの学習に使用
  • テストデータ(20%):モデルの性能評価に使用

テストデータは学習には使わず、「未知のデータ」としてモデルの汎化性能を測定します。

8.1 データ分割

# ===================================
# データ分割
# ===================================
print("\n[INFO] データ分割開始")

# 必要なカラムだけ選択
df_final = df_transformed.select(
    'user_id',   # ユーザーID(識別用)
    'features',  # 特徴量ベクトル
    'label'      # 目的変数
)

# 訓練データ80%、テストデータ20%に分割
# seed=42: 乱数シードを固定して再現性を確保
train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=42)

print(f"[INFO] 訓練データ: {train_data.count():,}件")
print(f"[INFO] テストデータ: {test_data.count():,}件")

# ===================================
# ラベルの分布を確認
# ===================================
print("\n[INFO] 訓練データのラベル分布")
train_data.groupBy('label').count().show()

print("\n[INFO] テストデータのラベル分布")
test_data.groupBy('label').count().show()

📊 ラベル分布の出力例

+-----+-------+
|label|  count|
+-----+-------+
|  0.0|2400000|
|  1.0|1600000|
+-----+-------+

8.2 データの保存

# ===================================
# データ保存
# ===================================
print("\n[INFO] データ保存開始")

output_path = "gs://my-bucket/ml_features/"

# 訓練データを保存
print("[INFO] 訓練データ保存中...")
train_data.write \
    .mode('overwrite') \
    .parquet(f"{output_path}/train/")

# テストデータを保存
print("[INFO] テストデータ保存中...")
test_data.write \
    .mode('overwrite') \
    .parquet(f"{output_path}/test/")

# ===================================
# パイプラインモデルを保存(再利用可能)
# ===================================
print("[INFO] パイプラインモデル保存中...")
pipeline_model.write().overwrite().save(f"{output_path}/pipeline_model/")

# 【パイプラインモデル保存の利点】
# 新しいデータに対して、同じ前処理を適用できる
# 読み込み方法:
#   from pyspark.ml import PipelineModel
#   loaded_model = PipelineModel.load(path)
#   new_data_transformed = loaded_model.transform(new_data)

print("[INFO] データ保存完了")

📊 9. 処理時間・コストレポート

# ===================================
# パフォーマンス測定
# ===================================
end_time = time.time()
elapsed_time = end_time - start_time

# サンプルの特徴量を取得
sample_features = train_data.select('features').first()[0]

print("\n" + "=" * 60)
print("処理完了レポート")
print("=" * 60)

# 処理時間
print(f"総処理時間: {elapsed_time:.2f}秒 ({elapsed_time/60:.2f}分)")
print(f"1秒あたり処理件数: {df_raw.count() / elapsed_time:,.0f}件/秒")

# データサイズ
print(f"\n入力データ: {df_raw.count():,}件")
print(f"出力データ(訓練): {train_data.count():,}件")
print(f"出力データ(テスト): {test_data.count():,}件")

# 特徴量情報
print(f"\n特徴量次元数: {len(sample_features)}次元")
print(f"数値特徴量: {len(numeric_cols)}個")
print(f"カテゴリ特徴量: 3個(gender, region, category)")

# ===================================
# コスト見積もり
# ===================================
print("\n[INFO] コスト見積もり")

# クラスター構成例
cluster_config = {
    "platform": "GCP Dataproc",
    "machine_type": "n1-standard-4",
    "num_workers": 5,
    "hourly_cost": 0.23 * 5  # $0.23/台 × 5台
}

processing_hours = elapsed_time / 3600
estimated_cost = cluster_config['hourly_cost'] * processing_hours

print(f"クラスター: {cluster_config['machine_type']} × {cluster_config['num_workers']}台")
print(f"時間単価: ${cluster_config['hourly_cost']:.2f}/時間")
print(f"処理時間: {processing_hours:.4f}時間")
print(f"推定コスト: ${estimated_cost:.4f}")
print("=" * 60)

🤖 10. 機械学習モデルの学習(参考)

💡 このセクションについて

特徴量の準備が完了したら、実際に機械学習モデルを学習してみましょう。ここではRandom Forestという代表的なアルゴリズムを使った分類モデルを紹介します。

# ===================================
# 機械学習モデルの学習(参考)
# ===================================
print("\n[INFO] 機械学習モデル学習開始(参考)")

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# ----------------------------------------
# Random Forestモデルの設定
# ----------------------------------------
rf = RandomForestClassifier(
    featuresCol='features',  # 特徴量カラム
    labelCol='label',        # ラベルカラム
    numTrees=100,            # 木の数(多いほど精度向上、計算コスト増)
    maxDepth=10,             # 木の最大深さ(深いほど複雑なパターンを学習)
    seed=42                  # 再現性のための乱数シード
)

# ----------------------------------------
# モデルの学習
# ----------------------------------------
print("[INFO] モデル学習中...")
rf_model = rf.fit(train_data)

# ----------------------------------------
# 予測の実行
# ----------------------------------------
print("[INFO] 予測実行中...")
predictions = rf_model.transform(test_data)

# ----------------------------------------
# モデルの評価
# ----------------------------------------
evaluator = BinaryClassificationEvaluator(
    labelCol='label',
    metricName='areaUnderROC'  # AUC(0.5=ランダム、1.0=完璧)
)

auc = evaluator.evaluate(predictions)
print(f"\n[INFO] AUC: {auc:.4f}")

# 【AUCの解釈】
# 0.5: ランダムな予測と同等(意味なし)
# 0.7: まずまずの精度
# 0.8: 良い精度
# 0.9+: 非常に良い精度

10.1 特徴量重要度と予測結果の確認

# ----------------------------------------
# 特徴量重要度の確認
# ----------------------------------------
feature_importance = rf_model.featureImportances
print("\n[INFO] 特徴量重要度(上位5個):")
for i in range(min(5, len(feature_importance))):
    print(f"  特徴量 {i}: {feature_importance[i]:.4f}")

# ----------------------------------------
# 予測結果のサンプル表示
# ----------------------------------------
print("\n[INFO] 予測結果サンプル")
predictions.select('user_id', 'label', 'prediction', 'probability') \
    .show(10, truncate=False)

# ----------------------------------------
# 混同行列
# ----------------------------------------
print("\n[INFO] 混同行列")
predictions.groupBy('label', 'prediction').count().show()

# 【混同行列の解釈】
# label=1, prediction=1: 正解(購入者を購入と予測)
# label=0, prediction=0: 正解(非購入者を非購入と予測)
# label=1, prediction=0: 見逃し(購入者を非購入と予測)
# label=0, prediction=1: 誤検知(非購入者を購入と予測)

spark.stop()
print("\n[INFO] 全処理完了")

📝 まとめ

✅ このプロジェクトで学んだこと

  • 特徴量エンジニアリングの実践
  • カテゴリ変数のエンコーディング(StringIndexer + OneHotEncoder)
  • 数値データの正規化(StandardScaler)
  • パイプライン構築で処理を自動化
  • データ分割(訓練/テスト)
  • パフォーマンス測定とコスト見積もり
  • 機械学習モデルとの連携

🎯 成果物チェックリスト

  • □ 訓練データ(Parquet形式)
  • □ テストデータ(Parquet形式)
  • □ パイプラインモデル(再利用可能)
  • □ 処理時間レポート
  • □ コスト見積もりレポート
  • □ モデル評価結果(AUC)

🎉 Sparkコース完全修了おめでとうございます!

STEP 1からSTEP 31まで、お疲れ様でした!
あなたは今、大規模データ処理のプロフェッショナルです。

習得したスキル:

✅ Sparkの基礎から応用まで完全マスター
✅ 大規模データ処理(1億レコード以上)
✅ パフォーマンスチューニング
✅ クラウド環境(AWS EMR、GCP Dataproc)
✅ 本番レベルのETLパイプライン構築
✅ 機械学習向けデータ前処理

これらのスキルは、データエンジニア・データサイエンティストとして即戦力になります!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 31

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE