📋 このプロジェクトで学ぶこと
- 1億レコードの大規模ログ分析の実践
- 正規表現を使ったログのパース(解析)方法
- データの前処理とクリーニング
- 時系列集計(時間帯別・曜日別分析)
- 異常検知(エラー多発・アクセス急増の検出)
- パフォーマンスチューニング
1. プロジェクト概要
1.1 あなたのミッション
📊 シナリオ
あなたは大手ECサイトのデータアナリストです。
1億レコード(約10GB)のアクセスログを分析し、以下を明らかにしてください:
- 時間帯別・曜日別のアクセス傾向
- 人気ページTOP 20
- エラーが多発している箇所
- 異常なアクセスパターンの検出
- ユーザーの行動パターン分析
1.2 データ形式の理解
Webサーバーは「アクセスログ」と呼ばれる記録を残します。今回はApache Combined Log Formatという形式のログを分析します。
💡 ログの各フィールドの意味
| フィールド |
意味 |
例 |
| IPアドレス |
アクセス元のコンピュータを識別 |
192.168.1.100 |
| タイムスタンプ |
アクセス日時 |
[20/Nov/2024:10:15:30 +0000] |
| HTTPメソッド |
リクエストの種類(GET=取得、POST=送信) |
GET, POST |
| URL |
アクセスされたページのパス |
/index.html |
| ステータスコード |
処理結果(200=成功、404=ページなし、500=サーバーエラー) |
200, 404, 500 |
| レスポンスサイズ |
返したデータのサイズ(バイト) |
1234 |
| リファラー |
どこから来たか(参照元URL) |
https://google.com |
| ユーザーエージェント |
ブラウザやデバイスの情報 |
Mozilla/5.0… |
実際のログは以下のような1行のテキストです:
192.168.1.100 – – [20/Nov/2024:10:15:30 +0000] “GET /index.html HTTP/1.1” 200 1234 “https://google.com” “Mozilla/5.0…”
1.3 成果物一覧
✅ このプロジェクトで作成するもの
- クリーニング済みデータ(Parquet形式)
- 時間帯別・曜日別の統計
- 人気ページTOP 20
- エラー分析レポート
- 異常検知リスト
- 疑わしいIPリスト
- 分析サマリー
2. SparkSessionの設定
2.1 最適化オプション付きでSparkを起動
まず、大規模データを効率的に処理するために、最適化オプションを有効にしてSparkSessionを作成します。
💡 各設定の意味
appName:アプリケーションの名前(管理画面で識別するため)
spark.sql.adaptive.enabled:実行時にクエリを最適化する機能を有効化
spark.sql.adaptive.coalescePartitions.enabled:パーティション数を自動調整する機能を有効化
以下のコードを入力してください(スマートフォンでは横スクロールできます):
# log_analysis_project.py
# ============================================
# 1. 必要なライブラリのインポート
# ============================================
# SparkSessionは、Sparkを使うための入口となるオブジェクト
from pyspark.sql import SparkSession
# データ加工に使う関数群
# col: カラム参照, regexp_extract: 正規表現で文字列抽出
# to_timestamp: 文字列を日時に変換, など多数の関数が含まれる
from pyspark.sql.functions import *
# データ型を定義するためのクラス群
from pyspark.sql.types import *
# ============================================
# 2. SparkSession の作成
# ============================================
# SparkSession.builderで設定を組み立てていく
spark = SparkSession.builder \
.appName(“LogAnalysisProject”) \
.config(“spark.sql.adaptive.enabled”, “true”) \
.config(“spark.sql.adaptive.coalescePartitions.enabled”, “true”) \
.getOrCreate()
# 設定完了メッセージを表示
print(“[INFO] SparkSession起動完了”)
📤 実行結果
[INFO] SparkSession起動完了
3. データの読み込み
3.1 ログファイルの読み込み
ログファイルは通常のテキストファイルです。spark.read.text()を使って1行ずつ読み込みます。
💡 なぜ text() を使うのか?
ログファイルはCSVやJSONのような構造化データではありません。
1行が1つの文字列として記録されているため、まずテキストとして読み込み、
その後、正規表現で必要な部分を抽出します。
# ============================================
# 3. ログファイルの読み込み
# ============================================
print(“[INFO] ログファイル読み込み開始”)
# text()メソッドでテキストファイルを読み込む
# ワイルドカード(*.log)で複数ファイルを一括読み込み
# 各行は “value” というカラムに格納される
logs_raw = spark.read.text(“gs://my-bucket/logs/*.log”)
# 読み込んだレコード数を確認
# count()はActionなので、ここで実際に処理が実行される
record_count = logs_raw.count()
print(f”[INFO] 読み込み完了: {record_count:,}件”)
💡 ローカル環境で試す場合
GCSではなくローカルファイルを使う場合は、パスを変更してください:
logs_raw = spark.read.text("./logs/*.log")
4. ログのパース(解析)
4.1 正規表現でログを分解する
ログは1行の文字列なので、「正規表現」を使って各フィールドを抽出します。
💡 正規表現とは?
文字列のパターンを表現する記法です。例えば:
\S+ → 空白以外の文字が1文字以上続く
\d+ → 数字が1文字以上続く
[^\]]+ → ] 以外の文字が1文字以上続く
"([^"]*)" → ダブルクォートで囲まれた文字列
Apache Combined Log Formatに対応した正規表現パターンを定義します:
# ============================================
# 4. ログのパース(正規表現で各フィールドを抽出)
# ============================================
print(“[INFO] ログ解析開始”)
# Apache Combined Log Formatに対応した正規表現パターン
# 各グループ(括弧で囲んだ部分)が1つのフィールドに対応
log_pattern = r'(\S+) – – \[([^\]]+)\] “(\S+) (\S+) (\S+)” (\d+) (\d+) “([^”]*)” “([^”]*)”‘
# パターンの意味:
# (\S+) → グループ1: IPアドレス(空白以外の連続)
# – – → 固定文字(認証情報、通常は – -)
# \[([^\]]+)\] → グループ2: タイムスタンプ([ ]で囲まれた部分)
# “(\S+) → グループ3: HTTPメソッド(GET, POSTなど)
# (\S+) → グループ4: URL
# (\S+)” → グループ5: プロトコル(HTTP/1.1など)
# (\d+) → グループ6: ステータスコード(数字)
# (\d+) → グループ7: レスポンスサイズ(数字)
# “([^”]*)” → グループ8: リファラー
# “([^”]*)” → グループ9: ユーザーエージェント
4.2 regexp_extract() で各フィールドを抽出
regexp_extract()関数を使って、正規表現の各グループからデータを取り出します。
💡 regexp_extract() の使い方
regexp_extract(カラム名, 正規表現, グループ番号)
グループ番号は1から始まります(最初の括弧が1)
# regexp_extract()で各フィールドを抽出
# select()で新しいカラムを作成
logs_parsed = logs_raw.select(
# グループ1からIPアドレスを抽出
regexp_extract(‘value’, log_pattern, 1).alias(‘ip’),
# グループ2からタイムスタンプ文字列を抽出
regexp_extract(‘value’, log_pattern, 2).alias(‘timestamp_str’),
# グループ3からHTTPメソッドを抽出
regexp_extract(‘value’, log_pattern, 3).alias(‘method’),
# グループ4からURLを抽出
regexp_extract(‘value’, log_pattern, 4).alias(‘url’),
# グループ5からプロトコルを抽出
regexp_extract(‘value’, log_pattern, 5).alias(‘protocol’),
# グループ6からステータスコードを抽出し、整数に変換
regexp_extract(‘value’, log_pattern, 6).cast(‘int’).alias(‘status’),
# グループ7からバイト数を抽出し、整数に変換
regexp_extract(‘value’, log_pattern, 7).cast(‘int’).alias(‘bytes’),
# グループ8からリファラーを抽出
regexp_extract(‘value’, log_pattern, 8).alias(‘referer’),
# グループ9からユーザーエージェントを抽出
regexp_extract(‘value’, log_pattern, 9).alias(‘user_agent’)
)
# スキーマ(データ構造)を確認
logs_parsed.printSchema()
📤 スキーマ確認結果
root
|-- ip: string (nullable = true)
|-- timestamp_str: string (nullable = true)
|-- method: string (nullable = true)
|-- url: string (nullable = true)
|-- protocol: string (nullable = true)
|-- status: integer (nullable = true)
|-- bytes: integer (nullable = true)
|-- referer: string (nullable = true)
|-- user_agent: string (nullable = true)
4.3 タイムスタンプを日時型に変換
文字列のタイムスタンプを、Sparkで日時計算ができる形式に変換します。
# タイムスタンプ文字列を日時型に変換
# to_timestamp(カラム, フォーマット)
# フォーマット: dd=日, MMM=月(英語略), yyyy=年, HH:mm:ss=時分秒
logs_with_time = logs_parsed.withColumn(
‘timestamp’,
to_timestamp(‘timestamp_str’, ‘dd/MMM/yyyy:HH:mm:ss’)
)
# 変換結果を確認(最初の5行を表示)
logs_with_time.select(‘timestamp_str’, ‘timestamp’).show(5, truncate=False)
5. データクリーニング
5.1 不正なレコードを除外
ログには、フォーマットが崩れた不正なレコードが含まれていることがあります。これらを除外します。
💡 クリーニングの必要性
不正なデータを残したまま分析すると、結果が歪んでしまいます。
例:ステータスコードがNULLのレコードは集計から除外すべきです。
# ============================================
# 5. データクリーニング
# ============================================
print(“[INFO] データクリーニング開始”)
# filter()で条件に合うレコードだけを残す
logs_clean = logs_with_time \
.filter(col(‘ip’).isNotNull()) \
.filter(col(‘timestamp’).isNotNull()) \
.filter(col(‘status’).isNotNull()) \
.filter(col(‘status’) > 0) \
.filter(col(‘bytes’) >= 0)
# 各フィルター条件の意味:
# ip.isNotNull() → IPアドレスが空でない
# timestamp.isNotNull() → タイムスタンプが正しく変換できた
# status.isNotNull() → ステータスコードが存在する
# status > 0 → ステータスコードが正の数
# bytes >= 0 → バイト数が0以上
5.2 URLをクリーニング
URLに付いているクエリパラメータ(?以降)を除去して、ページ単位で集計できるようにします。
# URLからクエリパラメータを除去
# 例: /product?id=123 → /product
logs_clean = logs_clean.withColumn(
‘url_clean’,
regexp_replace(‘url’, r’\?.*’, ”) # ?以降を空文字に置換
)
5.3 ボットアクセスを除外
検索エンジンのクローラー(ボット)は人間のアクセスではないため、分析から除外します。
# ボットアクセスを除外
# user_agentに「bot」「crawler」「spider」が含まれるものを除外
# rlike()は正規表現でマッチを確認
# (?i)は大文字小文字を区別しないオプション
logs_clean = logs_clean.filter(
~col(‘user_agent’).rlike(‘(?i)bot|crawler|spider’)
)
# クリーニング後の件数を確認
print(f”[INFO] クリーニング後: {logs_clean.count():,}件”)
5.4 分析用の追加カラムを生成
時間帯別・曜日別の分析のために、タイムスタンプから必要な情報を抽出します。
# ============================================
# 6. 追加カラムの生成
# ============================================
logs_enriched = logs_clean \
.withColumn(‘hour’, hour(‘timestamp’)) \
.withColumn(‘date’, to_date(‘timestamp’)) \
.withColumn(‘day_of_week’, dayofweek(‘timestamp’)) \
.withColumn(‘status_category’,
when(col(‘status’) < 300, ‘success’)
.when(col(‘status’) < 400, ‘redirect’)
.when(col(‘status’) < 500, ‘client_error’)
.otherwise(‘server_error’)
)
# 各カラムの意味:
# hour → 0〜23の時間(hour関数で抽出)
# date → 日付部分のみ(to_date関数で抽出)
# day_of_week → 曜日(1=日曜, 2=月曜, … 7=土曜)
# status_category → ステータスコードを4分類に変換
💡 ステータスコードの分類
| 範囲 |
分類 |
意味 |
| 100-299 |
success |
正常に処理完了 |
| 300-399 |
redirect |
リダイレクト(別ページへ転送) |
| 400-499 |
client_error |
クライアント側のエラー(404など) |
| 500-599 |
server_error |
サーバー側のエラー |
5.5 キャッシュして高速化
何度も使うDataFrameはキャッシュ(メモリに保持)することで、処理を高速化できます。
# DataFrameをメモリにキャッシュ
# 以降の処理で繰り返しアクセスするため、キャッシュしておく
logs_enriched.cache()
# count()を実行してキャッシュを確定
print(f”[INFO] エンリッチ完了: {logs_enriched.count():,}件”)
6. データ品質チェック
分析の前に、データの品質を確認しておきましょう。
# ============================================
# 7. データ品質チェック
# ============================================
print(“\n[INFO] データ品質チェック”)
# — NULL値の確認 —
# 各カラムのNULL値の数をカウント
null_counts = logs_enriched.select([
sum(col(c).isNull().cast(‘int’)).alias(c)
for c in [‘ip’, ‘timestamp’, ‘url’, ‘status’]
]).collect()[0].asDict()
print(“NULL値の数:”)
for col_name, count in null_counts.items():
print(f” {col_name}: {count:,}件”)
# — ステータスコード分布 —
print(“\nステータスコード分布:”)
logs_enriched.groupBy(‘status_category’) \
.count() \
.orderBy(desc(‘count’)) \
.show()
# — データ期間 —
print(“\nデータ期間:”)
logs_enriched.select(
min(‘date’).alias(‘開始日’),
max(‘date’).alias(‘終了日’)
).show()
📤 品質チェック結果例
NULL値の数:
ip: 0件
timestamp: 0件
url: 0件
status: 0件
ステータスコード分布:
+---------------+--------+
|status_category| count|
+---------------+--------+
| success|85000000|
| client_error|10000000|
| redirect| 4000000|
| server_error| 1000000|
+---------------+--------+
データ期間:
+----------+----------+
| 開始日| 終了日|
+----------+----------+
|2024-11-01|2024-11-30|
+----------+----------+
7. 時系列集計
7.1 時間帯別アクセス分析
何時にアクセスが多いかを分析します。ECサイトでは深夜より昼間の方がアクセスが多いはずです。
# ============================================
# 8. 時間帯別分析
# ============================================
print(“\n[INFO] 時間帯別分析開始”)
hourly_stats = logs_enriched \
.groupBy(‘hour’) \
.agg(
count(‘*’).alias(‘access_count’),
countDistinct(‘ip’).alias(‘unique_users’),
sum(when(col(‘status’) >= 400, 1).otherwise(0)).alias(‘error_count’),
avg(‘bytes’).alias(‘avg_bytes’)
) \
.orderBy(‘hour’)
# 各集計の意味:
# count(‘*’) → 総アクセス数
# countDistinct(‘ip’) → ユニークユーザー数(重複IPを除外)
# sum(when(…)) → エラー数(ステータス400以上をカウント)
# avg(‘bytes’) → 平均レスポンスサイズ
# 結果を表示
hourly_stats.show(24)
7.2 曜日別アクセス分析
曜日によるアクセス傾向の違いを分析します。
# ============================================
# 9. 曜日別分析
# ============================================
print(“\n[INFO] 曜日別分析開始”)
# 曜日番号を曜日名に変換しながら集計
daily_stats = logs_enriched \
.groupBy(‘day_of_week’) \
.agg(
count(‘*’).alias(‘access_count’),
countDistinct(‘ip’).alias(‘unique_users’),
avg(‘bytes’).alias(‘avg_bytes’)
) \
.withColumn(‘day_name’,
when(col(‘day_of_week’) == 1, ‘日曜’)
.when(col(‘day_of_week’) == 2, ‘月曜’)
.when(col(‘day_of_week’) == 3, ‘火曜’)
.when(col(‘day_of_week’) == 4, ‘水曜’)
.when(col(‘day_of_week’) == 5, ‘木曜’)
.when(col(‘day_of_week’) == 6, ‘金曜’)
.otherwise(‘土曜’)
) \
.orderBy(‘day_of_week’)
daily_stats.show()
7.3 人気ページTOP 20
最もアクセスされているページを特定します。
# ============================================
# 10. 人気ページ分析
# ============================================
print(“\n[INFO] 人気ページ分析開始”)
popular_pages = logs_enriched \
.filter(col(‘status’) == 200) \
.groupBy(‘url_clean’) \
.agg(
count(‘*’).alias(‘view_count’),
countDistinct(‘ip’).alias(‘unique_visitors’)
) \
.orderBy(desc(‘view_count’)) \
.limit(20)
# filter(status == 200) → 正常にアクセスできたページのみ集計
# desc(‘view_count’) → アクセス数の多い順に並べ替え
# limit(20) → 上位20件に絞る
print(“\n人気ページTOP 20:”)
popular_pages.show(20, truncate=False)
8. 異常検知
8.1 エラー多発ページの検出
エラーが多いページを特定し、問題の原因を調査する手がかりにします。
# ============================================
# 11. エラー分析
# ============================================
print(“\n[INFO] エラー分析開始”)
# ステータス400以上(エラー)のページを集計
error_pages = logs_enriched \
.filter(col(‘status’) >= 400) \
.groupBy(‘url_clean’, ‘status’) \
.agg(
count(‘*’).alias(‘error_count’)
) \
.orderBy(desc(‘error_count’)) \
.limit(20)
print(“\nエラー多発ページTOP 20:”)
error_pages.show(20, truncate=False)
# 404エラー(ページが見つからない)の分析
not_found = logs_enriched \
.filter(col(‘status’) == 404) \
.groupBy(‘url_clean’) \
.count() \
.orderBy(desc(‘count’)) \
.limit(10)
print(“\n404エラーTOP 10:”)
not_found.show(10, truncate=False)
8.2 アクセス急増の検出(統計的手法)
「平均 + 2σ(標準偏差の2倍)」を超えるアクセスを異常として検出します。
💡 統計的異常検知の考え方
正規分布では、平均±2σの範囲に約95%のデータが収まります。
つまり、この範囲を超えるアクセスは「通常とは異なる」と判断できます。
# ============================================
# 12. 異常なアクセスパターンの検出
# ============================================
print(“\n[INFO] 異常検知開始”)
# 1時間あたりのアクセス数を計算
hourly_access = logs_enriched \
.groupBy(‘date’, ‘hour’) \
.count() \
.withColumnRenamed(‘count’, ‘access_count’)
# 統計値(平均と標準偏差)を計算
stats = hourly_access.select(
avg(‘access_count’).alias(‘mean’),
stddev(‘access_count’).alias(‘stddev’)
).collect()[0]
mean_val = stats[‘mean’]
stddev_val = stats[‘stddev’]
# 閾値 = 平均 + 2×標準偏差
threshold = mean_val + (2 * stddev_val)
print(f”\n平均アクセス数: {mean_val:,.0f}”)
print(f”標準偏差: {stddev_val:,.0f}”)
print(f”異常閾値(平均+2σ): {threshold:,.0f}”)
# 閾値を超えるアクセスを検出
anomalies = hourly_access \
.filter(col(‘access_count’) > threshold) \
.orderBy(desc(‘access_count’))
print(f”\n異常検知: {anomalies.count()}件”)
anomalies.show(10)
8.3 疑わしいIPアドレスの検出
異常に多くアクセスしているIPや、エラー率が高いIPを検出します。
# ============================================
# 13. 疑わしいIPアドレスの検出
# ============================================
print(“\n[INFO] 疑わしいIP検出開始”)
suspicious_ips = logs_enriched \
.groupBy(‘ip’) \
.agg(
count(‘*’).alias(‘access_count’),
countDistinct(‘url_clean’).alias(‘unique_urls’),
sum(when(col(‘status’) >= 400, 1).otherwise(0)).alias(‘error_count’)
) \
.filter(
(col(‘access_count’) > 10000) |
((col(‘error_count’) / col(‘access_count’)) > 0.5)
) \
.orderBy(desc(‘access_count’))
# フィルター条件:
# access_count > 10000 → 1万回以上アクセス(異常に多い)
# error_count / access_count > 0.5 → エラー率50%以上(攻撃の可能性)
print(f”\n疑わしいIP: {suspicious_ips.count()}件”)
suspicious_ips.show(10)
9. 結果の保存
分析結果をParquet形式で保存します。Parquetは列指向フォーマットで、大規模データの保存・読み込みに最適です。
# ============================================
# 14. 結果の保存
# ============================================
print(“\n[INFO] 結果を保存中…”)
# 時間帯別統計を保存
hourly_stats.write \
.mode(‘overwrite’) \
.parquet(‘gs://my-bucket/output/hourly_stats.parquet’)
# 曜日別統計を保存
daily_stats.write \
.mode(‘overwrite’) \
.parquet(‘gs://my-bucket/output/daily_stats.parquet’)
# 人気ページを保存
popular_pages.write \
.mode(‘overwrite’) \
.parquet(‘gs://my-bucket/output/popular_pages.parquet’)
# エラー分析を保存
error_pages.write \
.mode(‘overwrite’) \
.parquet(‘gs://my-bucket/output/error_pages.parquet’)
# 異常検知結果を保存
anomalies.write \
.mode(‘overwrite’) \
.parquet(‘gs://my-bucket/output/anomalies.parquet’)
# 疑わしいIPを保存
suspicious_ips.write \
.mode(‘overwrite’) \
.parquet(‘gs://my-bucket/output/suspicious_ips.parquet’)
# 処理済みログ全体を日付でパーティション分割して保存
logs_enriched.write \
.mode(‘overwrite’) \
.partitionBy(‘date’) \
.parquet(‘gs://my-bucket/output/logs_processed.parquet’)
print(“[INFO] 保存完了”)
10. 分析サマリーの生成
# ============================================
# 15. 分析サマリーの生成
# ============================================
summary = {
“total_records”: logs_enriched.count(),
“date_range”: {
“start”: logs_enriched.select(min(‘date’)).collect()[0][0],
“end”: logs_enriched.select(max(‘date’)).collect()[0][0]
},
“unique_users”: logs_enriched.select(countDistinct(‘ip’)).collect()[0][0],
“total_errors”: logs_enriched.filter(col(‘status’) >= 400).count(),
“error_rate”: logs_enriched.filter(col(‘status’) >= 400).count() / logs_enriched.count(),
“avg_response_size”: logs_enriched.select(avg(‘bytes’)).collect()[0][0]
}
print(“\n” + “=”*50)
print(“📊 分析サマリー”)
print(“=”*50)
print(f”総レコード数: {summary[‘total_records’]:,}件”)
print(f”期間: {summary[‘date_range’][‘start’]} 〜 {summary[‘date_range’][‘end’]}”)
print(f”ユニークユーザー数: {summary[‘unique_users’]:,}人”)
print(f”総エラー数: {summary[‘total_errors’]:,}件”)
print(f”エラー率: {summary[‘error_rate’]:.2%}”)
print(f”平均レスポンスサイズ: {summary[‘avg_response_size’]:,.0f}バイト”)
print(“=”*50)
# SparkSessionを終了
spark.stop()
print(“\n[INFO] 処理完了”)
📤 サマリー出力例
==================================================
📊 分析サマリー
==================================================
総レコード数: 100,000,000件
期間: 2024-11-01 〜 2024-11-30
ユニークユーザー数: 5,000,000人
総エラー数: 11,000,000件
エラー率: 11.00%
平均レスポンスサイズ: 15,234バイト
==================================================
[INFO] 処理完了
11. まとめ
✅ このプロジェクトで学んだこと
- 正規表現を使ったログのパース
- データクリーニングの重要性(NULL除外、ボット除外)
- 時系列集計(時間帯別・曜日別)
- 異常検知(統計的手法:平均+2σ)
- Parquet形式での効率的な保存
- キャッシュによる処理高速化
🎯 成果物チェックリスト
- □ クリーニング済みデータ(Parquet形式)
- □ 時間帯別統計
- □ 曜日別統計
- □ 人気ページTOP 20
- □ エラー分析レポート
- □ 異常検知リスト
- □ 疑わしいIPリスト
- □ 分析サマリー
📝 次のステップ
お疲れ様でした!1億レコードのログ分析プロジェクトを完了しました。
次のSTEP 30では、複数のデータソースを統合するETLパイプライン構築に挑戦します。
artnasekai
#artnasekai #学習メモ