STEP 24:GCP Dataproc入門

🌐 STEP 24: GCP Dataproc入門

Google Cloud PlatformでSparkクラスターを構築し、GCSと連携しよう

📋 このステップで学ぶこと

  • GCP Dataprocとは何か
  • Dataprocクラスターの作成方法
  • Google Cloud Storage(GCS)との連携
  • Sparkジョブの投入方法
  • クラスター自動削除の設定
💰 コストに関する重要な注意

GCP Dataprocも従量課金です。クラスターを起動している間、料金が発生します。
学習後は必ずクラスターを削除してください。
目安:n1-standard-4 × 3台で約1時間 $0.30〜0.60程度

🌐 1. GCP Dataprocとは?

1-1. Dataprocの概要

Google Cloud Dataprocは、Google Cloud Platformが提供するマネージド型のビッグデータ処理サービスです。
Apache Spark、Hadoop、Prestoなどを、高速にクラスター構築して実行できます。

⚡ Dataprocの特徴

高速起動:クラスター作成が約90秒(EMRより高速)
秒単位課金:1分単位ではなく、秒単位で課金
自動スケーリング:負荷に応じてノード数を自動調整
BigQueryとの連携:BigQueryのデータを直接処理

1-2. AWS EMR vs GCP Dataproc

項目 AWS EMR GCP Dataproc
起動時間 10〜15分 約90秒
課金単位 分単位 秒単位
ストレージ Amazon S3 Google Cloud Storage
DWH連携 Redshift BigQuery
価格(目安) やや高め やや安め
💡 どちらを選ぶべきか?

既にAWSを使っている → EMR
既にGCPを使っている → Dataproc
BigQueryを使いたい → Dataproc
コスト重視 → Dataproc(秒単位課金)
どちらも初めて → 好みで選んでOK(機能は同等)

1-3. Dataprocのアーキテクチャ

🏗️ Dataprocクラスターの構成

【Dataprocクラスター】

┌─────────────────────────────────────┐
│ マスターノード(1台)        │
│ ・Spark Driver           │
│ ・YARN ResourceManager       │
│ ・Jupyter Notebook(オプション)  │
└─────────────────────────────────────┘
       ↓
┌─────────────────────────────────────┐
│ ワーカーノード(複数台)      │
│ ・Spark Executor          │
│ ・データ処理の実行         │
└─────────────────────────────────────┘
       ↓
┌─────────────────────────────────────┐
│ Google Cloud Storage (GCS)   │
│ ・入力データの保存         │
│ ・出力結果の保存          │
└─────────────────────────────────────┘

🔧 2. 事前準備

2-1. GCPアカウントの準備

Dataprocを使用するには、Google Cloudアカウントが必要です。
新規アカウントには$300の無料クレジット(90日間有効)が付与されます。

  1. Google Cloudにアクセス
  2. 「無料で開始」をクリック
  3. Googleアカウントでログイン
  4. 請求先情報を登録(無料クレジット内は課金されません)
  5. 新規プロジェクトを作成

2-2. APIの有効化

Dataprocを使用するには、いくつかのAPIを有効化する必要があります。

  1. GCPコンソールで「APIとサービス」→「ライブラリ」を開く
  2. 「Dataproc API」を検索して有効化
  3. 「Compute Engine API」を検索して有効化
  4. 「Cloud Storage API」を検索して有効化

2-3. GCSバケットの作成

Dataprocで処理するデータは、Google Cloud Storage(GCS)に保存します。

  1. GCPコンソールで「Cloud Storage」を開く
  2. 「バケットを作成」をクリック
  3. バケット名を入力(例:my-spark-bucket-12345)※グローバルで一意
  4. リージョンを選択(例:asia-northeast1 東京)
  5. 「作成」をクリック

2-4. サンプルデータのアップロード

# サンプルCSVデータ(sales_data.csv)

date,product,category,amount,region
2024-01-01,商品A,食品,1000,東京
2024-01-01,商品B,電化製品,50000,大阪
2024-01-02,商品C,衣料品,15000,東京
2024-01-02,商品D,食品,2500,名古屋
2024-01-03,商品E,電化製品,80000,大阪

作成したCSVファイルを、GCSバケットにアップロードします。

  1. Cloud Storageコンソールで作成したバケットを開く
  2. 「ファイルをアップロード」をクリック
  3. CSVファイルを選択してアップロード
アップロード先:
gs://my-spark-bucket-12345/input/sales_data.csv

🚀 3. Dataprocクラスターの作成

3-1. コンソールからクラスター作成

  1. GCPコンソールで「Dataproc」を検索して開く
  2. 「クラスターを作成」をクリック
  3. 「Compute Engine上のクラスター」を選択
  4. クラスター名を入力(例:my-spark-cluster
  5. リージョン・ゾーンを選択(例:asia-northeast1-a

3-2. ノード構成の設定

ノードタイプ 推奨マシンタイプ 台数 役割
マスター n1-standard-4 1台 クラスター管理
ワーカー n1-standard-4 2台 データ処理
💡 マシンタイプの選び方

n1-standard-4:4vCPU, 15GB(学習用に最適)
n1-standard-8:8vCPU, 30GB(中規模処理)
n1-highmem-8:8vCPU, 52GB(メモリ重視の処理)

3-3. オプション設定

# 推奨オプション設定

# Jupyterコンポーネントを追加(対話的な開発用)
オプショナルコンポーネント:
  ☑ Jupyter Notebook
  ☑ Component Gateway(Webアクセス用)

# 自動削除設定(コスト管理)
「クラスター スケジュールされた削除」を設定:
  アイドル状態が 30分 続いたら削除

3-4. gcloudコマンドでクラスター作成(CLI)

コマンドライン(Cloud Shell または ローカル)でも作成できます。

# gcloudコマンドでクラスター作成

gcloud dataproc clusters create my-spark-cluster \
    --region=asia-northeast1 \
    --zone=asia-northeast1-a \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --optional-components=JUPYTER \
    --enable-component-gateway \
    --max-idle=30m
Waiting on operation [projects/my-project/regions/asia-northeast1/operations/xxx]
Cluster created successfully.

クラスターのステータス:
CREATING → RUNNING ← この状態になればOK!

起動時間:約90秒

📦 4. GCSとの連携

4-1. GCSからデータを読み込む

Dataprocでは、GCSのデータを直接読み込むことができます。
GCSのパスはgs://バケット名/パスの形式で指定します。

# GCSからCSVを読み込む

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Dataproc GCS Example") \
    .getOrCreate()

# GCSからデータを読み込み
df = spark.read.csv(
    "gs://my-spark-bucket-12345/input/sales_data.csv",
    header=True,
    inferSchema=True
)

df.show()
+----------+-------+--------+------+------+
|      date|product|category|amount|region|
+----------+-------+--------+------+------+
|2024-01-01|  商品A|    食品|  1000|  東京|
|2024-01-01|  商品B|電化製品| 50000|  大阪|
|2024-01-02|  商品C|  衣料品| 15000|  東京|
|2024-01-02|  商品D|    食品|  2500|名古屋|
|2024-01-03|  商品E|電化製品| 80000|  大阪|
+----------+-------+--------+------+------+

4-2. GCSにデータを書き込む

# 集計結果をGCSに保存

from pyspark.sql import functions as F

# カテゴリ別の売上集計
result = df.groupBy("category").agg(
    F.sum("amount").alias("total_sales")
)

# Parquet形式でGCSに保存
result.write.parquet(
    "gs://my-spark-bucket-12345/output/category_sales/",
    mode="overwrite"
)

print("GCSへの保存完了")
GCSへの保存完了

保存先:
gs://my-spark-bucket-12345/output/category_sales/
├── part-00000-xxx.parquet
├── part-00001-xxx.parquet
└── _SUCCESS
📝 S3 vs GCS パス形式の比較

AWS S3:s3://bucket-name/path/file.csv
GCP GCS:gs://bucket-name/path/file.csv

※プロトコル部分(s3:// vs gs://)のみ異なる

▶️ 5. Sparkジョブの投入方法

5-1. Jupyter Notebookを使う(対話的)

クラスター作成時にJupyterコンポーネントを有効にした場合、Webブラウザからアクセスできます。

  1. Dataprocコンソールでクラスターを選択
  2. 「Webインターフェース」タブを開く
  3. 「Jupyter」のリンクをクリック
  4. 新しいノートブックを作成(PySpark)

5-2. gcloudコマンドでジョブ投入

バッチ処理の場合は、gcloud dataproc jobs submitでPythonスクリプトを実行します。

# Pythonスクリプト(sales_analysis.py)をGCSにアップロード

# sales_analysis.py の内容
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("Sales Analysis").getOrCreate()

# GCSから読み込み
df = spark.read.csv("gs://my-spark-bucket-12345/input/sales_data.csv", 
                    header=True, inferSchema=True)

# 地域別集計
result = df.groupBy("region").agg(
    F.sum("amount").alias("total_sales"),
    F.count("*").alias("order_count")
)

# GCSに保存
result.write.parquet("gs://my-spark-bucket-12345/output/region_sales/", 
                     mode="overwrite")

spark.stop()

5-3. ジョブの投入コマンド

# gcloudコマンドでジョブ投入

gcloud dataproc jobs submit pyspark \
    gs://my-spark-bucket-12345/scripts/sales_analysis.py \
    --cluster=my-spark-cluster \
    --region=asia-northeast1
Job [xxx] submitted.
Waiting for job output...
...
Job [xxx] finished successfully.

ジョブのステータス:
PENDING → RUNNING → DONE

5-4. コンソールからジョブ投入

  1. Dataprocコンソールで「ジョブ」を選択
  2. 「ジョブを送信」をクリック
  3. クラスターを選択
  4. ジョブタイプ:「PySpark」を選択
  5. メインPythonファイル:gs://bucket/scripts/sales_analysis.py
  6. 「送信」をクリック

💰 6. クラスター自動削除とコスト管理

6-1. 自動削除の設定(重要!)

Dataprocには、自動削除機能があります。
アイドル状態が続いたら自動でクラスターを削除し、課金を止めることができます。

# クラスター作成時に自動削除を設定

gcloud dataproc clusters create my-spark-cluster \
    --region=asia-northeast1 \
    --max-idle=30m        # アイドル30分で削除

# または、特定時刻に削除
gcloud dataproc clusters create my-spark-cluster \
    --region=asia-northeast1 \
    --expiration-time=2024-01-15T18:00:00Z  # この時刻に削除

6-2. 手動でクラスターを削除

⚠️ 必ずクラスターを削除してください!

クラスターを起動したままにすると、課金が続きます
学習が終わったら、必ずクラスターを削除してください。

# gcloudコマンドでクラスター削除

gcloud dataproc clusters delete my-spark-cluster \
    --region=asia-northeast1

# 確認メッセージが出るので「Y」を入力

または、GCPコンソールから削除:

  1. Dataprocコンソールでクラスターを選択
  2. 「削除」をクリック
  3. 確認ダイアログで「削除」をクリック

6-3. コストの目安

構成 1時間あたりの目安 用途
n1-standard-4 × 3台 約 $0.30〜0.60 学習・開発
n1-standard-8 × 5台 約 $1.50〜2.00 中規模処理
n1-highmem-16 × 10台 約 $10.00〜15.00 大規模本番
💡 GCP無料クレジット

新規アカウントには$300の無料クレジット(90日間)があります。
学習用途であれば、無料クレジット内で十分に試せます。

📝 練習問題

問題 1 基礎

GCSパスの指定

GCSバケット「data-bucket」の「input」フォルダにある「sales.csv」を読み込むパスを書いてください。

【解答】
gs://data-bucket/input/sales.csv
問題 2 応用

GCSからの読み込みコード

GCSの「gs://my-bucket/data.parquet」からParquetファイルを読み込むコードを書いてください。

【解答】
df = spark.read.parquet("gs://my-bucket/data.parquet")
df.show()
問題 3 応用

gcloudコマンドでクラスター作成

以下の条件でクラスターを作成するgcloudコマンドを書いてください。
・クラスター名:test-cluster
・リージョン:asia-northeast1
・ワーカー数:3台
・アイドル30分で自動削除

【解答】
gcloud dataproc clusters create test-cluster \
    --region=asia-northeast1 \
    --num-workers=3 \
    --max-idle=30m
問題 4 実践

Dataprocでの売上分析

GCSからCSVを読み込み、地域別の売上合計を集計し、結果をParquetで保存する一連のコードを書いてください。

【解答】
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("Sales Analysis").getOrCreate()

# GCSから読み込み
df = spark.read.csv(
    "gs://my-bucket/input/sales.csv",
    header=True,
    inferSchema=True
)

# 地域別集計
result = df.groupBy("region").agg(
    F.sum("amount").alias("total_sales")
)

# GCSに保存
result.write.parquet(
    "gs://my-bucket/output/region_sales/",
    mode="overwrite"
)

spark.stop()

❓ よくある質問

Q1: EMRとDataproc、どちらが良い?
機能面ではほぼ同等です。既存のクラウド環境に合わせるのが最も重要です。AWSを使っているならEMR、GCPならDataprocを選びましょう。
Q2: クラスターを削除したらデータは消える?
GCSに保存したデータは消えません。クラスター内のローカルストレージに保存したデータは消えます。重要なデータは必ずGCSに保存してください。
Q3: BigQueryとの連携方法は?
SparkからBigQueryに直接クエリできます。spark.read.format("bigquery")を使用します。詳細はGoogle Cloud公式ドキュメントを参照してください。
Q4: ローカルで開発したコードはそのまま動く?
基本的にはそのまま動きます。ファイルパスを「ローカルパス」から「GCSパス(gs://)」に変更するだけでOKです。

📝 STEP 24 のまとめ

✅ このステップで学んだこと

GCP Dataproc:高速起動(90秒)、秒単位課金
GCS連携gs://bucket/pathでデータ読み書き
自動削除--max-idleでアイドル時に自動削除
ジョブ投入:gcloudコマンドまたはコンソールから

⚠️ 最重要ポイント

クラスターを削除し忘れると課金が続きます!
学習が終わったら必ず「削除」するか、–max-idleで自動削除を設定してください。

🎯 次のステップの予告

次のSTEP 25では、「クラスター管理とコスト最適化」を学びます。
スポットインスタンスや自動スケーリングで、コストを大幅に削減しましょう!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 24

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE