STEP 27:Spark MLlib入門

🤖 STEP 27: Spark MLlib入門

Sparkで機械学習 – 大規模データで予測モデルを作ろう

📋 このステップで学ぶこと

  • MLlibとは何か – Sparkの機械学習ライブラリ
  • 特徴量エンジニアリングの基礎
  • 分類モデルの構築(ロジスティック回帰)
  • モデル評価の方法
  • パイプラインの構築
📌 このステップの位置づけ

このステップはMLlibの入門編です。
本格的な機械学習(ハイパーパラメータチューニング、クロスバリデーション、様々なアルゴリズムなど)は、別の機械学習コースで学習してください。

🤖 1. MLlibとは?

1-1. MLlibの概要

MLlib(エムエルリブ)は、Sparkの機械学習ライブラリです。
大規模データで機械学習モデルを作ることができます。

📊 MLlibでできること

分類:メールがスパムかどうか判定
回帰:家の価格を予測
クラスタリング:顧客をグループ分け
レコメンド:おすすめ商品を提案

1-2. 機械学習の基本的な流れ

🔄 機械学習の4ステップ

1. データ準備
  ↓
2. 特徴量エンジニアリング(データを機械が理解できる形に変換)
  ↓
3. モデル学習(データからパターンを学習)
  ↓
4. モデル評価(精度を確認)

1-3. scikit-learn vs MLlib

項目 scikit-learn Spark MLlib
データサイズ 小〜中規模(メモリに収まる) 大規模(分散処理)
実行環境 単一マシン クラスター
学習曲線 易しい やや難しい
使い分け プロトタイプ、小規模 本番環境、大規模

🔧 2. 特徴量エンジニアリング

2-1. 特徴量とは?

特徴量とは、機械学習モデルに入力するデータのことです。
「年齢」「性別」「購入金額」などがこれにあたります。

💡 特徴量エンジニアリングが重要な理由

機械学習モデルは数字しか扱えません
「男性」「女性」のような文字列や、複数のカラムを、モデルが理解できる形式に変換する必要があります。

2-2. StringIndexer:文字列を数字に変換

# StringIndexer: 文字列を数字に変換

from pyspark.ml.feature import StringIndexer

# サンプルデータ
df = spark.createDataFrame([
    (0, "male"),
    (1, "female"),
    (2, "male"),
    (3, "female")
], ["id", "gender"])

# StringIndexerを作成
indexer = StringIndexer(inputCol="gender", outputCol="gender_index")

# 変換を実行
indexed = indexer.fit(df).transform(df)
indexed.show()
+---+------+------------+
| id|gender|gender_index|
+---+------+------------+
|  0|  male|         1.0|
|  1|female|         0.0|
|  2|  male|         1.0|
|  3|female|         0.0|
+---+------+------------+

※ 出現頻度が高い順に0, 1, 2...と番号が振られる

2-3. OneHotEncoder:カテゴリを0/1ベクトルに変換

# OneHotEncoder: カテゴリを0/1のベクトルに変換

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(
    inputCols=["gender_index"], 
    outputCols=["gender_vec"]
)

encoded = encoder.fit(indexed).transform(indexed)
encoded.show(truncate=False)
+---+------+------------+-------------+
| id|gender|gender_index|   gender_vec|
+---+------+------------+-------------+
|  0|  male|         1.0|(1,[0],[1.0])|
|  1|female|         0.0|    (1,[],[])|
|  2|  male|         1.0|(1,[0],[1.0])|
|  3|female|         0.0|    (1,[],[])|
+---+------+------------+-------------+

2-4. VectorAssembler:特徴量ベクトルの作成

機械学習モデルは、1つのベクトルとして特徴量を受け取ります。
VectorAssemblerで複数のカラムを1つにまとめます。

# VectorAssembler: 複数の特徴量を1つにまとめる

from pyspark.ml.feature import VectorAssembler

df = spark.createDataFrame([
    (1, 25, 50000),
    (2, 30, 60000),
    (3, 35, 70000)
], ["id", "age", "salary"])

# ageとsalaryを1つのベクトルに
assembler = VectorAssembler(
    inputCols=["age", "salary"],
    outputCol="features"
)

feature_df = assembler.transform(df)
feature_df.show(truncate=False)
+---+---+------+---------------+
| id|age|salary|       features|
+---+---+------+---------------+
|  1| 25| 50000|[25.0,50000.0] |
|  2| 30| 60000|[30.0,60000.0] |
|  3| 35| 70000|[35.0,70000.0] |
+---+---+------+---------------+

2-5. StandardScaler:正規化

特徴量のスケール(大きさ)を揃えることで、モデルの精度が上がります。

# StandardScaler: 平均0、標準偏差1に正規化

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(
    inputCol="features", 
    outputCol="scaled_features",
    withStd=True,
    withMean=True
)

scaler_model = scaler.fit(feature_df)
scaled_df = scaler_model.transform(feature_df)
scaled_df.select("features", "scaled_features").show(truncate=False)

🎯 3. 分類モデルの構築

3-1. 分類問題とは?

分類問題とは、「Yes/No」や「カテゴリA/B/C」を予測する問題です。

📊 分類問題の例

・メールがスパムかどうか(スパム/正常)
・顧客が商品を購入するか(購入する/しない)
・病気の診断(陽性/陰性)
・手書き文字の認識(0〜9)

3-2. ロジスティック回帰モデル

# ロジスティック回帰で分類モデルを作成

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors

# サンプルデータ(label: 0 or 1, features: 特徴量ベクトル)
training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))
], ["label", "features"])

# モデル作成
lr = LogisticRegression(maxIter=10, regParam=0.01)

# 学習
model = lr.fit(training)

# テストデータで予測
test = spark.createDataFrame([
    (Vectors.dense([1.0, 1.5, 0.2]),),
    (Vectors.dense([2.5, 1.2, 0.8]),)
], ["features"])

predictions = model.transform(test)
predictions.select("features", "prediction", "probability").show(truncate=False)
+---------------+----------+----------------------------------------+
|       features|prediction|                             probability|
+---------------+----------+----------------------------------------+
|[1.0, 1.5, 0.2]|       1.0|[0.35..., 0.64...]  ← 64%の確率で1|
|[2.5, 1.2, 0.8]|       0.0|[0.72..., 0.27...]  ← 72%の確率で0|
+---------------+----------+----------------------------------------+

3-3. パイプラインで処理をまとめる

Pipelineを使うと、複数の処理を1つにまとめて管理できます。

# 完全なパイプライン:データ準備から予測まで

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression

# サンプルデータ
df = spark.createDataFrame([
    ("male", 25, 50000, 1),
    ("female", 30, 60000, 0),
    ("male", 35, 70000, 1),
    ("female", 28, 55000, 0),
    ("male", 40, 80000, 1),
    ("female", 32, 65000, 0)
], ["gender", "age", "salary", "purchased"])

# ステップ1: 性別を数値に変換
gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_index")

# ステップ2: 特徴量をベクトルにまとめる
assembler = VectorAssembler(
    inputCols=["gender_index", "age", "salary"],
    outputCol="features"
)

# ステップ3: ロジスティック回帰
lr = LogisticRegression(
    featuresCol="features",
    labelCol="purchased",
    maxIter=10
)

# パイプライン作成
pipeline = Pipeline(stages=[gender_indexer, assembler, lr])

# 訓練データとテストデータに分割
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# 学習
model = pipeline.fit(train_data)

# 予測
predictions = model.transform(test_data)
predictions.select("gender", "age", "purchased", "prediction").show()

📊 4. モデル評価

4-1. 精度(Accuracy)の計算

# モデルの精度を評価

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="purchased",
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)
print(f"精度: {accuracy:.2%}")
精度: 85.30%

4-2. 混同行列(Confusion Matrix)

混同行列とは、予測がどれだけ正しいかを表にしたものです。

# 混同行列を作成

predictions.groupBy("purchased", "prediction").count().show()
+---------+----------+-----+
|purchased|prediction|count|
+---------+----------+-----+
|        0|       0.0|  850|  ← 正解:買わない、予測:買わない(正解)
|        0|       1.0|  150|  ← 正解:買わない、予測:買う(間違い)
|        1|       0.0|   80|  ← 正解:買う、予測:買わない(間違い)
|        1|       1.0|  420|  ← 正解:買う、予測:買う(正解)
+---------+----------+-----+
💡 混同行列の読み方

True Positive(真陽性):買うと予測して、実際に買った
True Negative(真陰性):買わないと予測して、実際に買わなかった
False Positive(偽陽性):買うと予測したが、実際は買わなかった
False Negative(偽陰性):買わないと予測したが、実際は買った

4-3. AUC(Area Under Curve)

# AUCを計算(二値分類の場合)

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="purchased",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(predictions)
print(f"AUC: {auc:.3f}")
AUC: 0.892

※ AUCは0.5〜1.0の値を取り、1.0に近いほど良いモデル
※ 0.5はランダム予測と同じ(意味のないモデル)

📝 練習問題

問題 1 基礎

StringIndexerの使用

次のデータの「color」カラムを数値に変換してください。

df = spark.createDataFrame([
    (1, "red"), (2, "blue"), (3, "red"), (4, "green")
], ["id", "color"])
【解答】
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="color", outputCol="color_index")
indexed = indexer.fit(df).transform(df)
indexed.show()
問題 2 応用

VectorAssemblerの使用

「age」「income」「score」の3つのカラムを「features」ベクトルにまとめてください。

【解答】
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["age", "income", "score"],
    outputCol="features"
)
feature_df = assembler.transform(df)
feature_df.show()
問題 3 実践

分類パイプラインの構築

以下の要素を含むパイプラインを作成してください。
①「category」のStringIndexer
②「age」「price」「category_index」のVectorAssembler
③ロジスティック回帰

【解答】
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression

# ステップ1: StringIndexer
indexer = StringIndexer(inputCol="category", outputCol="category_index")

# ステップ2: VectorAssembler
assembler = VectorAssembler(
    inputCols=["age", "price", "category_index"],
    outputCol="features"
)

# ステップ3: ロジスティック回帰
lr = LogisticRegression(featuresCol="features", labelCol="label")

# パイプライン
pipeline = Pipeline(stages=[indexer, assembler, lr])

# 学習
model = pipeline.fit(train_data)

❓ よくある質問

Q1: MLlibとscikit-learnどちらを使うべき?
データサイズで判断します。メモリに収まる(数GB程度)ならscikit-learn、それ以上ならMLlibを使います。
Q2: VectorAssemblerは必須?
はい、必須です。MLlibのモデルは特徴量を1つのベクトルとして受け取るため、VectorAssemblerで複数のカラムをまとめる必要があります。
Q3: パイプラインを使うメリットは?
・処理の再利用性が高まる
コードが整理される
・訓練とテストで同じ前処理が適用される
Q4: 精度が低い時はどうする?
特徴量を増やす(新しい情報を追加)
ハイパーパラメータを調整(maxIter、regParamなど)
別のアルゴリズムを試す(RandomForest、GBTなど)

📝 STEP 27 のまとめ

✅ このステップで学んだこと

MLlib:Sparkの機械学習ライブラリ
StringIndexer:文字列を数値に変換
VectorAssembler:複数カラムを1つのベクトルに
Pipeline:複数の処理をまとめる
LogisticRegression:分類モデル
評価指標:Accuracy、混同行列、AUC

💡 次のステップへ

このSTEPはMLlibの入門編です。
本格的な機械学習を学びたい方は、専門の機械学習コースを受講してください。

🎯 次のステップの予告

次のSTEP 28では、「構造化ストリーミング入門」を学びます。
リアルタイムデータ処理の基礎を習得しましょう!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 27

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE