🌊 STEP 28: 構造化ストリーミング入門
リアルタイムデータ処理の基礎 – 流れてくるデータを連続的に処理しよう
📋 このステップで学ぶこと
- ストリーミング処理とは何か
- バッチ処理との違い
- 構造化ストリーミングの基礎
- ウィンドウ集計
- 出力モードの使い分け
- 実践例:リアルタイム集計
このステップはストリーミング処理の入門編です。
本格的なストリーミング処理(Kafka連携、Exactly-onceセマンティクス、状態管理など)は、別のストリーミングコースで学習してください。
🌊 1. ストリーミング処理とは?
1-1. バッチ処理 vs ストリーミング処理
これまで学んできたのはバッチ処理です。データを一括で処理する方式でした。
一方、ストリーミング処理は、データが流れてくるのをリアルタイムで処理します。
| 項目 | バッチ処理 | ストリーミング処理 |
|---|---|---|
| データ | まとめて処理 | 流れてきたら即処理 |
| 実行タイミング | 定期実行(毎日、毎時など) | 常時実行 |
| 遅延 | 分〜時間単位 | 秒〜ミリ秒単位 |
| 用途 | 日次集計、レポート | リアルタイムダッシュボード、異常検知 |
【バッチ処理(これまで)】
昨日のログ → 集計 → レポート
(1日遅れで結果がわかる)
【ストリーミング処理(今回)】
アクセスログ → 即時集計 → リアルタイムダッシュボード
(数秒で結果がわかる)
1-2. ストリーミング処理の用途
・リアルタイムダッシュボード:アクセス数、売上をリアルタイム表示
・異常検知:サーバー障害、不正アクセスを即座に検出
・IoT:センサーデータを連続処理
・金融:株価の変動をリアルタイムで分析
1-3. Sparkの構造化ストリーミング
Structured Streaming(構造化ストリーミング)は、Sparkのストリーミング処理機能です。
DataFrameと同じAPIで書けるので、バッチ処理の知識がそのまま活かせます。
・DataFrameと同じAPI:学習コストが低い
・Exactly-once:データの重複・欠損なし
・イベント時刻処理:遅延データも正しく処理
・統合:バッチとストリーミングを同じコードで
🔰 2. 構造化ストリーミングの基本
2-1. ストリーミングの読み込み
バッチ処理ではspark.readを使いましたが、
ストリーミングではspark.readStreamを使います。
# バッチ処理(これまで) df = spark.read.csv("path/to/data") # ストリーミング処理(今回) df = spark.readStream.csv("path/to/data")
2-2. 簡単な例:ファイルの監視
ディレクトリにファイルが追加されたら、自動的に処理する例です。
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType spark = SparkSession.builder \ .appName("Streaming Example") \ .getOrCreate() # スキーマを定義(ストリーミングでは必須) schema = StructType() \ .add("user_id", StringType()) \ .add("action", StringType()) \ .add("timestamp", TimestampType()) # ストリーミングでCSVを読み込み streaming_df = spark.readStream \ .schema(schema) \ .csv("input_directory/") # 処理(バッチと同じ書き方) result = streaming_df.groupBy("action").count() # 結果を出力 query = result.writeStream \ .outputMode("complete") \ .format("console") \ .start() # 停止するまで待機 query.awaitTermination()
------------------------------------------- Batch: 0 ------------------------------------------- +------+-----+ |action|count| +------+-----+ | click| 150| | view| 320| | buy | 45| +------+-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +------+-----+ |action|count| +------+-----+ | click| 180| | view| 410| | buy | 52| +------+-----+ (新しいファイルが来るたびに更新される)
2-3. 出力モード(Output Mode)
| モード | 説明 | 用途 |
|---|---|---|
| append | 新しい行のみ出力 | ログ追記、イベント記録 |
| complete | 全結果を毎回出力 | 集計結果、ダッシュボード |
| update | 更新された行のみ出力 | 差分更新 |
# append: 新しいデータのみ出力(集計なしの場合) query = df.writeStream.outputMode("append").format("console").start() # complete: 全結果を出力(集計ありの場合) query = df.writeStream.outputMode("complete").format("console").start() # update: 更新された行のみ出力 query = df.writeStream.outputMode("update").format("console").start()
⏰ 3. ウィンドウ集計
3-1. ウィンドウとは?
ウィンドウとは、時間の区切りのことです。
「直近5分間のアクセス数」のように、時間範囲でデータを集計します。
【固定ウィンドウ(Tumbling Window)】
|–5分–|–5分–|–5分–|
重複なし、隙間なし
【スライディングウィンドウ(Sliding Window)】
|—-5分—-|
|—-5分—-|
|—-5分—-|
1分ごとにスライド(重複あり)
3-2. 固定ウィンドウの例
from pyspark.sql import functions as F # スキーマ定義 schema = StructType() \ .add("user_id", StringType()) \ .add("action", StringType()) \ .add("event_time", TimestampType()) # ストリーミング読み込み events = spark.readStream \ .schema(schema) \ .json("events/") # 5分間の固定ウィンドウで集計 windowed_counts = events \ .groupBy( F.window("event_time", "5 minutes"), "action" ) \ .count() # 出力 query = windowed_counts.writeStream \ .outputMode("complete") \ .format("console") \ .option("truncate", "false") \ .start()
+------------------------------------------+------+-----+
| window|action|count|
+------------------------------------------+------+-----+
|{2024-01-15 10:00:00, 2024-01-15 10:05:00}| click| 45|
|{2024-01-15 10:00:00, 2024-01-15 10:05:00}| view| 120|
|{2024-01-15 10:05:00, 2024-01-15 10:10:00}| click| 52|
|{2024-01-15 10:05:00, 2024-01-15 10:10:00}| view| 135|
+------------------------------------------+------+-----+3-3. スライディングウィンドウの例
# 5分間のウィンドウを1分ごとにスライド windowed_counts = events \ .groupBy( F.window("event_time", "5 minutes", "1 minute"), "action" ) \ .count() # window関数の引数: # - 第1引数: イベント時刻カラム # - 第2引数: ウィンドウサイズ # - 第3引数: スライド間隔(省略時は固定ウィンドウ)
・固定ウィンドウ:「毎時の売上」「5分ごとのアクセス数」
・スライディングウィンドウ:「直近10分の移動平均」「1分更新のダッシュボード」
💾 4. 出力先の設定
4-1. 様々な出力先
| 出力先 | format | 用途 |
|---|---|---|
| コンソール | console | デバッグ、開発時 |
| ファイル | parquet, csv, json | 永続化、後続処理 |
| メモリ | memory | テスト、一時保存 |
| Kafka | kafka | 他システムへ連携 |
4-2. ファイルへの出力
# Parquetファイルに出力 query = result.writeStream \ .outputMode("append") \ .format("parquet") \ .option("path", "output/streaming_results/") \ .option("checkpointLocation", "checkpoint/") \ .start() # checkpointLocationは必須(障害復旧用)
4-3. メモリテーブルへの出力
# メモリ上のテーブルに出力 query = result.writeStream \ .outputMode("complete") \ .format("memory") \ .queryName("realtime_counts") \ .start() # SQLでクエリ可能 spark.sql("SELECT * FROM realtime_counts").show()
4-4. トリガー設定
トリガーで、処理の実行間隔を設定できます。
# デフォルト:できるだけ速く処理 query = result.writeStream \ .trigger(processingTime="0 seconds") \ .start() # 10秒ごとに処理 query = result.writeStream \ .trigger(processingTime="10 seconds") \ .start() # 1回だけ処理(バッチ的な使い方) query = result.writeStream \ .trigger(once=True) \ .start()
🎯 5. 実践例:リアルタイムアクセス集計
5-1. シナリオ
Webサーバーのアクセスログをリアルタイムで集計し、
5分ごとのページ別アクセス数を計算します。
from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import * spark = SparkSession.builder \ .appName("Realtime Access Log") \ .getOrCreate() # アクセスログのスキーマ log_schema = StructType([ StructField("timestamp", TimestampType(), True), StructField("user_id", StringType(), True), StructField("page", StringType(), True), StructField("action", StringType(), True) ]) # ストリーミング読み込み(新しいJSONファイルを監視) access_logs = spark.readStream \ .schema(log_schema) \ .json("logs/access/") # 5分間のウィンドウでページ別アクセス数を集計 page_counts = access_logs \ .groupBy( F.window("timestamp", "5 minutes"), "page" ) \ .agg( F.count("*").alias("access_count"), F.countDistinct("user_id").alias("unique_users") ) # 結果をコンソールに出力 query = page_counts.writeStream \ .outputMode("complete") \ .format("console") \ .option("truncate", "false") \ .trigger(processingTime="10 seconds") \ .start() query.awaitTermination()
-------------------------------------------
Batch: 5
-------------------------------------------
+------------------------------------------+--------+------------+------------+
| window| page|access_count|unique_users|
+------------------------------------------+--------+------------+------------+
|{2024-01-15 10:00:00, 2024-01-15 10:05:00}| /home| 450| 320|
|{2024-01-15 10:00:00, 2024-01-15 10:05:00}|/product| 280| 150|
|{2024-01-15 10:00:00, 2024-01-15 10:05:00}| /cart| 95| 80|
|{2024-01-15 10:05:00, 2024-01-15 10:10:00}| /home| 520| 380|
|{2024-01-15 10:05:00, 2024-01-15 10:10:00}|/product| 310| 175|
+------------------------------------------+--------+------------+------------+5-2. ストリーミングジョブの停止
# クエリの停止 query.stop() # 全てのストリーミングクエリを停止 for q in spark.streams.active: q.stop()
📝 練習問題
基本的なストリーミング読み込み
CSVファイルをストリーミングで読み込むコードを書いてください。スキーマは「id(Integer)、name(String)」です。
from pyspark.sql.types import StructType, IntegerType, StringType schema = StructType() \ .add("id", IntegerType()) \ .add("name", StringType()) streaming_df = spark.readStream \ .schema(schema) \ .csv("input_directory/")
ウィンドウ集計
「event_time」カラムで10分間の固定ウィンドウを作成し、「category」ごとのカウントを集計するコードを書いてください。
from pyspark.sql import functions as F result = df.groupBy( F.window("event_time", "10 minutes"), "category" ).count()
出力設定
集計結果をParquetファイルに出力し、30秒ごとに処理を実行するコードを書いてください。checkpointLocationも設定してください。
query = result.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "output/") \
.option("checkpointLocation", "checkpoint/") \
.trigger(processingTime="30 seconds") \
.start()リアルタイム売上集計
売上データ(timestamp, product, amount)をストリーミングで読み込み、5分間のウィンドウで商品別売上合計を集計するコードを書いてください。
from pyspark.sql import functions as F from pyspark.sql.types import * schema = StructType([ StructField("timestamp", TimestampType()), StructField("product", StringType()), StructField("amount", IntegerType()) ]) sales = spark.readStream.schema(schema).json("sales/") result = sales.groupBy( F.window("timestamp", "5 minutes"), "product" ).agg( F.sum("amount").alias("total_sales") ) query = result.writeStream \ .outputMode("complete") \ .format("console") \ .start()
❓ よくある質問
📝 STEP 28 のまとめ
・ストリーミング処理:リアルタイムでデータを処理
・readStream / writeStream:ストリーミングの読み書き
・出力モード:append / complete / update
・ウィンドウ集計:時間範囲でデータを集計
・トリガー:処理の実行間隔を設定
STEP 27〜28で、Sparkの高度なトピックの入門を学びました。
・MLlib:機械学習の基礎
・構造化ストリーミング:リアルタイム処理の基礎
本格的に学びたい方は、それぞれの専門コースを受講してください。
次のPart 8(STEP 29〜31)は総合演習です。
これまで学んだ知識を活かして、本格的なプロジェクトに挑戦しましょう!
学習メモ
ビッグデータ処理(Apache Spark) - Step 28