STEP 23:AWS EMR入門

☁️ STEP 23: AWS EMR入門

AWS EMRでSparkクラスターを構築し、大規模データ処理を実行しよう

📋 このステップで学ぶこと

  • AWS EMRとは何か
  • EMRクラスターの作成方法
  • S3との連携
  • Sparkジョブの実行
  • クラスターの停止とコスト管理
💰 コストに関する重要な注意

AWS EMRは従量課金です。クラスターを起動している間、料金が発生します。
学習後は必ずクラスターを終了してください。
目安:m5.xlarge × 3台で約1時間 $0.50〜1.00程度

☁️ 1. AWS EMRとは?

1-1. EMR(Elastic MapReduce)の概要

AWS EMRは、AWSが提供するマネージド型のビッグデータ処理サービスです。
Apache Spark、Hadoop、Prestoなどの分散処理フレームワークを、簡単にクラスター上で実行できます。

📊 ローカルモード vs EMR

【ローカルモード(これまでの学習)】
・1台のPCで実行
・データサイズ:数GB程度が限界
・コスト:無料

【AWS EMR(本番環境)】
・複数台のサーバーで並列実行
・データサイズ:数TB〜PBも処理可能
・コスト:従量課金

1-2. EMRを使うメリット

メリット 説明
簡単セットアップ 数クリックでSparkクラスターを構築
スケーラビリティ 必要に応じてノード数を増減
S3連携 S3のデータを直接読み書き
コスト効率 使った分だけ課金、スポットインスタンス対応

1-3. EMRのアーキテクチャ

🏗️ EMRクラスターの構成

【EMRクラスター】

┌─────────────────────────────────────┐
│ マスターノード(1台)        │
│ ・Spark Driver           │
│ ・クラスター管理          │
│ ・Jupyter Notebook         │
└─────────────────────────────────────┘
       ↓
┌─────────────────────────────────────┐
│ コアノード(複数台)        │
│ ・Spark Executor          │
│ ・データ処理の実行         │
│ ・HDFS(オプション)        │
└─────────────────────────────────────┘
       ↓
┌─────────────────────────────────────┐
│ Amazon S3             │
│ ・入力データの保存         │
│ ・出力結果の保存          │
└─────────────────────────────────────┘

🔧 2. 事前準備

2-1. AWSアカウントの準備

EMRを使用するには、AWSアカウントが必要です。
まだアカウントがない場合は、AWS公式サイトから作成してください。

⚠️ 初期設定の注意点

ルートユーザーは使わない:IAMユーザーを作成して使用
MFA(多要素認証)を有効化:セキュリティ強化
請求アラートを設定:予期せぬ課金を防止

2-2. S3バケットの作成

EMRで処理するデータは、S3(Simple Storage Service)に保存します。
まず、データ保存用のS3バケットを作成しましょう。

  1. AWSコンソールにログイン
  2. 「S3」を検索してS3コンソールを開く
  3. 「バケットを作成」をクリック
  4. バケット名を入力(例:my-spark-data-bucket
  5. リージョンを選択(例:ap-northeast-1 東京)
  6. 「バケットを作成」をクリック

2-3. サンプルデータのアップロード

# ローカルでサンプルCSVを作成
# sales_data.csv

date,product,category,amount,region
2024-01-01,商品A,食品,1000,東京
2024-01-01,商品B,電化製品,50000,大阪
2024-01-02,商品C,衣料品,15000,東京
2024-01-02,商品D,食品,2500,名古屋
2024-01-03,商品E,電化製品,80000,大阪

作成したCSVファイルを、S3バケットにアップロードします。

  1. S3コンソールで作成したバケットを開く
  2. 「アップロード」をクリック
  3. CSVファイルをドラッグ&ドロップ
  4. 「アップロード」をクリック
アップロード先:
s3://my-spark-data-bucket/input/sales_data.csv

🚀 3. EMRクラスターの作成

3-1. EMRコンソールからクラスター作成

  1. AWSコンソールで「EMR」を検索してEMRコンソールを開く
  2. 「クラスターを作成」をクリック
  3. クラスター名を入力(例:my-spark-cluster
  4. EMRリリースを選択(最新版を推奨:emr-7.0.0など)
  5. アプリケーションで「Spark」を選択

3-2. インスタンス設定

ノードタイプ 推奨インスタンス 台数 役割
マスター m5.xlarge 1台 クラスター管理
コア m5.xlarge 2台 データ処理
💡 インスタンスタイプの選び方

学習用:m5.xlarge(4vCPU, 16GB)で十分
本番用:データサイズに応じてr5(メモリ重視)やc5(CPU重視)を選択
コスト重視:スポットインスタンスを活用(次のSTEPで詳しく説明)

3-3. その他の設定

# キーペア設定
EC2キーペア:既存のキーペアを選択、または新規作成
(SSHでマスターノードに接続する場合に必要)

# ログ設定
S3バケット:s3://my-spark-data-bucket/logs/

# セキュリティ設定
・EMRマネージドセキュリティグループを使用(デフォルト)
・必要に応じてSSHアクセス(ポート22)を許可

3-4. クラスター作成の完了

設定が完了したら「クラスターを作成」をクリックします。
クラスターの起動には約10〜15分かかります。

クラスターのステータス:
Starting → Bootstrapping → Running ← この状態になればOK!

※「Waiting」と表示される場合もあります(ジョブ待機中)

📦 4. S3との連携

4-1. S3からデータを読み込む

EMRでは、S3のデータを直接読み込むことができます。
S3のパスはs3://バケット名/パスの形式で指定します。

# S3からCSVを読み込む

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("EMR S3 Example") \
    .getOrCreate()

# S3からデータを読み込み
df = spark.read.csv(
    "s3://my-spark-data-bucket/input/sales_data.csv",
    header=True,
    inferSchema=True
)

df.show()
+----------+-------+--------+------+------+
|      date|product|category|amount|region|
+----------+-------+--------+------+------+
|2024-01-01|  商品A|    食品|  1000|  東京|
|2024-01-01|  商品B|電化製品| 50000|  大阪|
|2024-01-02|  商品C|  衣料品| 15000|  東京|
|2024-01-02|  商品D|    食品|  2500|名古屋|
|2024-01-03|  商品E|電化製品| 80000|  大阪|
+----------+-------+--------+------+------+

4-2. S3にデータを書き込む

# 集計結果をS3に保存

# カテゴリ別の売上集計
result = df.groupBy("category").sum("amount")

# Parquet形式でS3に保存(推奨)
result.write.parquet(
    "s3://my-spark-data-bucket/output/category_sales/",
    mode="overwrite"
)

print("S3への保存完了")
S3への保存完了

保存先:
s3://my-spark-data-bucket/output/category_sales/
├── part-00000-xxx.parquet
├── part-00001-xxx.parquet
└── _SUCCESS
✅ S3連携のポイント

Parquet形式を推奨:圧縮効率が高く、読み込みも高速
パーティション分割:大量データは日付などでパーティション
EMRとS3は同じリージョン:転送コスト削減

▶️ 5. Sparkジョブの実行方法

5-1. EMR Notebooks(推奨)

EMR Notebooksは、Jupyter Notebookベースの対話的な開発環境です。
ローカルのJupyter Notebookと同じ感覚でSparkコードを実行できます。

  1. EMRコンソールで「Notebooks」を選択
  2. 「ノートブックを作成」をクリック
  3. ノートブック名を入力
  4. クラスターを選択(作成したクラスター)
  5. 「ノートブックを作成」をクリック
  6. ステータスが「Ready」になったら「開く」をクリック

5-2. spark-submitでジョブ投入

バッチ処理の場合は、spark-submitでPythonスクリプトを実行します。

# Pythonスクリプト(sales_analysis.py)をS3にアップロード

# sales_analysis.py の内容
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("Sales Analysis").getOrCreate()

# S3から読み込み
df = spark.read.csv("s3://my-spark-data-bucket/input/sales_data.csv", 
                    header=True, inferSchema=True)

# 集計
result = df.groupBy("region").agg(
    F.sum("amount").alias("total_sales"),
    F.count("*").alias("order_count")
)

# S3に保存
result.write.parquet("s3://my-spark-data-bucket/output/region_sales/", 
                     mode="overwrite")

spark.stop()

5-3. EMRステップとしてジョブを追加

  1. EMRコンソールでクラスターを選択
  2. 「ステップ」タブを開く
  3. 「ステップを追加」をクリック
  4. ステップタイプ:「Spark アプリケーション」を選択
  5. Spark-submit オプション:--deploy-mode cluster
  6. アプリケーションの場所:s3://my-spark-data-bucket/scripts/sales_analysis.py
  7. 「ステップを追加」をクリック
ステップのステータス:
Pending → Running → Completed

ログの確認先:
s3://my-spark-data-bucket/logs/

💰 6. クラスターの停止とコスト管理

6-1. クラスターの終了(重要!)

⚠️ 必ずクラスターを終了してください!

クラスターを起動したままにすると、課金が続きます
学習が終わったら、必ずクラスターを終了(Terminate)してください。

  1. EMRコンソールでクラスターを選択
  2. 「終了」をクリック
  3. 確認ダイアログで「終了」をクリック
  4. ステータスが「Terminated」になったことを確認

6-2. コストの目安

構成 1時間あたりの目安 用途
m5.xlarge × 3台 約 $0.50〜1.00 学習・開発
m5.2xlarge × 5台 約 $2.00〜3.00 中規模処理
r5.4xlarge × 10台 約 $15.00〜20.00 大規模本番

6-3. コスト削減のポイント

💡 コスト削減テクニック

自動終了設定:アイドル状態が続いたら自動終了
スポットインスタンス:オンデマンドの50〜90%OFF(次のSTEPで詳しく)
適切なサイズ:必要最小限のインスタンス
請求アラート:予算超過を通知

📝 練習問題

問題 1 基礎

S3パスの指定

S3バケット「data-bucket」の「input」フォルダにある「sales.csv」を読み込むパスを書いてください。

【解答】
s3://data-bucket/input/sales.csv
問題 2 応用

S3からの読み込みコード

S3の「s3://my-bucket/data.parquet」からParquetファイルを読み込むコードを書いてください。

【解答】
df = spark.read.parquet("s3://my-bucket/data.parquet")
df.show()
問題 3 応用

S3への書き込みコード

DataFrameをS3の「s3://my-bucket/output/」にParquet形式で上書き保存するコードを書いてください。

【解答】
df.write.parquet(
    "s3://my-bucket/output/",
    mode="overwrite"
)
問題 4 実践

EMRでの売上分析

S3からCSVを読み込み、カテゴリ別の売上合計を集計し、結果をParquetで保存する一連のコードを書いてください。

【解答】
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("Sales Analysis").getOrCreate()

# S3から読み込み
df = spark.read.csv(
    "s3://my-bucket/input/sales.csv",
    header=True,
    inferSchema=True
)

# カテゴリ別集計
result = df.groupBy("category").agg(
    F.sum("amount").alias("total_sales")
)

# S3に保存
result.write.parquet(
    "s3://my-bucket/output/category_sales/",
    mode="overwrite"
)

spark.stop()

❓ よくある質問

Q1: EMRとEC2の違いは?
EC2は汎用的な仮想サーバーで、自分でSparkをインストール・設定する必要があります。EMRはSparkが最初から設定済みのマネージドサービスで、すぐに使い始められます。
Q2: クラスターを終了したらデータは消える?
S3に保存したデータは消えません。EMRクラスター内のHDFSに保存したデータは消えます。重要なデータは必ずS3に保存してください。
Q3: ローカルで開発したコードはそのまま動く?
基本的にはそのまま動きます。ただし、ファイルパスを「ローカルパス」から「S3パス」に変更する必要があります。
Q4: 無料枠はある?
EMR自体に無料枠はありませんが、EC2の無料枠(t2.micro/t3.microの750時間/月)は適用されます。ただし、t2.microではSparkの実行は現実的ではありません。

📝 STEP 23 のまとめ

✅ このステップで学んだこと

AWS EMR:マネージド型のSpark実行環境
クラスター構成:マスター + コアノード
S3連携s3://bucket/pathでデータ読み書き
コスト管理:使用後は必ずクラスターを終了

⚠️ 最重要ポイント

クラスターを終了し忘れると課金が続きます!
学習が終わったら必ず「終了(Terminate)」してください。

🎯 次のステップの予告

次のSTEP 24では、「GCP Dataproc入門」を学びます。
Google Cloud PlatformでのSpark実行方法を習得しましょう!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 23

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE