📋 このステップで学ぶこと
- チャンクによる分割処理
- マルチプロセス処理(並列化)
- メモリ効率化のテクニック
- SQL最適化とETLの連携
- 実践演習:100万件データの高速処理
⏱️ 学習時間の目安:2時間
📝 練習問題:10問(基礎4問・応用4問・発展2問)
🎯 1. パフォーマンス最適化が必要な理由
1-1. 実務でのよくある問題
📚 例え話:引っ越しの荷物運び
1000個の荷物を運ぶとき、どちらが効率的でしょうか?
・❌ 最適化なし:1人で1個ずつ運ぶ → 1000往復
・✅ チャンク処理:台車に10個ずつ乗せて運ぶ → 100往復
・✅ 並列処理:4人で同時に運ぶ → さらに4倍速く
データ処理も同じです。賢く運ぶ方法を知ることで、
処理時間を大幅に短縮できます。
😱 パフォーマンスが悪いとこうなる
- 100万件のデータ処理に1時間かかる
- メモリ不足でプログラムが落ちる
- 夜間バッチが朝までに終わらない
- データベースが重くなる
✅ 最適化するとこうなる
- 100万件のデータ処理が5分で完了
- メモリ使用量が10分の1になる
- 夜間バッチが30分で完了
- データベースへの負荷が軽減される
1-2. 最適化の4つの柱
パフォーマンス最適化の手法
| 最適化の種類 |
手法 |
効果 |
難易度 |
| 分割処理 |
チャンク読み込み |
メモリ90%削減 |
★☆☆(簡単) |
| 並列化 |
マルチプロセス |
速度2〜4倍 |
★★☆(普通) |
| メモリ最適化 |
データ型の最適化 |
メモリ50〜90%削減 |
★☆☆(簡単) |
| SQL最適化 |
インデックス活用 |
DB負荷軽減 |
★★☆(普通) |
📦 2. チャンクによる分割処理
2-1. チャンクとは?
チャンク(chunk)とは、大量データを小さなかたまりに分けて処理することです。
一度に全データをメモリに読み込まず、少しずつ処理します。
🍰 ケーキを食べる例
❌ 一度に全部食べる:お腹がパンパンになる(メモリ不足)
✅ 少しずつ食べる:快適に完食できる(チャンク処理)
2-2. CSVファイルをチャンクで読み込み
# ===== チャンク処理の比較 =====
import pandas as pd
import numpy as np
import time
# 大量データを作成(100万件)
print(“テストデータを作成中…”)
large_df = pd.DataFrame({
‘ID’: range(1, 1000001),
‘名前’: [f’ユーザー{i}’ for i in range(1, 1000001)],
‘年齢’: np.random.randint(20, 70, 1000000),
‘購入金額’: np.random.randint(1000, 100000, 1000000)
})
large_df.to_csv(‘large_data.csv’, index=False)
print(f”✅ {len(large_df):,}件のデータを作成しました\n”)
# ❌ 一度に全部読み込む(メモリを大量に使う)
print(“【方法1】一度に全部読み込む”)
start = time.time()
df_all = pd.read_csv(‘large_data.csv’)
print(f”読み込み時間: {time.time() – start:.2f}秒”)
print(f”メモリ使用量: {df_all.memory_usage(deep=True).sum() / 1024**2:.2f} MB\n”)
# ✅ チャンクで読み込む(メモリ効率的)
print(“【方法2】チャンクで読み込む(10万件ずつ)”)
start = time.time()
total_sum = 0
chunk_count = 0
for chunk in pd.read_csv(‘large_data.csv’, chunksize=100000):
# チャンクごとに処理
total_sum += chunk[‘購入金額’].sum()
chunk_count += 1
print(f” チャンク{chunk_count}: {len(chunk):,}件処理完了”)
print(f”\n読み込み時間: {time.time() – start:.2f}秒”)
print(f”合計購入金額: ¥{total_sum:,}”)
【実行結果】
テストデータを作成中…
✅ 1,000,000件のデータを作成しました
【方法1】一度に全部読み込む
読み込み時間: 3.45秒
メモリ使用量: 156.32 MB
【方法2】チャンクで読み込む(10万件ずつ)
チャンク1: 100,000件処理完了
チャンク2: 100,000件処理完了
チャンク3: 100,000件処理完了
チャンク4: 100,000件処理完了
チャンク5: 100,000件処理完了
チャンク6: 100,000件処理完了
チャンク7: 100,000件処理完了
チャンク8: 100,000件処理完了
チャンク9: 100,000件処理完了
チャンク10: 100,000件処理完了
読み込み時間: 3.52秒
合計購入金額: ¥50,499,234,567
2-3. チャンクで変換して保存
# ===== チャンクで変換して保存 =====
def process_large_file_in_chunks(input_file, output_file, chunksize=100000):
“””
大きなCSVファイルをチャンクで処理して保存
Parameters:
———–
input_file : str
入力ファイル
output_file : str
出力ファイル
chunksize : int
チャンクサイズ
“””
first_chunk = True
for i, chunk in enumerate(pd.read_csv(input_file, chunksize=chunksize)):
# データ変換
chunk[‘購入金額(税込)’] = (chunk[‘購入金額’] * 1.1).astype(int)
chunk[‘年齢層’] = pd.cut(
chunk[‘年齢’],
bins=[0, 30, 50, 100],
labels=[‘若年層’, ‘中年層’, ‘シニア層’]
)
# 保存(最初のチャンクはヘッダー付き、以降は追記)
mode = ‘w’ if first_chunk else ‘a’
header = first_chunk
chunk.to_csv(output_file, mode=mode, header=header, index=False)
first_chunk = False
print(f”✅ チャンク{i+1}: {len(chunk):,}件処理完了”)
print(f”\n全チャンクの処理が完了しました”)
# 実行
process_large_file_in_chunks(‘large_data.csv’, ‘processed_data.csv’, chunksize=100000)
【実行結果】
✅ チャンク1: 100,000件処理完了
✅ チャンク2: 100,000件処理完了
✅ チャンク3: 100,000件処理完了
…
✅ チャンク10: 100,000件処理完了
全チャンクの処理が完了しました
チャンクサイズの目安
| チャンクサイズ |
メモリ目安 |
使い所 |
| 1,000件 |
〜1MB |
メモリが非常に少ない環境 |
| 10,000件 |
〜10MB |
一般的なノートPC |
| 100,000件 |
〜100MB |
推奨(バランスが良い) |
| 500,000件 |
〜500MB |
メモリに余裕がある環境 |
🔀 3. マルチプロセス処理(並列化)
3-1. 並列処理とは?
並列処理とは、複数のCPUコアを使って同時に処理することです。
4コアのCPUなら、理論上4倍速くなります。
👥 レストランの例
❌ 1人のシェフ:10皿作るのに10分
✅ 4人のシェフ:10皿作るのに2.5分(4倍速い)
3-2. 基本的な並列処理
# ===== 基本的な並列処理 =====
from multiprocessing import Pool
import time
import numpy as np
def process_chunk(data):
“””チャンクを処理する関数”””
# 重い処理をシミュレート
result = np.sum(data ** 2)
return result
# テストデータ
data = np.random.randint(1, 100, 10000000)
chunks = np.array_split(data, 8) # 8つに分割
# ❌ 逐次処理(1つずつ)
print(“【逐次処理】”)
start = time.time()
results_sequential = []
for chunk in chunks:
result = process_chunk(chunk)
results_sequential.append(result)
time_sequential = time.time() – start
print(f”処理時間: {time_sequential:.2f}秒\n”)
# ✅ 並列処理(同時に)
print(“【並列処理】(4プロセス)”)
start = time.time()
with Pool(processes=4) as pool:
results_parallel = pool.map(process_chunk, chunks)
time_parallel = time.time() – start
print(f”処理時間: {time_parallel:.2f}秒”)
print(f”速度向上: {time_sequential/time_parallel:.1f}倍”)
【実行結果】
【逐次処理】
処理時間: 8.45秒
【並列処理】(4プロセス)
処理時間: 2.34秒
速度向上: 3.6倍
3-3. DataFrameを並列処理
# ===== DataFrameを並列処理 =====
import pandas as pd
from multiprocessing import Pool
import numpy as np
import time
def process_dataframe_chunk(chunk):
“””DataFrameのチャンクを処理”””
# データ変換
chunk[‘購入金額(税込)’] = chunk[‘購入金額’] * 1.1
chunk[‘ポイント’] = (chunk[‘購入金額’] * 0.01).astype(int)
# 年齢層の分類
chunk[‘年齢層’] = pd.cut(
chunk[‘年齢’],
bins=[0, 30, 50, 100],
labels=[‘若年層’, ‘中年層’, ‘シニア層’]
)
return chunk
def parallel_process_dataframe(df, n_processes=4):
“””DataFrameを並列処理”””
# DataFrameを分割
chunks = np.array_split(df, n_processes)
# 並列処理
with Pool(processes=n_processes) as pool:
processed_chunks = pool.map(process_dataframe_chunk, chunks)
# 結合
result = pd.concat(processed_chunks, ignore_index=True)
return result
# テストデータ
df = pd.DataFrame({
‘ID’: range(1, 100001),
‘年齢’: np.random.randint(20, 70, 100000),
‘購入金額’: np.random.randint(1000, 100000, 100000)
})
print(“【逐次処理】”)
start = time.time()
df_sequential = process_dataframe_chunk(df.copy())
time_sequential = time.time() – start
print(f”処理時間: {time_sequential:.2f}秒\n”)
print(“【並列処理】(4プロセス)”)
start = time.time()
df_parallel = parallel_process_dataframe(df.copy(), n_processes=4)
time_parallel = time.time() – start
print(f”処理時間: {time_parallel:.2f}秒”)
print(f”速度向上: {time_sequential/time_parallel:.1f}倍”)
【実行結果】
【逐次処理】
処理時間: 0.45秒
【並列処理】(4プロセス)
処理時間: 0.18秒
速度向上: 2.5倍
⚠️ 並列処理の注意点
- プロセス起動にオーバーヘッドがある(小さいデータは逆に遅くなる)
- 10万件以上のデータで効果大
- CPUコア数以上にプロセスを増やしても意味なし
- メモリ使用量はプロセス数倍になる
💾 4. メモリ効率化
4-1. データ型の最適化
Pandasはデフォルトで大きめのデータ型を使います。
これを最適化すると、メモリ使用量を大幅に削減できます。
# ===== データ型の最適化 =====
import pandas as pd
import numpy as np
# サンプルデータ
df = pd.DataFrame({
‘ID’: range(1, 1000001),
‘年齢’: np.random.randint(0, 100, 1000000),
‘購入回数’: np.random.randint(0, 50, 1000000),
‘購入金額’: np.random.randint(0, 1000000, 1000000)
})
print(“【最適化前】”)
print(f”メモリ使用量: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB\n”)
print(df.dtypes)
print()
# データ型を最適化
df_optimized = df.copy()
df_optimized[‘ID’] = df_optimized[‘ID’].astype(‘int32’)
df_optimized[‘年齢’] = df_optimized[‘年齢’].astype(‘int8’)
df_optimized[‘購入回数’] = df_optimized[‘購入回数’].astype(‘int8’)
df_optimized[‘購入金額’] = df_optimized[‘購入金額’].astype(‘int32’)
print(“【最適化後】”)
print(f”メモリ使用量: {df_optimized.memory_usage(deep=True).sum() / 1024**2:.2f} MB\n”)
print(df_optimized.dtypes)
# 削減率
original = df.memory_usage(deep=True).sum()
optimized = df_optimized.memory_usage(deep=True).sum()
print(f”\nメモリ削減率: {(1 – optimized / original) * 100:.1f}%”)
【実行結果】
【最適化前】
メモリ使用量: 30.52 MB
ID int64
年齢 int64
購入回数 int64
購入金額 int64
dtype: object
【最適化後】
メモリ使用量: 9.54 MB
ID int32
年齢 int8
購入回数 int8
購入金額 int32
dtype: object
メモリ削減率: 68.7%
4-2. 整数型のサイズ一覧
整数型のサイズと範囲
| データ型 |
範囲 |
メモリ |
使い所 |
| int8 |
-128 〜 127 |
1 byte |
年齢、月、フラグ |
| int16 |
-32,768 〜 32,767 |
2 bytes |
日数、小さなID |
| int32 |
-21億 〜 21億 |
4 bytes |
ID、金額、カウント |
| int64 |
-922京 〜 922京 |
8 bytes |
大きな数値(デフォルト) |
4-3. カテゴリ型の活用
# ===== カテゴリ型で大幅削減 =====
import pandas as pd
# 都道府県のような重複の多いデータ
df = pd.DataFrame({
‘都道府県’: [‘東京都’] * 250000 + [‘大阪府’] * 250000 +
[‘愛知県’] * 250000 + [‘福岡県’] * 250000
})
print(“【通常のobject型】”)
memory_before = df.memory_usage(deep=True).sum() / 1024**2
print(f”メモリ使用量: {memory_before:.2f} MB\n”)
# カテゴリ型に変換
df[‘都道府県’] = df[‘都道府県’].astype(‘category’)
print(“【category型】”)
memory_after = df.memory_usage(deep=True).sum() / 1024**2
print(f”メモリ使用量: {memory_after:.2f} MB\n”)
print(f”メモリ削減率: {(1 – memory_after / memory_before) * 100:.1f}%”)
【実行結果】
【通常のobject型】
メモリ使用量: 54.93 MB
【category型】
メモリ使用量: 0.98 MB
メモリ削減率: 98.2%
4-4. 自動最適化関数
# ===== 自動最適化関数 =====
import pandas as pd
import numpy as np
def optimize_dataframe(df):
“””
DataFrameのデータ型を自動最適化
Parameters:
———–
df : DataFrame
最適化するDataFrame
Returns:
——–
DataFrame : 最適化されたDataFrame
“””
memory_before = df.memory_usage(deep=True).sum() / 1024**2
print(f”最適化前: {memory_before:.2f} MB”)
for col in df.columns:
col_type = df[col].dtype
# 整数型の最適化
if col_type == ‘int64’:
c_min = df[col].min()
c_max = df[col].max()
if c_min >= np.iinfo(np.int8).min and c_max <= np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min >= np.iinfo(np.int16).min and c_max <= np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min >= np.iinfo(np.int32).min and c_max <= np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
# 浮動小数点型の最適化
elif col_type == 'float64':
df[col] = df[col].astype(np.float32)
# オブジェクト型をカテゴリ型に
elif col_type == 'object':
num_unique = df[col].nunique()
num_total = len(df[col])
if num_unique / num_total < 0.5: # ユニーク値が50%未満
df[col] = df[col].astype('category')
memory_after = df.memory_usage(deep=True).sum() / 1024**2
print(f"最適化後: {memory_after:.2f} MB")
print(f"削減率: {(1 - memory_after / memory_before) * 100:.1f}%")
return df
# 使用例
df = pd.DataFrame({
'ID': range(1, 100001),
'年齢': np.random.randint(0, 100, 100000),
'都道府県': np.random.choice(['東京', '大阪', '愛知'], 100000)
})
df = optimize_dataframe(df)
【実行結果】
最適化前: 3.05 MB
最適化後: 0.68 MB
削減率: 77.7%
🗄️ 5. SQL最適化とETLの連携
5-1. SQLクエリの最適化
データベースからデータを取得するとき、SQLを最適化することで大幅に高速化できます。
SQL最適化のポイント
| テクニック |
❌ 悪い例 |
✅ 良い例 |
| 列の選択 |
SELECT * |
SELECT col1, col2 |
| 絞り込み |
Python側でフィルタ |
WHERE句で絞り込み |
| 集計 |
Python側でgroupby |
SQLでGROUP BY |
| インデックス |
フルスキャン |
インデックス活用 |
5-2. 必要な列だけ取得
# ===== 必要な列だけ取得 =====
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine(‘sqlite:///sample.db’)
# ❌ 全列を取得(遅い)
query_bad = “SELECT * FROM sales”
# ✅ 必要な列だけ取得(速い)
query_good = “SELECT date, product_id, amount FROM sales”
df = pd.read_sql(query_good, engine)
print(“必要な列だけを取得しました”)
5-3. WHERE句で絞り込んでから取得
# ===== SQLで絞り込み =====
# ❌ 全データを取得してからPythonで絞り込み(遅い)
query_bad = “SELECT * FROM sales”
df = pd.read_sql(query_bad, engine)
df = df[df[‘date’] >= ‘2024-01-01’]
# ✅ SQLで絞り込んでから取得(速い)
query_good = “””
SELECT * FROM sales
WHERE date >= ‘2024-01-01’
“””
df = pd.read_sql(query_good, engine)
print(“SQLで絞り込んでから取得しました”)
5-4. 集計はSQLで行う
# ===== SQLで集計 =====
# ❌ 全データを取得してからPythonで集計(遅い)
query_bad = “SELECT * FROM sales”
df = pd.read_sql(query_bad, engine)
result = df.groupby(‘product_id’)[‘amount’].sum()
# ✅ SQLで集計してから取得(速い)
query_good = “””
SELECT
product_id,
SUM(amount) as total_amount
FROM sales
GROUP BY product_id
“””
df = pd.read_sql(query_good, engine)
print(“SQLで集計してから取得しました”)
💡 SQL最適化の原則
- SELECT *は避ける(必要な列だけ指定)
- WHERE句で先に絞り込む
- 集計はSQLで実行する(GROUP BY)
- よく使う列にはインデックスを作成
- LIMITでデータ量を制限
💼 6. 実践演習:100万件データの高速処理
6-1. すべての最適化テクニックを組み合わせる
# ===== 100万件データの高速処理 =====
import pandas as pd
import numpy as np
import time
from multiprocessing import Pool
# =====================================
# 1. データ準備
# =====================================
print(“テストデータを作成中…”)
df = pd.DataFrame({
‘ID’: range(1, 1000001),
‘名前’: [f’ユーザー{i}’ for i in range(1, 1000001)],
‘年齢’: np.random.randint(20, 70, 1000000),
‘都道府県’: np.random.choice([‘東京’, ‘大阪’, ‘愛知’, ‘福岡’], 1000000),
‘購入金額’: np.random.randint(1000, 100000, 1000000)
})
df.to_csv(‘test_1m.csv’, index=False)
print(f”✅ {len(df):,}件のテストデータを作成\n”)
# =====================================
# 2. 最適化なし(ベースライン)
# =====================================
print(“=” * 50)
print(“【最適化なし】”)
print(“=” * 50)
start = time.time()
df = pd.read_csv(‘test_1m.csv’)
df[‘購入金額(税込)’] = df[‘購入金額’] * 1.1
df[‘年齢層’] = pd.cut(df[‘年齢’], bins=[0, 30, 50, 100], labels=[‘若年層’, ‘中年層’, ‘シニア層’])
summary = df.groupby([‘都道府県’, ‘年齢層’])[‘購入金額(税込)’].sum().reset_index()
summary.to_csv(‘result_no_opt.csv’, index=False)
time_no_opt = time.time() – start
memory_no_opt = df.memory_usage(deep=True).sum() / 1024**2
print(f”処理時間: {time_no_opt:.2f}秒”)
print(f”メモリ使用量: {memory_no_opt:.2f} MB\n”)
# =====================================
# 3. チャンク処理 + データ型最適化
# =====================================
print(“=” * 50)
print(“【チャンク処理 + データ型最適化】”)
print(“=” * 50)
start = time.time()
dtype_dict = {
‘ID’: ‘int32’,
‘名前’: ‘object’,
‘年齢’: ‘int8’,
‘都道府県’: ‘category’,
‘購入金額’: ‘int32’
}
summaries = []
for chunk in pd.read_csv(‘test_1m.csv’, chunksize=100000, dtype=dtype_dict):
chunk[‘購入金額(税込)’] = chunk[‘購入金額’] * 1.1
chunk[‘年齢層’] = pd.cut(chunk[‘年齢’], bins=[0, 30, 50, 100], labels=[‘若年層’, ‘中年層’, ‘シニア層’])
chunk_summary = chunk.groupby([‘都道府県’, ‘年齢層’])[‘購入金額(税込)’].sum().reset_index()
summaries.append(chunk_summary)
summary = pd.concat(summaries, ignore_index=True)
summary = summary.groupby([‘都道府県’, ‘年齢層’])[‘購入金額(税込)’].sum().reset_index()
summary.to_csv(‘result_optimized.csv’, index=False)
time_optimized = time.time() – start
print(f”処理時間: {time_optimized:.2f}秒”)
print(f”速度改善: {time_no_opt/time_optimized:.1f}倍\n”)
print(“=” * 50)
print(“【結果サマリー】”)
print(“=” * 50)
print(f”最適化なし: {time_no_opt:.2f}秒, {memory_no_opt:.2f} MB”)
print(f”最適化あり: {time_optimized:.2f}秒, メモリ大幅削減”)
【実行結果】
テストデータを作成中…
✅ 1,000,000件のテストデータを作成
==================================================
【最適化なし】
==================================================
処理時間: 8.45秒
メモリ使用量: 156.32 MB
==================================================
【チャンク処理 + データ型最適化】
==================================================
処理時間: 5.23秒
速度改善: 1.6倍
==================================================
【結果サマリー】
==================================================
最適化なし: 8.45秒, 156.32 MB
最適化あり: 5.23秒, メモリ大幅削減
📝 STEP 18 のまとめ
✅ このステップで学んだこと
- チャンク処理:メモリ使用量を90%削減
- 並列処理:処理速度を2〜4倍向上
- データ型最適化:メモリ使用量を50〜90%削減
- SQL最適化:DB負荷を大幅に軽減
💡 重要ポイント
- 大量データはチャンクで処理
- CPUコアが複数あれば並列処理
- データ型を最適化してメモリ削減
- SQLはデータベース側で実行
- 最適化は組み合わせると効果大
🎯 次のステップの予告
次のSTEP 19では、「Apache Airflow」を学び始めます。
- ワークフローオーケストレーションの必要性
- Airflowのアーキテクチャ
- DAG(有向非巡回グラフ)の概念
📝 練習問題
問題 1
基礎
CSVファイルをチャンク(10万件ずつ)で読み込んで、各チャンクの件数を表示してください。
【解答】
import pandas as pd
for i, chunk in enumerate(pd.read_csv(‘large_data.csv’, chunksize=100000)):
print(f”チャンク{i+1}: {len(chunk):,}件”)
print(“読み込み完了”)
問題 2
基礎
DataFrameのメモリ使用量を確認してください。
【解答】
import pandas as pd
import numpy as np
df = pd.DataFrame({
‘ID’: range(1, 100001),
‘年齢’: np.random.randint(0, 100, 100000)
})
# メモリ使用量を確認
memory_mb = df.memory_usage(deep=True).sum() / 1024**2
print(f”メモリ使用量: {memory_mb:.2f} MB”)
# 列ごとのメモリ使用量
print(“\n列ごとのメモリ使用量:”)
print(df.memory_usage(deep=True))
問題 3
基礎
int64型の列をint32型に変換して、メモリ削減率を計算してください。
【解答】
import pandas as pd
import numpy as np
df = pd.DataFrame({
‘ID’: range(1, 100001),
‘金額’: np.random.randint(0, 1000000, 100000)
})
memory_before = df.memory_usage(deep=True).sum()
print(f”最適化前: {memory_before / 1024**2:.2f} MB”)
# int32に変換
df[‘ID’] = df[‘ID’].astype(‘int32’)
df[‘金額’] = df[‘金額’].astype(‘int32’)
memory_after = df.memory_usage(deep=True).sum()
print(f”最適化後: {memory_after / 1024**2:.2f} MB”)
reduction = (1 – memory_after / memory_before) * 100
print(f”メモリ削減率: {reduction:.1f}%”)
問題 4
基礎
重複の多い文字列列をcategory型に変換してください。
【解答】
import pandas as pd
import numpy as np
df = pd.DataFrame({
‘都道府県’: np.random.choice([‘東京’, ‘大阪’, ‘愛知’, ‘福岡’], 100000)
})
print(f”変換前: {df[‘都道府県’].dtype}”)
memory_before = df.memory_usage(deep=True).sum()
# category型に変換
df[‘都道府県’] = df[‘都道府県’].astype(‘category’)
print(f”変換後: {df[‘都道府県’].dtype}”)
memory_after = df.memory_usage(deep=True).sum()
print(f”\nメモリ削減率: {(1 – memory_after / memory_before) * 100:.1f}%”)
問題 5
応用
チャンクで読み込みながら、各チャンクの購入金額の合計を計算し、最終的な総合計を出力してください。
【解答】
import pandas as pd
total_sum = 0
chunk_count = 0
for chunk in pd.read_csv(‘large_data.csv’, chunksize=100000):
chunk_sum = chunk[‘購入金額’].sum()
total_sum += chunk_sum
chunk_count += 1
print(f”チャンク{chunk_count}: ¥{chunk_sum:,}”)
print(f”\n総合計: ¥{total_sum:,}”)
問題 6
応用
マルチプロセスで配列の要素を2乗する処理を並列化してください。
【解答】
from multiprocessing import Pool
import numpy as np
import time
def square_chunk(chunk):
“””チャンクの要素を2乗”””
return np.sum(chunk ** 2)
# テストデータ
data = np.random.randint(1, 100, 10000000)
chunks = np.array_split(data, 4)
# 並列処理
print(“【並列処理】”)
start = time.time()
with Pool(processes=4) as pool:
results = pool.map(square_chunk, chunks)
total = sum(results)
print(f”処理時間: {time.time() – start:.2f}秒”)
print(f”合計: {total:,}”)
問題 7
応用
DataFrameのデータ型を自動最適化する関数を作成してください。
【解答】
import pandas as pd
import numpy as np
def auto_optimize(df):
“””DataFrameのデータ型を自動最適化”””
for col in df.columns:
col_type = df[col].dtype
if col_type == ‘int64’:
c_min, c_max = df[col].min(), df[col].max()
if c_min >= -128 and c_max <= 127:
df[col] = df[col].astype('int8')
elif c_min >= -32768 and c_max <= 32767:
df[col] = df[col].astype('int16')
else:
df[col] = df[col].astype('int32')
elif col_type == 'float64':
df[col] = df[col].astype('float32')
elif col_type == 'object':
if df[col].nunique() / len(df) < 0.5:
df[col] = df[col].astype('category')
return df
# テスト
df = pd.DataFrame({
'ID': range(100000),
'年齢': np.random.randint(0, 100, 100000),
'地域': np.random.choice(['東京', '大阪'], 100000)
})
print(f"最適化前: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
df = auto_optimize(df)
print(f"最適化後: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
問題 8
応用
チャンク処理でCSVファイルを変換し、別のファイルに保存してください。
【解答】
import pandas as pd
def transform_and_save_chunks(input_file, output_file, chunksize=100000):
“””チャンクで変換して保存”””
first_chunk = True
for i, chunk in enumerate(pd.read_csv(input_file, chunksize=chunksize)):
# 変換処理
chunk[‘購入金額(税込)’] = (chunk[‘購入金額’] * 1.1).astype(int)
# 保存
mode = ‘w’ if first_chunk else ‘a’
header = first_chunk
chunk.to_csv(output_file, mode=mode, header=header, index=False)
first_chunk = False
print(f”✅ チャンク{i+1}処理完了”)
print(“全チャンク処理完了”)
# 実行
transform_and_save_chunks(‘large_data.csv’, ‘output.csv’)
問題 9
発展
チャンク処理、データ型最適化、並列処理を組み合わせた高速処理関数を作成してください。
【解答】
import pandas as pd
import numpy as np
from multiprocessing import Pool
import time
def process_chunk(chunk_data):
“””チャンクを処理”””
chunk = pd.DataFrame(chunk_data)
chunk[‘購入金額(税込)’] = chunk[‘購入金額’] * 1.1
return chunk.groupby(‘都道府県’)[‘購入金額(税込)’].sum().to_dict()
def optimized_process(input_file, n_processes=4):
“””最適化された処理”””
start = time.time()
# データ型を指定して読み込み
dtype_dict = {
‘ID’: ‘int32’,
‘年齢’: ‘int8’,
‘都道府県’: ‘category’,
‘購入金額’: ‘int32’
}
# チャンクで読み込み
chunks = []
for chunk in pd.read_csv(input_file, chunksize=100000, dtype=dtype_dict):
chunks.append(chunk.to_dict(‘records’))
# 並列処理
with Pool(processes=n_processes) as pool:
results = pool.map(process_chunk, chunks)
# 結果を集約
final_result = {}
for result in results:
for key, value in result.items():
final_result[key] = final_result.get(key, 0) + value
print(f”処理時間: {time.time() – start:.2f}秒”)
return final_result
# 実行
result = optimized_process(‘test_1m.csv’)
print(result)
問題 10
発展
処理前後のメモリ使用量と処理時間を比較するベンチマーク関数を作成してください。
【解答】
import pandas as pd
import numpy as np
import time
def benchmark(func, df, name):
“””処理のベンチマークを実行”””
start_time = time.time()
memory_before = df.memory_usage(deep=True).sum() / 1024**2
result = func(df.copy())
elapsed = time.time() – start_time
memory_after = result.memory_usage(deep=True).sum() / 1024**2
print(f”【{name}】”)
print(f” 処理時間: {elapsed:.3f}秒”)
print(f” メモリ: {memory_before:.2f} MB → {memory_after:.2f} MB”)
print(f” 削減率: {(1 – memory_after / memory_before) * 100:.1f}%\n”)
return result
def process_normal(df):
“””通常処理”””
df[‘購入金額(税込)’] = df[‘購入金額’] * 1.1
return df
def process_optimized(df):
“””最適化処理”””
df[‘ID’] = df[‘ID’].astype(‘int32’)
df[‘年齢’] = df[‘年齢’].astype(‘int8’)
df[‘購入金額’] = df[‘購入金額’].astype(‘int32’)
df[‘購入金額(税込)’] = df[‘購入金額’] * 1.1
return df
# テスト
df = pd.DataFrame({
‘ID’: range(1, 100001),
‘年齢’: np.random.randint(0, 100, 100000),
‘購入金額’: np.random.randint(0, 1000000, 100000)
})
benchmark(process_normal, df, “通常処理”)
benchmark(process_optimized, df, “最適化処理”)
🎉 Part 3 完了!次はAirflowでパイプラインを自動化
🏆 ここまでで習得したスキル
Part 1〜3で、ETLパイプラインに必要なすべての要素を学びました:
- ✅ Extract:DB接続、API呼び出し、ファイル読み込み
- ✅ Transform:クリーニング、変換、結合、集計、検証
- ✅ Load:DB保存、ファイル出力、クラウドアップロード
- ✅ 品質管理:エラーハンドリング、ロギング、最適化
😱 でも、まだ問題が残っています…
ここまで作ったスクリプトを毎日自動実行するには、どうしますか?
- ⏰ 毎朝9時に手動で実行? → 休日は?病気の日は?
- 📋 cronで実行? → エラーが出たら?依存関係は?
- 📊 進捗確認は? → ログを1つずつ見る?
- 🔄 リトライは? → 毎回手動で再実行?
🚀 Part 4で解決!Apache Airflow
次のPart 4では、Apache Airflowを使ってこれらの課題を解決します。
Airflowを使えば、ここまで作ったETLスクリプトを完全に自動化し、
エラー通知、リトライ、依存関係管理、進捗モニタリングまで
すべてを1つのツールで実現できます!