STEP 20:Sparkクラスターのコンテナ化

⚡ STEP 20: Sparkクラスターのコンテナ化

分散処理環境をローカルで構築する

📋 このステップで学ぶこと

  • Apache Sparkの概要とアーキテクチャ
  • Spark Master + Worker構成
  • docker-compose.ymlでのクラスター構築
  • Jupyter NotebookからSpark接続
  • ボリュームマウントでデータ共有
  • Spark UIによるジョブ監視
  • 実践演習:PySparkでデータ処理

🔧 0. このステップの前提知識

📚 これまでの学習の復習

このステップでは、以下の知識を使います。忘れた場合は復習してから進めましょう。

  • Docker Compose:複数コンテナの管理(STEP 15-16)
  • サービス間通信:サービス名でのアクセス(STEP 17)
  • ネットワーク:カスタムネットワークの作成(STEP 18)
  • ボリュームマウント:データの共有(STEP 10, 16)
  • depends_on:起動順序の制御(STEP 16)

0-1. 作業ディレクトリの準備

# 作業ディレクトリを作成して移動 mkdir -p ~/docker-practice/step20 cd ~/docker-practice/step20 # 現在の場所を確認 pwd

0-2. Sparkクラスターのアーキテクチャ

【Spark クラスター構成】 ┌─────────────────────────────────────────────────────────────┐ │ ユーザー(ブラウザ) │ ├──────────────────────┬──────────────────────────────────────┤ │ Jupyter Notebook │ Spark Master Web UI │ │ http://localhost:8888│ http://localhost:8080 │ └──────────┬───────────┴──────────────┬───────────────────────┘ │ PySpark接続 │ 監視 ↓ ↓ ┌─────────────────────────────────────────────────────────────┐ │ Spark Master │ │ (spark-master:7077) │ │ • クラスター管理 │ │ • リソース割り当て │ │ • タスク分配 │ └───────────────────────┬─────────────────────────────────────┘ │ タスク割り当て ┌───────────────┼───────────────┐ ↓ ↓ ↓ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Spark Worker │ │ Spark Worker │ │ Spark Worker │ │ 1 │ │ 2 │ │ 3 │ │ • 1 Core │ │ • 1 Core │ │ • 1 Core │ │ • 1GB RAM │ │ • 1GB RAM │ │ • 1GB RAM │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ └────────────────┴────────────────┘ │ ↓ ┌──────────────────┐ │ 共有ボリューム │ │ ./data │ │ ./notebooks │ └──────────────────┘

0-3. Sparkの主要コンポーネント

コンポーネント 役割 ポート
Master クラスター管理、リソース割り当て 7077(通信), 8080(Web UI)
Worker タスクの実行、データ処理 8081〜(各WorkerのWeb UI)
Driver アプリケーション制御、SparkContext管理 4040(アプリUI)
Executor Worker内でタスクを実行するプロセス

0-4. Sparkの特徴

⚡ Sparkの強み
  • インメモリ処理:Hadoopより最大100倍高速
  • 統合API:SQL、ストリーミング、MLを統合
  • 言語サポート:Python、Scala、Java、R
  • 分散処理:複数マシンで並列実行
💡 このステップで構築する環境
  • Spark Master × 1
  • Spark Worker × 2
  • Jupyter Notebook(PySpark)
  • 共有データボリューム

🎯 1. Apache Sparkの概要

1-1. Sparkとは?

Apache Sparkは、大規模データの分散処理を高速に実行するフレームワークです。 データエンジニアリングでは、ETL処理、データ変換、分析に広く使われています。

1-2. SparkSessionとは?

SparkSessionは、Sparkアプリケーションのエントリーポイントです。 すべてのSpark操作はSparkSessionを通じて行います。

【SparkSessionの役割】 ┌─────────────────────────────────────────────────────────────┐ │ SparkSession │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ DataFrame │ │ SQL │ │ Catalog │ │ │ │ API │ │ API │ │ API │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ │ ↓ ↓ ↓ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ SparkContext │ │ │ │ (クラスターとの接続管理) │ │ │ └──────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘

1-3. RDD、DataFrame、DataSetの違い

Sparkのデータ構造比較
データ構造 特徴 用途
RDD 低レベルAPI、型安全でない レガシーコード、細かい制御
DataFrame 列ベース、スキーマあり、最適化 一般的なデータ処理(推奨)
DataSet 型安全、Scala/Java向け 型安全が必要な場合
💡 PySparkでの推奨

PythonではDataFrame APIを使用します。DataSetはScala/Java専用です。

🏗️ 2. docker-compose.ymlの構成

2-1. 必要なサービス

【Sparkクラスター構成】 ┌─────────────────────────────────────────────────────────────┐ │ サービス一覧 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. spark-master ← クラスター管理(ポート8080, 7077) │ │ 2. spark-worker-1 ← タスク実行(1GB RAM, 1 Core) │ │ 3. spark-worker-2 ← タスク実行(1GB RAM, 1 Core) │ │ 4. jupyter ← PySpark開発環境(ポート8888) │ │ │ │ ネットワーク: spark-network(bridgeドライバー) │ │ 共有ボリューム: ./data, ./notebooks │ │ │ └─────────────────────────────────────────────────────────────┘

2-2. 主要な環境変数

Spark環境変数(bitnami/spark イメージ)
環境変数 説明
SPARK_MODE master または worker
SPARK_MASTER_URL MasterのURL(Worker用)
SPARK_WORKER_MEMORY Workerのメモリ割り当て
SPARK_WORKER_CORES WorkerのCPUコア数

2-3. ポート構成

公開ポート一覧
サービス ポート 用途
Spark Master 8080:8080 Master Web UI(クラスター監視)
Spark Master 7077:7077 Master通信ポート
Jupyter 8888:8888 Jupyter Notebook
Spark App 4040:4040 アプリケーションUI(実行時のみ)

🛠️ 3. 実践演習:Sparkクラスター構築

3-1. プロジェクトディレクトリを作成

# プロジェクトディレクトリを作成 mkdir -p ~/docker-practice/spark-docker/{data,notebooks,data/output} cd ~/docker-practice/spark-docker # 現在の場所を確認 pwd
📁 完成時のプロジェクト構成
spark-docker/ ├── docker-compose.yml ├── data/ │ ├── sample.csv │ └── output/ └── notebooks/ └── spark_test.ipynb(自動生成)

3-2. サンプルデータを作成

# サンプルCSVデータを作成 cat > data/sample.csv << ‘EOF’ name,age,city,salary Alice,28,Tokyo,450000 Bob,35,Osaka,520000 Charlie,42,Nagoya,480000 David,31,Fukuoka,410000 Eve,29,Sapporo,430000 Frank,38,Tokyo,550000 Grace,26,Osaka,390000 Henry,45,Tokyo,620000 Ivy,33,Nagoya,470000 Jack,27,Fukuoka,380000 EOF # 内容を確認 cat data/sample.csv

3-3. docker-compose.ymlを作成

# docker-compose.ymlを作成 cat > docker-compose.yml << ‘EOF’ version: ‘3.8’ services: # Spark Master spark-master: image: bitnami/spark:3.4 container_name: spark-master environment: – SPARK_MODE=master – SPARK_RPC_AUTHENTICATION_ENABLED=no – SPARK_RPC_ENCRYPTION_ENABLED=no – SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no – SPARK_SSL_ENABLED=no ports: – “8080:8080” # Master Web UI – “7077:7077” # Master通信ポート volumes: – ./data:/data – ./notebooks:/notebooks networks: – spark-network healthcheck: test: [“CMD”, “curl”, “-f”, “http://localhost:8080”] interval: 30s timeout: 10s retries: 5 start_period: 30s # Spark Worker 1 spark-worker-1: image: bitnami/spark:3.4 container_name: spark-worker-1 environment: – SPARK_MODE=worker – SPARK_MASTER_URL=spark://spark-master:7077 – SPARK_WORKER_MEMORY=1G – SPARK_WORKER_CORES=1 – SPARK_RPC_AUTHENTICATION_ENABLED=no – SPARK_RPC_ENCRYPTION_ENABLED=no – SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no – SPARK_SSL_ENABLED=no volumes: – ./data:/data depends_on: spark-master: condition: service_healthy networks: – spark-network # Spark Worker 2 spark-worker-2: image: bitnami/spark:3.4 container_name: spark-worker-2 environment: – SPARK_MODE=worker – SPARK_MASTER_URL=spark://spark-master:7077 – SPARK_WORKER_MEMORY=1G – SPARK_WORKER_CORES=1 – SPARK_RPC_AUTHENTICATION_ENABLED=no – SPARK_RPC_ENCRYPTION_ENABLED=no – SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no – SPARK_SSL_ENABLED=no volumes: – ./data:/data depends_on: spark-master: condition: service_healthy networks: – spark-network # Jupyter Notebook (PySpark) jupyter: image: jupyter/pyspark-notebook:latest container_name: jupyter-pyspark environment: – JUPYTER_ENABLE_LAB=yes – SPARK_MASTER=spark://spark-master:7077 ports: – “8888:8888” # Jupyter Notebook – “4040:4040″ # Spark Application UI volumes: – ./notebooks:/home/jovyan/work – ./data:/home/jovyan/data depends_on: – spark-master – spark-worker-1 – spark-worker-2 networks: – spark-network command: start-notebook.sh –NotebookApp.token=” –NotebookApp.password=” networks: spark-network: driver: bridge EOF # 内容を確認 cat docker-compose.yml
🎯 docker-compose.ymlのポイント
  • bitnami/spark:3.4 – Bitnamiの公式Sparkイメージ
  • jupyter/pyspark-notebook – JupyterとPySparkが統合されたイメージ
  • healthcheck – Masterの準備完了を確認してからWorkerを起動
  • NotebookApp.token='' – Jupyterのトークン認証を無効化(開発用)

3-4. 起動

# 起動(初回はイメージのダウンロードに時間がかかります) docker-compose up -d # 起動状況を確認 docker-compose ps
NAME COMMAND STATUS PORTS jupyter-pyspark “tini -g — start-no…” Up 0.0.0.0:4040->4040/tcp, 0.0.0.0:8888->8888/tcp spark-master “/opt/bitnami/script…” Up (healthy) 0.0.0.0:7077->7077/tcp, 0.0.0.0:8080->8080/tcp spark-worker-1 “/opt/bitnami/script…” Up spark-worker-2 “/opt/bitnami/script…” Up
# ログを確認(Ctrl+Cで終了) docker-compose logs -f spark-master

3-5. Web UIにアクセス

🌐 各サービスへのアクセス
Spark Master UI http://localhost:8080
Jupyter Notebook http://localhost:8888
Spark App UI http://localhost:4040(実行時のみ)

3-6. Spark Master UIで確認

📊 Spark Master UIでの確認ポイント
  1. ブラウザでhttp://localhost:8080を開く
  2. Workersセクションで2つのWorkerが表示されていることを確認
  3. 各Workerのメモリ(1GB)とコア数(1)を確認
  4. Running Applicationsは最初は空(アプリ実行後に表示)

💻 4. Jupyter NotebookからSpark接続

4-1. Jupyter Notebookにアクセス

🌐 Jupyterへのアクセス手順
  1. ブラウザでhttp://localhost:8888を開く
  2. トークンなしで直接アクセス可能(開発用設定)
  3. 「work」フォルダをクリック
  4. 「New」→「Python 3」で新しいノートブック作成

4-2. SparkSessionの作成

Jupyter Notebookで以下のコードを順番に実行します。

# セル1: SparkSessionの作成 from pyspark.sql import SparkSession # SparkSession作成(Sparkクラスターに接続) spark = SparkSession.builder \ .appName(“Spark Docker Test”) \ .master(“spark://spark-master:7077”) \ .config(“spark.executor.memory”, “512m”) \ .config(“spark.executor.cores”, “1”) \ .getOrCreate() # バージョン確認 print(f”Spark version: {spark.version}”) print(f”Application ID: {spark.sparkContext.applicationId}”)
Spark version: 3.4.0 Application ID: app-20241210123456-0001

4-3. CSVファイルの読み込み

# セル2: CSVファイル読み込み df = spark.read.csv( “/home/jovyan/data/sample.csv”, header=True, inferSchema=True ) # データ確認 print(“=== データプレビュー ===”) df.show() # スキーマ確認 print(“\n=== スキーマ ===”) df.printSchema() # レコード数 print(f”\n=== レコード数: {df.count()} ===”)
=== データプレビュー === +——-+—+——-+——+ | name|age| city|salary| +——-+—+——-+——+ | Alice| 28| Tokyo|450000| | Bob| 35| Osaka|520000| |Charlie| 42| Nagoya|480000| | David| 31|Fukuoka|410000| | Eve| 29|Sapporo|430000| | Frank| 38| Tokyo|550000| | Grace| 26| Osaka|390000| | Henry| 45| Tokyo|620000| | Ivy| 33| Nagoya|470000| | Jack| 27|Fukuoka|380000| +——-+—+——-+——+ === スキーマ === root |– name: string (nullable = true) |– age: integer (nullable = true) |– city: string (nullable = true) |– salary: integer (nullable = true) === レコード数: 10 ===

4-4. データ処理と集計

# セル3: データ処理と集計 from pyspark.sql.functions import avg, sum as spark_sum, count, max as spark_max, min as spark_min # 都市別の集計 print(“=== 都市別統計 ===”) city_stats = df.groupBy(“city”).agg( count(“*”).alias(“employee_count”), avg(“salary”).cast(“integer”).alias(“avg_salary”), spark_max(“salary”).alias(“max_salary”), spark_min(“salary”).alias(“min_salary”) ).orderBy(“avg_salary”, ascending=False) city_stats.show() # 年齢層別の集計 print(“\n=== 年齢層別統計 ===”) from pyspark.sql.functions import when df_with_age_group = df.withColumn( “age_group”, when(df.age < 30, "20代") .when(df.age < 40, "30代") .otherwise("40代以上") ) age_stats = df_with_age_group.groupBy("age_group").agg( count("*").alias("count"), avg("salary").cast("integer").alias("avg_salary") ) age_stats.show()
=== 都市別統計 === +——-+————–+———-+———-+———-+ | city|employee_count|avg_salary|max_salary|min_salary| +——-+————–+———-+———-+———-+ | Tokyo| 3| 540000| 620000| 450000| | Nagoya| 2| 475000| 480000| 470000| | Osaka| 2| 455000| 520000| 390000| |Sapporo| 1| 430000| 430000| 430000| |Fukuoka| 2| 395000| 410000| 380000| +——-+————–+———-+———-+———-+ === 年齢層別統計 === +———+—–+———-+ |age_group|count|avg_salary| +———+—–+———-+ | 20代| 4| 412500| | 30代| 3| 493333| | 40代以上| 3| 526666| +———+—–+———-+

4-5. SQLクエリの実行

# セル4: SQLクエリ実行 # 一時ビュー作成 df.createOrReplaceTempView(“employees”) # SQLで集計 result = spark.sql(“”” SELECT city, COUNT(*) as employee_count, AVG(salary) as avg_salary, SUM(salary) as total_salary FROM employees WHERE age >= 30 GROUP BY city HAVING COUNT(*) >= 1 ORDER BY avg_salary DESC “””) print(“=== 30歳以上の都市別統計(SQL) ===”) result.show()

4-6. 結果をParquetで保存

# セル5: 結果をParquetで保存 # 集計結果を保存 city_stats.write.mode(“overwrite”).parquet(“/home/jovyan/data/output/city_stats.parquet”) # 元データをPartitionして保存 df.write.mode(“overwrite”).partitionBy(“city”).parquet(“/home/jovyan/data/output/employees_by_city.parquet”) print(“Parquetファイルを保存しました!”) # 保存したファイルを読み込んで確認 loaded_df = spark.read.parquet(“/home/jovyan/data/output/city_stats.parquet”) print(“\n=== 保存したファイルの読み込み ===”) loaded_df.show()

4-7. SparkSessionの終了

# セル6: SparkSessionの終了 spark.stop() print(“SparkSessionを終了しました”)

📊 5. Spark UIによるジョブ監視

5-1. Spark Master UI

【Spark Master UI(http://localhost:8080)】 ┌─────────────────────────────────────────────────────────────┐ │ Spark Master at spark://spark-master:7077 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Workers: 2 │ │ Cores in use: 2 Total, 0 Used │ │ Memory in use: 2.0 GB Total, 0 B Used │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Workers │ │ │ ├─────────────────────────────────────────────────────┤ │ │ │ Worker ID │ Address │ State │ Cores│ Mem │ │ │ │ worker-xxx-1 │ 172.x.x.x │ ALIVE │ 1 │ 1GB │ │ │ │ worker-xxx-2 │ 172.x.x.x │ ALIVE │ 1 │ 1GB │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ Running Applications: 1 │ │ Completed Applications: 0 │ │ │ └─────────────────────────────────────────────────────────────┘

5-2. Application UI

📊 Application UI(http://localhost:4040)

SparkSessionが実行中のときのみアクセス可能です。以下の情報を確認できます:

  • Jobs:実行されたジョブの一覧
  • Stages:各ジョブのステージ詳細
  • Storage:キャッシュされたRDD/DataFrame
  • Executors:各Executorのメトリクス
  • SQL:実行されたSQLクエリ

🔍 6. トラブルシューティング

6-1. よくある問題と解決策

❌ 問題1:WorkerがMasterに接続できない

症状:

Spark Master UIにWorkerが表示されない

解決策:

# ネットワーク確認 docker network ls docker network inspect spark-docker_spark-network # Masterのログ確認 docker-compose logs spark-master | grep -i “registered” # 再起動 docker-compose down docker-compose up -d
❌ 問題2:JupyterからSparkに接続できない

症状:

SparkSession作成時にタイムアウト

解決策:

# Masterの状態確認 docker-compose exec jupyter ping spark-master # 正しいURLか確認(spark://spark-master:7077) # サービス名「spark-master」を使用すること # Jupyterコンテナの再起動 docker-compose restart jupyter
❌ 問題3:メモリ不足エラー

症状:

「java.lang.OutOfMemoryError」が発生

解決策:

# Workerのメモリを増やす(docker-compose.yml) SPARK_WORKER_MEMORY=2G # または、Executorのメモリを調整(SparkSession作成時) .config(“spark.executor.memory”, “1g”)

6-2. デバッグコマンド

# コンテナの状態確認 docker-compose ps # 各サービスのログ確認 docker-compose logs spark-master docker-compose logs spark-worker-1 docker-compose logs jupyter # コンテナ内でコマンド実行 docker-compose exec spark-master spark-shell –version docker-compose exec jupyter python -c “import pyspark; print(pyspark.__version__)” # ネットワーク接続テスト docker-compose exec jupyter nc -zv spark-master 7077

6-3. クリーンアップ

# 停止 docker-compose down # ボリュームも含めて完全削除 docker-compose down -v # 出力ファイルの削除 rm -rf data/output/*

💪 7. 練習問題

練習問題 1 基礎

Spark MasterとWorkerの役割を説明してください。

【解答】
  • Spark Master:クラスター全体を管理。リソース割り当て、タスク分配、Worker管理を担当。
  • Spark Worker:実際のデータ処理を実行。Masterから割り当てられたタスクを並列実行。
練習問題 2 基礎

SparkSessionを作成するコードを書いてください。

【解答】
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName(“MyApp”) \ .master(“spark://spark-master:7077”) \ .getOrCreate()
練習問題 3 応用

CSVファイルを読み込んで、特定の条件でフィルタリングし、結果を表示するコードを書いてください。

【解答】
# CSVファイル読み込み df = spark.read.csv(“/home/jovyan/data/sample.csv”, header=True, inferSchema=True) # 給与が450000以上のデータをフィルタリング filtered_df = df.filter(df.salary >= 450000) # 結果を表示 filtered_df.show() # または、where句を使用 filtered_df = df.where(“salary >= 450000”) filtered_df.show()
練習問題 4 応用

DataFrameをグループ化して集計するコードを書いてください。

【解答】
from pyspark.sql.functions import avg, count, sum as spark_sum # 都市別に集計 result = df.groupBy(“city”).agg( count(“*”).alias(“employee_count”), avg(“salary”).alias(“avg_salary”), spark_sum(“salary”).alias(“total_salary”) ).orderBy(“avg_salary”, ascending=False) result.show()
練習問題 5 応用

SQLクエリを使ってデータを集計する方法を書いてください。

【解答】
# 一時ビュー作成 df.createOrReplaceTempView(“employees”) # SQLクエリ実行 result = spark.sql(“”” SELECT city, COUNT(*) as count, AVG(salary) as avg_salary FROM employees GROUP BY city ORDER BY avg_salary DESC “””) result.show()
練習問題 6 発展

結果をParquet形式で保存し、再度読み込むコードを書いてください。

【解答】
# Parquet形式で保存 result.write.mode(“overwrite”).parquet(“/home/jovyan/data/output/result.parquet”) # 再度読み込み loaded_df = spark.read.parquet(“/home/jovyan/data/output/result.parquet”) loaded_df.show() # パーティション分割して保存 df.write.mode(“overwrite”).partitionBy(“city”).parquet(“/home/jovyan/data/output/by_city.parquet”)
練習問題 7 発展

大量データを生成して処理時間を計測するコードを書いてください。

【解答】
from pyspark.sql.functions import rand, randn import time # 100万行のランダムデータ生成 large_df = spark.range(0, 1000000).select( (rand() * 1000).cast(“integer”).alias(“id”), (rand() * 100).alias(“value1”), (randn() * 50).alias(“value2”) ) # 処理時間計測 start = time.time() result = large_df \ .filter(large_df.value1 > 50) \ .groupBy(“id”) \ .agg({“value1”: “avg”, “value2”: “sum”}) \ .count() end = time.time() print(f”処理レコード数: {result}”) print(f”処理時間: {end – start:.2f}秒”)
練習問題 8 基礎

Spark Master UIとApplication UIの違いを説明してください。

【解答】
項目 Master UI (8080) Application UI (4040)
表示内容 クラスター全体の状態 実行中アプリの詳細
常時アクセス 可能 アプリ実行中のみ
主な用途 Worker管理、リソース確認 ジョブ監視、デバッグ

📝 STEP 20 のまとめ

✅ このステップで学んだこと
  • Sparkクラスター:Master + Worker構成
  • Docker構成:マルチコンテナでクラスター構築
  • Jupyter統合:ブラウザでPySpark実行
  • データ処理:DataFrame API、SQL、集計
  • データ保存:Parquet形式での保存と読み込み
  • Spark UI:ジョブ監視とデバッグ
📊 Sparkクラスター構成 早見表
サービス 役割 アクセス
spark-master クラスター管理 http://localhost:8080
spark-worker-1/2 タスク実行 Master UI経由で確認
jupyter PySpark開発 http://localhost:8888
💡 重要ポイント
  • spark://spark-master:7077でMasterに接続
  • healthcheckでMaster準備完了を待つ
  • 共有ボリュームでデータを全コンテナで共有
  • Parquet形式は高速で圧縮効率が良い
  • spark.stop()でセッション終了を忘れずに
🎯 次のステップの予告

次のSTEP 21では、「環境変数とシークレット管理」を学びます。

  • ENV命令とARG命令の違い
  • .envファイルの活用
  • Docker Secretsの使い方
  • セキュアな認証情報管理

本番環境に向けたセキュリティ対策を学びましょう!

📝

学習メモ

Docker・コンテナ技術入門 - Step 20

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE