☁️ STEP 23: AWS EMR入門
AWS EMRでSparkクラスターを構築し、大規模データ処理を実行しよう
📋 このステップで学ぶこと
- AWS EMRとは何か
- EMRクラスターの作成方法
- S3との連携
- Sparkジョブの実行
- クラスターの停止とコスト管理
AWS EMRは従量課金です。クラスターを起動している間、料金が発生します。
学習後は必ずクラスターを終了してください。
目安:m5.xlarge × 3台で約1時間 $0.50〜1.00程度
☁️ 1. AWS EMRとは?
1-1. EMR(Elastic MapReduce)の概要
AWS EMRは、AWSが提供するマネージド型のビッグデータ処理サービスです。
Apache Spark、Hadoop、Prestoなどの分散処理フレームワークを、簡単にクラスター上で実行できます。
【ローカルモード(これまでの学習)】
・1台のPCで実行
・データサイズ:数GB程度が限界
・コスト:無料
【AWS EMR(本番環境)】
・複数台のサーバーで並列実行
・データサイズ:数TB〜PBも処理可能
・コスト:従量課金
1-2. EMRを使うメリット
| メリット | 説明 |
|---|---|
| 簡単セットアップ | 数クリックでSparkクラスターを構築 |
| スケーラビリティ | 必要に応じてノード数を増減 |
| S3連携 | S3のデータを直接読み書き |
| コスト効率 | 使った分だけ課金、スポットインスタンス対応 |
1-3. EMRのアーキテクチャ
【EMRクラスター】
┌─────────────────────────────────────┐
│ マスターノード(1台) │
│ ・Spark Driver │
│ ・クラスター管理 │
│ ・Jupyter Notebook │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ コアノード(複数台) │
│ ・Spark Executor │
│ ・データ処理の実行 │
│ ・HDFS(オプション) │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Amazon S3 │
│ ・入力データの保存 │
│ ・出力結果の保存 │
└─────────────────────────────────────┘
🔧 2. 事前準備
2-1. AWSアカウントの準備
EMRを使用するには、AWSアカウントが必要です。
まだアカウントがない場合は、AWS公式サイトから作成してください。
・ルートユーザーは使わない:IAMユーザーを作成して使用
・MFA(多要素認証)を有効化:セキュリティ強化
・請求アラートを設定:予期せぬ課金を防止
2-2. S3バケットの作成
EMRで処理するデータは、S3(Simple Storage Service)に保存します。
まず、データ保存用のS3バケットを作成しましょう。
- AWSコンソールにログイン
- 「S3」を検索してS3コンソールを開く
- 「バケットを作成」をクリック
- バケット名を入力(例:
my-spark-data-bucket) - リージョンを選択(例:
ap-northeast-1東京) - 「バケットを作成」をクリック
2-3. サンプルデータのアップロード
# ローカルでサンプルCSVを作成 # sales_data.csv date,product,category,amount,region 2024-01-01,商品A,食品,1000,東京 2024-01-01,商品B,電化製品,50000,大阪 2024-01-02,商品C,衣料品,15000,東京 2024-01-02,商品D,食品,2500,名古屋 2024-01-03,商品E,電化製品,80000,大阪
作成したCSVファイルを、S3バケットにアップロードします。
- S3コンソールで作成したバケットを開く
- 「アップロード」をクリック
- CSVファイルをドラッグ&ドロップ
- 「アップロード」をクリック
アップロード先: s3://my-spark-data-bucket/input/sales_data.csv
🚀 3. EMRクラスターの作成
3-1. EMRコンソールからクラスター作成
- AWSコンソールで「EMR」を検索してEMRコンソールを開く
- 「クラスターを作成」をクリック
- クラスター名を入力(例:
my-spark-cluster) - EMRリリースを選択(最新版を推奨:
emr-7.0.0など) - アプリケーションで「Spark」を選択
3-2. インスタンス設定
| ノードタイプ | 推奨インスタンス | 台数 | 役割 |
|---|---|---|---|
| マスター | m5.xlarge | 1台 | クラスター管理 |
| コア | m5.xlarge | 2台 | データ処理 |
・学習用:m5.xlarge(4vCPU, 16GB)で十分
・本番用:データサイズに応じてr5(メモリ重視)やc5(CPU重視)を選択
・コスト重視:スポットインスタンスを活用(次のSTEPで詳しく説明)
3-3. その他の設定
# キーペア設定 EC2キーペア:既存のキーペアを選択、または新規作成 (SSHでマスターノードに接続する場合に必要) # ログ設定 S3バケット:s3://my-spark-data-bucket/logs/ # セキュリティ設定 ・EMRマネージドセキュリティグループを使用(デフォルト) ・必要に応じてSSHアクセス(ポート22)を許可
3-4. クラスター作成の完了
設定が完了したら「クラスターを作成」をクリックします。
クラスターの起動には約10〜15分かかります。
クラスターのステータス: Starting → Bootstrapping → Running ← この状態になればOK! ※「Waiting」と表示される場合もあります(ジョブ待機中)
📦 4. S3との連携
4-1. S3からデータを読み込む
EMRでは、S3のデータを直接読み込むことができます。
S3のパスはs3://バケット名/パスの形式で指定します。
# S3からCSVを読み込む from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("EMR S3 Example") \ .getOrCreate() # S3からデータを読み込み df = spark.read.csv( "s3://my-spark-data-bucket/input/sales_data.csv", header=True, inferSchema=True ) df.show()
+----------+-------+--------+------+------+ | date|product|category|amount|region| +----------+-------+--------+------+------+ |2024-01-01| 商品A| 食品| 1000| 東京| |2024-01-01| 商品B|電化製品| 50000| 大阪| |2024-01-02| 商品C| 衣料品| 15000| 東京| |2024-01-02| 商品D| 食品| 2500|名古屋| |2024-01-03| 商品E|電化製品| 80000| 大阪| +----------+-------+--------+------+------+
4-2. S3にデータを書き込む
# 集計結果をS3に保存 # カテゴリ別の売上集計 result = df.groupBy("category").sum("amount") # Parquet形式でS3に保存(推奨) result.write.parquet( "s3://my-spark-data-bucket/output/category_sales/", mode="overwrite" ) print("S3への保存完了")
S3への保存完了 保存先: s3://my-spark-data-bucket/output/category_sales/ ├── part-00000-xxx.parquet ├── part-00001-xxx.parquet └── _SUCCESS
・Parquet形式を推奨:圧縮効率が高く、読み込みも高速
・パーティション分割:大量データは日付などでパーティション
・EMRとS3は同じリージョン:転送コスト削減
▶️ 5. Sparkジョブの実行方法
5-1. EMR Notebooks(推奨)
EMR Notebooksは、Jupyter Notebookベースの対話的な開発環境です。
ローカルのJupyter Notebookと同じ感覚でSparkコードを実行できます。
- EMRコンソールで「Notebooks」を選択
- 「ノートブックを作成」をクリック
- ノートブック名を入力
- クラスターを選択(作成したクラスター)
- 「ノートブックを作成」をクリック
- ステータスが「Ready」になったら「開く」をクリック
5-2. spark-submitでジョブ投入
バッチ処理の場合は、spark-submitでPythonスクリプトを実行します。
# Pythonスクリプト(sales_analysis.py)をS3にアップロード # sales_analysis.py の内容 from pyspark.sql import SparkSession from pyspark.sql import functions as F spark = SparkSession.builder.appName("Sales Analysis").getOrCreate() # S3から読み込み df = spark.read.csv("s3://my-spark-data-bucket/input/sales_data.csv", header=True, inferSchema=True) # 集計 result = df.groupBy("region").agg( F.sum("amount").alias("total_sales"), F.count("*").alias("order_count") ) # S3に保存 result.write.parquet("s3://my-spark-data-bucket/output/region_sales/", mode="overwrite") spark.stop()
5-3. EMRステップとしてジョブを追加
- EMRコンソールでクラスターを選択
- 「ステップ」タブを開く
- 「ステップを追加」をクリック
- ステップタイプ:「Spark アプリケーション」を選択
- Spark-submit オプション:
--deploy-mode cluster - アプリケーションの場所:
s3://my-spark-data-bucket/scripts/sales_analysis.py - 「ステップを追加」をクリック
ステップのステータス: Pending → Running → Completed ログの確認先: s3://my-spark-data-bucket/logs/
💰 6. クラスターの停止とコスト管理
6-1. クラスターの終了(重要!)
クラスターを起動したままにすると、課金が続きます。
学習が終わったら、必ずクラスターを終了(Terminate)してください。
- EMRコンソールでクラスターを選択
- 「終了」をクリック
- 確認ダイアログで「終了」をクリック
- ステータスが「Terminated」になったことを確認
6-2. コストの目安
| 構成 | 1時間あたりの目安 | 用途 |
|---|---|---|
| m5.xlarge × 3台 | 約 $0.50〜1.00 | 学習・開発 |
| m5.2xlarge × 5台 | 約 $2.00〜3.00 | 中規模処理 |
| r5.4xlarge × 10台 | 約 $15.00〜20.00 | 大規模本番 |
6-3. コスト削減のポイント
・自動終了設定:アイドル状態が続いたら自動終了
・スポットインスタンス:オンデマンドの50〜90%OFF(次のSTEPで詳しく)
・適切なサイズ:必要最小限のインスタンス
・請求アラート:予算超過を通知
📝 練習問題
S3パスの指定
S3バケット「data-bucket」の「input」フォルダにある「sales.csv」を読み込むパスを書いてください。
s3://data-bucket/input/sales.csv
S3からの読み込みコード
S3の「s3://my-bucket/data.parquet」からParquetファイルを読み込むコードを書いてください。
df = spark.read.parquet("s3://my-bucket/data.parquet")
df.show()S3への書き込みコード
DataFrameをS3の「s3://my-bucket/output/」にParquet形式で上書き保存するコードを書いてください。
df.write.parquet(
"s3://my-bucket/output/",
mode="overwrite"
)EMRでの売上分析
S3からCSVを読み込み、カテゴリ別の売上合計を集計し、結果をParquetで保存する一連のコードを書いてください。
from pyspark.sql import SparkSession from pyspark.sql import functions as F spark = SparkSession.builder.appName("Sales Analysis").getOrCreate() # S3から読み込み df = spark.read.csv( "s3://my-bucket/input/sales.csv", header=True, inferSchema=True ) # カテゴリ別集計 result = df.groupBy("category").agg( F.sum("amount").alias("total_sales") ) # S3に保存 result.write.parquet( "s3://my-bucket/output/category_sales/", mode="overwrite" ) spark.stop()
❓ よくある質問
📝 STEP 23 のまとめ
・AWS EMR:マネージド型のSpark実行環境
・クラスター構成:マスター + コアノード
・S3連携:s3://bucket/pathでデータ読み書き
・コスト管理:使用後は必ずクラスターを終了
クラスターを終了し忘れると課金が続きます!
学習が終わったら必ず「終了(Terminate)」してください。
次のSTEP 24では、「GCP Dataproc入門」を学びます。
Google Cloud PlatformでのSpark実行方法を習得しましょう!
学習メモ
ビッグデータ処理(Apache Spark) - Step 23