🤖 STEP 31: プロジェクト③ 機械学習向けデータ前処理
レコメンデーションシステム用データ準備 – MLパイプラインを構築しよう
📋 このプロジェクトで学ぶこと
- 大規模データの特徴量エンジニアリング
- カテゴリ変数のOne-Hot Encoding
- 数値データの正規化(StandardScaler)
- 訓練データとテストデータの分割
- 機械学習パイプラインの構築
- 処理時間・コストレポート作成
前提条件: STEP 1-30の全知識
🎯 1. プロジェクト概要
💡 特徴量エンジニアリングとは?
特徴量エンジニアリングは、機械学習モデルの精度を左右する最も重要なプロセスの1つです。生のデータから、モデルが学習しやすい形式の「特徴量」を作成します。
- カテゴリ変数:文字列を数値に変換(例:男性→0、女性→1)
- 数値変数:スケールを揃える(正規化・標準化)
- 派生特徴量:既存データから新しい指標を計算
📊 あなたのミッション
あなたはMLエンジニアです。
ECサイトのレコメンデーションシステムのために、500万件のユーザー行動データから機械学習用の特徴量を作成してください:
- ユーザーの行動データから特徴量を抽出
- カテゴリ変数をエンコーディング
- 数値データを正規化
- 訓練データ・テストデータに分割
- 機械学習パイプラインで使用できる形式で保存
1.1 データ構造
🔧 2. 環境設定とデータ読み込み
💡 Spark MLlibとは?
Spark MLlibは、分散処理に対応した機械学習ライブラリです。大規模データに対して、以下のような処理を高速に実行できます:
- 特徴量変換:StringIndexer、OneHotEncoder、StandardScaler等
- 機械学習モデル:分類、回帰、クラスタリング、協調フィルタリング
- パイプライン:複数の処理ステップを連結
2.1 必要なライブラリのインポート
# ml_feature_engineering.py
# ===================================
# 必要なライブラリのインポート
# ===================================
# SparkSession: Spark処理の入り口
from pyspark.sql import SparkSession
# functions: データ変換で使う関数群
from pyspark.sql.functions import *
# types: スキーマ定義で使う型
from pyspark.sql.types import *
# ===================================
# Spark MLlib関連のインポート
# ===================================
# 特徴量変換器
from pyspark.ml.feature import (
StringIndexer, # 文字列 → 数値インデックス
OneHotEncoder, # 数値 → 0/1ベクトル
VectorAssembler, # 複数カラム → 1つのベクトル
StandardScaler # 数値の正規化
)
# パイプライン
from pyspark.ml import Pipeline
# 時間計測用
import time
2.2 SparkSessionの作成とデータ読み込み
# ===================================
# SparkSession作成
# ===================================
spark = SparkSession.builder \
.appName("ML-Feature-Engineering") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
print("[INFO] データ読み込み開始")
start_time = time.time()
# ===================================
# データ読み込み
# ===================================
# Parquet形式でユーザー行動データを読み込み
df_raw = spark.read.parquet("gs://my-bucket/user_behavior/")
print(f"[INFO] 読み込み完了: {df_raw.count():,}件")
# ===================================
# 基本統計の確認
# ===================================
print("\n[INFO] 基本統計")
df_raw.select('age', 'view_count', 'purchase_count', 'total_amount') \
.describe() \
.show()
# describe()の出力内容:
# - count: レコード数
# - mean: 平均値
# - stddev: 標準偏差
# - min: 最小値
# - max: 最大値
📊 describe()の出力例
+-------+------------------+------------------+------------------+ |summary| age| view_count| purchase_count| +-------+------------------+------------------+------------------+ | count| 5000000| 5000000| 5000000| | mean| 35.42| 45.67| 3.21| | stddev| 12.34| 28.45| 2.15| | min| 18| 0| 0| | max| 80| 500| 50| +-------+------------------+------------------+------------------+
🧹 3. データクリーニング
💡 機械学習向けデータクリーニングのポイント
機械学習モデルは、不正なデータに対して敏感です。以下の点に注意してクリーニングを行います:
- NULL値:多くのアルゴリズムはNULLを扱えないため、削除または補完
- 異常値:年齢が負の値、120歳以上など明らかにおかしい値を除外
- 範囲チェック:閲覧回数、購入回数が負でないことを確認
# ===================================
# データクリーニング
# ===================================
print("\n[INFO] データクリーニング開始")
df_clean = df_raw \
.dropna() \
.filter(col('age') > 0) \
.filter(col('age') < 120) \
.filter(col('view_count') >= 0) \
.filter(col('purchase_count') >= 0) \
.filter(col('total_amount') >= 0)
# 【処理の解説】
# dropna(): NULL値を含むレコードを全て削除
# filter(age > 0): 年齢が0以下のレコードを除外
# filter(age < 120): 年齢が120以上のレコードを除外
# filter(view_count >= 0): 閲覧回数が負のレコードを除外
# filter(purchase_count >= 0): 購入回数が負のレコードを除外
# filter(total_amount >= 0): 総購入金額が負のレコードを除外
print(f"[INFO] クリーニング後: {df_clean.count():,}件")
🔄 4. カテゴリ変数のエンコーディング
💡 なぜエンコーディングが必要か?
機械学習アルゴリズムは、基本的に数値しか処理できません。そのため、文字列(男性/女性、東京/大阪等)を数値に変換する必要があります。
2段階の変換プロセス:
- StringIndexer:文字列 → 数値インデックス(例:東京→0, 大阪→1)
- OneHotEncoder:数値 → 0/1ベクトル(例:0→[1,0,0], 1→[0,1,0])
4.1 StringIndexerの設定
# ===================================
# カテゴリ変数のエンコーディング設定
# ===================================
print("\n[INFO] カテゴリ変数エンコーディング開始")
# ----------------------------------------
# ステップ1: StringIndexer
# 文字列を数値インデックスに変換
# ----------------------------------------
# 性別(M/F)を数値に変換
gender_indexer = StringIndexer(
inputCol="gender", # 入力カラム
outputCol="gender_index", # 出力カラム
handleInvalid="keep" # 未知の値は特別なインデックスに
)
# 地域を数値に変換
region_indexer = StringIndexer(
inputCol="region",
outputCol="region_index",
handleInvalid="keep"
)
# 商品カテゴリを数値に変換
category_indexer = StringIndexer(
inputCol="product_category",
outputCol="category_index",
handleInvalid="keep"
)
# 【handleInvalidの解説】
# "error": 未知の値があるとエラー(デフォルト)
# "skip": 未知の値を含む行をスキップ
# "keep": 未知の値に特別なインデックスを割り当て
4.2 OneHotEncoderの設定
💡 One-Hot Encodingとは?
One-Hot Encodingは、カテゴリを「0と1のベクトル」で表現する方法です。
例:地域が3種類(東京、大阪、名古屋)の場合
- 東京 → [1, 0, 0]
- 大阪 → [0, 1, 0]
- 名古屋 → [0, 0, 1]
これにより、「東京と大阪は数値的に近い」といった誤った解釈を避けられます。
# ----------------------------------------
# ステップ2: OneHotEncoder
# 数値インデックスを0/1ベクトルに変換
# ----------------------------------------
# 性別のOne-Hot Encoding
gender_encoder = OneHotEncoder(
inputCols=["gender_index"], # 入力(配列で指定)
outputCols=["gender_vec"] # 出力(配列で指定)
)
# 地域のOne-Hot Encoding
region_encoder = OneHotEncoder(
inputCols=["region_index"],
outputCols=["region_vec"]
)
# 商品カテゴリのOne-Hot Encoding
category_encoder = OneHotEncoder(
inputCols=["category_index"],
outputCols=["category_vec"]
)
print("[INFO] エンコーディング設定完了")
📊 5. 数値特徴量の作成
💡 派生特徴量(Feature Engineering)
既存のデータから新しい特徴量を作成することで、モデルの予測精度が向上します。ビジネス知識を活かして、有用な指標を設計することが重要です。
5.1 派生特徴量の作成
# ===================================
# 数値特徴量の作成
# ===================================
print("\n[INFO] 数値特徴量作成開始")
df_with_features = df_clean \
.withColumn('purchase_rate',
col('purchase_count') / (col('view_count') + 1)
) \
.withColumn('avg_order_value',
col('total_amount') / (col('purchase_count') + 1)
) \
.withColumn('engagement_score',
(col('view_count') + col('purchase_count') * 10) / (col('last_visit_days') + 1)
) \
.withColumn('is_premium',
when(col('total_amount') > 100000, 1.0).otherwise(0.0)
)
# 【派生特徴量の解説】
#
# purchase_rate(購入率):
# = 購入回数 / (閲覧回数 + 1)
# → 閲覧したうち何割が購入に至ったか
# → +1はゼロ除算を防ぐため
#
# avg_order_value(平均注文額):
# = 総購入金額 / (購入回数 + 1)
# → 1回あたりの平均購入金額
#
# engagement_score(エンゲージメントスコア):
# = (閲覧回数 + 購入回数×10) / (最終訪問日数 + 1)
# → 最近のアクティブ度を表す独自指標
# → 購入を閲覧より10倍重要視
#
# is_premium(プレミアム会員フラグ):
# = 総購入金額が10万円以上なら1、そうでなければ0
# → 高額購入者を識別するフラグ
5.2 数値特徴量のリスト定義
# ===================================
# 数値カラムのリスト定義
# ===================================
# 機械学習モデルに入力する数値特徴量
numeric_cols = [
'age', # 年齢
'view_count', # 閲覧回数
'purchase_count', # 購入回数
'total_amount', # 総購入金額
'avg_rating', # 平均評価
'last_visit_days', # 最終訪問からの日数
'purchase_rate', # 購入率(派生)
'avg_order_value', # 平均注文額(派生)
'engagement_score' # エンゲージメントスコア(派生)
]
print(f"[INFO] 数値特徴量: {len(numeric_cols)}個")
🔗 6. 特徴量ベクトルの作成
💡 VectorAssemblerとStandardScaler
VectorAssemblerは、複数のカラムを1つの特徴量ベクトルにまとめます。機械学習モデルは、このベクトル形式で入力を受け取ります。
StandardScalerは、各特徴量を「平均0、標準偏差1」に変換します。これにより、スケールの異なる特徴量(年齢:18-80、金額:0-1000000)を同等に扱えます。
# ===================================
# 特徴量ベクトルの作成設定
# ===================================
print("\n[INFO] 特徴量ベクトル作成開始")
# ----------------------------------------
# ステップ1: 数値特徴量をまとめる
# ----------------------------------------
numeric_assembler = VectorAssembler(
inputCols=numeric_cols, # 入力: 数値カラムのリスト
outputCol="numeric_features" # 出力: 1つのベクトル
)
# ----------------------------------------
# ステップ2: 数値特徴量を正規化
# ----------------------------------------
scaler = StandardScaler(
inputCol="numeric_features", # 入力: まとめた数値ベクトル
outputCol="scaled_numeric_features",# 出力: 正規化後のベクトル
withStd=True, # 標準偏差で割る(分散を1に)
withMean=True # 平均を引く(平均を0に)
)
# 【StandardScalerの計算式】
# 正規化後の値 = (元の値 - 平均) / 標準偏差
# 例: 年齢30歳、平均35歳、標準偏差10の場合
# → (30 - 35) / 10 = -0.5
# ----------------------------------------
# ステップ3: 全特徴量を1つのベクトルにまとめる
# ----------------------------------------
feature_assembler = VectorAssembler(
inputCols=[
"scaled_numeric_features", # 正規化済み数値特徴量
"gender_vec", # 性別(One-Hot)
"region_vec", # 地域(One-Hot)
"category_vec", # カテゴリ(One-Hot)
"is_premium" # プレミアムフラグ
],
outputCol="features" # 最終的な特徴量ベクトル
)
print("[INFO] 特徴量ベクトル設定完了")
🔗 7. 機械学習パイプラインの構築
💡 パイプラインの利点
Pipelineを使うと、複数の処理ステップを1つのオブジェクトにまとめられます。これにより:
- 処理の順序が明確になる
- モデルと前処理を一緒に保存・再利用できる
- 新しいデータに対して同じ変換を適用しやすい
7.1 ラベル(目的変数)の作成
# ===================================
# ラベル(目的変数)の作成
# ===================================
print("\n[INFO] パイプライン構築開始")
# 「次回購入するかどうか」をラベルとして定義
# 最終訪問が30日以内 → 1(購入する見込み)
# 最終訪問が30日超過 → 0(購入しない見込み)
df_with_label = df_with_features.withColumn(
'label',
when(col('last_visit_days') < 30, 1.0).otherwise(0.0)
)
# 【ラベル設計の解説】
# 実際のプロジェクトでは、過去データから
# 「30日以内に訪問した人の何%が購入したか」を
# 分析し、閾値を決定します
7.2 パイプラインの構築と実行
# ===================================
# パイプラインの構築
# ===================================
# 全ての処理ステップを順番に定義
pipeline = Pipeline(stages=[
# ステップ1-3: カテゴリ変数のインデックス化
gender_indexer, # 性別を数値に
region_indexer, # 地域を数値に
category_indexer, # カテゴリを数値に
# ステップ4-6: One-Hot Encoding
gender_encoder, # 性別を0/1ベクトルに
region_encoder, # 地域を0/1ベクトルに
category_encoder, # カテゴリを0/1ベクトルに
# ステップ7: 数値特徴量の集約
numeric_assembler, # 数値カラムを1ベクトルに
# ステップ8: 正規化
scaler, # 平均0、分散1に変換
# ステップ9: 全特徴量の集約
feature_assembler # 最終特徴量ベクトル作成
])
# ===================================
# パイプラインの実行
# ===================================
print("[INFO] パイプライン実行中...")
# fit(): データを使ってパイプラインを学習
# transform(): 学習済みパイプラインでデータを変換
pipeline_model = pipeline.fit(df_with_label)
df_transformed = pipeline_model.transform(df_with_label)
print(f"[INFO] 変換後: {df_transformed.count():,}件")
📊 8. データ分割と保存
💡 訓練データとテストデータの分割
機械学習では、データを訓練データとテストデータに分割します:
- 訓練データ(80%):モデルの学習に使用
- テストデータ(20%):モデルの性能評価に使用
テストデータは学習には使わず、「未知のデータ」としてモデルの汎化性能を測定します。
8.1 データ分割
# ===================================
# データ分割
# ===================================
print("\n[INFO] データ分割開始")
# 必要なカラムだけ選択
df_final = df_transformed.select(
'user_id', # ユーザーID(識別用)
'features', # 特徴量ベクトル
'label' # 目的変数
)
# 訓練データ80%、テストデータ20%に分割
# seed=42: 乱数シードを固定して再現性を確保
train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=42)
print(f"[INFO] 訓練データ: {train_data.count():,}件")
print(f"[INFO] テストデータ: {test_data.count():,}件")
# ===================================
# ラベルの分布を確認
# ===================================
print("\n[INFO] 訓練データのラベル分布")
train_data.groupBy('label').count().show()
print("\n[INFO] テストデータのラベル分布")
test_data.groupBy('label').count().show()
📊 ラベル分布の出力例
+-----+-------+ |label| count| +-----+-------+ | 0.0|2400000| | 1.0|1600000| +-----+-------+
8.2 データの保存
# ===================================
# データ保存
# ===================================
print("\n[INFO] データ保存開始")
output_path = "gs://my-bucket/ml_features/"
# 訓練データを保存
print("[INFO] 訓練データ保存中...")
train_data.write \
.mode('overwrite') \
.parquet(f"{output_path}/train/")
# テストデータを保存
print("[INFO] テストデータ保存中...")
test_data.write \
.mode('overwrite') \
.parquet(f"{output_path}/test/")
# ===================================
# パイプラインモデルを保存(再利用可能)
# ===================================
print("[INFO] パイプラインモデル保存中...")
pipeline_model.write().overwrite().save(f"{output_path}/pipeline_model/")
# 【パイプラインモデル保存の利点】
# 新しいデータに対して、同じ前処理を適用できる
# 読み込み方法:
# from pyspark.ml import PipelineModel
# loaded_model = PipelineModel.load(path)
# new_data_transformed = loaded_model.transform(new_data)
print("[INFO] データ保存完了")
📊 9. 処理時間・コストレポート
# ===================================
# パフォーマンス測定
# ===================================
end_time = time.time()
elapsed_time = end_time - start_time
# サンプルの特徴量を取得
sample_features = train_data.select('features').first()[0]
print("\n" + "=" * 60)
print("処理完了レポート")
print("=" * 60)
# 処理時間
print(f"総処理時間: {elapsed_time:.2f}秒 ({elapsed_time/60:.2f}分)")
print(f"1秒あたり処理件数: {df_raw.count() / elapsed_time:,.0f}件/秒")
# データサイズ
print(f"\n入力データ: {df_raw.count():,}件")
print(f"出力データ(訓練): {train_data.count():,}件")
print(f"出力データ(テスト): {test_data.count():,}件")
# 特徴量情報
print(f"\n特徴量次元数: {len(sample_features)}次元")
print(f"数値特徴量: {len(numeric_cols)}個")
print(f"カテゴリ特徴量: 3個(gender, region, category)")
# ===================================
# コスト見積もり
# ===================================
print("\n[INFO] コスト見積もり")
# クラスター構成例
cluster_config = {
"platform": "GCP Dataproc",
"machine_type": "n1-standard-4",
"num_workers": 5,
"hourly_cost": 0.23 * 5 # $0.23/台 × 5台
}
processing_hours = elapsed_time / 3600
estimated_cost = cluster_config['hourly_cost'] * processing_hours
print(f"クラスター: {cluster_config['machine_type']} × {cluster_config['num_workers']}台")
print(f"時間単価: ${cluster_config['hourly_cost']:.2f}/時間")
print(f"処理時間: {processing_hours:.4f}時間")
print(f"推定コスト: ${estimated_cost:.4f}")
print("=" * 60)
🤖 10. 機械学習モデルの学習(参考)
💡 このセクションについて
特徴量の準備が完了したら、実際に機械学習モデルを学習してみましょう。ここではRandom Forestという代表的なアルゴリズムを使った分類モデルを紹介します。
# ===================================
# 機械学習モデルの学習(参考)
# ===================================
print("\n[INFO] 機械学習モデル学習開始(参考)")
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# ----------------------------------------
# Random Forestモデルの設定
# ----------------------------------------
rf = RandomForestClassifier(
featuresCol='features', # 特徴量カラム
labelCol='label', # ラベルカラム
numTrees=100, # 木の数(多いほど精度向上、計算コスト増)
maxDepth=10, # 木の最大深さ(深いほど複雑なパターンを学習)
seed=42 # 再現性のための乱数シード
)
# ----------------------------------------
# モデルの学習
# ----------------------------------------
print("[INFO] モデル学習中...")
rf_model = rf.fit(train_data)
# ----------------------------------------
# 予測の実行
# ----------------------------------------
print("[INFO] 予測実行中...")
predictions = rf_model.transform(test_data)
# ----------------------------------------
# モデルの評価
# ----------------------------------------
evaluator = BinaryClassificationEvaluator(
labelCol='label',
metricName='areaUnderROC' # AUC(0.5=ランダム、1.0=完璧)
)
auc = evaluator.evaluate(predictions)
print(f"\n[INFO] AUC: {auc:.4f}")
# 【AUCの解釈】
# 0.5: ランダムな予測と同等(意味なし)
# 0.7: まずまずの精度
# 0.8: 良い精度
# 0.9+: 非常に良い精度
10.1 特徴量重要度と予測結果の確認
# ----------------------------------------
# 特徴量重要度の確認
# ----------------------------------------
feature_importance = rf_model.featureImportances
print("\n[INFO] 特徴量重要度(上位5個):")
for i in range(min(5, len(feature_importance))):
print(f" 特徴量 {i}: {feature_importance[i]:.4f}")
# ----------------------------------------
# 予測結果のサンプル表示
# ----------------------------------------
print("\n[INFO] 予測結果サンプル")
predictions.select('user_id', 'label', 'prediction', 'probability') \
.show(10, truncate=False)
# ----------------------------------------
# 混同行列
# ----------------------------------------
print("\n[INFO] 混同行列")
predictions.groupBy('label', 'prediction').count().show()
# 【混同行列の解釈】
# label=1, prediction=1: 正解(購入者を購入と予測)
# label=0, prediction=0: 正解(非購入者を非購入と予測)
# label=1, prediction=0: 見逃し(購入者を非購入と予測)
# label=0, prediction=1: 誤検知(非購入者を購入と予測)
spark.stop()
print("\n[INFO] 全処理完了")
📝 まとめ
✅ このプロジェクトで学んだこと
- 特徴量エンジニアリングの実践
- カテゴリ変数のエンコーディング(StringIndexer + OneHotEncoder)
- 数値データの正規化(StandardScaler)
- パイプライン構築で処理を自動化
- データ分割(訓練/テスト)
- パフォーマンス測定とコスト見積もり
- 機械学習モデルとの連携
🎯 成果物チェックリスト
- □ 訓練データ(Parquet形式)
- □ テストデータ(Parquet形式)
- □ パイプラインモデル(再利用可能)
- □ 処理時間レポート
- □ コスト見積もりレポート
- □ モデル評価結果(AUC)
🎉 Sparkコース完全修了おめでとうございます!
STEP 1からSTEP 31まで、お疲れ様でした!
あなたは今、大規模データ処理のプロフェッショナルです。
習得したスキル:
これらのスキルは、データエンジニア・データサイエンティストとして即戦力になります!
学習メモ
ビッグデータ処理(Apache Spark) - Step 31