STEP 14:データベースへのロード

💾 STEP 14: データベースへのロード

変換したデータをデータベースに保存する方法を学ぼう

📋 このステップで学ぶこと

  • to_sqlメソッドの使い方(基本から応用まで)
  • if_existsパラメータの使い分け
  • バルクインサート(一括挿入)の実装
  • UPSERT(更新または挿入)の実装
  • トランザクション管理の基礎

⏱️ 学習時間の目安:2時間

📝 練習問題:10問(基礎4問・応用4問・発展2問)

🎯 1. データベースへのロードとは?

1-1. ロードの基本

ロード(Load)とは、変換したデータをデータベースに保存することです。
ETLの最後のステップで、分析結果や加工したデータを永続化します。

📚 例え話:倉庫への荷物の搬入

データロードは、倉庫に荷物を搬入する作業に似ています。

to_sql:トラックで荷物をまとめて搬入(標準的で便利)
バルクインサート:大型トラックで一気に搬入(大量荷物向け)
UPSERT:既に同じ荷物があれば入れ替え、なければ追加
トランザクション:搬入作業全体を管理(途中で失敗したら全部戻す)

1-2. 主なロード方法の比較

ロード方法の比較
方法 特徴 速度 使い所
to_sql Pandasの標準メソッド。簡単で使いやすい ⭐⭐⭐ ほとんどの場面で推奨
execute_values 複数行を一度に挿入。psycopg2専用 ⭐⭐⭐⭐ PostgreSQLで大量データ
COPY PostgreSQL専用の高速コマンド ⭐⭐⭐⭐⭐ 100万件以上の大量データ
1行ずつINSERT 最も遅い。非推奨 避けるべき
💡 このステップのゴール

まずはto_sqlメソッドを使いこなせるようになりましょう。
これ1つで、ほとんどの場面に対応できます!

📝 2. to_sqlメソッドの基本

2-1. 環境準備(SQLite)

まず、SQLiteを使って練習しましょう。
SQLiteはファイルベースのデータベースで、インストール不要で使えます。

# ===== to_sqlの基本 ===== import pandas as pd from sqlalchemy import create_engine # サンプルデータ df = pd.DataFrame({ ‘商品ID’: [1, 2, 3, 4, 5], ‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’, ‘ぶどう’, ‘いちご’], ‘価格’: [150, 100, 80, 300, 250], ‘在庫’: [50, 100, 80, 30, 40] }) print(“=== 保存するデータ ===”) print(df) # SQLiteデータベースに接続(ファイルがなければ自動作成) engine = create_engine(‘sqlite:///shop.db’) # テーブルに保存(テーブル名: products) df.to_sql(‘products’, engine, if_exists=’replace’, index=False) print(“\n✅ データをproductsテーブルに保存しました!”) # 保存したデータを確認 df_loaded = pd.read_sql(‘SELECT * FROM products’, engine) print(“\n=== 保存されたデータ ===”) print(df_loaded)
【実行結果】 === 保存するデータ === 商品ID 商品名 価格 在庫 0 1 りんご 150 50 1 2 バナナ 100 100 2 3 みかん 80 80 3 4 ぶどう 300 30 4 5 いちご 250 40 ✅ データをproductsテーブルに保存しました! === 保存されたデータ === 商品ID 商品名 価格 在庫 0 1 りんご 150 50 1 2 バナナ 100 100 2 3 みかん 80 80 3 4 ぶどう 300 30 4 5 いちご 250 40
🎯 to_sqlのパラメータ
  • 第1引数:テーブル名(’products’)
  • engine:データベース接続
  • if_exists:テーブルが既に存在する場合の挙動
  • index=False:DataFrameのインデックスを保存しない

2-2. if_existsパラメータの詳細

if_existsの3つのオプション
動作 使い所
‘replace’ テーブルを削除して新規作成(既存データは消える) 初回作成時、全データ入れ替え
‘append’ 既存テーブルに追加(既存データは残る) 日次バッチ、データ追加
‘fail’ エラーを出す(テーブルが存在したら失敗) 誤った上書き防止

2-3. データの追加(append)

# ===== appendモードでデータ追加 ===== # 新しいデータ new_data = pd.DataFrame({ ‘商品ID’: [6, 7], ‘商品名’: [‘メロン’, ‘すいか’], ‘価格’: [500, 400], ‘在庫’: [20, 15] }) print(“=== 追加するデータ ===”) print(new_data) # 追加(appendモード) new_data.to_sql(‘products’, engine, if_exists=’append’, index=False) print(“\n✅ データを追加しました!”) # 確認 df_all = pd.read_sql(‘SELECT * FROM products’, engine) print(“\n=== 追加後のデータ(全件)===”) print(df_all)
【実行結果】 === 追加するデータ === 商品ID 商品名 価格 在庫 0 6 メロン 500 20 1 7 すいか 400 15 ✅ データを追加しました! === 追加後のデータ(全件)=== 商品ID 商品名 価格 在庫 0 1 りんご 150 50 1 2 バナナ 100 100 2 3 みかん 80 80 3 4 ぶどう 300 30 4 5 いちご 250 40 5 6 メロン 500 20 6 7 すいか 400 15

2-4. データ型の指定

# ===== データ型を明示的に指定 ===== from sqlalchemy.types import Integer, String df = pd.DataFrame({ ‘商品コード’: [‘A001’, ‘A002’, ‘A003’], ‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’], ‘価格’: [150, 100, 80], ‘説明’: [‘新鮮なりんご’, ‘甘いバナナ’, ‘ジューシーなみかん’] }) # データ型を定義 dtype_dict = { ‘商品コード’: String(10), # 文字列(最大10文字) ‘商品名’: String(100), # 文字列(最大100文字) ‘価格’: Integer, # 整数 ‘説明’: String(200) # 文字列(最大200文字) } # データ型を指定して保存 df.to_sql( ‘products_typed’, engine, if_exists=’replace’, index=False, dtype=dtype_dict ) print(“✅ データ型を指定して保存しました!”) # 確認 result = pd.read_sql(‘SELECT * FROM products_typed’, engine) print(result)
【実行結果】 ✅ データ型を指定して保存しました! 商品コード 商品名 価格 説明 0 A001 りんご 150 新鮮なりんご 1 A002 バナナ 100 甘いバナナ 2 A003 みかん 80 ジューシーなみかん

🔧 3. PostgreSQLへのロード

3-1. 接続設定

実務ではPostgreSQLMySQLなどのデータベースを使うことが多いです。

# ===== PostgreSQLへの接続 ===== from sqlalchemy import create_engine import os # 環境変数から接続情報を取得(推奨) DB_USER = os.getenv(‘DB_USER’, ‘postgres’) DB_PASS = os.getenv(‘DB_PASS’, ‘password’) DB_HOST = os.getenv(‘DB_HOST’, ‘localhost’) DB_PORT = os.getenv(‘DB_PORT’, ‘5432’) DB_NAME = os.getenv(‘DB_NAME’, ‘mydb’) # 接続文字列を作成 connection_string = f’postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}’ # エンジンを作成 engine = create_engine(connection_string) print(“✅ PostgreSQLに接続しました!”) # データを保存 df.to_sql(‘products’, engine, if_exists=’replace’, index=False) print(“✅ データを保存しました!”)
⚠️ セキュリティ上の注意

パスワードなどの接続情報は環境変数に保存して、コードに直接書かないようにしましょう。
.envファイルを使う場合は、.gitignoreに追加してGitにコミットしないこと!

3-2. 各データベースの接続文字列

主要データベースの接続文字列
データベース 接続文字列
SQLite sqlite:///database.db
PostgreSQL postgresql://user:pass@host:5432/dbname
MySQL mysql+pymysql://user:pass@host:3306/dbname

⚡ 4. バルクインサート(大量データの高速挿入)

4-1. バルクインサートとは?

バルクインサートとは、複数行のデータをまとめて一度に挿入する方法です。
1行ずつ挿入するより、数十倍〜数百倍速いです!

📊 速度の違い(10万件の場合)
  • 1行ずつINSERT:約60秒 😰
  • to_sql(デフォルト):約10秒
  • to_sql + chunksize:約3秒
  • execute_values:約1秒
  • COPY:約0.3秒 🚀

4-2. chunksizeパラメータの活用

# ===== chunksizeを指定して大量データを保存 ===== import pandas as pd import numpy as np import time from sqlalchemy import create_engine engine = create_engine(‘sqlite:///large_data.db’) # 10万件のダミーデータを作成 print(“=== 10万件のデータを作成中… ===”) large_df = pd.DataFrame({ ‘商品ID’: range(1, 100001), ‘商品名’: [f’商品{i}’ for i in range(1, 100001)], ‘価格’: np.random.randint(100, 1000, 100000), ‘在庫’: np.random.randint(0, 100, 100000) }) print(f”データ件数: {len(large_df):,}件”) # 時間計測:chunksizeなし start = time.time() large_df.to_sql(‘products_no_chunk’, engine, if_exists=’replace’, index=False) time_no_chunk = time.time() – start print(f”\nchunksizeなし: {time_no_chunk:.2f}秒”) # 時間計測:chunksize=10000 start = time.time() large_df.to_sql(‘products_with_chunk’, engine, if_exists=’replace’, index=False, chunksize=10000) time_with_chunk = time.time() – start print(f”chunksize=10000: {time_with_chunk:.2f}秒”) # 件数確認 count = pd.read_sql(‘SELECT COUNT(*) as cnt FROM products_with_chunk’, engine) print(f”\n✅ 保存件数: {count[‘cnt’][0]:,}件”)
【実行結果】 === 10万件のデータを作成中… === データ件数: 100,000件 chunksizeなし: 4.52秒 chunksize=10000: 2.34秒 ✅ 保存件数: 100,000件
🎯 chunksizeの目安
  • 1,000〜10,000:一般的なケース
  • 10,000〜50,000:メモリに余裕がある場合
  • 100〜1,000:メモリが少ない場合

大きすぎるとメモリ不足、小さすぎると処理が遅くなります。

🔄 5. UPSERT(更新または挿入)

5-1. UPSERTとは?

UPSERTとは、「既に存在したら更新、なければ挿入」という処理です。
主キーの重複を避けながら、データを最新に保つことができます。

📊 UPSERTが必要な場面
  • 日次で在庫データを更新するとき
  • 顧客情報を最新に保つとき
  • 同じキー(主キー)で重複させたくないとき

5-2. SQLiteでのUPSERT

# ===== SQLiteでのUPSERT ===== import sqlite3 import pandas as pd conn = sqlite3.connect(‘upsert_demo.db’) cur = conn.cursor() # テーブル作成 cur.execute(”’ CREATE TABLE IF NOT EXISTS products ( product_id INTEGER PRIMARY KEY, product_name TEXT, price INTEGER, stock INTEGER ) ”’) # 初期データを挿入 initial_data = [ (1, ‘りんご’, 150, 50), (2, ‘バナナ’, 100, 100), (3, ‘みかん’, 80, 80) ] cur.executemany(”’ INSERT OR REPLACE INTO products VALUES (?, ?, ?, ?) ”’, initial_data) conn.commit() print(“=== 初期データ ===”) df = pd.read_sql(‘SELECT * FROM products’, conn) print(df) # UPSERTするデータ(ID=1は更新、ID=4は新規) upsert_data = [ (1, ‘りんご’, 160, 55), # 更新 (4, ‘ぶどう’, 300, 30) # 新規 ] # SQLiteのUPSERT: INSERT OR REPLACE cur.executemany(”’ INSERT OR REPLACE INTO products VALUES (?, ?, ?, ?) ”’, upsert_data) conn.commit() print(“\n=== UPSERT後 ===”) df = pd.read_sql(‘SELECT * FROM products ORDER BY product_id’, conn) print(df) conn.close()
【実行結果】 === 初期データ === product_id product_name price stock 0 1 りんご 150 50 1 2 バナナ 100 100 2 3 みかん 80 80 === UPSERT後 === product_id product_name price stock 0 1 りんご 160 55 1 2 バナナ 100 100 2 3 みかん 80 80 3 4 ぶどう 300 30

5-3. PostgreSQLでのUPSERT

# ===== PostgreSQLでのUPSERT ===== # PostgreSQLでは ON CONFLICT を使用 upsert_query = ”’ INSERT INTO products (product_id, product_name, price, stock) VALUES (%s, %s, %s, %s) ON CONFLICT (product_id) DO UPDATE SET product_name = EXCLUDED.product_name, price = EXCLUDED.price, stock = EXCLUDED.stock ”’ # EXCLUDEDは「挿入しようとした新しい値」を指す
🎯 UPSERTのポイント
  • SQLiteINSERT OR REPLACE
  • PostgreSQLON CONFLICT ... DO UPDATE
  • MySQLON DUPLICATE KEY UPDATE
  • EXCLUDED:新しく挿入しようとした値を参照

🔐 6. トランザクション管理

6-1. トランザクションとは?

トランザクションとは、複数の処理をまとめて実行し、全て成功したときだけ確定する仕組みです。

🏦 銀行の振込の例
  1. A さんの口座から1万円引く
  2. B さんの口座に1万円足す

もし途中でエラーが出たら?
両方とも取り消す(ロールバック)
→ お金が消えたり、増えたりしない!

6-2. 基本的なトランザクション

# ===== トランザクションの基本 ===== import sqlite3 conn = sqlite3.connect(‘transaction_demo.db’) cur = conn.cursor() # テーブル作成 cur.execute(”’CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY, name TEXT, stock INTEGER )”’) cur.execute(”’CREATE TABLE IF NOT EXISTS sales ( id INTEGER PRIMARY KEY AUTOINCREMENT, product_id INTEGER, amount INTEGER )”’) cur.execute(‘DELETE FROM products’) cur.execute(‘DELETE FROM sales’) cur.execute(“INSERT INTO products VALUES (1, ‘りんご’, 100)”) conn.commit() print(“=== 初期状態 ===”) print(pd.read_sql(‘SELECT * FROM products’, conn)) try: # トランザクション開始 print(“\n=== トランザクション開始 ===”) # 処理1:在庫を減らす cur.execute(‘UPDATE products SET stock = stock – 10 WHERE id = 1’) print(“処理1: 在庫を10減らしました”) # 処理2:売上を記録 cur.execute(‘INSERT INTO sales (product_id, amount) VALUES (1, 10)’) print(“処理2: 売上を記録しました”) # 全て成功したらコミット conn.commit() print(“\n✅ トランザクション成功!(コミット)”) except Exception as e: # エラーが出たらロールバック conn.rollback() print(f”\n❌ エラー: {e}”) print(“トランザクションをロールバックしました”) print(“\n=== 最終状態 ===”) print(“【products】”) print(pd.read_sql(‘SELECT * FROM products’, conn)) print(“\n【sales】”) print(pd.read_sql(‘SELECT * FROM sales’, conn)) conn.close()
【実行結果】 === 初期状態 === id name stock 0 1 りんご 100 === トランザクション開始 === 処理1: 在庫を10減らしました 処理2: 売上を記録しました ✅ トランザクション成功!(コミット) === 最終状態 === 【products】 id name stock 0 1 りんご 90 【sales】 id product_id amount 0 1 1 10
💡 トランザクションの3つのキーワード
  • BEGIN:トランザクション開始(自動で開始されることが多い)
  • COMMIT:変更を確定する
  • ROLLBACK:変更を取り消す

📝 STEP 14 のまとめ

✅ このステップで学んだこと
  • to_sql:Pandasの標準メソッドで簡単にDBにロード
  • if_exists:’replace’(上書き)、’append’(追加)、’fail’(エラー)
  • chunksize:大量データを分割して効率的にロード
  • UPSERT:既存なら更新、なければ挿入
  • トランザクション:commit/rollbackでデータの整合性を保つ
💡 使い分けの指針
場面 推奨方法
テーブル新規作成 to_sql + if_exists=’replace’
データ追加 to_sql + if_exists=’append’
大量データ(10万件以上) to_sql + chunksize
重複更新 UPSERT
複数テーブル同時更新 トランザクション
🎯 次のステップの予告

次のSTEP 15では、「ファイルへのエクスポート」を学びます。

  • CSVファイル出力
  • Excelファイル出力
  • JSONファイル出力
  • Parquet形式(高速&圧縮)

📝 練習問題

問題 1 基礎

以下のDataFrameをSQLiteデータベース(test.db)の「customers」テーブルに保存してください。

df = pd.DataFrame({ ‘顧客ID’: [1, 2, 3], ‘名前’: [‘田中太郎’, ‘佐藤花子’, ‘鈴木一郎’], ‘メール’: [‘tanaka@example.com’, ‘sato@test.jp’, ‘suzuki@mail.jp’] }) # ここにコードを書いてください
【解答】
from sqlalchemy import create_engine engine = create_engine(‘sqlite:///test.db’) df.to_sql(‘customers’, engine, if_exists=’replace’, index=False) # 確認 result = pd.read_sql(‘SELECT * FROM customers’, engine) print(result)
問題 2 基礎

既に存在する「customers」テーブルに、新しい顧客データを追加してください。

new_customers = pd.DataFrame({ ‘顧客ID’: [4, 5], ‘名前’: [‘高橋美咲’, ‘伊藤健太’], ‘メール’: [‘takahashi@example.jp’, ‘ito@test.com’] }) # ここにコードを書いてください
【解答】
new_customers.to_sql(‘customers’, engine, if_exists=’append’, index=False) # 確認 result = pd.read_sql(‘SELECT * FROM customers’, engine) print(result)
問題 3 基礎

保存されたデータの件数を確認するSQLを実行してください。

【解答】
count = pd.read_sql(‘SELECT COUNT(*) as 件数 FROM customers’, engine) print(f”顧客数: {count[‘件数’][0]}件”)
問題 4 基礎

データ型を指定してテーブルを作成してください。商品コード: 文字列(10)、価格: 整数

df = pd.DataFrame({ ‘商品コード’: [‘A001’, ‘A002’], ‘価格’: [150, 100] }) # ここにコードを書いてください
【解答】
from sqlalchemy.types import Integer, String dtype_dict = { ‘商品コード’: String(10), ‘価格’: Integer } df.to_sql(‘products’, engine, if_exists=’replace’, index=False, dtype=dtype_dict)
問題 5 応用

5万件のダミーデータを作成し、chunksize=5000で保存してください。

【解答】
import numpy as np import time large_df = pd.DataFrame({ ‘ID’: range(1, 50001), ‘名前’: [f’ユーザー{i}’ for i in range(1, 50001)], ‘年齢’: np.random.randint(20, 70, 50000) }) start = time.time() large_df.to_sql(‘users’, engine, if_exists=’replace’, index=False, chunksize=5000) print(f”保存完了: {time.time() – start:.2f}秒”)
問題 6 応用

SQLiteでUPSERTを実装してください。ID=1は更新、ID=3は新規挿入されるようにしてください。

# 初期データ: ID=1,2が存在 # UPSERTデータ: (1, ‘りんご’, 160), (3, ‘みかん’, 80)
【解答】
import sqlite3 conn = sqlite3.connect(‘upsert.db’) cur = conn.cursor() # テーブル作成と初期データ cur.execute(‘CREATE TABLE IF NOT EXISTS products (id INTEGER PRIMARY KEY, name TEXT, price INTEGER)’) cur.execute(‘INSERT OR REPLACE INTO products VALUES (1, “りんご”, 150)’) cur.execute(‘INSERT OR REPLACE INTO products VALUES (2, “バナナ”, 100)’) conn.commit() # UPSERT upsert_data = [(1, ‘りんご’, 160), (3, ‘みかん’, 80)] cur.executemany(‘INSERT OR REPLACE INTO products VALUES (?, ?, ?)’, upsert_data) conn.commit() # 確認 print(pd.read_sql(‘SELECT * FROM products’, conn)) conn.close()
問題 7 応用

トランザクションを使って、在庫を減らす処理と売上を記録する処理を同時に実行してください。

【解答】
import sqlite3 conn = sqlite3.connect(‘transaction.db’) cur = conn.cursor() # テーブル準備 cur.execute(‘CREATE TABLE IF NOT EXISTS products (id INTEGER PRIMARY KEY, stock INTEGER)’) cur.execute(‘CREATE TABLE IF NOT EXISTS sales (id INTEGER PRIMARY KEY AUTOINCREMENT, product_id INTEGER, amount INTEGER)’) cur.execute(‘DELETE FROM products’) cur.execute(‘DELETE FROM sales’) cur.execute(‘INSERT INTO products VALUES (1, 100)’) conn.commit() try: cur.execute(‘UPDATE products SET stock = stock – 10 WHERE id = 1’) cur.execute(‘INSERT INTO sales (product_id, amount) VALUES (1, 10)’) conn.commit() print(“✅ トランザクション成功”) except Exception as e: conn.rollback() print(f”❌ ロールバック: {e}”) conn.close()
問題 8 応用

環境変数から接続情報を取得してデータベースに接続する関数を作成してください。

【解答】
import os from sqlalchemy import create_engine def get_db_engine(db_type=’sqlite’): if db_type == ‘sqlite’: db_file = os.getenv(‘SQLITE_FILE’, ‘default.db’) return create_engine(f’sqlite:///{db_file}’) elif db_type == ‘postgresql’: user = os.getenv(‘DB_USER’) password = os.getenv(‘DB_PASS’) host = os.getenv(‘DB_HOST’, ‘localhost’) port = os.getenv(‘DB_PORT’, ‘5432’) dbname = os.getenv(‘DB_NAME’) return create_engine(f’postgresql://{user}:{password}@{host}:{port}/{dbname}’) # 使用例 engine = get_db_engine(‘sqlite’) print(“接続成功”)
問題 9 発展

エラーハンドリング付きの安全なデータロード関数を作成してください。

【解答】
def safe_load(df, table_name, engine, if_exists=’append’, chunksize=1000): “””安全にデータをロードする関数””” try: if df.empty: print(“⚠️ DataFrameが空です”) return False df.to_sql(table_name, engine, if_exists=if_exists, index=False, chunksize=chunksize) print(f”✅ {len(df)}件を{table_name}に保存”) return True except Exception as e: print(f”❌ エラー: {e}”) return False # 使用例 success = safe_load(df, ‘products’, engine)
問題 10 発展

複数のテーブルに同時にデータをロードし、1つでも失敗したら全てロールバックする処理を実装してください。

【解答】
import sqlite3 import pandas as pd def load_multiple_tables(data_dict, db_path): “””複数テーブルをトランザクションでロード””” conn = sqlite3.connect(db_path) try: for table_name, df in data_dict.items(): df.to_sql(table_name, conn, if_exists=’replace’, index=False) print(f”✅ {table_name}: {len(df)}件ロード”) conn.commit() print(“\n✅ 全テーブルのロード完了!”) return True except Exception as e: conn.rollback() print(f”\n❌ エラー: {e}”) print(“全てのテーブルをロールバックしました”) return False finally: conn.close() # 使用例 customers = pd.DataFrame({‘id’: [1, 2], ‘name’: [‘田中’, ‘佐藤’]}) orders = pd.DataFrame({‘id’: [1], ‘customer_id’: [1], ‘amount’: [1000]}) load_multiple_tables({‘customers’: customers, ‘orders’: orders}, ‘multi.db’)
📝

学習メモ

ETL・データパイプライン構築 - Step 14

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE