📋 このステップで学ぶこと
- 大容量ファイル処理の問題点と解決アプローチ
- チャンク処理(chunksize)による分割読み込み
- dtype指定によるメモリ削減テクニック
- category型の効果的な活用
- Daskを使った並列処理
- ストリーミング処理の基礎
- 実践演習:大容量CSVファイルの処理
🎯 1. 大容量ファイルの問題点
1-1. なぜ大容量ファイルは難しいのか?
通常の方法でファイルを読み込むと、すべてのデータをメモリ(RAM)に展開します。
ファイルがメモリ容量を超えると、エラーになったり、PCが極端に遅くなります。
❌ 大容量ファイルでNG なコード
import pandas as pd
# 10GBのCSVファイルを読み込もうとすると…
df = pd.read_csv(‘huge_file.csv’) # MemoryError!
PCのメモリが8GBの場合、10GBのファイルは読み込めません!
実際には、CSVをDataFrameに変換すると2〜10倍のメモリを使うこともあります。
🚚 例え話:引っ越しの荷物
引っ越しで大量の荷物を運ぶとき:
普通の方法:すべての荷物を一度にトラックに積もうとする
→ トラックに入りきらない!
賢い方法:荷物を少しずつ運ぶ
→ 何往復かすれば、すべて運べる!
データも同じです。少しずつ処理すればメモリに収まります。
1-2. ファイルサイズと処理方法の目安
| ファイルサイズ |
推奨処理方法 |
説明 |
| 〜100MB |
通常の読み込み |
そのままpd.read_csv()でOK |
| 100MB〜1GB |
dtype指定 |
データ型を最適化してメモリ削減 |
| 1GB〜10GB |
チャンク処理 |
少しずつ読み込んで処理 |
| 10GB以上 |
並列処理+チャンク |
複数のコアで並列処理(Dask等) |
📝 実際のメモリ使用量の目安
CSVファイルをDataFrameに読み込むと、ファイルサイズの2〜10倍のメモリを使います。
- 100MBのCSV → 200MB〜1GBのメモリ
- 1GBのCSV → 2GB〜10GBのメモリ
- 文字列が多いとメモリ使用量が増加
1-3. 大容量ファイル処理の4つのアプローチ
1. dtype指定
→
2. チャンク処理
→
3. 並列処理
→
4. ストリーミング
| アプローチ |
仕組み |
使いどころ |
| dtype指定 |
データ型を最適化してメモリ削減 |
最初に試すべき方法 |
| チャンク処理 |
ファイルを小さな塊に分けて処理 |
dtype指定でも足りない場合 |
| 並列処理 |
複数のCPUコアで同時処理 |
処理速度を上げたい場合 |
| ストリーミング |
1行ずつ読み込んで処理 |
シンプルな処理、メモリ最小化 |
🔄 2. チャンク処理(chunksize)
2-1. チャンク処理とは?
チャンク(chunk)とは、「かたまり」という意味です。
大きなファイルを小さなかたまりに分けて、少しずつ処理します。
📚 チャンクのイメージ
1000万行のCSVファイルを100万行ずつ処理する場合:
- チャンク1:1〜100万行目を読み込み → 処理 → メモリから解放
- チャンク2:101〜200万行目を読み込み → 処理 → メモリから解放
- チャンク3:201〜300万行目を読み込み → 処理 → メモリから解放
- … 10回繰り返し
常に100万行分のメモリしか使わないので、メモリ不足になりません!
2-2. 基本的なチャンク処理
# ===== chunksizeを使った読み込み =====
import pandas as pd
# 10万行ずつ読み込む
chunk_size = 100000
# read_csvにchunksizeを指定すると、イテレータが返る
for chunk in pd.read_csv(‘large_file.csv’, chunksize=chunk_size):
# chunkはDataFrame(10万行分)
print(f”チャンクサイズ: {len(chunk)}行”)
print(f”メモリ使用量: {chunk.memory_usage(deep=True).sum() / 1024**2:.2f} MB”)
# ここでデータ処理(例:特定の条件でフィルタ)
filtered = chunk[chunk[‘price’] > 1000]
print(f”フィルタ後: {len(filtered)}行”)
print(“—“)
📝 コードの説明
chunksize=100000:1回に読み込む行数を指定
for chunk in pd.read_csv(...):チャンクごとにループ
- 各
chunkは通常のDataFrame(Pandasの全機能が使える)
- ループが終わると、そのチャンクはメモリから解放される
2-3. チャンクを処理して結合する
フィルタリングした結果を結合して1つのDataFrameにする方法です。
# ===== チャンクを処理して結合 =====
import pandas as pd
chunk_size = 100000
result_chunks = [] # 結果を格納するリスト
# 各チャンクを処理
for chunk in pd.read_csv(‘large_file.csv’, chunksize=chunk_size):
# 条件でフィルタリング(例:価格1000円以上)
filtered = chunk[chunk[‘price’] > 1000]
# 結果が空でなければリストに追加
if len(filtered) > 0:
result_chunks.append(filtered)
# すべてのチャンクを結合
result = pd.concat(result_chunks, ignore_index=True)
print(f”結果: {len(result)}行”)
print(result.head())
⚠️ 注意点
結合後のDataFrameがメモリに収まる場合のみ有効です。
フィルタリングで十分にデータが絞られる場合に使いましょう。
2-4. チャンクごとに集計する
全データの合計や平均を計算する場合、チャンクごとに集計して累積します。
# ===== チャンクごとに集計して合計 =====
import pandas as pd
chunk_size = 100000
total_sales = 0 # 売上合計
total_count = 0 # 行数カウント
processed_chunks = 0 # 処理済みチャンク数
for chunk in pd.read_csv(‘sales.csv’, chunksize=chunk_size):
# 各チャンクで集計
chunk_sales = chunk[‘sales’].sum()
chunk_count = len(chunk)
# 合計に加算
total_sales += chunk_sales
total_count += chunk_count
processed_chunks += 1
print(f”チャンク {processed_chunks}: 売上 {chunk_sales:,.0f}円, 行数 {chunk_count:,}行”)
print(“=” * 50)
print(f”総売上: {total_sales:,.0f}円”)
print(f”総行数: {total_count:,}行”)
print(f”平均売上: {total_sales / total_count:,.2f}円”)
2-5. グループ別に集計する
カテゴリ別の集計など、グループ別の集計をチャンク処理で行う方法です。
# ===== カテゴリ別集計(チャンク処理) =====
import pandas as pd
from collections import defaultdict
chunk_size = 100000
# カテゴリごとの売上を格納する辞書
category_sales = defaultdict(float)
category_count = defaultdict(int)
for chunk in pd.read_csv(‘sales.csv’, chunksize=chunk_size):
# チャンク内でgroupby集計
chunk_summary = chunk.groupby(‘category’).agg({
‘sales’: ‘sum’,
‘order_id’: ‘count’
})
# 辞書に追加
for category, row in chunk_summary.iterrows():
category_sales[category] += row[‘sales’]
category_count[category] += row[‘order_id’]
# 結果をDataFrameに変換
result = pd.DataFrame({
‘category’: list(category_sales.keys()),
‘total_sales’: list(category_sales.values()),
‘order_count’: list(category_count.values())
})
# 売上順にソート
result = result.sort_values(‘total_sales’, ascending=False)
print(result)
2-6. 結果をファイルに追記する
処理結果をメモリに溜めずに、直接ファイルに追記する方法です。
これにより、どんなに大きなファイルでも処理できます。
# ===== 結果をファイルに追記 =====
import pandas as pd
chunk_size = 100000
output_file = ‘filtered_data.csv’
first_chunk = True # ヘッダーを書くかどうかのフラグ
for chunk in pd.read_csv(‘huge_file.csv’, chunksize=chunk_size):
# 条件でフィルタ(例:2024年1月のデータ)
filtered = chunk[
(chunk[‘date’] >= ‘2024-01-01’) &
(chunk[‘date’] < '2024-02-01')
]
# 結果があればファイルに追記
if len(filtered) > 0:
filtered.to_csv(
output_file,
mode=’a’, # 追記モード(’w’は上書き)
header=first_chunk, # 最初だけヘッダーを書く
index=False
)
first_chunk = False
print(f”処理済み: {chunk.index[-1] + 1:,}行”)
print(f”完了!結果は {output_file} に保存されました”)
✅ chunksizeの決め方
- 小さすぎる(1,000行):処理が遅い(読み込み回数が多い)
- 大きすぎる(1000万行):メモリ不足になる
- 推奨値:
- メモリ8GB → 5万〜10万行
- メモリ16GB → 20万〜50万行
- メモリ32GB以上 → 50万〜100万行
💾 3. dtype指定によるメモリ削減
3-1. dtypeとは?
dtype(データ型)を適切に指定することで、メモリ使用量を大幅に削減できます。
Pandasのデフォルトは「余裕を持った」データ型なので、最適化の余地があります。
💡 例え話:収納ボックス
靴を収納するとき:
- 大きすぎる箱:スペースの無駄(= int64で小さな数値を格納)
- ぴったりの箱:効率的(= int8で小さな数値を格納)
データ型も「ぴったりサイズ」を選ぶことで、メモリを節約できます。
3-2. データ型とメモリ使用量
| データ型 |
メモリ |
値の範囲 |
使用例 |
int64 |
8バイト |
-9京 〜 9京 |
Pandasのデフォルト |
int32 |
4バイト |
-21億 〜 21億 |
ID、数量など |
int16 |
2バイト |
-32,768 〜 32,767 |
年、小さな数値 |
int8 |
1バイト |
-128 〜 127 |
年齢、フラグ |
float64 |
8バイト |
倍精度小数 |
Pandasのデフォルト |
float32 |
4バイト |
単精度小数 |
価格、スコアなど |
category |
少ない |
繰り返しの多いデータ |
性別、都道府県 |
3-3. dtype指定の基本
# ===== dtypeを指定して読み込み =====
import pandas as pd
# データ型を辞書で定義
dtypes = {
‘user_id’: ‘int32’, # int64 → int32(メモリ半減)
‘age’: ‘int8’, # 0〜120なのでint8で十分
‘gender’: ‘category’, # ‘M’/’F’のような限られた値
‘prefecture’: ‘category’, # 47都道府県(繰り返しが多い)
‘price’: ‘float32’, # float64 → float32(メモリ半減)
‘product_name’: ‘str’ # 文字列
}
# dtype指定で読み込み
df = pd.read_csv(‘data.csv’, dtype=dtypes)
# メモリ使用量を確認
print(“=== カラムごとのメモリ使用量 ===”)
print(df.memory_usage(deep=True))
print()
print(f”合計: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB”)
3-4. メモリ使用量の確認と最適化
# ===== 読み込み後の最適化 =====
import pandas as pd
# まず普通に読み込み
df = pd.read_csv(‘data.csv’)
# 最適化前のメモリ使用量
print(“=== 最適化前 ===”)
print(f”合計: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB”)
print()
print(“データ型:”)
print(df.dtypes)
print()
# 各カラムの値の範囲を確認
print(“数値カラムの範囲:”)
for col in df.select_dtypes(include=[‘int64’, ‘float64’]).columns:
print(f” {col}: {df[col].min()} 〜 {df[col].max()}”)
# 最適化
df[‘user_id’] = df[‘user_id’].astype(‘int32’) # 範囲に合わせて変更
df[‘age’] = df[‘age’].astype(‘int8’) # 0〜120なのでint8
df[‘prefecture’] = df[‘prefecture’].astype(‘category’) # 繰り返しが多い
df[‘price’] = df[‘price’].astype(‘float32’) # 精度を落としてOK
# 最適化後のメモリ使用量
print(“\n=== 最適化後 ===”)
print(f”合計: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB”)
3-5. 自動メモリ最適化関数
データ型を自動的に最適化する便利な関数を用意しました。
# ===== 自動メモリ最適化関数 =====
import pandas as pd
import numpy as np
def optimize_memory(df, verbose=True):
“””
DataFrameのメモリを自動最適化
Parameters:
df: 最適化するDataFrame
verbose: 詳細を表示するか
Returns:
最適化されたDataFrame
“””
start_mem = df.memory_usage(deep=True).sum() / 1024**2
for col in df.columns:
col_type = df[col].dtype
# 数値型の最適化
if col_type != ‘object’ and col_type.name != ‘category’:
c_min = df[col].min()
c_max = df[col].max()
# 整数型
if str(col_type)[:3] == ‘int’:
if c_min >= 0: # 非負整数
if c_max < 255:
df[col] = df[col].astype(np.uint8)
elif c_max < 65535:
df[col] = df[col].astype(np.uint16)
elif c_max < 4294967295:
df[col] = df[col].astype(np.uint32)
else: # 負の値あり
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
# 浮動小数点型
elif str(col_type)[:5] == 'float':
if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
# 文字列型 → カテゴリ型への変換
elif col_type == 'object':
num_unique = df[col].nunique()
num_total = len(df[col])
# ユニーク値が全体の50%以下ならカテゴリ化
if num_unique / num_total < 0.5:
df[col] = df[col].astype('category')
end_mem = df.memory_usage(deep=True).sum() / 1024**2
if verbose:
print(f"最適化前: {start_mem:.2f} MB")
print(f"最適化後: {end_mem:.2f} MB")
print(f"削減率: {100 * (start_mem - end_mem) / start_mem:.1f}%")
return df
# 使用例
df = pd.read_csv('data.csv')
df = optimize_memory(df)
3-6. category型の威力
category型は、繰り返しの多いデータで劇的にメモリを削減できます。
📊 category型の効果例
100万行のデータで「都道府県」カラム(47種類)がある場合:
object型:約60MB(各行に文字列を格納)
category型:約1MB(番号のみ格納、辞書で変換)
→ 約60分の1に削減!
# ===== category型の効果を確認 =====
import pandas as pd
import numpy as np
# サンプルデータ作成
n = 1000000
prefectures = [‘東京都’, ‘大阪府’, ‘愛知県’, ‘北海道’, ‘福岡県’] * (n // 5)
df = pd.DataFrame({‘prefecture’: prefectures})
# object型のメモリ
print(“=== object型 ===”)
print(f”メモリ: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB”)
# category型に変換
df[‘prefecture’] = df[‘prefecture’].astype(‘category’)
# category型のメモリ
print(“\n=== category型 ===”)
print(f”メモリ: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB”)
⚡ 4. 並列処理(Dask)
4-1. 並列処理とは?
並列処理とは、複数のCPUコアを同時に使って処理する方法です。
処理時間を大幅に短縮できます。
🏪 例え話:スーパーのレジ
100人のお客さんを処理する場合:
- 通常処理:レジが1台だけ → 行列ができて遅い
- 並列処理:レジが4台 → 4倍速く処理できる
CPUコアも同じです。4コアなら、4つの処理を同時にできます!
4-2. Daskとは?
Daskは、Pandasに似たAPIで大規模データを並列処理できるライブラリです。
# Daskのインストール
pip install “dask[complete]”
4-3. Daskの基本的な使い方
# ===== Daskで大容量ファイル読み込み =====
import dask.dataframe as dd
# Daskで読み込み(この時点ではまだ読み込まない = 遅延評価)
ddf = dd.read_csv(‘large_file.csv’)
# DataFrameの情報を確認
print(f”カラム: {ddf.columns.tolist()}”)
print(f”パーティション数: {ddf.npartitions}”)
# 最初の5行を表示(この時点で必要な部分だけ読み込む)
print(ddf.head())
# 集計(遅延評価 = まだ計算しない)
total_sales = ddf[‘sales’].sum()
print(type(total_sales)) # dask.dataframe.core.Scalar
# 実際に計算を実行(.compute()で計算開始)
result = total_sales.compute()
print(f”売上合計: {result:,.0f}円”)
📝 Daskの特徴:遅延評価
Daskは.compute()を呼ぶまで実際の計算を行いません(遅延評価)。
これにより、不要な計算を避け、効率的に処理できます。
4-4. 複数ファイルを一度に読み込む
# ===== 複数CSVを一度に読み込む =====
import dask.dataframe as dd
# ワイルドカードで複数ファイルを指定
# data/sales_2024_01.csv, sales_2024_02.csv, … を全部読み込む
ddf = dd.read_csv(‘data/sales_*.csv’)
# フィルタリング
filtered = ddf[ddf[‘price’] > 1000]
# グループ集計
result = filtered.groupby(‘category’)[‘sales’].sum()
# 計算実行(並列処理される)
summary = result.compute()
print(summary.sort_values(ascending=False))
4-5. PandasとDaskの比較
| 項目 |
Pandas |
Dask |
| データサイズ |
メモリに収まる範囲 |
メモリ以上でもOK |
| 処理方式 |
シングルコア |
マルチコア(並列) |
| 実行タイミング |
即座に実行 |
遅延評価(.compute()で実行) |
| メモリ使用 |
全データをメモリに |
必要な部分だけ |
| 学習コスト |
低い |
やや高い(遅延評価の理解が必要) |
✅ Daskを使うべきとき
- ファイルサイズがメモリ容量を超える
- 複数の大きなファイルをまとめて処理したい
- 処理時間を短縮したい
❌ Daskが不要なとき
- ファイルが小さい(100MB以下)
- 複雑な処理(Daskは全操作に対応していない)
- インタラクティブな分析
🌊 5. ストリーミング処理
5-1. ストリーミング処理とは?
ストリーミング処理とは、データを1行ずつ読み込んで処理する方法です。
メモリ使用量を最小限に抑えられます。
📺 例え話:テレビの視聴方法
- 録画(= 通常処理):全部録画してから見る → ディスク容量が必要
- ストリーミング(= ストリーミング処理):見ながらデータを受信 → 容量が少なくて済む
5-2. csvモジュールでストリーミング処理
# ===== 1行ずつ処理(ストリーミング) =====
import csv
# 集計用変数
total_sales = 0
count = 0
# ファイルを1行ずつ読み込み
with open(‘sales.csv’, ‘r’, encoding=’utf-8′) as f:
reader = csv.DictReader(f) # ヘッダー行を自動的に辞書のキーに
for row in reader:
# 各行を処理
total_sales += float(row[‘sales’])
count += 1
# 10万行ごとに進捗表示
if count % 100000 == 0:
print(f”処理済み: {count:,}行”)
print(“=” * 40)
print(f”総売上: {total_sales:,.0f}円”)
print(f”総行数: {count:,}行”)
print(f”平均売上: {total_sales / count:,.2f}円”)
5-3. フィルタリングしてファイル出力
# ===== ストリーミングでフィルタリング =====
import csv
input_file = ‘large_file.csv’
output_file = ‘filtered.csv’
# 出力ファイルを準備
with open(output_file, ‘w’, newline=”, encoding=’utf-8′) as out_f:
writer = None
# 入力ファイルを1行ずつ処理
with open(input_file, ‘r’, encoding=’utf-8′) as in_f:
reader = csv.DictReader(in_f)
for row in reader:
# 条件でフィルタ(例:年齢20以上)
if int(row[‘age’]) >= 20:
# 最初の行でwriterを初期化(ヘッダー出力)
if writer is None:
writer = csv.DictWriter(out_f, fieldnames=reader.fieldnames)
writer.writeheader()
writer.writerow(row)
print(f”完了!結果は {output_file} に保存されました”)
5-4. チャンク処理 vs ストリーミング処理
| 項目 |
チャンク処理 |
ストリーミング処理 |
| 処理単位 |
まとまった単位(10万行など) |
1行ずつ |
| メモリ |
チャンクサイズ分 |
最小(1行分) |
| 処理速度 |
速い |
やや遅い |
| Pandas機能 |
使える |
使えない |
| 向いている処理 |
集計、変換、分析 |
シンプルなフィルタ、集計 |
🏋️ 6. 実践演習:大容量CSVの処理
6-1. 課題:売上データの集計
10GBの売上データから、2024年1月のデータだけを抽出して、商品別の売上を集計します。
# ===== 完全な実装例 =====
import pandas as pd
from datetime import datetime
from collections import defaultdict
# 設定
input_file = ‘sales_10gb.csv’
output_file = ‘summary_2024_01.csv’
chunk_size = 100000
# dtype指定でメモリ削減
dtypes = {
‘order_id’: ‘int32’,
‘product_id’: ‘int32’,
‘product_name’: ‘str’, # 後でcategory化
‘quantity’: ‘int16’,
‘price’: ‘float32’,
‘sales’: ‘float32’
}
# 集計用の辞書
product_sales = defaultdict(float)
product_count = defaultdict(int)
# 処理開始
total_rows = 0
start_time = datetime.now()
print(“処理開始…”)
# チャンクごとに処理
for i, chunk in enumerate(pd.read_csv(
input_file,
chunksize=chunk_size,
dtype=dtypes,
parse_dates=[‘order_date’] # 日付カラムを自動パース
)):
# 2024年1月のデータだけフィルタ
filtered = chunk[
(chunk[‘order_date’] >= ‘2024-01-01’) &
(chunk[‘order_date’] < '2024-02-01')
]
# 商品別に集計(groupbyで高速化)
if len(filtered) > 0:
chunk_summary = filtered.groupby(‘product_name’).agg({
‘sales’: ‘sum’,
‘order_id’: ‘count’
})
for product, row in chunk_summary.iterrows():
product_sales[product] += row[‘sales’]
product_count[product] += row[‘order_id’]
# 進捗表示
total_rows += len(chunk)
if (i + 1) % 10 == 0: # 10チャンクごと
elapsed = (datetime.now() – start_time).total_seconds()
speed = total_rows / elapsed
print(f”処理済み: {total_rows:,}行 ({elapsed:.1f}秒, {speed:,.0f}行/秒)”)
# 結果をDataFrameに変換
result_df = pd.DataFrame({
‘product_name’: list(product_sales.keys()),
‘total_sales’: list(product_sales.values()),
‘order_count’: list(product_count.values())
})
# 売上順にソート
result_df = result_df.sort_values(‘total_sales’, ascending=False)
# CSVに保存
result_df.to_csv(output_file, index=False)
# 結果表示
print(“\n” + “=” * 50)
print(“処理完了!”)
print(f”総行数: {total_rows:,}行”)
print(f”処理時間: {(datetime.now() – start_time).total_seconds():.1f}秒”)
print(f”\n売上トップ10:”)
print(result_df.head(10).to_string(index=False))
💡 このコードのポイント
- dtype指定:メモリを削減(int64→int32、float64→float32)
- チャンク処理:10万行ずつ処理してメモリ節約
- defaultdict:キーの存在チェックが不要で便利
- 進捗表示:処理状況を確認できる
- groupby:チャンク内の集計を高速化
📝 STEP 6 のまとめ
✅ このステップで学んだこと
- 大容量ファイルの問題:メモリに収まらないとエラーになる
- チャンク処理:
chunksizeで少しずつ読み込む
- dtype指定:データ型を最適化してメモリ削減
- category型:繰り返しの多いデータで劇的に削減
- 並列処理(Dask):複数コアで高速処理
- ストリーミング処理:1行ずつ処理してメモリ最小化
💡 大容量ファイル処理のベストプラクティス
- まずdtype指定でメモリ削減を試す
- それでも足りなければチャンク処理
- さらに速度が欲しければDaskで並列処理
- シンプルな処理ならストリーミング処理も検討
- 定期的に進捗を表示して処理状況を確認
🎯 次のステップの予告
次のSTEP 7では、「Pandasによるデータクリーニング①」を学びます。
- 欠損値の処理(削除、補完、検出)
- 重複データの処理
- データ型の確認と変換
データをきれいにする基本技術を習得しましょう!
📝 練習問題
問題 1
基礎
chunksizeを指定してCSVファイルを読み込み、各チャンクの行数を表示するコードを書いてください。
【解答例】
import pandas as pd
chunk_size = 10000
for i, chunk in enumerate(pd.read_csv(‘data.csv’, chunksize=chunk_size)):
print(f”チャンク {i+1}: {len(chunk)}行”)
問題 2
基礎
DataFrameのメモリ使用量を確認するコードを書いてください。
【解答例】
import pandas as pd
df = pd.read_csv(‘data.csv’)
# カラムごとのメモリ使用量
print(df.memory_usage(deep=True))
# 合計(MB単位)
total_mb = df.memory_usage(deep=True).sum() / 1024**2
print(f”\n合計: {total_mb:.2f} MB”)
問題 3
基礎
int64型のカラムをint32型に変換するコードを書いてください。
【解答例】
import pandas as pd
df = pd.read_csv(‘data.csv’)
# 変換前のデータ型を確認
print(“変換前:”, df[‘user_id’].dtype)
# int32に変換
df[‘user_id’] = df[‘user_id’].astype(‘int32’)
# 変換後のデータ型を確認
print(“変換後:”, df[‘user_id’].dtype)
問題 4
基礎
文字列カラムをcategory型に変換するコードを書いてください。
【解答例】
import pandas as pd
df = pd.read_csv(‘data.csv’)
# 変換前のメモリ使用量
before = df[‘prefecture’].memory_usage(deep=True)
# category型に変換
df[‘prefecture’] = df[‘prefecture’].astype(‘category’)
# 変換後のメモリ使用量
after = df[‘prefecture’].memory_usage(deep=True)
print(f”変換前: {before / 1024:.2f} KB”)
print(f”変換後: {after / 1024:.2f} KB”)
問題 5
応用
チャンク処理を使って、全データの売上合計を計算するコードを書いてください。
【解答例】
import pandas as pd
chunk_size = 100000
total_sales = 0
total_rows = 0
for chunk in pd.read_csv(‘sales.csv’, chunksize=chunk_size):
total_sales += chunk[‘sales’].sum()
total_rows += len(chunk)
print(f”総売上: {total_sales:,.0f}円”)
print(f”総行数: {total_rows:,}行”)
問題 6
応用
チャンク処理でフィルタリングした結果を新しいCSVファイルに保存するコードを書いてください。条件:priceが1000以上
【解答例】
import pandas as pd
chunk_size = 100000
output_file = ‘filtered.csv’
first_chunk = True
for chunk in pd.read_csv(‘data.csv’, chunksize=chunk_size):
# フィルタリング
filtered = chunk[chunk[‘price’] >= 1000]
# 結果を追記
if len(filtered) > 0:
filtered.to_csv(
output_file,
mode=’a’,
header=first_chunk,
index=False
)
first_chunk = False
print(“完了!”)
問題 7
応用
dtype指定とchunksizeを組み合わせてCSVを読み込むコードを書いてください。
【解答例】
import pandas as pd
# dtype指定
dtypes = {
‘user_id’: ‘int32’,
‘age’: ‘int8’,
‘prefecture’: ‘category’,
‘price’: ‘float32’
}
# dtype + chunksize
for chunk in pd.read_csv(‘data.csv’, dtype=dtypes, chunksize=100000):
print(f”行数: {len(chunk)}”)
print(f”メモリ: {chunk.memory_usage(deep=True).sum() / 1024**2:.2f} MB”)
break # 最初のチャンクだけ表示
問題 8
応用
Daskを使って複数のCSVファイル(data/*.csv)を読み込み、売上合計を計算するコードを書いてください。
【解答例】
import dask.dataframe as dd
# 複数ファイルを読み込み
ddf = dd.read_csv(‘data/*.csv’)
# 売上合計を計算
total = ddf[‘sales’].sum().compute()
print(f”売上合計: {total:,.0f}円”)
問題 9
発展
チャンク処理を使って、カテゴリ別の売上合計を計算するコードを書いてください。
【解答例】
import pandas as pd
from collections import defaultdict
chunk_size = 100000
category_sales = defaultdict(float)
for chunk in pd.read_csv(‘sales.csv’, chunksize=chunk_size):
# チャンク内でgroupby集計
chunk_summary = chunk.groupby(‘category’)[‘sales’].sum()
# 辞書に加算
for category, sales in chunk_summary.items():
category_sales[category] += sales
# 結果をDataFrameに変換
result = pd.DataFrame({
‘category’: list(category_sales.keys()),
‘total_sales’: list(category_sales.values())
})
# ソートして表示
result = result.sort_values(‘total_sales’, ascending=False)
print(result)
問題 10
発展
自動メモリ最適化関数を使って、DataFrameのメモリを削減し、削減率を表示するコードを書いてください。
【解答例】
import pandas as pd
import numpy as np
def optimize_memory(df):
“””DataFrameのメモリを自動最適化”””
start_mem = df.memory_usage(deep=True).sum() / 1024**2
for col in df.columns:
col_type = df[col].dtype
if col_type != ‘object’ and col_type.name != ‘category’:
c_min, c_max = df[col].min(), df[col].max()
if str(col_type)[:3] == ‘int’:
if c_min >= 0 and c_max < 255:
df[col] = df[col].astype(np.uint8)
elif c_min > -128 and c_max < 127:
df[col] = df[col].astype(np.int8)
elif c_min > -32768 and c_max < 32767:
df[col] = df[col].astype(np.int16)
elif str(col_type)[:5] == 'float':
df[col] = df[col].astype(np.float32)
elif col_type == 'object':
if df[col].nunique() / len(df) < 0.5:
df[col] = df[col].astype('category')
end_mem = df.memory_usage(deep=True).sum() / 1024**2
print(f"最適化前: {start_mem:.2f} MB")
print(f"最適化後: {end_mem:.2f} MB")
print(f"削減率: {100 * (start_mem - end_mem) / start_mem:.1f}%")
return df
# 使用
df = pd.read_csv('data.csv')
df = optimize_memory(df)