🤖 STEP 27: Spark MLlib入門
Sparkで機械学習 – 大規模データで予測モデルを作ろう
📋 このステップで学ぶこと
- MLlibとは何か – Sparkの機械学習ライブラリ
- 特徴量エンジニアリングの基礎
- 分類モデルの構築(ロジスティック回帰)
- モデル評価の方法
- パイプラインの構築
このステップはMLlibの入門編です。
本格的な機械学習(ハイパーパラメータチューニング、クロスバリデーション、様々なアルゴリズムなど)は、別の機械学習コースで学習してください。
🤖 1. MLlibとは?
1-1. MLlibの概要
MLlib(エムエルリブ)は、Sparkの機械学習ライブラリです。
大規模データで機械学習モデルを作ることができます。
・分類:メールがスパムかどうか判定
・回帰:家の価格を予測
・クラスタリング:顧客をグループ分け
・レコメンド:おすすめ商品を提案
1-2. 機械学習の基本的な流れ
1. データ準備
↓
2. 特徴量エンジニアリング(データを機械が理解できる形に変換)
↓
3. モデル学習(データからパターンを学習)
↓
4. モデル評価(精度を確認)
1-3. scikit-learn vs MLlib
| 項目 | scikit-learn | Spark MLlib |
|---|---|---|
| データサイズ | 小〜中規模(メモリに収まる) | 大規模(分散処理) |
| 実行環境 | 単一マシン | クラスター |
| 学習曲線 | 易しい | やや難しい |
| 使い分け | プロトタイプ、小規模 | 本番環境、大規模 |
🔧 2. 特徴量エンジニアリング
2-1. 特徴量とは?
特徴量とは、機械学習モデルに入力するデータのことです。
「年齢」「性別」「購入金額」などがこれにあたります。
機械学習モデルは数字しか扱えません。
「男性」「女性」のような文字列や、複数のカラムを、モデルが理解できる形式に変換する必要があります。
2-2. StringIndexer:文字列を数字に変換
# StringIndexer: 文字列を数字に変換 from pyspark.ml.feature import StringIndexer # サンプルデータ df = spark.createDataFrame([ (0, "male"), (1, "female"), (2, "male"), (3, "female") ], ["id", "gender"]) # StringIndexerを作成 indexer = StringIndexer(inputCol="gender", outputCol="gender_index") # 変換を実行 indexed = indexer.fit(df).transform(df) indexed.show()
+---+------+------------+ | id|gender|gender_index| +---+------+------------+ | 0| male| 1.0| | 1|female| 0.0| | 2| male| 1.0| | 3|female| 0.0| +---+------+------------+ ※ 出現頻度が高い順に0, 1, 2...と番号が振られる
2-3. OneHotEncoder:カテゴリを0/1ベクトルに変換
# OneHotEncoder: カテゴリを0/1のベクトルに変換 from pyspark.ml.feature import OneHotEncoder encoder = OneHotEncoder( inputCols=["gender_index"], outputCols=["gender_vec"] ) encoded = encoder.fit(indexed).transform(indexed) encoded.show(truncate=False)
+---+------+------------+-------------+ | id|gender|gender_index| gender_vec| +---+------+------------+-------------+ | 0| male| 1.0|(1,[0],[1.0])| | 1|female| 0.0| (1,[],[])| | 2| male| 1.0|(1,[0],[1.0])| | 3|female| 0.0| (1,[],[])| +---+------+------------+-------------+
2-4. VectorAssembler:特徴量ベクトルの作成
機械学習モデルは、1つのベクトルとして特徴量を受け取ります。
VectorAssemblerで複数のカラムを1つにまとめます。
# VectorAssembler: 複数の特徴量を1つにまとめる from pyspark.ml.feature import VectorAssembler df = spark.createDataFrame([ (1, 25, 50000), (2, 30, 60000), (3, 35, 70000) ], ["id", "age", "salary"]) # ageとsalaryを1つのベクトルに assembler = VectorAssembler( inputCols=["age", "salary"], outputCol="features" ) feature_df = assembler.transform(df) feature_df.show(truncate=False)
+---+---+------+---------------+ | id|age|salary| features| +---+---+------+---------------+ | 1| 25| 50000|[25.0,50000.0] | | 2| 30| 60000|[30.0,60000.0] | | 3| 35| 70000|[35.0,70000.0] | +---+---+------+---------------+
2-5. StandardScaler:正規化
特徴量のスケール(大きさ)を揃えることで、モデルの精度が上がります。
# StandardScaler: 平均0、標準偏差1に正規化 from pyspark.ml.feature import StandardScaler scaler = StandardScaler( inputCol="features", outputCol="scaled_features", withStd=True, withMean=True ) scaler_model = scaler.fit(feature_df) scaled_df = scaler_model.transform(feature_df) scaled_df.select("features", "scaled_features").show(truncate=False)
🎯 3. 分類モデルの構築
3-1. 分類問題とは?
分類問題とは、「Yes/No」や「カテゴリA/B/C」を予測する問題です。
・メールがスパムかどうか(スパム/正常)
・顧客が商品を購入するか(購入する/しない)
・病気の診断(陽性/陰性)
・手書き文字の認識(0〜9)
3-2. ロジスティック回帰モデル
# ロジスティック回帰で分類モデルを作成 from pyspark.ml.classification import LogisticRegression from pyspark.ml.linalg import Vectors # サンプルデータ(label: 0 or 1, features: 特徴量ベクトル) training = spark.createDataFrame([ (1.0, Vectors.dense([0.0, 1.1, 0.1])), (0.0, Vectors.dense([2.0, 1.0, -1.0])), (0.0, Vectors.dense([2.0, 1.3, 1.0])), (1.0, Vectors.dense([0.0, 1.2, -0.5])) ], ["label", "features"]) # モデル作成 lr = LogisticRegression(maxIter=10, regParam=0.01) # 学習 model = lr.fit(training) # テストデータで予測 test = spark.createDataFrame([ (Vectors.dense([1.0, 1.5, 0.2]),), (Vectors.dense([2.5, 1.2, 0.8]),) ], ["features"]) predictions = model.transform(test) predictions.select("features", "prediction", "probability").show(truncate=False)
+---------------+----------+----------------------------------------+ | features|prediction| probability| +---------------+----------+----------------------------------------+ |[1.0, 1.5, 0.2]| 1.0|[0.35..., 0.64...] ← 64%の確率で1| |[2.5, 1.2, 0.8]| 0.0|[0.72..., 0.27...] ← 72%の確率で0| +---------------+----------+----------------------------------------+
3-3. パイプラインで処理をまとめる
Pipelineを使うと、複数の処理を1つにまとめて管理できます。
# 完全なパイプライン:データ準備から予測まで from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, VectorAssembler from pyspark.ml.classification import LogisticRegression # サンプルデータ df = spark.createDataFrame([ ("male", 25, 50000, 1), ("female", 30, 60000, 0), ("male", 35, 70000, 1), ("female", 28, 55000, 0), ("male", 40, 80000, 1), ("female", 32, 65000, 0) ], ["gender", "age", "salary", "purchased"]) # ステップ1: 性別を数値に変換 gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_index") # ステップ2: 特徴量をベクトルにまとめる assembler = VectorAssembler( inputCols=["gender_index", "age", "salary"], outputCol="features" ) # ステップ3: ロジスティック回帰 lr = LogisticRegression( featuresCol="features", labelCol="purchased", maxIter=10 ) # パイプライン作成 pipeline = Pipeline(stages=[gender_indexer, assembler, lr]) # 訓練データとテストデータに分割 train_data, test_data = df.randomSplit([0.8, 0.2], seed=42) # 学習 model = pipeline.fit(train_data) # 予測 predictions = model.transform(test_data) predictions.select("gender", "age", "purchased", "prediction").show()
📊 4. モデル評価
4-1. 精度(Accuracy)の計算
# モデルの精度を評価 from pyspark.ml.evaluation import MulticlassClassificationEvaluator evaluator = MulticlassClassificationEvaluator( labelCol="purchased", predictionCol="prediction", metricName="accuracy" ) accuracy = evaluator.evaluate(predictions) print(f"精度: {accuracy:.2%}")
精度: 85.30%
4-2. 混同行列(Confusion Matrix)
混同行列とは、予測がどれだけ正しいかを表にしたものです。
# 混同行列を作成 predictions.groupBy("purchased", "prediction").count().show()
+---------+----------+-----+ |purchased|prediction|count| +---------+----------+-----+ | 0| 0.0| 850| ← 正解:買わない、予測:買わない(正解) | 0| 1.0| 150| ← 正解:買わない、予測:買う(間違い) | 1| 0.0| 80| ← 正解:買う、予測:買わない(間違い) | 1| 1.0| 420| ← 正解:買う、予測:買う(正解) +---------+----------+-----+
・True Positive(真陽性):買うと予測して、実際に買った
・True Negative(真陰性):買わないと予測して、実際に買わなかった
・False Positive(偽陽性):買うと予測したが、実際は買わなかった
・False Negative(偽陰性):買わないと予測したが、実際は買った
4-3. AUC(Area Under Curve)
# AUCを計算(二値分類の場合) from pyspark.ml.evaluation import BinaryClassificationEvaluator evaluator = BinaryClassificationEvaluator( labelCol="purchased", rawPredictionCol="rawPrediction", metricName="areaUnderROC" ) auc = evaluator.evaluate(predictions) print(f"AUC: {auc:.3f}")
AUC: 0.892 ※ AUCは0.5〜1.0の値を取り、1.0に近いほど良いモデル ※ 0.5はランダム予測と同じ(意味のないモデル)
📝 練習問題
StringIndexerの使用
次のデータの「color」カラムを数値に変換してください。
df = spark.createDataFrame([
(1, "red"), (2, "blue"), (3, "red"), (4, "green")
], ["id", "color"])from pyspark.ml.feature import StringIndexer indexer = StringIndexer(inputCol="color", outputCol="color_index") indexed = indexer.fit(df).transform(df) indexed.show()
VectorAssemblerの使用
「age」「income」「score」の3つのカラムを「features」ベクトルにまとめてください。
from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler( inputCols=["age", "income", "score"], outputCol="features" ) feature_df = assembler.transform(df) feature_df.show()
分類パイプラインの構築
以下の要素を含むパイプラインを作成してください。
①「category」のStringIndexer
②「age」「price」「category_index」のVectorAssembler
③ロジスティック回帰
from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, VectorAssembler from pyspark.ml.classification import LogisticRegression # ステップ1: StringIndexer indexer = StringIndexer(inputCol="category", outputCol="category_index") # ステップ2: VectorAssembler assembler = VectorAssembler( inputCols=["age", "price", "category_index"], outputCol="features" ) # ステップ3: ロジスティック回帰 lr = LogisticRegression(featuresCol="features", labelCol="label") # パイプライン pipeline = Pipeline(stages=[indexer, assembler, lr]) # 学習 model = pipeline.fit(train_data)
❓ よくある質問
・コードが整理される
・訓練とテストで同じ前処理が適用される
・ハイパーパラメータを調整(maxIter、regParamなど)
・別のアルゴリズムを試す(RandomForest、GBTなど)
📝 STEP 27 のまとめ
・MLlib:Sparkの機械学習ライブラリ
・StringIndexer:文字列を数値に変換
・VectorAssembler:複数カラムを1つのベクトルに
・Pipeline:複数の処理をまとめる
・LogisticRegression:分類モデル
・評価指標:Accuracy、混同行列、AUC
このSTEPはMLlibの入門編です。
本格的な機械学習を学びたい方は、専門の機械学習コースを受講してください。
次のSTEP 28では、「構造化ストリーミング入門」を学びます。
リアルタイムデータ処理の基礎を習得しましょう!
学習メモ
ビッグデータ処理(Apache Spark) - Step 27