STEP 29:プロジェクト① 大規模ログ分析

🎯 STEP 29: プロジェクト① 大規模ログ分析

1億レコードのアクセスログを分析しよう – 総合演習で実力を試す

📋 このプロジェクトで学ぶこと
  • 1億レコードの大規模ログ分析の実践
  • 正規表現を使ったログのパース(解析)方法
  • データの前処理とクリーニング
  • 時系列集計(時間帯別・曜日別分析)
  • 異常検知(エラー多発・アクセス急増の検出)
  • パフォーマンスチューニング

1. プロジェクト概要

1.1 あなたのミッション

📊 シナリオ

あなたは大手ECサイトのデータアナリストです。
1億レコード(約10GB)のアクセスログを分析し、以下を明らかにしてください:

  1. 時間帯別・曜日別のアクセス傾向
  2. 人気ページTOP 20
  3. エラーが多発している箇所
  4. 異常なアクセスパターンの検出
  5. ユーザーの行動パターン分析

1.2 データ形式の理解

Webサーバーは「アクセスログ」と呼ばれる記録を残します。今回はApache Combined Log Formatという形式のログを分析します。

💡 ログの各フィールドの意味
フィールド 意味
IPアドレス アクセス元のコンピュータを識別 192.168.1.100
タイムスタンプ アクセス日時 [20/Nov/2024:10:15:30 +0000]
HTTPメソッド リクエストの種類(GET=取得、POST=送信) GET, POST
URL アクセスされたページのパス /index.html
ステータスコード 処理結果(200=成功、404=ページなし、500=サーバーエラー) 200, 404, 500
レスポンスサイズ 返したデータのサイズ(バイト) 1234
リファラー どこから来たか(参照元URL) https://google.com
ユーザーエージェント ブラウザやデバイスの情報 Mozilla/5.0…

実際のログは以下のような1行のテキストです:

192.168.1.100 – – [20/Nov/2024:10:15:30 +0000] “GET /index.html HTTP/1.1” 200 1234 “https://google.com” “Mozilla/5.0…”

1.3 成果物一覧

✅ このプロジェクトで作成するもの
  • クリーニング済みデータ(Parquet形式)
  • 時間帯別・曜日別の統計
  • 人気ページTOP 20
  • エラー分析レポート
  • 異常検知リスト
  • 疑わしいIPリスト
  • 分析サマリー

2. SparkSessionの設定

2.1 最適化オプション付きでSparkを起動

まず、大規模データを効率的に処理するために、最適化オプションを有効にしてSparkSessionを作成します。

💡 各設定の意味

appName:アプリケーションの名前(管理画面で識別するため)

spark.sql.adaptive.enabled:実行時にクエリを最適化する機能を有効化

spark.sql.adaptive.coalescePartitions.enabled:パーティション数を自動調整する機能を有効化

以下のコードを入力してください(スマートフォンでは横スクロールできます):

# log_analysis_project.py # ============================================ # 1. 必要なライブラリのインポート # ============================================ # SparkSessionは、Sparkを使うための入口となるオブジェクト from pyspark.sql import SparkSession # データ加工に使う関数群 # col: カラム参照, regexp_extract: 正規表現で文字列抽出 # to_timestamp: 文字列を日時に変換, など多数の関数が含まれる from pyspark.sql.functions import * # データ型を定義するためのクラス群 from pyspark.sql.types import * # ============================================ # 2. SparkSession の作成 # ============================================ # SparkSession.builderで設定を組み立てていく spark = SparkSession.builder \ .appName(“LogAnalysisProject”) \ .config(“spark.sql.adaptive.enabled”, “true”) \ .config(“spark.sql.adaptive.coalescePartitions.enabled”, “true”) \ .getOrCreate() # 設定完了メッセージを表示 print(“[INFO] SparkSession起動完了”)
📤 実行結果
[INFO] SparkSession起動完了

3. データの読み込み

3.1 ログファイルの読み込み

ログファイルは通常のテキストファイルです。spark.read.text()を使って1行ずつ読み込みます。

💡 なぜ text() を使うのか?

ログファイルはCSVやJSONのような構造化データではありません。
1行が1つの文字列として記録されているため、まずテキストとして読み込み、
その後、正規表現で必要な部分を抽出します。

# ============================================ # 3. ログファイルの読み込み # ============================================ print(“[INFO] ログファイル読み込み開始”) # text()メソッドでテキストファイルを読み込む # ワイルドカード(*.log)で複数ファイルを一括読み込み # 各行は “value” というカラムに格納される logs_raw = spark.read.text(“gs://my-bucket/logs/*.log”) # 読み込んだレコード数を確認 # count()はActionなので、ここで実際に処理が実行される record_count = logs_raw.count() print(f”[INFO] 読み込み完了: {record_count:,}件”)
💡 ローカル環境で試す場合

GCSではなくローカルファイルを使う場合は、パスを変更してください:
logs_raw = spark.read.text("./logs/*.log")

4. ログのパース(解析)

4.1 正規表現でログを分解する

ログは1行の文字列なので、「正規表現」を使って各フィールドを抽出します。

💡 正規表現とは?

文字列のパターンを表現する記法です。例えば:

  • \S+ → 空白以外の文字が1文字以上続く
  • \d+ → 数字が1文字以上続く
  • [^\]]+ → ] 以外の文字が1文字以上続く
  • "([^"]*)" → ダブルクォートで囲まれた文字列

Apache Combined Log Formatに対応した正規表現パターンを定義します:

# ============================================ # 4. ログのパース(正規表現で各フィールドを抽出) # ============================================ print(“[INFO] ログ解析開始”) # Apache Combined Log Formatに対応した正規表現パターン # 各グループ(括弧で囲んだ部分)が1つのフィールドに対応 log_pattern = r'(\S+) – – \[([^\]]+)\] “(\S+) (\S+) (\S+)” (\d+) (\d+) “([^”]*)” “([^”]*)”‘ # パターンの意味: # (\S+) → グループ1: IPアドレス(空白以外の連続) # – – → 固定文字(認証情報、通常は – -) # \[([^\]]+)\] → グループ2: タイムスタンプ([ ]で囲まれた部分) # “(\S+) → グループ3: HTTPメソッド(GET, POSTなど) # (\S+) → グループ4: URL # (\S+)” → グループ5: プロトコル(HTTP/1.1など) # (\d+) → グループ6: ステータスコード(数字) # (\d+) → グループ7: レスポンスサイズ(数字) # “([^”]*)” → グループ8: リファラー # “([^”]*)” → グループ9: ユーザーエージェント

4.2 regexp_extract() で各フィールドを抽出

regexp_extract()関数を使って、正規表現の各グループからデータを取り出します。

💡 regexp_extract() の使い方

regexp_extract(カラム名, 正規表現, グループ番号)
グループ番号は1から始まります(最初の括弧が1)

# regexp_extract()で各フィールドを抽出 # select()で新しいカラムを作成 logs_parsed = logs_raw.select( # グループ1からIPアドレスを抽出 regexp_extract(‘value’, log_pattern, 1).alias(‘ip’), # グループ2からタイムスタンプ文字列を抽出 regexp_extract(‘value’, log_pattern, 2).alias(‘timestamp_str’), # グループ3からHTTPメソッドを抽出 regexp_extract(‘value’, log_pattern, 3).alias(‘method’), # グループ4からURLを抽出 regexp_extract(‘value’, log_pattern, 4).alias(‘url’), # グループ5からプロトコルを抽出 regexp_extract(‘value’, log_pattern, 5).alias(‘protocol’), # グループ6からステータスコードを抽出し、整数に変換 regexp_extract(‘value’, log_pattern, 6).cast(‘int’).alias(‘status’), # グループ7からバイト数を抽出し、整数に変換 regexp_extract(‘value’, log_pattern, 7).cast(‘int’).alias(‘bytes’), # グループ8からリファラーを抽出 regexp_extract(‘value’, log_pattern, 8).alias(‘referer’), # グループ9からユーザーエージェントを抽出 regexp_extract(‘value’, log_pattern, 9).alias(‘user_agent’) ) # スキーマ(データ構造)を確認 logs_parsed.printSchema()
📤 スキーマ確認結果
root
 |-- ip: string (nullable = true)
 |-- timestamp_str: string (nullable = true)
 |-- method: string (nullable = true)
 |-- url: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- bytes: integer (nullable = true)
 |-- referer: string (nullable = true)
 |-- user_agent: string (nullable = true)

4.3 タイムスタンプを日時型に変換

文字列のタイムスタンプを、Sparkで日時計算ができる形式に変換します。

# タイムスタンプ文字列を日時型に変換 # to_timestamp(カラム, フォーマット) # フォーマット: dd=日, MMM=月(英語略), yyyy=年, HH:mm:ss=時分秒 logs_with_time = logs_parsed.withColumn( ‘timestamp’, to_timestamp(‘timestamp_str’, ‘dd/MMM/yyyy:HH:mm:ss’) ) # 変換結果を確認(最初の5行を表示) logs_with_time.select(‘timestamp_str’, ‘timestamp’).show(5, truncate=False)

5. データクリーニング

5.1 不正なレコードを除外

ログには、フォーマットが崩れた不正なレコードが含まれていることがあります。これらを除外します。

💡 クリーニングの必要性

不正なデータを残したまま分析すると、結果が歪んでしまいます。
例:ステータスコードがNULLのレコードは集計から除外すべきです。

# ============================================ # 5. データクリーニング # ============================================ print(“[INFO] データクリーニング開始”) # filter()で条件に合うレコードだけを残す logs_clean = logs_with_time \ .filter(col(‘ip’).isNotNull()) \ .filter(col(‘timestamp’).isNotNull()) \ .filter(col(‘status’).isNotNull()) \ .filter(col(‘status’) > 0) \ .filter(col(‘bytes’) >= 0) # 各フィルター条件の意味: # ip.isNotNull() → IPアドレスが空でない # timestamp.isNotNull() → タイムスタンプが正しく変換できた # status.isNotNull() → ステータスコードが存在する # status > 0 → ステータスコードが正の数 # bytes >= 0 → バイト数が0以上

5.2 URLをクリーニング

URLに付いているクエリパラメータ(?以降)を除去して、ページ単位で集計できるようにします。

# URLからクエリパラメータを除去 # 例: /product?id=123 → /product logs_clean = logs_clean.withColumn( ‘url_clean’, regexp_replace(‘url’, r’\?.*’, ”) # ?以降を空文字に置換 )

5.3 ボットアクセスを除外

検索エンジンのクローラー(ボット)は人間のアクセスではないため、分析から除外します。

# ボットアクセスを除外 # user_agentに「bot」「crawler」「spider」が含まれるものを除外 # rlike()は正規表現でマッチを確認 # (?i)は大文字小文字を区別しないオプション logs_clean = logs_clean.filter( ~col(‘user_agent’).rlike(‘(?i)bot|crawler|spider’) ) # クリーニング後の件数を確認 print(f”[INFO] クリーニング後: {logs_clean.count():,}件”)

5.4 分析用の追加カラムを生成

時間帯別・曜日別の分析のために、タイムスタンプから必要な情報を抽出します。

# ============================================ # 6. 追加カラムの生成 # ============================================ logs_enriched = logs_clean \ .withColumn(‘hour’, hour(‘timestamp’)) \ .withColumn(‘date’, to_date(‘timestamp’)) \ .withColumn(‘day_of_week’, dayofweek(‘timestamp’)) \ .withColumn(‘status_category’, when(col(‘status’) < 300, ‘success’) .when(col(‘status’) < 400, ‘redirect’) .when(col(‘status’) < 500, ‘client_error’) .otherwise(‘server_error’) ) # 各カラムの意味: # hour → 0〜23の時間(hour関数で抽出) # date → 日付部分のみ(to_date関数で抽出) # day_of_week → 曜日(1=日曜, 2=月曜, … 7=土曜) # status_category → ステータスコードを4分類に変換
💡 ステータスコードの分類
範囲 分類 意味
100-299 success 正常に処理完了
300-399 redirect リダイレクト(別ページへ転送)
400-499 client_error クライアント側のエラー(404など)
500-599 server_error サーバー側のエラー

5.5 キャッシュして高速化

何度も使うDataFrameはキャッシュ(メモリに保持)することで、処理を高速化できます。

# DataFrameをメモリにキャッシュ # 以降の処理で繰り返しアクセスするため、キャッシュしておく logs_enriched.cache() # count()を実行してキャッシュを確定 print(f”[INFO] エンリッチ完了: {logs_enriched.count():,}件”)

6. データ品質チェック

分析の前に、データの品質を確認しておきましょう。

# ============================================ # 7. データ品質チェック # ============================================ print(“\n[INFO] データ品質チェック”) # — NULL値の確認 — # 各カラムのNULL値の数をカウント null_counts = logs_enriched.select([ sum(col(c).isNull().cast(‘int’)).alias(c) for c in [‘ip’, ‘timestamp’, ‘url’, ‘status’] ]).collect()[0].asDict() print(“NULL値の数:”) for col_name, count in null_counts.items(): print(f” {col_name}: {count:,}件”) # — ステータスコード分布 — print(“\nステータスコード分布:”) logs_enriched.groupBy(‘status_category’) \ .count() \ .orderBy(desc(‘count’)) \ .show() # — データ期間 — print(“\nデータ期間:”) logs_enriched.select( min(‘date’).alias(‘開始日’), max(‘date’).alias(‘終了日’) ).show()
📤 品質チェック結果例
NULL値の数:
  ip: 0件
  timestamp: 0件
  url: 0件
  status: 0件

ステータスコード分布:
+---------------+--------+
|status_category|   count|
+---------------+--------+
|        success|85000000|
|   client_error|10000000|
|       redirect| 4000000|
|   server_error| 1000000|
+---------------+--------+

データ期間:
+----------+----------+
|    開始日|    終了日|
+----------+----------+
|2024-11-01|2024-11-30|
+----------+----------+

7. 時系列集計

7.1 時間帯別アクセス分析

何時にアクセスが多いかを分析します。ECサイトでは深夜より昼間の方がアクセスが多いはずです。

# ============================================ # 8. 時間帯別分析 # ============================================ print(“\n[INFO] 時間帯別分析開始”) hourly_stats = logs_enriched \ .groupBy(‘hour’) \ .agg( count(‘*’).alias(‘access_count’), countDistinct(‘ip’).alias(‘unique_users’), sum(when(col(‘status’) >= 400, 1).otherwise(0)).alias(‘error_count’), avg(‘bytes’).alias(‘avg_bytes’) ) \ .orderBy(‘hour’) # 各集計の意味: # count(‘*’) → 総アクセス数 # countDistinct(‘ip’) → ユニークユーザー数(重複IPを除外) # sum(when(…)) → エラー数(ステータス400以上をカウント) # avg(‘bytes’) → 平均レスポンスサイズ # 結果を表示 hourly_stats.show(24)

7.2 曜日別アクセス分析

曜日によるアクセス傾向の違いを分析します。

# ============================================ # 9. 曜日別分析 # ============================================ print(“\n[INFO] 曜日別分析開始”) # 曜日番号を曜日名に変換しながら集計 daily_stats = logs_enriched \ .groupBy(‘day_of_week’) \ .agg( count(‘*’).alias(‘access_count’), countDistinct(‘ip’).alias(‘unique_users’), avg(‘bytes’).alias(‘avg_bytes’) ) \ .withColumn(‘day_name’, when(col(‘day_of_week’) == 1, ‘日曜’) .when(col(‘day_of_week’) == 2, ‘月曜’) .when(col(‘day_of_week’) == 3, ‘火曜’) .when(col(‘day_of_week’) == 4, ‘水曜’) .when(col(‘day_of_week’) == 5, ‘木曜’) .when(col(‘day_of_week’) == 6, ‘金曜’) .otherwise(‘土曜’) ) \ .orderBy(‘day_of_week’) daily_stats.show()

7.3 人気ページTOP 20

最もアクセスされているページを特定します。

# ============================================ # 10. 人気ページ分析 # ============================================ print(“\n[INFO] 人気ページ分析開始”) popular_pages = logs_enriched \ .filter(col(‘status’) == 200) \ .groupBy(‘url_clean’) \ .agg( count(‘*’).alias(‘view_count’), countDistinct(‘ip’).alias(‘unique_visitors’) ) \ .orderBy(desc(‘view_count’)) \ .limit(20) # filter(status == 200) → 正常にアクセスできたページのみ集計 # desc(‘view_count’) → アクセス数の多い順に並べ替え # limit(20) → 上位20件に絞る print(“\n人気ページTOP 20:”) popular_pages.show(20, truncate=False)

8. 異常検知

8.1 エラー多発ページの検出

エラーが多いページを特定し、問題の原因を調査する手がかりにします。

# ============================================ # 11. エラー分析 # ============================================ print(“\n[INFO] エラー分析開始”) # ステータス400以上(エラー)のページを集計 error_pages = logs_enriched \ .filter(col(‘status’) >= 400) \ .groupBy(‘url_clean’, ‘status’) \ .agg( count(‘*’).alias(‘error_count’) ) \ .orderBy(desc(‘error_count’)) \ .limit(20) print(“\nエラー多発ページTOP 20:”) error_pages.show(20, truncate=False) # 404エラー(ページが見つからない)の分析 not_found = logs_enriched \ .filter(col(‘status’) == 404) \ .groupBy(‘url_clean’) \ .count() \ .orderBy(desc(‘count’)) \ .limit(10) print(“\n404エラーTOP 10:”) not_found.show(10, truncate=False)

8.2 アクセス急増の検出(統計的手法)

「平均 + 2σ(標準偏差の2倍)」を超えるアクセスを異常として検出します。

💡 統計的異常検知の考え方

正規分布では、平均±2σの範囲に約95%のデータが収まります。
つまり、この範囲を超えるアクセスは「通常とは異なる」と判断できます。

# ============================================ # 12. 異常なアクセスパターンの検出 # ============================================ print(“\n[INFO] 異常検知開始”) # 1時間あたりのアクセス数を計算 hourly_access = logs_enriched \ .groupBy(‘date’, ‘hour’) \ .count() \ .withColumnRenamed(‘count’, ‘access_count’) # 統計値(平均と標準偏差)を計算 stats = hourly_access.select( avg(‘access_count’).alias(‘mean’), stddev(‘access_count’).alias(‘stddev’) ).collect()[0] mean_val = stats[‘mean’] stddev_val = stats[‘stddev’] # 閾値 = 平均 + 2×標準偏差 threshold = mean_val + (2 * stddev_val) print(f”\n平均アクセス数: {mean_val:,.0f}”) print(f”標準偏差: {stddev_val:,.0f}”) print(f”異常閾値(平均+2σ): {threshold:,.0f}”) # 閾値を超えるアクセスを検出 anomalies = hourly_access \ .filter(col(‘access_count’) > threshold) \ .orderBy(desc(‘access_count’)) print(f”\n異常検知: {anomalies.count()}件”) anomalies.show(10)

8.3 疑わしいIPアドレスの検出

異常に多くアクセスしているIPや、エラー率が高いIPを検出します。

# ============================================ # 13. 疑わしいIPアドレスの検出 # ============================================ print(“\n[INFO] 疑わしいIP検出開始”) suspicious_ips = logs_enriched \ .groupBy(‘ip’) \ .agg( count(‘*’).alias(‘access_count’), countDistinct(‘url_clean’).alias(‘unique_urls’), sum(when(col(‘status’) >= 400, 1).otherwise(0)).alias(‘error_count’) ) \ .filter( (col(‘access_count’) > 10000) | ((col(‘error_count’) / col(‘access_count’)) > 0.5) ) \ .orderBy(desc(‘access_count’)) # フィルター条件: # access_count > 10000 → 1万回以上アクセス(異常に多い) # error_count / access_count > 0.5 → エラー率50%以上(攻撃の可能性) print(f”\n疑わしいIP: {suspicious_ips.count()}件”) suspicious_ips.show(10)

9. 結果の保存

分析結果をParquet形式で保存します。Parquetは列指向フォーマットで、大規模データの保存・読み込みに最適です。

# ============================================ # 14. 結果の保存 # ============================================ print(“\n[INFO] 結果を保存中…”) # 時間帯別統計を保存 hourly_stats.write \ .mode(‘overwrite’) \ .parquet(‘gs://my-bucket/output/hourly_stats.parquet’) # 曜日別統計を保存 daily_stats.write \ .mode(‘overwrite’) \ .parquet(‘gs://my-bucket/output/daily_stats.parquet’) # 人気ページを保存 popular_pages.write \ .mode(‘overwrite’) \ .parquet(‘gs://my-bucket/output/popular_pages.parquet’) # エラー分析を保存 error_pages.write \ .mode(‘overwrite’) \ .parquet(‘gs://my-bucket/output/error_pages.parquet’) # 異常検知結果を保存 anomalies.write \ .mode(‘overwrite’) \ .parquet(‘gs://my-bucket/output/anomalies.parquet’) # 疑わしいIPを保存 suspicious_ips.write \ .mode(‘overwrite’) \ .parquet(‘gs://my-bucket/output/suspicious_ips.parquet’) # 処理済みログ全体を日付でパーティション分割して保存 logs_enriched.write \ .mode(‘overwrite’) \ .partitionBy(‘date’) \ .parquet(‘gs://my-bucket/output/logs_processed.parquet’) print(“[INFO] 保存完了”)

10. 分析サマリーの生成

# ============================================ # 15. 分析サマリーの生成 # ============================================ summary = { “total_records”: logs_enriched.count(), “date_range”: { “start”: logs_enriched.select(min(‘date’)).collect()[0][0], “end”: logs_enriched.select(max(‘date’)).collect()[0][0] }, “unique_users”: logs_enriched.select(countDistinct(‘ip’)).collect()[0][0], “total_errors”: logs_enriched.filter(col(‘status’) >= 400).count(), “error_rate”: logs_enriched.filter(col(‘status’) >= 400).count() / logs_enriched.count(), “avg_response_size”: logs_enriched.select(avg(‘bytes’)).collect()[0][0] } print(“\n” + “=”*50) print(“📊 分析サマリー”) print(“=”*50) print(f”総レコード数: {summary[‘total_records’]:,}件”) print(f”期間: {summary[‘date_range’][‘start’]} 〜 {summary[‘date_range’][‘end’]}”) print(f”ユニークユーザー数: {summary[‘unique_users’]:,}人”) print(f”総エラー数: {summary[‘total_errors’]:,}件”) print(f”エラー率: {summary[‘error_rate’]:.2%}”) print(f”平均レスポンスサイズ: {summary[‘avg_response_size’]:,.0f}バイト”) print(“=”*50) # SparkSessionを終了 spark.stop() print(“\n[INFO] 処理完了”)
📤 サマリー出力例
==================================================
📊 分析サマリー
==================================================
総レコード数: 100,000,000件
期間: 2024-11-01 〜 2024-11-30
ユニークユーザー数: 5,000,000人
総エラー数: 11,000,000件
エラー率: 11.00%
平均レスポンスサイズ: 15,234バイト
==================================================

[INFO] 処理完了

11. まとめ

✅ このプロジェクトで学んだこと
  • 正規表現を使ったログのパース
  • データクリーニングの重要性(NULL除外、ボット除外)
  • 時系列集計(時間帯別・曜日別)
  • 異常検知(統計的手法:平均+2σ)
  • Parquet形式での効率的な保存
  • キャッシュによる処理高速化
🎯 成果物チェックリスト
  • □ クリーニング済みデータ(Parquet形式)
  • □ 時間帯別統計
  • □ 曜日別統計
  • □ 人気ページTOP 20
  • □ エラー分析レポート
  • □ 異常検知リスト
  • □ 疑わしいIPリスト
  • □ 分析サマリー
📝 次のステップ

お疲れ様でした!1億レコードのログ分析プロジェクトを完了しました。
次のSTEP 30では、複数のデータソースを統合するETLパイプライン構築に挑戦します。

📝

学習メモ

ビッグデータ処理(Apache Spark) - Step 29

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE