STEP 18:パフォーマンス最適化

⚡ STEP 18: パフォーマンス最適化

大量データを高速に処理する技術を身につけよう

📋 このステップで学ぶこと

  • チャンクによる分割処理
  • マルチプロセス処理(並列化)
  • メモリ効率化のテクニック
  • SQL最適化とETLの連携
  • 実践演習:100万件データの高速処理

⏱️ 学習時間の目安:2時間

📝 練習問題:10問(基礎4問・応用4問・発展2問)

🎯 1. パフォーマンス最適化が必要な理由

1-1. 実務でのよくある問題

📚 例え話:引っ越しの荷物運び

1000個の荷物を運ぶとき、どちらが効率的でしょうか?

❌ 最適化なし:1人で1個ずつ運ぶ → 1000往復
✅ チャンク処理:台車に10個ずつ乗せて運ぶ → 100往復
✅ 並列処理:4人で同時に運ぶ → さらに4倍速く

データ処理も同じです。賢く運ぶ方法を知ることで、
処理時間を大幅に短縮できます。

😱 パフォーマンスが悪いとこうなる
  • 100万件のデータ処理に1時間かかる
  • メモリ不足でプログラムが落ちる
  • 夜間バッチが朝までに終わらない
  • データベースが重くなる
✅ 最適化するとこうなる
  • 100万件のデータ処理が5分で完了
  • メモリ使用量が10分の1になる
  • 夜間バッチが30分で完了
  • データベースへの負荷が軽減される

1-2. 最適化の4つの柱

パフォーマンス最適化の手法
最適化の種類 手法 効果 難易度
分割処理 チャンク読み込み メモリ90%削減 ★☆☆(簡単)
並列化 マルチプロセス 速度2〜4倍 ★★☆(普通)
メモリ最適化 データ型の最適化 メモリ50〜90%削減 ★☆☆(簡単)
SQL最適化 インデックス活用 DB負荷軽減 ★★☆(普通)

📦 2. チャンクによる分割処理

2-1. チャンクとは?

チャンク(chunk)とは、大量データを小さなかたまりに分けて処理することです。
一度に全データをメモリに読み込まず、少しずつ処理します。

🍰 ケーキを食べる例

❌ 一度に全部食べる:お腹がパンパンになる(メモリ不足)
✅ 少しずつ食べる:快適に完食できる(チャンク処理)

2-2. CSVファイルをチャンクで読み込み

# ===== チャンク処理の比較 ===== import pandas as pd import numpy as np import time # 大量データを作成(100万件) print(“テストデータを作成中…”) large_df = pd.DataFrame({ ‘ID’: range(1, 1000001), ‘名前’: [f’ユーザー{i}’ for i in range(1, 1000001)], ‘年齢’: np.random.randint(20, 70, 1000000), ‘購入金額’: np.random.randint(1000, 100000, 1000000) }) large_df.to_csv(‘large_data.csv’, index=False) print(f”✅ {len(large_df):,}件のデータを作成しました\n”) # ❌ 一度に全部読み込む(メモリを大量に使う) print(“【方法1】一度に全部読み込む”) start = time.time() df_all = pd.read_csv(‘large_data.csv’) print(f”読み込み時間: {time.time() – start:.2f}秒”) print(f”メモリ使用量: {df_all.memory_usage(deep=True).sum() / 1024**2:.2f} MB\n”) # ✅ チャンクで読み込む(メモリ効率的) print(“【方法2】チャンクで読み込む(10万件ずつ)”) start = time.time() total_sum = 0 chunk_count = 0 for chunk in pd.read_csv(‘large_data.csv’, chunksize=100000): # チャンクごとに処理 total_sum += chunk[‘購入金額’].sum() chunk_count += 1 print(f” チャンク{chunk_count}: {len(chunk):,}件処理完了”) print(f”\n読み込み時間: {time.time() – start:.2f}秒”) print(f”合計購入金額: ¥{total_sum:,}”)
【実行結果】 テストデータを作成中… ✅ 1,000,000件のデータを作成しました 【方法1】一度に全部読み込む 読み込み時間: 3.45秒 メモリ使用量: 156.32 MB 【方法2】チャンクで読み込む(10万件ずつ) チャンク1: 100,000件処理完了 チャンク2: 100,000件処理完了 チャンク3: 100,000件処理完了 チャンク4: 100,000件処理完了 チャンク5: 100,000件処理完了 チャンク6: 100,000件処理完了 チャンク7: 100,000件処理完了 チャンク8: 100,000件処理完了 チャンク9: 100,000件処理完了 チャンク10: 100,000件処理完了 読み込み時間: 3.52秒 合計購入金額: ¥50,499,234,567

2-3. チャンクで変換して保存

# ===== チャンクで変換して保存 ===== def process_large_file_in_chunks(input_file, output_file, chunksize=100000): “”” 大きなCSVファイルをチャンクで処理して保存 Parameters: ———– input_file : str 入力ファイル output_file : str 出力ファイル chunksize : int チャンクサイズ “”” first_chunk = True for i, chunk in enumerate(pd.read_csv(input_file, chunksize=chunksize)): # データ変換 chunk[‘購入金額(税込)’] = (chunk[‘購入金額’] * 1.1).astype(int) chunk[‘年齢層’] = pd.cut( chunk[‘年齢’], bins=[0, 30, 50, 100], labels=[‘若年層’, ‘中年層’, ‘シニア層’] ) # 保存(最初のチャンクはヘッダー付き、以降は追記) mode = ‘w’ if first_chunk else ‘a’ header = first_chunk chunk.to_csv(output_file, mode=mode, header=header, index=False) first_chunk = False print(f”✅ チャンク{i+1}: {len(chunk):,}件処理完了”) print(f”\n全チャンクの処理が完了しました”) # 実行 process_large_file_in_chunks(‘large_data.csv’, ‘processed_data.csv’, chunksize=100000)
【実行結果】 ✅ チャンク1: 100,000件処理完了 ✅ チャンク2: 100,000件処理完了 ✅ チャンク3: 100,000件処理完了 … ✅ チャンク10: 100,000件処理完了 全チャンクの処理が完了しました
チャンクサイズの目安
チャンクサイズ メモリ目安 使い所
1,000件 〜1MB メモリが非常に少ない環境
10,000件 〜10MB 一般的なノートPC
100,000件 〜100MB 推奨(バランスが良い)
500,000件 〜500MB メモリに余裕がある環境

🔀 3. マルチプロセス処理(並列化)

3-1. 並列処理とは?

並列処理とは、複数のCPUコアを使って同時に処理することです。
4コアのCPUなら、理論上4倍速くなります。

👥 レストランの例

❌ 1人のシェフ:10皿作るのに10分
✅ 4人のシェフ:10皿作るのに2.5分(4倍速い)

3-2. 基本的な並列処理

# ===== 基本的な並列処理 ===== from multiprocessing import Pool import time import numpy as np def process_chunk(data): “””チャンクを処理する関数””” # 重い処理をシミュレート result = np.sum(data ** 2) return result # テストデータ data = np.random.randint(1, 100, 10000000) chunks = np.array_split(data, 8) # 8つに分割 # ❌ 逐次処理(1つずつ) print(“【逐次処理】”) start = time.time() results_sequential = [] for chunk in chunks: result = process_chunk(chunk) results_sequential.append(result) time_sequential = time.time() – start print(f”処理時間: {time_sequential:.2f}秒\n”) # ✅ 並列処理(同時に) print(“【並列処理】(4プロセス)”) start = time.time() with Pool(processes=4) as pool: results_parallel = pool.map(process_chunk, chunks) time_parallel = time.time() – start print(f”処理時間: {time_parallel:.2f}秒”) print(f”速度向上: {time_sequential/time_parallel:.1f}倍”)
【実行結果】 【逐次処理】 処理時間: 8.45秒 【並列処理】(4プロセス) 処理時間: 2.34秒 速度向上: 3.6倍

3-3. DataFrameを並列処理

# ===== DataFrameを並列処理 ===== import pandas as pd from multiprocessing import Pool import numpy as np import time def process_dataframe_chunk(chunk): “””DataFrameのチャンクを処理””” # データ変換 chunk[‘購入金額(税込)’] = chunk[‘購入金額’] * 1.1 chunk[‘ポイント’] = (chunk[‘購入金額’] * 0.01).astype(int) # 年齢層の分類 chunk[‘年齢層’] = pd.cut( chunk[‘年齢’], bins=[0, 30, 50, 100], labels=[‘若年層’, ‘中年層’, ‘シニア層’] ) return chunk def parallel_process_dataframe(df, n_processes=4): “””DataFrameを並列処理””” # DataFrameを分割 chunks = np.array_split(df, n_processes) # 並列処理 with Pool(processes=n_processes) as pool: processed_chunks = pool.map(process_dataframe_chunk, chunks) # 結合 result = pd.concat(processed_chunks, ignore_index=True) return result # テストデータ df = pd.DataFrame({ ‘ID’: range(1, 100001), ‘年齢’: np.random.randint(20, 70, 100000), ‘購入金額’: np.random.randint(1000, 100000, 100000) }) print(“【逐次処理】”) start = time.time() df_sequential = process_dataframe_chunk(df.copy()) time_sequential = time.time() – start print(f”処理時間: {time_sequential:.2f}秒\n”) print(“【並列処理】(4プロセス)”) start = time.time() df_parallel = parallel_process_dataframe(df.copy(), n_processes=4) time_parallel = time.time() – start print(f”処理時間: {time_parallel:.2f}秒”) print(f”速度向上: {time_sequential/time_parallel:.1f}倍”)
【実行結果】 【逐次処理】 処理時間: 0.45秒 【並列処理】(4プロセス) 処理時間: 0.18秒 速度向上: 2.5倍
⚠️ 並列処理の注意点
  • プロセス起動にオーバーヘッドがある(小さいデータは逆に遅くなる)
  • 10万件以上のデータで効果大
  • CPUコア数以上にプロセスを増やしても意味なし
  • メモリ使用量はプロセス数倍になる

💾 4. メモリ効率化

4-1. データ型の最適化

Pandasはデフォルトで大きめのデータ型を使います。
これを最適化すると、メモリ使用量を大幅に削減できます。

# ===== データ型の最適化 ===== import pandas as pd import numpy as np # サンプルデータ df = pd.DataFrame({ ‘ID’: range(1, 1000001), ‘年齢’: np.random.randint(0, 100, 1000000), ‘購入回数’: np.random.randint(0, 50, 1000000), ‘購入金額’: np.random.randint(0, 1000000, 1000000) }) print(“【最適化前】”) print(f”メモリ使用量: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB\n”) print(df.dtypes) print() # データ型を最適化 df_optimized = df.copy() df_optimized[‘ID’] = df_optimized[‘ID’].astype(‘int32’) df_optimized[‘年齢’] = df_optimized[‘年齢’].astype(‘int8’) df_optimized[‘購入回数’] = df_optimized[‘購入回数’].astype(‘int8’) df_optimized[‘購入金額’] = df_optimized[‘購入金額’].astype(‘int32’) print(“【最適化後】”) print(f”メモリ使用量: {df_optimized.memory_usage(deep=True).sum() / 1024**2:.2f} MB\n”) print(df_optimized.dtypes) # 削減率 original = df.memory_usage(deep=True).sum() optimized = df_optimized.memory_usage(deep=True).sum() print(f”\nメモリ削減率: {(1 – optimized / original) * 100:.1f}%”)
【実行結果】 【最適化前】 メモリ使用量: 30.52 MB ID int64 年齢 int64 購入回数 int64 購入金額 int64 dtype: object 【最適化後】 メモリ使用量: 9.54 MB ID int32 年齢 int8 購入回数 int8 購入金額 int32 dtype: object メモリ削減率: 68.7%

4-2. 整数型のサイズ一覧

整数型のサイズと範囲
データ型 範囲 メモリ 使い所
int8 -128 〜 127 1 byte 年齢、月、フラグ
int16 -32,768 〜 32,767 2 bytes 日数、小さなID
int32 -21億 〜 21億 4 bytes ID、金額、カウント
int64 -922京 〜 922京 8 bytes 大きな数値(デフォルト)

4-3. カテゴリ型の活用

# ===== カテゴリ型で大幅削減 ===== import pandas as pd # 都道府県のような重複の多いデータ df = pd.DataFrame({ ‘都道府県’: [‘東京都’] * 250000 + [‘大阪府’] * 250000 + [‘愛知県’] * 250000 + [‘福岡県’] * 250000 }) print(“【通常のobject型】”) memory_before = df.memory_usage(deep=True).sum() / 1024**2 print(f”メモリ使用量: {memory_before:.2f} MB\n”) # カテゴリ型に変換 df[‘都道府県’] = df[‘都道府県’].astype(‘category’) print(“【category型】”) memory_after = df.memory_usage(deep=True).sum() / 1024**2 print(f”メモリ使用量: {memory_after:.2f} MB\n”) print(f”メモリ削減率: {(1 – memory_after / memory_before) * 100:.1f}%”)
【実行結果】 【通常のobject型】 メモリ使用量: 54.93 MB 【category型】 メモリ使用量: 0.98 MB メモリ削減率: 98.2%

4-4. 自動最適化関数

# ===== 自動最適化関数 ===== import pandas as pd import numpy as np def optimize_dataframe(df): “”” DataFrameのデータ型を自動最適化 Parameters: ———– df : DataFrame 最適化するDataFrame Returns: ——– DataFrame : 最適化されたDataFrame “”” memory_before = df.memory_usage(deep=True).sum() / 1024**2 print(f”最適化前: {memory_before:.2f} MB”) for col in df.columns: col_type = df[col].dtype # 整数型の最適化 if col_type == ‘int64’: c_min = df[col].min() c_max = df[col].max() if c_min >= np.iinfo(np.int8).min and c_max <= np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min >= np.iinfo(np.int16).min and c_max <= np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min >= np.iinfo(np.int32).min and c_max <= np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) # 浮動小数点型の最適化 elif col_type == 'float64': df[col] = df[col].astype(np.float32) # オブジェクト型をカテゴリ型に elif col_type == 'object': num_unique = df[col].nunique() num_total = len(df[col]) if num_unique / num_total < 0.5: # ユニーク値が50%未満 df[col] = df[col].astype('category') memory_after = df.memory_usage(deep=True).sum() / 1024**2 print(f"最適化後: {memory_after:.2f} MB") print(f"削減率: {(1 - memory_after / memory_before) * 100:.1f}%") return df # 使用例 df = pd.DataFrame({ 'ID': range(1, 100001), '年齢': np.random.randint(0, 100, 100000), '都道府県': np.random.choice(['東京', '大阪', '愛知'], 100000) }) df = optimize_dataframe(df)
【実行結果】 最適化前: 3.05 MB 最適化後: 0.68 MB 削減率: 77.7%

🗄️ 5. SQL最適化とETLの連携

5-1. SQLクエリの最適化

データベースからデータを取得するとき、SQLを最適化することで大幅に高速化できます。

SQL最適化のポイント
テクニック ❌ 悪い例 ✅ 良い例
列の選択 SELECT * SELECT col1, col2
絞り込み Python側でフィルタ WHERE句で絞り込み
集計 Python側でgroupby SQLでGROUP BY
インデックス フルスキャン インデックス活用

5-2. 必要な列だけ取得

# ===== 必要な列だけ取得 ===== import pandas as pd from sqlalchemy import create_engine engine = create_engine(‘sqlite:///sample.db’) # ❌ 全列を取得(遅い) query_bad = “SELECT * FROM sales” # ✅ 必要な列だけ取得(速い) query_good = “SELECT date, product_id, amount FROM sales” df = pd.read_sql(query_good, engine) print(“必要な列だけを取得しました”)

5-3. WHERE句で絞り込んでから取得

# ===== SQLで絞り込み ===== # ❌ 全データを取得してからPythonで絞り込み(遅い) query_bad = “SELECT * FROM sales” df = pd.read_sql(query_bad, engine) df = df[df[‘date’] >= ‘2024-01-01’] # ✅ SQLで絞り込んでから取得(速い) query_good = “”” SELECT * FROM sales WHERE date >= ‘2024-01-01’ “”” df = pd.read_sql(query_good, engine) print(“SQLで絞り込んでから取得しました”)

5-4. 集計はSQLで行う

# ===== SQLで集計 ===== # ❌ 全データを取得してからPythonで集計(遅い) query_bad = “SELECT * FROM sales” df = pd.read_sql(query_bad, engine) result = df.groupby(‘product_id’)[‘amount’].sum() # ✅ SQLで集計してから取得(速い) query_good = “”” SELECT product_id, SUM(amount) as total_amount FROM sales GROUP BY product_id “”” df = pd.read_sql(query_good, engine) print(“SQLで集計してから取得しました”)
💡 SQL最適化の原則
  • SELECT *は避ける(必要な列だけ指定)
  • WHERE句で先に絞り込む
  • 集計はSQLで実行する(GROUP BY)
  • よく使う列にはインデックスを作成
  • LIMITでデータ量を制限

💼 6. 実践演習:100万件データの高速処理

6-1. すべての最適化テクニックを組み合わせる

# ===== 100万件データの高速処理 ===== import pandas as pd import numpy as np import time from multiprocessing import Pool # ===================================== # 1. データ準備 # ===================================== print(“テストデータを作成中…”) df = pd.DataFrame({ ‘ID’: range(1, 1000001), ‘名前’: [f’ユーザー{i}’ for i in range(1, 1000001)], ‘年齢’: np.random.randint(20, 70, 1000000), ‘都道府県’: np.random.choice([‘東京’, ‘大阪’, ‘愛知’, ‘福岡’], 1000000), ‘購入金額’: np.random.randint(1000, 100000, 1000000) }) df.to_csv(‘test_1m.csv’, index=False) print(f”✅ {len(df):,}件のテストデータを作成\n”) # ===================================== # 2. 最適化なし(ベースライン) # ===================================== print(“=” * 50) print(“【最適化なし】”) print(“=” * 50) start = time.time() df = pd.read_csv(‘test_1m.csv’) df[‘購入金額(税込)’] = df[‘購入金額’] * 1.1 df[‘年齢層’] = pd.cut(df[‘年齢’], bins=[0, 30, 50, 100], labels=[‘若年層’, ‘中年層’, ‘シニア層’]) summary = df.groupby([‘都道府県’, ‘年齢層’])[‘購入金額(税込)’].sum().reset_index() summary.to_csv(‘result_no_opt.csv’, index=False) time_no_opt = time.time() – start memory_no_opt = df.memory_usage(deep=True).sum() / 1024**2 print(f”処理時間: {time_no_opt:.2f}秒”) print(f”メモリ使用量: {memory_no_opt:.2f} MB\n”) # ===================================== # 3. チャンク処理 + データ型最適化 # ===================================== print(“=” * 50) print(“【チャンク処理 + データ型最適化】”) print(“=” * 50) start = time.time() dtype_dict = { ‘ID’: ‘int32’, ‘名前’: ‘object’, ‘年齢’: ‘int8’, ‘都道府県’: ‘category’, ‘購入金額’: ‘int32’ } summaries = [] for chunk in pd.read_csv(‘test_1m.csv’, chunksize=100000, dtype=dtype_dict): chunk[‘購入金額(税込)’] = chunk[‘購入金額’] * 1.1 chunk[‘年齢層’] = pd.cut(chunk[‘年齢’], bins=[0, 30, 50, 100], labels=[‘若年層’, ‘中年層’, ‘シニア層’]) chunk_summary = chunk.groupby([‘都道府県’, ‘年齢層’])[‘購入金額(税込)’].sum().reset_index() summaries.append(chunk_summary) summary = pd.concat(summaries, ignore_index=True) summary = summary.groupby([‘都道府県’, ‘年齢層’])[‘購入金額(税込)’].sum().reset_index() summary.to_csv(‘result_optimized.csv’, index=False) time_optimized = time.time() – start print(f”処理時間: {time_optimized:.2f}秒”) print(f”速度改善: {time_no_opt/time_optimized:.1f}倍\n”) print(“=” * 50) print(“【結果サマリー】”) print(“=” * 50) print(f”最適化なし: {time_no_opt:.2f}秒, {memory_no_opt:.2f} MB”) print(f”最適化あり: {time_optimized:.2f}秒, メモリ大幅削減”)
【実行結果】 テストデータを作成中… ✅ 1,000,000件のテストデータを作成 ================================================== 【最適化なし】 ================================================== 処理時間: 8.45秒 メモリ使用量: 156.32 MB ================================================== 【チャンク処理 + データ型最適化】 ================================================== 処理時間: 5.23秒 速度改善: 1.6倍 ================================================== 【結果サマリー】 ================================================== 最適化なし: 8.45秒, 156.32 MB 最適化あり: 5.23秒, メモリ大幅削減

📝 STEP 18 のまとめ

✅ このステップで学んだこと
  • チャンク処理:メモリ使用量を90%削減
  • 並列処理:処理速度を2〜4倍向上
  • データ型最適化:メモリ使用量を50〜90%削減
  • SQL最適化:DB負荷を大幅に軽減
💡 重要ポイント
  • 大量データはチャンクで処理
  • CPUコアが複数あれば並列処理
  • データ型を最適化してメモリ削減
  • SQLはデータベース側で実行
  • 最適化は組み合わせると効果大
🎯 次のステップの予告

次のSTEP 19では、「Apache Airflow」を学び始めます。

  • ワークフローオーケストレーションの必要性
  • Airflowのアーキテクチャ
  • DAG(有向非巡回グラフ)の概念

📝 練習問題

問題 1 基礎

CSVファイルをチャンク(10万件ずつ)で読み込んで、各チャンクの件数を表示してください。

【解答】
import pandas as pd for i, chunk in enumerate(pd.read_csv(‘large_data.csv’, chunksize=100000)): print(f”チャンク{i+1}: {len(chunk):,}件”) print(“読み込み完了”)
問題 2 基礎

DataFrameのメモリ使用量を確認してください。

【解答】
import pandas as pd import numpy as np df = pd.DataFrame({ ‘ID’: range(1, 100001), ‘年齢’: np.random.randint(0, 100, 100000) }) # メモリ使用量を確認 memory_mb = df.memory_usage(deep=True).sum() / 1024**2 print(f”メモリ使用量: {memory_mb:.2f} MB”) # 列ごとのメモリ使用量 print(“\n列ごとのメモリ使用量:”) print(df.memory_usage(deep=True))
問題 3 基礎

int64型の列をint32型に変換して、メモリ削減率を計算してください。

【解答】
import pandas as pd import numpy as np df = pd.DataFrame({ ‘ID’: range(1, 100001), ‘金額’: np.random.randint(0, 1000000, 100000) }) memory_before = df.memory_usage(deep=True).sum() print(f”最適化前: {memory_before / 1024**2:.2f} MB”) # int32に変換 df[‘ID’] = df[‘ID’].astype(‘int32’) df[‘金額’] = df[‘金額’].astype(‘int32’) memory_after = df.memory_usage(deep=True).sum() print(f”最適化後: {memory_after / 1024**2:.2f} MB”) reduction = (1 – memory_after / memory_before) * 100 print(f”メモリ削減率: {reduction:.1f}%”)
問題 4 基礎

重複の多い文字列列をcategory型に変換してください。

【解答】
import pandas as pd import numpy as np df = pd.DataFrame({ ‘都道府県’: np.random.choice([‘東京’, ‘大阪’, ‘愛知’, ‘福岡’], 100000) }) print(f”変換前: {df[‘都道府県’].dtype}”) memory_before = df.memory_usage(deep=True).sum() # category型に変換 df[‘都道府県’] = df[‘都道府県’].astype(‘category’) print(f”変換後: {df[‘都道府県’].dtype}”) memory_after = df.memory_usage(deep=True).sum() print(f”\nメモリ削減率: {(1 – memory_after / memory_before) * 100:.1f}%”)
問題 5 応用

チャンクで読み込みながら、各チャンクの購入金額の合計を計算し、最終的な総合計を出力してください。

【解答】
import pandas as pd total_sum = 0 chunk_count = 0 for chunk in pd.read_csv(‘large_data.csv’, chunksize=100000): chunk_sum = chunk[‘購入金額’].sum() total_sum += chunk_sum chunk_count += 1 print(f”チャンク{chunk_count}: ¥{chunk_sum:,}”) print(f”\n総合計: ¥{total_sum:,}”)
問題 6 応用

マルチプロセスで配列の要素を2乗する処理を並列化してください。

【解答】
from multiprocessing import Pool import numpy as np import time def square_chunk(chunk): “””チャンクの要素を2乗””” return np.sum(chunk ** 2) # テストデータ data = np.random.randint(1, 100, 10000000) chunks = np.array_split(data, 4) # 並列処理 print(“【並列処理】”) start = time.time() with Pool(processes=4) as pool: results = pool.map(square_chunk, chunks) total = sum(results) print(f”処理時間: {time.time() – start:.2f}秒”) print(f”合計: {total:,}”)
問題 7 応用

DataFrameのデータ型を自動最適化する関数を作成してください。

【解答】
import pandas as pd import numpy as np def auto_optimize(df): “””DataFrameのデータ型を自動最適化””” for col in df.columns: col_type = df[col].dtype if col_type == ‘int64’: c_min, c_max = df[col].min(), df[col].max() if c_min >= -128 and c_max <= 127: df[col] = df[col].astype('int8') elif c_min >= -32768 and c_max <= 32767: df[col] = df[col].astype('int16') else: df[col] = df[col].astype('int32') elif col_type == 'float64': df[col] = df[col].astype('float32') elif col_type == 'object': if df[col].nunique() / len(df) < 0.5: df[col] = df[col].astype('category') return df # テスト df = pd.DataFrame({ 'ID': range(100000), '年齢': np.random.randint(0, 100, 100000), '地域': np.random.choice(['東京', '大阪'], 100000) }) print(f"最適化前: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB") df = auto_optimize(df) print(f"最適化後: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
問題 8 応用

チャンク処理でCSVファイルを変換し、別のファイルに保存してください。

【解答】
import pandas as pd def transform_and_save_chunks(input_file, output_file, chunksize=100000): “””チャンクで変換して保存””” first_chunk = True for i, chunk in enumerate(pd.read_csv(input_file, chunksize=chunksize)): # 変換処理 chunk[‘購入金額(税込)’] = (chunk[‘購入金額’] * 1.1).astype(int) # 保存 mode = ‘w’ if first_chunk else ‘a’ header = first_chunk chunk.to_csv(output_file, mode=mode, header=header, index=False) first_chunk = False print(f”✅ チャンク{i+1}処理完了”) print(“全チャンク処理完了”) # 実行 transform_and_save_chunks(‘large_data.csv’, ‘output.csv’)
問題 9 発展

チャンク処理、データ型最適化、並列処理を組み合わせた高速処理関数を作成してください。

【解答】
import pandas as pd import numpy as np from multiprocessing import Pool import time def process_chunk(chunk_data): “””チャンクを処理””” chunk = pd.DataFrame(chunk_data) chunk[‘購入金額(税込)’] = chunk[‘購入金額’] * 1.1 return chunk.groupby(‘都道府県’)[‘購入金額(税込)’].sum().to_dict() def optimized_process(input_file, n_processes=4): “””最適化された処理””” start = time.time() # データ型を指定して読み込み dtype_dict = { ‘ID’: ‘int32’, ‘年齢’: ‘int8’, ‘都道府県’: ‘category’, ‘購入金額’: ‘int32’ } # チャンクで読み込み chunks = [] for chunk in pd.read_csv(input_file, chunksize=100000, dtype=dtype_dict): chunks.append(chunk.to_dict(‘records’)) # 並列処理 with Pool(processes=n_processes) as pool: results = pool.map(process_chunk, chunks) # 結果を集約 final_result = {} for result in results: for key, value in result.items(): final_result[key] = final_result.get(key, 0) + value print(f”処理時間: {time.time() – start:.2f}秒”) return final_result # 実行 result = optimized_process(‘test_1m.csv’) print(result)
問題 10 発展

処理前後のメモリ使用量と処理時間を比較するベンチマーク関数を作成してください。

【解答】
import pandas as pd import numpy as np import time def benchmark(func, df, name): “””処理のベンチマークを実行””” start_time = time.time() memory_before = df.memory_usage(deep=True).sum() / 1024**2 result = func(df.copy()) elapsed = time.time() – start_time memory_after = result.memory_usage(deep=True).sum() / 1024**2 print(f”【{name}】”) print(f” 処理時間: {elapsed:.3f}秒”) print(f” メモリ: {memory_before:.2f} MB → {memory_after:.2f} MB”) print(f” 削減率: {(1 – memory_after / memory_before) * 100:.1f}%\n”) return result def process_normal(df): “””通常処理””” df[‘購入金額(税込)’] = df[‘購入金額’] * 1.1 return df def process_optimized(df): “””最適化処理””” df[‘ID’] = df[‘ID’].astype(‘int32’) df[‘年齢’] = df[‘年齢’].astype(‘int8’) df[‘購入金額’] = df[‘購入金額’].astype(‘int32’) df[‘購入金額(税込)’] = df[‘購入金額’] * 1.1 return df # テスト df = pd.DataFrame({ ‘ID’: range(1, 100001), ‘年齢’: np.random.randint(0, 100, 100000), ‘購入金額’: np.random.randint(0, 1000000, 100000) }) benchmark(process_normal, df, “通常処理”) benchmark(process_optimized, df, “最適化処理”)

🎉 Part 3 完了!次はAirflowでパイプラインを自動化

🏆 ここまでで習得したスキル

Part 1〜3で、ETLパイプラインに必要なすべての要素を学びました:

  • Extract:DB接続、API呼び出し、ファイル読み込み
  • Transform:クリーニング、変換、結合、集計、検証
  • Load:DB保存、ファイル出力、クラウドアップロード
  • 品質管理:エラーハンドリング、ロギング、最適化
😱 でも、まだ問題が残っています…

ここまで作ったスクリプトを毎日自動実行するには、どうしますか?

  • ⏰ 毎朝9時に手動で実行? → 休日は?病気の日は?
  • 📋 cronで実行? → エラーが出たら?依存関係は?
  • 📊 進捗確認は? → ログを1つずつ見る?
  • 🔄 リトライは? → 毎回手動で再実行?
🚀 Part 4で解決!Apache Airflow

次のPart 4では、Apache Airflowを使ってこれらの課題を解決します。

Airflowを使えば、ここまで作ったETLスクリプトを完全に自動化し、
エラー通知リトライ依存関係管理進捗モニタリングまで
すべてを1つのツールで実現できます!

📝

学習メモ

ETL・データパイプライン構築 - Step 18

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE