STEP 6:大容量ファイル処理

📦 STEP 6: 大容量ファイル処理

メモリに収まらない巨大ファイルを効率的に処理する方法を学びます

📋 このステップで学ぶこと

  • 大容量ファイル処理の問題点と解決アプローチ
  • チャンク処理(chunksize)による分割読み込み
  • dtype指定によるメモリ削減テクニック
  • category型の効果的な活用
  • Daskを使った並列処理
  • ストリーミング処理の基礎
  • 実践演習:大容量CSVファイルの処理

🎯 1. 大容量ファイルの問題点

1-1. なぜ大容量ファイルは難しいのか?

通常の方法でファイルを読み込むと、すべてのデータをメモリ(RAM)に展開します。
ファイルがメモリ容量を超えると、エラーになったり、PCが極端に遅くなります。

❌ 大容量ファイルでNG なコード
import pandas as pd # 10GBのCSVファイルを読み込もうとすると… df = pd.read_csv(‘huge_file.csv’) # MemoryError!

PCのメモリが8GBの場合、10GBのファイルは読み込めません!
実際には、CSVをDataFrameに変換すると2〜10倍のメモリを使うこともあります。

🚚 例え話:引っ越しの荷物

引っ越しで大量の荷物を運ぶとき:

普通の方法:すべての荷物を一度にトラックに積もうとする
→ トラックに入りきらない!

賢い方法:荷物を少しずつ運ぶ
→ 何往復かすれば、すべて運べる!

データも同じです。少しずつ処理すればメモリに収まります。

1-2. ファイルサイズと処理方法の目安

ファイルサイズ 推奨処理方法 説明
〜100MB 通常の読み込み そのままpd.read_csv()でOK
100MB〜1GB dtype指定 データ型を最適化してメモリ削減
1GB〜10GB チャンク処理 少しずつ読み込んで処理
10GB以上 並列処理+チャンク 複数のコアで並列処理(Dask等)
📝 実際のメモリ使用量の目安

CSVファイルをDataFrameに読み込むと、ファイルサイズの2〜10倍のメモリを使います。

  • 100MBのCSV → 200MB〜1GBのメモリ
  • 1GBのCSV → 2GB〜10GBのメモリ
  • 文字列が多いとメモリ使用量が増加

1-3. 大容量ファイル処理の4つのアプローチ

1. dtype指定
2. チャンク処理
3. 並列処理
4. ストリーミング
アプローチ 仕組み 使いどころ
dtype指定 データ型を最適化してメモリ削減 最初に試すべき方法
チャンク処理 ファイルを小さな塊に分けて処理 dtype指定でも足りない場合
並列処理 複数のCPUコアで同時処理 処理速度を上げたい場合
ストリーミング 1行ずつ読み込んで処理 シンプルな処理、メモリ最小化

🔄 2. チャンク処理(chunksize)

2-1. チャンク処理とは?

チャンク(chunk)とは、「かたまり」という意味です。
大きなファイルを小さなかたまりに分けて、少しずつ処理します。

📚 チャンクのイメージ

1000万行のCSVファイルを100万行ずつ処理する場合:

  • チャンク1:1〜100万行目を読み込み → 処理 → メモリから解放
  • チャンク2:101〜200万行目を読み込み → 処理 → メモリから解放
  • チャンク3:201〜300万行目を読み込み → 処理 → メモリから解放
  • … 10回繰り返し

常に100万行分のメモリしか使わないので、メモリ不足になりません!

2-2. 基本的なチャンク処理

# ===== chunksizeを使った読み込み ===== import pandas as pd # 10万行ずつ読み込む chunk_size = 100000 # read_csvにchunksizeを指定すると、イテレータが返る for chunk in pd.read_csv(‘large_file.csv’, chunksize=chunk_size): # chunkはDataFrame(10万行分) print(f”チャンクサイズ: {len(chunk)}行”) print(f”メモリ使用量: {chunk.memory_usage(deep=True).sum() / 1024**2:.2f} MB”) # ここでデータ処理(例:特定の条件でフィルタ) filtered = chunk[chunk[‘price’] > 1000] print(f”フィルタ後: {len(filtered)}行”) print(“—“)
📝 コードの説明
  • chunksize=100000:1回に読み込む行数を指定
  • for chunk in pd.read_csv(...):チャンクごとにループ
  • chunkは通常のDataFrame(Pandasの全機能が使える)
  • ループが終わると、そのチャンクはメモリから解放される

2-3. チャンクを処理して結合する

フィルタリングした結果を結合して1つのDataFrameにする方法です。

# ===== チャンクを処理して結合 ===== import pandas as pd chunk_size = 100000 result_chunks = [] # 結果を格納するリスト # 各チャンクを処理 for chunk in pd.read_csv(‘large_file.csv’, chunksize=chunk_size): # 条件でフィルタリング(例:価格1000円以上) filtered = chunk[chunk[‘price’] > 1000] # 結果が空でなければリストに追加 if len(filtered) > 0: result_chunks.append(filtered) # すべてのチャンクを結合 result = pd.concat(result_chunks, ignore_index=True) print(f”結果: {len(result)}行”) print(result.head())
⚠️ 注意点

結合後のDataFrameがメモリに収まる場合のみ有効です。
フィルタリングで十分にデータが絞られる場合に使いましょう。

2-4. チャンクごとに集計する

全データの合計や平均を計算する場合、チャンクごとに集計して累積します。

# ===== チャンクごとに集計して合計 ===== import pandas as pd chunk_size = 100000 total_sales = 0 # 売上合計 total_count = 0 # 行数カウント processed_chunks = 0 # 処理済みチャンク数 for chunk in pd.read_csv(‘sales.csv’, chunksize=chunk_size): # 各チャンクで集計 chunk_sales = chunk[‘sales’].sum() chunk_count = len(chunk) # 合計に加算 total_sales += chunk_sales total_count += chunk_count processed_chunks += 1 print(f”チャンク {processed_chunks}: 売上 {chunk_sales:,.0f}円, 行数 {chunk_count:,}行”) print(“=” * 50) print(f”総売上: {total_sales:,.0f}円”) print(f”総行数: {total_count:,}行”) print(f”平均売上: {total_sales / total_count:,.2f}円”)

2-5. グループ別に集計する

カテゴリ別の集計など、グループ別の集計をチャンク処理で行う方法です。

# ===== カテゴリ別集計(チャンク処理) ===== import pandas as pd from collections import defaultdict chunk_size = 100000 # カテゴリごとの売上を格納する辞書 category_sales = defaultdict(float) category_count = defaultdict(int) for chunk in pd.read_csv(‘sales.csv’, chunksize=chunk_size): # チャンク内でgroupby集計 chunk_summary = chunk.groupby(‘category’).agg({ ‘sales’: ‘sum’, ‘order_id’: ‘count’ }) # 辞書に追加 for category, row in chunk_summary.iterrows(): category_sales[category] += row[‘sales’] category_count[category] += row[‘order_id’] # 結果をDataFrameに変換 result = pd.DataFrame({ ‘category’: list(category_sales.keys()), ‘total_sales’: list(category_sales.values()), ‘order_count’: list(category_count.values()) }) # 売上順にソート result = result.sort_values(‘total_sales’, ascending=False) print(result)

2-6. 結果をファイルに追記する

処理結果をメモリに溜めずに、直接ファイルに追記する方法です。
これにより、どんなに大きなファイルでも処理できます。

# ===== 結果をファイルに追記 ===== import pandas as pd chunk_size = 100000 output_file = ‘filtered_data.csv’ first_chunk = True # ヘッダーを書くかどうかのフラグ for chunk in pd.read_csv(‘huge_file.csv’, chunksize=chunk_size): # 条件でフィルタ(例:2024年1月のデータ) filtered = chunk[ (chunk[‘date’] >= ‘2024-01-01’) & (chunk[‘date’] < '2024-02-01') ] # 結果があればファイルに追記 if len(filtered) > 0: filtered.to_csv( output_file, mode=’a’, # 追記モード(’w’は上書き) header=first_chunk, # 最初だけヘッダーを書く index=False ) first_chunk = False print(f”処理済み: {chunk.index[-1] + 1:,}行”) print(f”完了!結果は {output_file} に保存されました”)
✅ chunksizeの決め方
  • 小さすぎる(1,000行):処理が遅い(読み込み回数が多い)
  • 大きすぎる(1000万行):メモリ不足になる
  • 推奨値
  • メモリ8GB → 5万〜10万行
  • メモリ16GB → 20万〜50万行
  • メモリ32GB以上 → 50万〜100万行

💾 3. dtype指定によるメモリ削減

3-1. dtypeとは?

dtype(データ型)を適切に指定することで、メモリ使用量を大幅に削減できます。
Pandasのデフォルトは「余裕を持った」データ型なので、最適化の余地があります。

💡 例え話:収納ボックス

靴を収納するとき:

  • 大きすぎる箱:スペースの無駄(= int64で小さな数値を格納)
  • ぴったりの箱:効率的(= int8で小さな数値を格納)

データ型も「ぴったりサイズ」を選ぶことで、メモリを節約できます。

3-2. データ型とメモリ使用量

データ型 メモリ 値の範囲 使用例
int64 8バイト -9京 〜 9京 Pandasのデフォルト
int32 4バイト -21億 〜 21億 ID、数量など
int16 2バイト -32,768 〜 32,767 年、小さな数値
int8 1バイト -128 〜 127 年齢、フラグ
float64 8バイト 倍精度小数 Pandasのデフォルト
float32 4バイト 単精度小数 価格、スコアなど
category 少ない 繰り返しの多いデータ 性別、都道府県

3-3. dtype指定の基本

# ===== dtypeを指定して読み込み ===== import pandas as pd # データ型を辞書で定義 dtypes = { ‘user_id’: ‘int32’, # int64 → int32(メモリ半減) ‘age’: ‘int8’, # 0〜120なのでint8で十分 ‘gender’: ‘category’, # ‘M’/’F’のような限られた値 ‘prefecture’: ‘category’, # 47都道府県(繰り返しが多い) ‘price’: ‘float32’, # float64 → float32(メモリ半減) ‘product_name’: ‘str’ # 文字列 } # dtype指定で読み込み df = pd.read_csv(‘data.csv’, dtype=dtypes) # メモリ使用量を確認 print(“=== カラムごとのメモリ使用量 ===”) print(df.memory_usage(deep=True)) print() print(f”合計: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB”)

3-4. メモリ使用量の確認と最適化

# ===== 読み込み後の最適化 ===== import pandas as pd # まず普通に読み込み df = pd.read_csv(‘data.csv’) # 最適化前のメモリ使用量 print(“=== 最適化前 ===”) print(f”合計: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB”) print() print(“データ型:”) print(df.dtypes) print() # 各カラムの値の範囲を確認 print(“数値カラムの範囲:”) for col in df.select_dtypes(include=[‘int64’, ‘float64’]).columns: print(f” {col}: {df[col].min()} 〜 {df[col].max()}”) # 最適化 df[‘user_id’] = df[‘user_id’].astype(‘int32’) # 範囲に合わせて変更 df[‘age’] = df[‘age’].astype(‘int8’) # 0〜120なのでint8 df[‘prefecture’] = df[‘prefecture’].astype(‘category’) # 繰り返しが多い df[‘price’] = df[‘price’].astype(‘float32’) # 精度を落としてOK # 最適化後のメモリ使用量 print(“\n=== 最適化後 ===”) print(f”合計: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB”)

3-5. 自動メモリ最適化関数

データ型を自動的に最適化する便利な関数を用意しました。

# ===== 自動メモリ最適化関数 ===== import pandas as pd import numpy as np def optimize_memory(df, verbose=True): “”” DataFrameのメモリを自動最適化 Parameters: df: 最適化するDataFrame verbose: 詳細を表示するか Returns: 最適化されたDataFrame “”” start_mem = df.memory_usage(deep=True).sum() / 1024**2 for col in df.columns: col_type = df[col].dtype # 数値型の最適化 if col_type != ‘object’ and col_type.name != ‘category’: c_min = df[col].min() c_max = df[col].max() # 整数型 if str(col_type)[:3] == ‘int’: if c_min >= 0: # 非負整数 if c_max < 255: df[col] = df[col].astype(np.uint8) elif c_max < 65535: df[col] = df[col].astype(np.uint16) elif c_max < 4294967295: df[col] = df[col].astype(np.uint32) else: # 負の値あり if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) # 浮動小数点型 elif str(col_type)[:5] == 'float': if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) # 文字列型 → カテゴリ型への変換 elif col_type == 'object': num_unique = df[col].nunique() num_total = len(df[col]) # ユニーク値が全体の50%以下ならカテゴリ化 if num_unique / num_total < 0.5: df[col] = df[col].astype('category') end_mem = df.memory_usage(deep=True).sum() / 1024**2 if verbose: print(f"最適化前: {start_mem:.2f} MB") print(f"最適化後: {end_mem:.2f} MB") print(f"削減率: {100 * (start_mem - end_mem) / start_mem:.1f}%") return df # 使用例 df = pd.read_csv('data.csv') df = optimize_memory(df)

3-6. category型の威力

category型は、繰り返しの多いデータで劇的にメモリを削減できます。

📊 category型の効果例

100万行のデータで「都道府県」カラム(47種類)がある場合:

  • object型:約60MB(各行に文字列を格納)
  • category型:約1MB(番号のみ格納、辞書で変換)

約60分の1に削減!

# ===== category型の効果を確認 ===== import pandas as pd import numpy as np # サンプルデータ作成 n = 1000000 prefectures = [‘東京都’, ‘大阪府’, ‘愛知県’, ‘北海道’, ‘福岡県’] * (n // 5) df = pd.DataFrame({‘prefecture’: prefectures}) # object型のメモリ print(“=== object型 ===”) print(f”メモリ: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB”) # category型に変換 df[‘prefecture’] = df[‘prefecture’].astype(‘category’) # category型のメモリ print(“\n=== category型 ===”) print(f”メモリ: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB”)

⚡ 4. 並列処理(Dask)

4-1. 並列処理とは?

並列処理とは、複数のCPUコアを同時に使って処理する方法です。
処理時間を大幅に短縮できます。

🏪 例え話:スーパーのレジ

100人のお客さんを処理する場合:

  • 通常処理:レジが1台だけ → 行列ができて遅い
  • 並列処理:レジが4台 → 4倍速く処理できる

CPUコアも同じです。4コアなら、4つの処理を同時にできます!

4-2. Daskとは?

Daskは、Pandasに似たAPIで大規模データを並列処理できるライブラリです。

# Daskのインストール pip install “dask[complete]”

4-3. Daskの基本的な使い方

# ===== Daskで大容量ファイル読み込み ===== import dask.dataframe as dd # Daskで読み込み(この時点ではまだ読み込まない = 遅延評価) ddf = dd.read_csv(‘large_file.csv’) # DataFrameの情報を確認 print(f”カラム: {ddf.columns.tolist()}”) print(f”パーティション数: {ddf.npartitions}”) # 最初の5行を表示(この時点で必要な部分だけ読み込む) print(ddf.head()) # 集計(遅延評価 = まだ計算しない) total_sales = ddf[‘sales’].sum() print(type(total_sales)) # dask.dataframe.core.Scalar # 実際に計算を実行(.compute()で計算開始) result = total_sales.compute() print(f”売上合計: {result:,.0f}円”)
📝 Daskの特徴:遅延評価

Daskは.compute()を呼ぶまで実際の計算を行いません(遅延評価)。
これにより、不要な計算を避け、効率的に処理できます。

4-4. 複数ファイルを一度に読み込む

# ===== 複数CSVを一度に読み込む ===== import dask.dataframe as dd # ワイルドカードで複数ファイルを指定 # data/sales_2024_01.csv, sales_2024_02.csv, … を全部読み込む ddf = dd.read_csv(‘data/sales_*.csv’) # フィルタリング filtered = ddf[ddf[‘price’] > 1000] # グループ集計 result = filtered.groupby(‘category’)[‘sales’].sum() # 計算実行(並列処理される) summary = result.compute() print(summary.sort_values(ascending=False))

4-5. PandasとDaskの比較

項目 Pandas Dask
データサイズ メモリに収まる範囲 メモリ以上でもOK
処理方式 シングルコア マルチコア(並列)
実行タイミング 即座に実行 遅延評価(.compute()で実行)
メモリ使用 全データをメモリに 必要な部分だけ
学習コスト 低い やや高い(遅延評価の理解が必要)
✅ Daskを使うべきとき
  • ファイルサイズがメモリ容量を超える
  • 複数の大きなファイルをまとめて処理したい
  • 処理時間を短縮したい
❌ Daskが不要なとき
  • ファイルが小さい(100MB以下)
  • 複雑な処理(Daskは全操作に対応していない)
  • インタラクティブな分析

🌊 5. ストリーミング処理

5-1. ストリーミング処理とは?

ストリーミング処理とは、データを1行ずつ読み込んで処理する方法です。
メモリ使用量を最小限に抑えられます。

📺 例え話:テレビの視聴方法
  • 録画(= 通常処理):全部録画してから見る → ディスク容量が必要
  • ストリーミング(= ストリーミング処理):見ながらデータを受信 → 容量が少なくて済む

5-2. csvモジュールでストリーミング処理

# ===== 1行ずつ処理(ストリーミング) ===== import csv # 集計用変数 total_sales = 0 count = 0 # ファイルを1行ずつ読み込み with open(‘sales.csv’, ‘r’, encoding=’utf-8′) as f: reader = csv.DictReader(f) # ヘッダー行を自動的に辞書のキーに for row in reader: # 各行を処理 total_sales += float(row[‘sales’]) count += 1 # 10万行ごとに進捗表示 if count % 100000 == 0: print(f”処理済み: {count:,}行”) print(“=” * 40) print(f”総売上: {total_sales:,.0f}円”) print(f”総行数: {count:,}行”) print(f”平均売上: {total_sales / count:,.2f}円”)

5-3. フィルタリングしてファイル出力

# ===== ストリーミングでフィルタリング ===== import csv input_file = ‘large_file.csv’ output_file = ‘filtered.csv’ # 出力ファイルを準備 with open(output_file, ‘w’, newline=”, encoding=’utf-8′) as out_f: writer = None # 入力ファイルを1行ずつ処理 with open(input_file, ‘r’, encoding=’utf-8′) as in_f: reader = csv.DictReader(in_f) for row in reader: # 条件でフィルタ(例:年齢20以上) if int(row[‘age’]) >= 20: # 最初の行でwriterを初期化(ヘッダー出力) if writer is None: writer = csv.DictWriter(out_f, fieldnames=reader.fieldnames) writer.writeheader() writer.writerow(row) print(f”完了!結果は {output_file} に保存されました”)

5-4. チャンク処理 vs ストリーミング処理

項目 チャンク処理 ストリーミング処理
処理単位 まとまった単位(10万行など) 1行ずつ
メモリ チャンクサイズ分 最小(1行分)
処理速度 速い やや遅い
Pandas機能 使える 使えない
向いている処理 集計、変換、分析 シンプルなフィルタ、集計

🏋️ 6. 実践演習:大容量CSVの処理

6-1. 課題:売上データの集計

10GBの売上データから、2024年1月のデータだけを抽出して、商品別の売上を集計します。

# ===== 完全な実装例 ===== import pandas as pd from datetime import datetime from collections import defaultdict # 設定 input_file = ‘sales_10gb.csv’ output_file = ‘summary_2024_01.csv’ chunk_size = 100000 # dtype指定でメモリ削減 dtypes = { ‘order_id’: ‘int32’, ‘product_id’: ‘int32’, ‘product_name’: ‘str’, # 後でcategory化 ‘quantity’: ‘int16’, ‘price’: ‘float32’, ‘sales’: ‘float32’ } # 集計用の辞書 product_sales = defaultdict(float) product_count = defaultdict(int) # 処理開始 total_rows = 0 start_time = datetime.now() print(“処理開始…”) # チャンクごとに処理 for i, chunk in enumerate(pd.read_csv( input_file, chunksize=chunk_size, dtype=dtypes, parse_dates=[‘order_date’] # 日付カラムを自動パース )): # 2024年1月のデータだけフィルタ filtered = chunk[ (chunk[‘order_date’] >= ‘2024-01-01’) & (chunk[‘order_date’] < '2024-02-01') ] # 商品別に集計(groupbyで高速化) if len(filtered) > 0: chunk_summary = filtered.groupby(‘product_name’).agg({ ‘sales’: ‘sum’, ‘order_id’: ‘count’ }) for product, row in chunk_summary.iterrows(): product_sales[product] += row[‘sales’] product_count[product] += row[‘order_id’] # 進捗表示 total_rows += len(chunk) if (i + 1) % 10 == 0: # 10チャンクごと elapsed = (datetime.now() – start_time).total_seconds() speed = total_rows / elapsed print(f”処理済み: {total_rows:,}行 ({elapsed:.1f}秒, {speed:,.0f}行/秒)”) # 結果をDataFrameに変換 result_df = pd.DataFrame({ ‘product_name’: list(product_sales.keys()), ‘total_sales’: list(product_sales.values()), ‘order_count’: list(product_count.values()) }) # 売上順にソート result_df = result_df.sort_values(‘total_sales’, ascending=False) # CSVに保存 result_df.to_csv(output_file, index=False) # 結果表示 print(“\n” + “=” * 50) print(“処理完了!”) print(f”総行数: {total_rows:,}行”) print(f”処理時間: {(datetime.now() – start_time).total_seconds():.1f}秒”) print(f”\n売上トップ10:”) print(result_df.head(10).to_string(index=False))
💡 このコードのポイント
  • dtype指定:メモリを削減(int64→int32、float64→float32)
  • チャンク処理:10万行ずつ処理してメモリ節約
  • defaultdict:キーの存在チェックが不要で便利
  • 進捗表示:処理状況を確認できる
  • groupby:チャンク内の集計を高速化

📝 STEP 6 のまとめ

✅ このステップで学んだこと
  • 大容量ファイルの問題:メモリに収まらないとエラーになる
  • チャンク処理chunksizeで少しずつ読み込む
  • dtype指定:データ型を最適化してメモリ削減
  • category型:繰り返しの多いデータで劇的に削減
  • 並列処理(Dask):複数コアで高速処理
  • ストリーミング処理:1行ずつ処理してメモリ最小化
💡 大容量ファイル処理のベストプラクティス
  1. まずdtype指定でメモリ削減を試す
  2. それでも足りなければチャンク処理
  3. さらに速度が欲しければDaskで並列処理
  4. シンプルな処理ならストリーミング処理も検討
  5. 定期的に進捗を表示して処理状況を確認
🎯 次のステップの予告

次のSTEP 7では、「Pandasによるデータクリーニング①」を学びます。

  • 欠損値の処理(削除、補完、検出)
  • 重複データの処理
  • データ型の確認と変換

データをきれいにする基本技術を習得しましょう!

📝 練習問題

問題 1 基礎

chunksizeを指定してCSVファイルを読み込み、各チャンクの行数を表示するコードを書いてください。

【解答例】
import pandas as pd chunk_size = 10000 for i, chunk in enumerate(pd.read_csv(‘data.csv’, chunksize=chunk_size)): print(f”チャンク {i+1}: {len(chunk)}行”)
問題 2 基礎

DataFrameのメモリ使用量を確認するコードを書いてください。

【解答例】
import pandas as pd df = pd.read_csv(‘data.csv’) # カラムごとのメモリ使用量 print(df.memory_usage(deep=True)) # 合計(MB単位) total_mb = df.memory_usage(deep=True).sum() / 1024**2 print(f”\n合計: {total_mb:.2f} MB”)
問題 3 基礎

int64型のカラムをint32型に変換するコードを書いてください。

【解答例】
import pandas as pd df = pd.read_csv(‘data.csv’) # 変換前のデータ型を確認 print(“変換前:”, df[‘user_id’].dtype) # int32に変換 df[‘user_id’] = df[‘user_id’].astype(‘int32’) # 変換後のデータ型を確認 print(“変換後:”, df[‘user_id’].dtype)
問題 4 基礎

文字列カラムをcategory型に変換するコードを書いてください。

【解答例】
import pandas as pd df = pd.read_csv(‘data.csv’) # 変換前のメモリ使用量 before = df[‘prefecture’].memory_usage(deep=True) # category型に変換 df[‘prefecture’] = df[‘prefecture’].astype(‘category’) # 変換後のメモリ使用量 after = df[‘prefecture’].memory_usage(deep=True) print(f”変換前: {before / 1024:.2f} KB”) print(f”変換後: {after / 1024:.2f} KB”)
問題 5 応用

チャンク処理を使って、全データの売上合計を計算するコードを書いてください。

【解答例】
import pandas as pd chunk_size = 100000 total_sales = 0 total_rows = 0 for chunk in pd.read_csv(‘sales.csv’, chunksize=chunk_size): total_sales += chunk[‘sales’].sum() total_rows += len(chunk) print(f”総売上: {total_sales:,.0f}円”) print(f”総行数: {total_rows:,}行”)
問題 6 応用

チャンク処理でフィルタリングした結果を新しいCSVファイルに保存するコードを書いてください。条件:priceが1000以上

【解答例】
import pandas as pd chunk_size = 100000 output_file = ‘filtered.csv’ first_chunk = True for chunk in pd.read_csv(‘data.csv’, chunksize=chunk_size): # フィルタリング filtered = chunk[chunk[‘price’] >= 1000] # 結果を追記 if len(filtered) > 0: filtered.to_csv( output_file, mode=’a’, header=first_chunk, index=False ) first_chunk = False print(“完了!”)
問題 7 応用

dtype指定とchunksizeを組み合わせてCSVを読み込むコードを書いてください。

【解答例】
import pandas as pd # dtype指定 dtypes = { ‘user_id’: ‘int32’, ‘age’: ‘int8’, ‘prefecture’: ‘category’, ‘price’: ‘float32’ } # dtype + chunksize for chunk in pd.read_csv(‘data.csv’, dtype=dtypes, chunksize=100000): print(f”行数: {len(chunk)}”) print(f”メモリ: {chunk.memory_usage(deep=True).sum() / 1024**2:.2f} MB”) break # 最初のチャンクだけ表示
問題 8 応用

Daskを使って複数のCSVファイル(data/*.csv)を読み込み、売上合計を計算するコードを書いてください。

【解答例】
import dask.dataframe as dd # 複数ファイルを読み込み ddf = dd.read_csv(‘data/*.csv’) # 売上合計を計算 total = ddf[‘sales’].sum().compute() print(f”売上合計: {total:,.0f}円”)
問題 9 発展

チャンク処理を使って、カテゴリ別の売上合計を計算するコードを書いてください。

【解答例】
import pandas as pd from collections import defaultdict chunk_size = 100000 category_sales = defaultdict(float) for chunk in pd.read_csv(‘sales.csv’, chunksize=chunk_size): # チャンク内でgroupby集計 chunk_summary = chunk.groupby(‘category’)[‘sales’].sum() # 辞書に加算 for category, sales in chunk_summary.items(): category_sales[category] += sales # 結果をDataFrameに変換 result = pd.DataFrame({ ‘category’: list(category_sales.keys()), ‘total_sales’: list(category_sales.values()) }) # ソートして表示 result = result.sort_values(‘total_sales’, ascending=False) print(result)
問題 10 発展

自動メモリ最適化関数を使って、DataFrameのメモリを削減し、削減率を表示するコードを書いてください。

【解答例】
import pandas as pd import numpy as np def optimize_memory(df): “””DataFrameのメモリを自動最適化””” start_mem = df.memory_usage(deep=True).sum() / 1024**2 for col in df.columns: col_type = df[col].dtype if col_type != ‘object’ and col_type.name != ‘category’: c_min, c_max = df[col].min(), df[col].max() if str(col_type)[:3] == ‘int’: if c_min >= 0 and c_max < 255: df[col] = df[col].astype(np.uint8) elif c_min > -128 and c_max < 127: df[col] = df[col].astype(np.int8) elif c_min > -32768 and c_max < 32767: df[col] = df[col].astype(np.int16) elif str(col_type)[:5] == 'float': df[col] = df[col].astype(np.float32) elif col_type == 'object': if df[col].nunique() / len(df) < 0.5: df[col] = df[col].astype('category') end_mem = df.memory_usage(deep=True).sum() / 1024**2 print(f"最適化前: {start_mem:.2f} MB") print(f"最適化後: {end_mem:.2f} MB") print(f"削減率: {100 * (start_mem - end_mem) / start_mem:.1f}%") return df # 使用 df = pd.read_csv('data.csv') df = optimize_memory(df)
📝

学習メモ

ETL・データパイプライン構築 - Step 6

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE