🌐 STEP 24: GCP Dataproc入門
Google Cloud PlatformでSparkクラスターを構築し、GCSと連携しよう
📋 このステップで学ぶこと
- GCP Dataprocとは何か
- Dataprocクラスターの作成方法
- Google Cloud Storage(GCS)との連携
- Sparkジョブの投入方法
- クラスター自動削除の設定
GCP Dataprocも従量課金です。クラスターを起動している間、料金が発生します。
学習後は必ずクラスターを削除してください。
目安:n1-standard-4 × 3台で約1時間 $0.30〜0.60程度
🌐 1. GCP Dataprocとは?
1-1. Dataprocの概要
Google Cloud Dataprocは、Google Cloud Platformが提供するマネージド型のビッグデータ処理サービスです。
Apache Spark、Hadoop、Prestoなどを、高速にクラスター構築して実行できます。
・高速起動:クラスター作成が約90秒(EMRより高速)
・秒単位課金:1分単位ではなく、秒単位で課金
・自動スケーリング:負荷に応じてノード数を自動調整
・BigQueryとの連携:BigQueryのデータを直接処理
1-2. AWS EMR vs GCP Dataproc
| 項目 | AWS EMR | GCP Dataproc |
|---|---|---|
| 起動時間 | 10〜15分 | 約90秒 |
| 課金単位 | 分単位 | 秒単位 |
| ストレージ | Amazon S3 | Google Cloud Storage |
| DWH連携 | Redshift | BigQuery |
| 価格(目安) | やや高め | やや安め |
・既にAWSを使っている → EMR
・既にGCPを使っている → Dataproc
・BigQueryを使いたい → Dataproc
・コスト重視 → Dataproc(秒単位課金)
・どちらも初めて → 好みで選んでOK(機能は同等)
1-3. Dataprocのアーキテクチャ
【Dataprocクラスター】
┌─────────────────────────────────────┐
│ マスターノード(1台) │
│ ・Spark Driver │
│ ・YARN ResourceManager │
│ ・Jupyter Notebook(オプション) │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ ワーカーノード(複数台) │
│ ・Spark Executor │
│ ・データ処理の実行 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Google Cloud Storage (GCS) │
│ ・入力データの保存 │
│ ・出力結果の保存 │
└─────────────────────────────────────┘
🔧 2. 事前準備
2-1. GCPアカウントの準備
Dataprocを使用するには、Google Cloudアカウントが必要です。
新規アカウントには$300の無料クレジット(90日間有効)が付与されます。
- Google Cloudにアクセス
- 「無料で開始」をクリック
- Googleアカウントでログイン
- 請求先情報を登録(無料クレジット内は課金されません)
- 新規プロジェクトを作成
2-2. APIの有効化
Dataprocを使用するには、いくつかのAPIを有効化する必要があります。
- GCPコンソールで「APIとサービス」→「ライブラリ」を開く
- 「Dataproc API」を検索して有効化
- 「Compute Engine API」を検索して有効化
- 「Cloud Storage API」を検索して有効化
2-3. GCSバケットの作成
Dataprocで処理するデータは、Google Cloud Storage(GCS)に保存します。
- GCPコンソールで「Cloud Storage」を開く
- 「バケットを作成」をクリック
- バケット名を入力(例:
my-spark-bucket-12345)※グローバルで一意 - リージョンを選択(例:
asia-northeast1東京) - 「作成」をクリック
2-4. サンプルデータのアップロード
# サンプルCSVデータ(sales_data.csv)
date,product,category,amount,region
2024-01-01,商品A,食品,1000,東京
2024-01-01,商品B,電化製品,50000,大阪
2024-01-02,商品C,衣料品,15000,東京
2024-01-02,商品D,食品,2500,名古屋
2024-01-03,商品E,電化製品,80000,大阪作成したCSVファイルを、GCSバケットにアップロードします。
- Cloud Storageコンソールで作成したバケットを開く
- 「ファイルをアップロード」をクリック
- CSVファイルを選択してアップロード
アップロード先: gs://my-spark-bucket-12345/input/sales_data.csv
🚀 3. Dataprocクラスターの作成
3-1. コンソールからクラスター作成
- GCPコンソールで「Dataproc」を検索して開く
- 「クラスターを作成」をクリック
- 「Compute Engine上のクラスター」を選択
- クラスター名を入力(例:
my-spark-cluster) - リージョン・ゾーンを選択(例:
asia-northeast1-a)
3-2. ノード構成の設定
| ノードタイプ | 推奨マシンタイプ | 台数 | 役割 |
|---|---|---|---|
| マスター | n1-standard-4 | 1台 | クラスター管理 |
| ワーカー | n1-standard-4 | 2台 | データ処理 |
・n1-standard-4:4vCPU, 15GB(学習用に最適)
・n1-standard-8:8vCPU, 30GB(中規模処理)
・n1-highmem-8:8vCPU, 52GB(メモリ重視の処理)
3-3. オプション設定
# 推奨オプション設定 # Jupyterコンポーネントを追加(対話的な開発用) オプショナルコンポーネント: ☑ Jupyter Notebook ☑ Component Gateway(Webアクセス用) # 自動削除設定(コスト管理) 「クラスター スケジュールされた削除」を設定: アイドル状態が 30分 続いたら削除
3-4. gcloudコマンドでクラスター作成(CLI)
コマンドライン(Cloud Shell または ローカル)でも作成できます。
# gcloudコマンドでクラスター作成
gcloud dataproc clusters create my-spark-cluster \
--region=asia-northeast1 \
--zone=asia-northeast1-a \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--optional-components=JUPYTER \
--enable-component-gateway \
--max-idle=30mWaiting on operation [projects/my-project/regions/asia-northeast1/operations/xxx] Cluster created successfully. クラスターのステータス: CREATING → RUNNING ← この状態になればOK! 起動時間:約90秒
📦 4. GCSとの連携
4-1. GCSからデータを読み込む
Dataprocでは、GCSのデータを直接読み込むことができます。
GCSのパスはgs://バケット名/パスの形式で指定します。
# GCSからCSVを読み込む from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Dataproc GCS Example") \ .getOrCreate() # GCSからデータを読み込み df = spark.read.csv( "gs://my-spark-bucket-12345/input/sales_data.csv", header=True, inferSchema=True ) df.show()
+----------+-------+--------+------+------+ | date|product|category|amount|region| +----------+-------+--------+------+------+ |2024-01-01| 商品A| 食品| 1000| 東京| |2024-01-01| 商品B|電化製品| 50000| 大阪| |2024-01-02| 商品C| 衣料品| 15000| 東京| |2024-01-02| 商品D| 食品| 2500|名古屋| |2024-01-03| 商品E|電化製品| 80000| 大阪| +----------+-------+--------+------+------+
4-2. GCSにデータを書き込む
# 集計結果をGCSに保存 from pyspark.sql import functions as F # カテゴリ別の売上集計 result = df.groupBy("category").agg( F.sum("amount").alias("total_sales") ) # Parquet形式でGCSに保存 result.write.parquet( "gs://my-spark-bucket-12345/output/category_sales/", mode="overwrite" ) print("GCSへの保存完了")
GCSへの保存完了 保存先: gs://my-spark-bucket-12345/output/category_sales/ ├── part-00000-xxx.parquet ├── part-00001-xxx.parquet └── _SUCCESS
AWS S3:s3://bucket-name/path/file.csv
GCP GCS:gs://bucket-name/path/file.csv
※プロトコル部分(s3:// vs gs://)のみ異なる
▶️ 5. Sparkジョブの投入方法
5-1. Jupyter Notebookを使う(対話的)
クラスター作成時にJupyterコンポーネントを有効にした場合、Webブラウザからアクセスできます。
- Dataprocコンソールでクラスターを選択
- 「Webインターフェース」タブを開く
- 「Jupyter」のリンクをクリック
- 新しいノートブックを作成(PySpark)
5-2. gcloudコマンドでジョブ投入
バッチ処理の場合は、gcloud dataproc jobs submitでPythonスクリプトを実行します。
# Pythonスクリプト(sales_analysis.py)をGCSにアップロード # sales_analysis.py の内容 from pyspark.sql import SparkSession from pyspark.sql import functions as F spark = SparkSession.builder.appName("Sales Analysis").getOrCreate() # GCSから読み込み df = spark.read.csv("gs://my-spark-bucket-12345/input/sales_data.csv", header=True, inferSchema=True) # 地域別集計 result = df.groupBy("region").agg( F.sum("amount").alias("total_sales"), F.count("*").alias("order_count") ) # GCSに保存 result.write.parquet("gs://my-spark-bucket-12345/output/region_sales/", mode="overwrite") spark.stop()
5-3. ジョブの投入コマンド
# gcloudコマンドでジョブ投入
gcloud dataproc jobs submit pyspark \
gs://my-spark-bucket-12345/scripts/sales_analysis.py \
--cluster=my-spark-cluster \
--region=asia-northeast1Job [xxx] submitted. Waiting for job output... ... Job [xxx] finished successfully. ジョブのステータス: PENDING → RUNNING → DONE
5-4. コンソールからジョブ投入
- Dataprocコンソールで「ジョブ」を選択
- 「ジョブを送信」をクリック
- クラスターを選択
- ジョブタイプ:「PySpark」を選択
- メインPythonファイル:
gs://bucket/scripts/sales_analysis.py - 「送信」をクリック
💰 6. クラスター自動削除とコスト管理
6-1. 自動削除の設定(重要!)
Dataprocには、自動削除機能があります。
アイドル状態が続いたら自動でクラスターを削除し、課金を止めることができます。
# クラスター作成時に自動削除を設定 gcloud dataproc clusters create my-spark-cluster \ --region=asia-northeast1 \ --max-idle=30m # アイドル30分で削除 # または、特定時刻に削除 gcloud dataproc clusters create my-spark-cluster \ --region=asia-northeast1 \ --expiration-time=2024-01-15T18:00:00Z # この時刻に削除
6-2. 手動でクラスターを削除
クラスターを起動したままにすると、課金が続きます。
学習が終わったら、必ずクラスターを削除してください。
# gcloudコマンドでクラスター削除 gcloud dataproc clusters delete my-spark-cluster \ --region=asia-northeast1 # 確認メッセージが出るので「Y」を入力
または、GCPコンソールから削除:
- Dataprocコンソールでクラスターを選択
- 「削除」をクリック
- 確認ダイアログで「削除」をクリック
6-3. コストの目安
| 構成 | 1時間あたりの目安 | 用途 |
|---|---|---|
| n1-standard-4 × 3台 | 約 $0.30〜0.60 | 学習・開発 |
| n1-standard-8 × 5台 | 約 $1.50〜2.00 | 中規模処理 |
| n1-highmem-16 × 10台 | 約 $10.00〜15.00 | 大規模本番 |
新規アカウントには$300の無料クレジット(90日間)があります。
学習用途であれば、無料クレジット内で十分に試せます。
📝 練習問題
GCSパスの指定
GCSバケット「data-bucket」の「input」フォルダにある「sales.csv」を読み込むパスを書いてください。
gs://data-bucket/input/sales.csv
GCSからの読み込みコード
GCSの「gs://my-bucket/data.parquet」からParquetファイルを読み込むコードを書いてください。
df = spark.read.parquet("gs://my-bucket/data.parquet")
df.show()gcloudコマンドでクラスター作成
以下の条件でクラスターを作成するgcloudコマンドを書いてください。
・クラスター名:test-cluster
・リージョン:asia-northeast1
・ワーカー数:3台
・アイドル30分で自動削除
gcloud dataproc clusters create test-cluster \
--region=asia-northeast1 \
--num-workers=3 \
--max-idle=30mDataprocでの売上分析
GCSからCSVを読み込み、地域別の売上合計を集計し、結果をParquetで保存する一連のコードを書いてください。
from pyspark.sql import SparkSession from pyspark.sql import functions as F spark = SparkSession.builder.appName("Sales Analysis").getOrCreate() # GCSから読み込み df = spark.read.csv( "gs://my-bucket/input/sales.csv", header=True, inferSchema=True ) # 地域別集計 result = df.groupBy("region").agg( F.sum("amount").alias("total_sales") ) # GCSに保存 result.write.parquet( "gs://my-bucket/output/region_sales/", mode="overwrite" ) spark.stop()
❓ よくある質問
spark.read.format("bigquery")を使用します。詳細はGoogle Cloud公式ドキュメントを参照してください。
📝 STEP 24 のまとめ
・GCP Dataproc:高速起動(90秒)、秒単位課金
・GCS連携:gs://bucket/pathでデータ読み書き
・自動削除:--max-idleでアイドル時に自動削除
・ジョブ投入:gcloudコマンドまたはコンソールから
クラスターを削除し忘れると課金が続きます!
学習が終わったら必ず「削除」するか、–max-idleで自動削除を設定してください。
次のSTEP 25では、「クラスター管理とコスト最適化」を学びます。
スポットインスタンスや自動スケーリングで、コストを大幅に削減しましょう!
学習メモ
ビッグデータ処理(Apache Spark) - Step 24