STEP 28:構造化ストリーミング入門

🌊 STEP 28: 構造化ストリーミング入門

リアルタイムデータ処理の基礎 – 流れてくるデータを連続的に処理しよう

📋 このステップで学ぶこと

  • ストリーミング処理とは何か
  • バッチ処理との違い
  • 構造化ストリーミングの基礎
  • ウィンドウ集計
  • 出力モードの使い分け
  • 実践例:リアルタイム集計
📌 このステップの位置づけ

このステップはストリーミング処理の入門編です。
本格的なストリーミング処理(Kafka連携、Exactly-onceセマンティクス、状態管理など)は、別のストリーミングコースで学習してください。

🌊 1. ストリーミング処理とは?

1-1. バッチ処理 vs ストリーミング処理

これまで学んできたのはバッチ処理です。データを一括で処理する方式でした。
一方、ストリーミング処理は、データが流れてくるのをリアルタイムで処理します。

項目 バッチ処理 ストリーミング処理
データ まとめて処理 流れてきたら即処理
実行タイミング 定期実行(毎日、毎時など) 常時実行
遅延 分〜時間単位 秒〜ミリ秒単位
用途 日次集計、レポート リアルタイムダッシュボード、異常検知
📊 ストリーミング処理の例

【バッチ処理(これまで)】
昨日のログ → 集計 → レポート
(1日遅れで結果がわかる)

【ストリーミング処理(今回)】
アクセスログ → 即時集計 → リアルタイムダッシュボード
(数秒で結果がわかる)

1-2. ストリーミング処理の用途

🎯 ストリーミング処理が必要な場面

リアルタイムダッシュボード:アクセス数、売上をリアルタイム表示
異常検知:サーバー障害、不正アクセスを即座に検出
IoT:センサーデータを連続処理
金融:株価の変動をリアルタイムで分析

1-3. Sparkの構造化ストリーミング

Structured Streaming(構造化ストリーミング)は、Sparkのストリーミング処理機能です。
DataFrameと同じAPIで書けるので、バッチ処理の知識がそのまま活かせます。

💡 構造化ストリーミングの特徴

DataFrameと同じAPI:学習コストが低い
Exactly-once:データの重複・欠損なし
イベント時刻処理:遅延データも正しく処理
統合:バッチとストリーミングを同じコードで

🔰 2. 構造化ストリーミングの基本

2-1. ストリーミングの読み込み

バッチ処理ではspark.readを使いましたが、
ストリーミングではspark.readStreamを使います。

# バッチ処理(これまで)
df = spark.read.csv("path/to/data")

# ストリーミング処理(今回)
df = spark.readStream.csv("path/to/data")

2-2. 簡単な例:ファイルの監視

ディレクトリにファイルが追加されたら、自動的に処理する例です。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType

spark = SparkSession.builder \
    .appName("Streaming Example") \
    .getOrCreate()

# スキーマを定義(ストリーミングでは必須)
schema = StructType() \
    .add("user_id", StringType()) \
    .add("action", StringType()) \
    .add("timestamp", TimestampType())

# ストリーミングでCSVを読み込み
streaming_df = spark.readStream \
    .schema(schema) \
    .csv("input_directory/")

# 処理(バッチと同じ書き方)
result = streaming_df.groupBy("action").count()

# 結果を出力
query = result.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# 停止するまで待機
query.awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
|action|count|
+------+-----+
| click|  150|
|  view|  320|
|  buy |   45|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
|action|count|
+------+-----+
| click|  180|
|  view|  410|
|  buy |   52|
+------+-----+

(新しいファイルが来るたびに更新される)

2-3. 出力モード(Output Mode)

モード 説明 用途
append 新しい行のみ出力 ログ追記、イベント記録
complete 全結果を毎回出力 集計結果、ダッシュボード
update 更新された行のみ出力 差分更新
# append: 新しいデータのみ出力(集計なしの場合)
query = df.writeStream.outputMode("append").format("console").start()

# complete: 全結果を出力(集計ありの場合)
query = df.writeStream.outputMode("complete").format("console").start()

# update: 更新された行のみ出力
query = df.writeStream.outputMode("update").format("console").start()

⏰ 3. ウィンドウ集計

3-1. ウィンドウとは?

ウィンドウとは、時間の区切りのことです。
「直近5分間のアクセス数」のように、時間範囲でデータを集計します。

⏰ ウィンドウの種類

【固定ウィンドウ(Tumbling Window)】
|–5分–|–5分–|–5分–|
重複なし、隙間なし

【スライディングウィンドウ(Sliding Window)】
|—-5分—-|
  |—-5分—-|
    |—-5分—-|
1分ごとにスライド(重複あり)

3-2. 固定ウィンドウの例

from pyspark.sql import functions as F

# スキーマ定義
schema = StructType() \
    .add("user_id", StringType()) \
    .add("action", StringType()) \
    .add("event_time", TimestampType())

# ストリーミング読み込み
events = spark.readStream \
    .schema(schema) \
    .json("events/")

# 5分間の固定ウィンドウで集計
windowed_counts = events \
    .groupBy(
        F.window("event_time", "5 minutes"),
        "action"
    ) \
    .count()

# 出力
query = windowed_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()
+------------------------------------------+------+-----+
|                                    window|action|count|
+------------------------------------------+------+-----+
|{2024-01-15 10:00:00, 2024-01-15 10:05:00}| click|   45|
|{2024-01-15 10:00:00, 2024-01-15 10:05:00}|  view|  120|
|{2024-01-15 10:05:00, 2024-01-15 10:10:00}| click|   52|
|{2024-01-15 10:05:00, 2024-01-15 10:10:00}|  view|  135|
+------------------------------------------+------+-----+

3-3. スライディングウィンドウの例

# 5分間のウィンドウを1分ごとにスライド
windowed_counts = events \
    .groupBy(
        F.window("event_time", "5 minutes", "1 minute"),
        "action"
    ) \
    .count()

# window関数の引数:
# - 第1引数: イベント時刻カラム
# - 第2引数: ウィンドウサイズ
# - 第3引数: スライド間隔(省略時は固定ウィンドウ)
💡 ウィンドウの使い分け

固定ウィンドウ:「毎時の売上」「5分ごとのアクセス数」
スライディングウィンドウ:「直近10分の移動平均」「1分更新のダッシュボード」

💾 4. 出力先の設定

4-1. 様々な出力先

出力先 format 用途
コンソール console デバッグ、開発時
ファイル parquet, csv, json 永続化、後続処理
メモリ memory テスト、一時保存
Kafka kafka 他システムへ連携

4-2. ファイルへの出力

# Parquetファイルに出力
query = result.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "output/streaming_results/") \
    .option("checkpointLocation", "checkpoint/") \
    .start()

# checkpointLocationは必須(障害復旧用)

4-3. メモリテーブルへの出力

# メモリ上のテーブルに出力
query = result.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("realtime_counts") \
    .start()

# SQLでクエリ可能
spark.sql("SELECT * FROM realtime_counts").show()

4-4. トリガー設定

トリガーで、処理の実行間隔を設定できます。

# デフォルト:できるだけ速く処理
query = result.writeStream \
    .trigger(processingTime="0 seconds") \
    .start()

# 10秒ごとに処理
query = result.writeStream \
    .trigger(processingTime="10 seconds") \
    .start()

# 1回だけ処理(バッチ的な使い方)
query = result.writeStream \
    .trigger(once=True) \
    .start()

🎯 5. 実践例:リアルタイムアクセス集計

5-1. シナリオ

Webサーバーのアクセスログをリアルタイムで集計し、
5分ごとのページ別アクセス数を計算します。

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("Realtime Access Log") \
    .getOrCreate()

# アクセスログのスキーマ
log_schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("user_id", StringType(), True),
    StructField("page", StringType(), True),
    StructField("action", StringType(), True)
])

# ストリーミング読み込み(新しいJSONファイルを監視)
access_logs = spark.readStream \
    .schema(log_schema) \
    .json("logs/access/")

# 5分間のウィンドウでページ別アクセス数を集計
page_counts = access_logs \
    .groupBy(
        F.window("timestamp", "5 minutes"),
        "page"
    ) \
    .agg(
        F.count("*").alias("access_count"),
        F.countDistinct("user_id").alias("unique_users")
    )

# 結果をコンソールに出力
query = page_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime="10 seconds") \
    .start()

query.awaitTermination()
-------------------------------------------
Batch: 5
-------------------------------------------
+------------------------------------------+--------+------------+------------+
|                                    window|    page|access_count|unique_users|
+------------------------------------------+--------+------------+------------+
|{2024-01-15 10:00:00, 2024-01-15 10:05:00}|   /home|         450|         320|
|{2024-01-15 10:00:00, 2024-01-15 10:05:00}|/product|         280|         150|
|{2024-01-15 10:00:00, 2024-01-15 10:05:00}|   /cart|          95|          80|
|{2024-01-15 10:05:00, 2024-01-15 10:10:00}|   /home|         520|         380|
|{2024-01-15 10:05:00, 2024-01-15 10:10:00}|/product|         310|         175|
+------------------------------------------+--------+------------+------------+

5-2. ストリーミングジョブの停止

# クエリの停止
query.stop()

# 全てのストリーミングクエリを停止
for q in spark.streams.active:
    q.stop()

📝 練習問題

問題 1 基礎

基本的なストリーミング読み込み

CSVファイルをストリーミングで読み込むコードを書いてください。スキーマは「id(Integer)、name(String)」です。

【解答】
from pyspark.sql.types import StructType, IntegerType, StringType

schema = StructType() \
    .add("id", IntegerType()) \
    .add("name", StringType())

streaming_df = spark.readStream \
    .schema(schema) \
    .csv("input_directory/")
問題 2 応用

ウィンドウ集計

「event_time」カラムで10分間の固定ウィンドウを作成し、「category」ごとのカウントを集計するコードを書いてください。

【解答】
from pyspark.sql import functions as F

result = df.groupBy(
    F.window("event_time", "10 minutes"),
    "category"
).count()
問題 3 応用

出力設定

集計結果をParquetファイルに出力し、30秒ごとに処理を実行するコードを書いてください。checkpointLocationも設定してください。

【解答】
query = result.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "output/") \
    .option("checkpointLocation", "checkpoint/") \
    .trigger(processingTime="30 seconds") \
    .start()
問題 4 実践

リアルタイム売上集計

売上データ(timestamp, product, amount)をストリーミングで読み込み、5分間のウィンドウで商品別売上合計を集計するコードを書いてください。

【解答】
from pyspark.sql import functions as F
from pyspark.sql.types import *

schema = StructType([
    StructField("timestamp", TimestampType()),
    StructField("product", StringType()),
    StructField("amount", IntegerType())
])

sales = spark.readStream.schema(schema).json("sales/")

result = sales.groupBy(
    F.window("timestamp", "5 minutes"),
    "product"
).agg(
    F.sum("amount").alias("total_sales")
)

query = result.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

❓ よくある質問

Q1: バッチ処理とストリーミング処理、どちらを使うべき?
遅延が許容できるかで判断します。日次レポートならバッチ、リアルタイムダッシュボードならストリーミングです。
Q2: checkpointLocationは必須?
ファイル出力時は必須です。障害発生時に、どこまで処理したかを記録しておくために必要です。
Q3: Kafkaとは何?
Apache Kafkaは、リアルタイムデータパイプラインのためのメッセージングシステムです。大規模なストリーミング処理では、SparkとKafkaを組み合わせて使うことが多いです。
Q4: 遅延データ(Late Data)はどう扱う?
ウォーターマークという機能で、遅延データを一定期間待つことができます。本格的なストリーミングコースで学習してください。

📝 STEP 28 のまとめ

✅ このステップで学んだこと

ストリーミング処理:リアルタイムでデータを処理
readStream / writeStream:ストリーミングの読み書き
出力モード:append / complete / update
ウィンドウ集計:時間範囲でデータを集計
トリガー:処理の実行間隔を設定

🎉 Part 7(高度なトピック)完了!

STEP 27〜28で、Sparkの高度なトピックの入門を学びました。
・MLlib:機械学習の基礎
・構造化ストリーミング:リアルタイム処理の基礎

本格的に学びたい方は、それぞれの専門コースを受講してください。

🎯 次のステップの予告

次のPart 8(STEP 29〜31)は総合演習です。
これまで学んだ知識を活かして、本格的なプロジェクトに挑戦しましょう!

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 28

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE