📋 このステップで学ぶこと
データベース接続の基本的な仕組みと流れ
psycopg2を使ったPostgreSQLへの接続方法
sqlite3を使ったSQLiteへの接続方法
PandasとDBの連携によるデータ取得
接続プールの概念と必要性
エラーハンドリングの実装方法
🎯 1. データベース接続の基本
1-1. なぜPythonからDBに接続するのか?
STEP 2で学んだように、企業の重要なデータの多くはデータベース に保存されています。
ETLパイプラインでは、このデータベースからプログラムで自動的にデータを取得 する必要があります。
Pythonを使えば、データベースに接続してSQLを実行し、結果を取得して加工することができます。
これがETLの「E」(Extract:抽出)の具体的な実装方法です。
🏧 例え話:ATMでお金を引き出す
ATMでお金を引き出す流れを思い出してください。データベース接続も同じような流れです。
【ATMとデータベース接続の対応】
ATMでの操作 データベース接続
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
① カードを挿入する → データベースに接続する
② 暗証番号を入力する → 認証情報(ユーザー名/パスワード)を提供する
③ 「引き出し」を選ぶ → SQLクエリを実行する
④ 金額を入力する → 取得するデータを指定する
⑤ お金を受け取る → 結果データを取得する
⑥ カードを取り出す → 接続を切断する
1-2. データベース接続の5つのステップ
Pythonからデータベースに接続してデータを取得する流れは、5つのステップ に分けられます。
【DB接続の5ステップ】
Step 1: 接続(Connect)
↓ データベースサーバーとの通信路を確立する
Step 2: カーソル作成(Cursor)
↓ SQLを実行するための「窓口」を作る
Step 3: SQL実行(Execute)
↓ SELECT文やINSERT文を実行する
Step 4: データ取得(Fetch)
↓ 実行結果をPython変数に取り込む
Step 5: 切断(Close)
↓ 接続を閉じてリソースを解放する
1-3. 重要な用語を理解する
データベース接続で使われる用語を整理しましょう。
用語
意味
例え
接続(Connection)
データベースとの通信路
電話をかけて繋がった状態
カーソル(Cursor)
SQLを実行するためのオブジェクト
注文を伝える受話器
コミット(Commit)
変更を確定する
「この注文で決定します」
ロールバック(Rollback)
変更を取り消す
「やっぱりキャンセルします」
フェッチ(Fetch)
結果データを取り出す
料理を受け取る
💡 接続とカーソルの関係
接続(Connection) は、データベースとの「電話回線」のようなものです。
カーソル(Cursor) は、その回線を使って「具体的な操作を行う窓口」です。
1つの接続から複数のカーソル を作ることができますが、
通常は1つの接続に対して1つのカーソルを使うことが多いです。
🐘 2. PostgreSQLへの接続(psycopg2)
2-1. psycopg2とは?
psycopg2 (サイコピージー・ツー)は、PythonからPostgreSQLに接続するための最も人気のあるライブラリ です。
「psyco」はPostgreSQLを意味し、「pg」もPostgreSQLの略称です。
PostgreSQLを使う多くの企業やプロジェクトで標準的に使用されています。
2-2. インストール方法
まず、psycopg2をインストールします。
# pipでインストール(通常の環境)
pip install psycopg2-binary
# Google Colabの場合
!pip install psycopg2-binary
📝 psycopg2-binary について
psycopg2-binaryは、コンパイル済みのバージョンです。
通常のpsycopg2はインストール時にコンパイルが必要で、
追加の開発ツールが必要になる場合があります。
学習や開発環境ではpsycopg2-binaryで十分です。
本番環境ではpsycopg2を使うことが推奨されています。
2-3. 基本的な接続方法(ステップバイステップ)
PostgreSQLへの接続を、1ステップずつ詳しく見ていきましょう。
ステップ1:ライブラリのインポート
# psycopg2ライブラリを読み込む
import psycopg2
📝 この行の意味
import psycopg2は、psycopg2ライブラリを使えるようにする命令です。
このライブラリに含まれる関数やクラスを使って、PostgreSQLに接続します。
ステップ2:データベースに接続
# データベースに接続
conn = psycopg2.connect(
host=”localhost”, # サーバーのアドレス
port=5432, # ポート番号
database=”mydb”, # データベース名
user=”postgres”, # ユーザー名
password=”password123″ # パスワード
)
📝 各パラメータの説明
host:データベースサーバーのアドレス
ローカル環境なら "localhost" または "127.0.0.1"
リモートサーバーなら "db.example.com" のようなアドレス
port:PostgreSQLが待ち受けているポート番号(デフォルトは 5432)
database:接続先のデータベース名
user:ログインするユーザー名
password:そのユーザーのパスワード
ステップ3:カーソルを作成
# カーソルを作成
cursor = conn.cursor()
📝 この行の意味
conn.cursor()は、接続オブジェクト(conn)から「カーソル」を作成します。
カーソルは、SQLを実行するための「操作窓口」です。
すべてのSQL実行は、このカーソルを通じて行います。
ステップ4:SQLを実行
# SQLを実行
cursor.execute(“SELECT * FROM users”)
📝 この行の意味
cursor.execute("SQL文")は、指定したSQLをデータベースで実行します。
この時点では、SQLが「実行された」だけで、結果はまだPython側に取り込まれていません。
結果を取り込むには、次のfetchを使います。
ステップ5:結果を取得
# すべての結果を取得
rows = cursor.fetchall()
# 結果を1行ずつ表示
for row in rows:
print(row)
📝 この行の意味
cursor.fetchall()は、SQLの実行結果をすべて 取得してリストとして返します。
各行はタプル(変更不可のリスト)として格納されます。
例:[(1, '山田太郎', 'yamada@example.com'), (2, '佐藤花子', 'sato@example.com')]
ステップ6:接続を閉じる
# カーソルを閉じる
cursor.close()
# 接続を閉じる
conn.close()
⚠️ 接続を閉じることが重要な理由
データベース接続はリソースを消費 します。
接続を閉じないと、以下の問題が発生する可能性があります:
データベースの接続数上限 に達する
メモリ が無駄に消費される
他のプログラムが接続できなくなる
完成コード:基本的な接続
# ===== PostgreSQL接続の完成コード =====
import psycopg2
# ステップ1: データベースに接続
conn = psycopg2.connect(
host=”localhost”,
port=5432,
database=”mydb”,
user=”postgres”,
password=”password123″
)
# ステップ2: カーソルを作成
cursor = conn.cursor()
# ステップ3: SQLを実行
cursor.execute(“SELECT * FROM users”)
# ステップ4: 結果を取得
rows = cursor.fetchall()
# ステップ5: 結果を表示
for row in rows:
print(row)
# ステップ6: 接続を閉じる
cursor.close()
conn.close()
2-4. データ取得の3つの方法
SQLの実行結果を取得する方法は3つ あります。データ量に応じて使い分けます。
メソッド
動作
使いどころ
fetchone()
1行だけ取得
1件だけ取得したいとき ループで1行ずつ処理したいとき
fetchall()
すべての行を取得
データ量が少ないとき すべてのデータが必要なとき
fetchmany(n)
n行だけ取得
大量データを分割処理するとき メモリを節約したいとき
# fetchone() – 1行だけ取得
cursor.execute(“SELECT * FROM users WHERE user_id = 1”)
row = cursor.fetchone()
print(row) # (1, ‘山田太郎’, ‘yamada@example.com’)
# fetchall() – すべての行を取得
cursor.execute(“SELECT * FROM users”)
rows = cursor.fetchall()
for row in rows:
print(row)
# fetchmany(5) – 5行だけ取得
cursor.execute(“SELECT * FROM users”)
rows = cursor.fetchmany(5) # 最初の5行
for row in rows:
print(row)
💡 大量データの場合はfetchmanyを使おう
100万件のデータをfetchall()で取得すると、すべてがメモリに載る ため、
メモリ不足でプログラムが落ちる可能性があります。
大量データの場合は、fetchmany(1000)のように分割して取得 するか、
後述するPandas を使うのがおすすめです。
2-5. Pandasと組み合わせる(推奨)
実務では、取得したデータをPandas DataFrame として扱うことが多いです。
Pandasには、SQLの結果を直接DataFrameに変換する便利な関数があります。
# ===== PandasでDB接続(推奨パターン)=====
import psycopg2
import pandas as pd
# データベースに接続
conn = psycopg2.connect(
host=”localhost”,
database=”mydb”,
user=”postgres”,
password=”password123″
)
# SQLを実行して、結果を直接DataFrameに変換
df = pd.read_sql_query(“SELECT * FROM users”, conn)
# 接続を閉じる
conn.close()
# DataFrameを表示
print(df.head())
✅ pd.read_sql_query()のメリット
カーソルを作る必要がない (コードがシンプル)
結果がDataFrame で取得できる(すぐに分析できる)
列名が自動的に設定される
大量データも効率的に処理 してくれる
2-6. データの挿入・更新・削除
SELECT(取得)だけでなく、INSERT(挿入)、UPDATE(更新)、DELETE(削除)も実行できます。
ただし、これらの操作にはcommitが必要です。
データの挿入(INSERT)
# データを挿入する
cursor.execute(“””
INSERT INTO users (name, email)
VALUES (%s, %s)
“””, (‘佐藤花子’, ‘sato@example.com’))
# 変更を確定(重要!)
conn.commit()
📝 %s はプレースホルダー
%sは「ここに値が入る」という目印(プレースホルダー)です。
実際の値は、execute()の第2引数でタプルとして渡します。
なぜプレースホルダーを使うのか?
SQL文に直接値を埋め込むと、SQLインジェクション という
セキュリティ攻撃を受ける危険があります。
プレースホルダーを使うと、自動的にエスケープ処理されて安全です。
データの更新(UPDATE)
# データを更新する
cursor.execute(“””
UPDATE users
SET email = %s
WHERE user_id = %s
“””, (‘new_email@example.com’, 1))
# 変更を確定
conn.commit()
データの削除(DELETE)
# データを削除する
cursor.execute(“””
DELETE FROM users
WHERE user_id = %s
“””, (5,)) # タプルにするため、カンマが必要
# 変更を確定
conn.commit()
⚠️ 重要:commitを忘れずに!
INSERT、UPDATE、DELETEを実行した後は、必ずconn.commit() を呼び出してください。
commit()しないと、変更が保存されません !
SELECT の場合は、データを読み取るだけなのでcommit()は不要です。
🗄️ 3. SQLiteへの接続
3-1. SQLiteとは?
SQLite (エスキューライト)は、ファイルベースの軽量データベース です。
サーバーを起動する必要がなく、1つのファイルでデータベース全体を管理できます。
✅ SQLiteのメリット
追加インストール不要 (Python標準搭載)
サーバーのセットアップ不要
ファイル1つで管理できる
学習・テスト に最適
アプリに組み込みやすい
⚠️ SQLiteのデメリット
大規模データには不向き
複数ユーザーの同時書き込みが苦手
ネットワーク経由でアクセスできない
一部のSQL機能が使えない
📝 SQLiteの使いどころ
適している場面: 学習、プロトタイプ開発、小規模アプリ、モバイルアプリ
適していない場面: 大規模Webサービス、複数ユーザーが同時にアクセスするシステム
3-2. SQLiteの基本的な使い方
SQLiteはPythonに標準搭載されているので、インストール不要 ですぐに使えます。
ステップ1:ライブラリのインポートと接続
# sqlite3はPython標準ライブラリなので、追加インストール不要
import sqlite3
# データベースに接続(ファイルがなければ自動作成される)
conn = sqlite3.connect(‘mydb.db’)
📝 PostgreSQLとの違い
PostgreSQLでは、ホスト名やポート番号を指定しましたが、
SQLiteではファイル名だけ を指定します。
指定したファイルが存在しない場合は、自動的に新しいファイルが作成 されます。
ステップ2:テーブルの作成
# カーソルを作成
cursor = conn.cursor()
# テーブルを作成するSQL
cursor.execute(”’
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE
)
”’)
📝 SQLの説明
CREATE TABLE IF NOT EXISTS:テーブルがなければ作成する
INTEGER PRIMARY KEY AUTOINCREMENT:自動で連番が振られる主キー
TEXT NOT NULL:文字列型で、空は許可しない
UNIQUE:重複した値は許可しない
ステップ3:データの挿入
# データを挿入
cursor.execute(”’
INSERT INTO users (name, email)
VALUES (?, ?)
”’, (‘山田太郎’, ‘yamada@example.com’))
# 変更を確定
conn.commit()
⚠️ プレースホルダーの違い
PostgreSQL(psycopg2) :%s を使う
SQLite(sqlite3) :? を使う
ライブラリによってプレースホルダーの書き方が異なるので注意してください。
完成コード:SQLiteの基本操作
# ===== SQLite基本操作の完成コード =====
import sqlite3
# データベースに接続(ファイルがなければ自動作成)
conn = sqlite3.connect(‘mydb.db’)
cursor = conn.cursor()
# テーブルを作成
cursor.execute(”’
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE
)
”’)
# データを挿入
cursor.execute(‘INSERT INTO users (name, email) VALUES (?, ?)’,
(‘山田太郎’, ‘yamada@example.com’))
cursor.execute(‘INSERT INTO users (name, email) VALUES (?, ?)’,
(‘佐藤花子’, ‘sato@example.com’))
# 変更を確定
conn.commit()
# データを取得
cursor.execute(‘SELECT * FROM users’)
rows = cursor.fetchall()
# 結果を表示
for row in rows:
print(row)
# 接続を閉じる
conn.close()
3-3. PandasとSQLiteの組み合わせ
PandasとSQLiteを組み合わせると、DataFrameをそのままデータベースに保存 できます。
データの一時保存や、分析結果の永続化に便利です。
DataFrameをSQLiteに保存する
# ===== DataFrameをSQLiteに保存 =====
import sqlite3
import pandas as pd
# 接続
conn = sqlite3.connect(‘mydb.db’)
# DataFrameを作成
df = pd.DataFrame({
‘name’: [‘山田太郎’, ‘佐藤花子’, ‘鈴木一郎’],
‘email’: [‘yamada@ex.com’, ‘sato@ex.com’, ‘suzuki@ex.com’],
‘age’: [30, 25, 35]
})
# DataFrameをSQLiteに保存
# to_sql()でDataFrameをそのままテーブルとして保存
df.to_sql(‘users’, conn, if_exists=’replace’, index=False)
# 接続を閉じる
conn.close()
print(“保存完了!”)
📝 to_sql()のパラメータ
'users':保存先のテーブル名
conn:データベース接続オブジェクト
if_exists:テーブルが既に存在する場合の動作
'replace':テーブルを削除して作り直す
'append':既存テーブルにデータを追加
'fail':エラーを発生させる(デフォルト)
index=False:DataFrameのインデックスは保存しない
SQLiteからDataFrameに読み込む
# ===== SQLiteからDataFrameに読み込み =====
import sqlite3
import pandas as pd
# 接続
conn = sqlite3.connect(‘mydb.db’)
# SQLiteからDataFrameに読み込み
df = pd.read_sql_query(‘SELECT * FROM users’, conn)
# 接続を閉じる
conn.close()
# 結果を表示
print(df)
【実行結果】
name email age
0 山田太郎 yamada@ex.com 30
1 佐藤花子 sato@ex.com 25
2 鈴木一郎 suzuki@ex.com 35
🏊 4. 接続プールの基礎
4-1. 接続プールとは?
接続プール(Connection Pool) とは、データベース接続を使い回す仕組み です。
通常、データベースに接続するたびに「接続の確立」と「切断」が行われますが、
これには時間とリソースがかかります。接続プールを使うと、この問題を解決できます。
🚗 例え話:レンタカーと自家用車
接続プールなし :毎回、車を買って、使い終わったら廃車にする
→ 購入手続きに時間がかかる、無駄が多い!
接続プールあり :レンタカー会社が車を何台か用意しておいて、必要な人に貸し出す
→ すぐに借りられる、使い終わったら返却して次の人が使う、効率的!
【接続プールの動作イメージ】
■ 接続プールなしの場合
リクエスト1 → 接続確立(0.5秒) → SQL実行 → 切断
リクエスト2 → 接続確立(0.5秒) → SQL実行 → 切断
リクエスト3 → 接続確立(0.5秒) → SQL実行 → 切断
→ 毎回0.5秒のオーバーヘッドが発生
■ 接続プールありの場合
【プール】接続1、接続2、接続3(事前に用意)
リクエスト1 → 接続1を借りる → SQL実行 → 返却
リクエスト2 → 接続2を借りる → SQL実行 → 返却
リクエスト3 → 接続1を借りる → SQL実行 → 返却(接続1が空いたので再利用)
→ 接続確立のオーバーヘッドがない!
4-2. なぜ接続プールが必要なのか?
問題
接続プールなし
接続プールあり
接続時間
毎回接続確立に時間がかかる
既存の接続を使うので高速
接続数
制御できず、上限に達する可能性
プールサイズで制御可能
リソース
無駄な接続が発生しやすい
効率的に再利用される
安定性
高負荷時に不安定
安定したパフォーマンス
4-3. psycopg2での接続プール
psycopg2には、接続プールを作成するための機能が用意されています。
# ===== psycopg2の接続プール =====
from psycopg2 import pool
# 接続プールを作成
connection_pool = pool.SimpleConnectionPool(
minconn=1, # 最小接続数(常にこの数の接続を維持)
maxconn=10, # 最大接続数(これ以上は作らない)
host=”localhost”,
database=”mydb”,
user=”postgres”,
password=”password123″
)
# プールから接続を「借りる」
conn = connection_pool.getconn()
# SQLを実行
cursor = conn.cursor()
cursor.execute(“SELECT * FROM users”)
rows = cursor.fetchall()
cursor.close()
# 接続を「返却」(閉じない!)
connection_pool.putconn(conn)
# プログラム終了時にプール全体を閉じる
connection_pool.closeall()
📝 接続プールの重要なポイント
getconn():プールから接続を借りる
putconn(conn):プールに接続を返す (close()ではない!)
closeall():プログラム終了時にプール全体を閉じる
minconn:常に維持する最小接続数
maxconn:作成する最大接続数
重要: 接続をclose()ではなくputconn()で返却します。
close()すると接続が破棄されてしまい、再利用できなくなります。
4-4. SQLAlchemyでの接続プール
SQLAlchemy は、より高度なデータベース操作ライブラリです。
接続プールを自動で管理 してくれるため、実務でよく使われています。
# ===== SQLAlchemyの接続プール =====
from sqlalchemy import create_engine
import pandas as pd
# エンジンを作成(接続プールも自動で作られる)
engine = create_engine(
‘postgresql://postgres:password123@localhost:5432/mydb’,
pool_size=10, # プールサイズ
max_overflow=20 # プールが満杯の時の追加接続数
)
# Pandasと組み合わせて使う
df = pd.read_sql_query(‘SELECT * FROM users’, engine)
# 処理が終わったらエンジンを破棄
engine.dispose()
print(df)
✅ SQLAlchemyのメリット
接続プールを自動管理 してくれる
PostgreSQL、MySQL、SQLiteなど様々なDBに対応
ORM(オブジェクト関係マッピング)も使える
Pandasとの相性が良い
📝 接続プールを使うべき場面
使うべき場面:
Webアプリケーション、APIサーバー、頻繁にDBアクセスするバッチ処理
使わなくても良い場面:
1回だけ実行するスクリプト、学習・テスト環境、アクセス頻度が低い処理
⚠️ 5. エラーハンドリング
5-1. なぜエラーハンドリングが重要なのか?
データベース接続では、様々なエラー が発生する可能性があります。
サーバーが起動していない
ネットワークが切断された
パスワードが間違っている
テーブルが存在しない
SQLの文法エラー
エラーが発生した時に適切に対処しないと、プログラムが突然停止 したり、
接続が閉じられずにリソースが漏れる 問題が発生します。
5-2. try-exceptでエラーをキャッチ
Pythonのtry-except構文を使って、エラーを適切に処理します。
# ===== 基本的なエラーハンドリング =====
import psycopg2
try:
# データベースに接続(エラーが起きる可能性がある処理)
conn = psycopg2.connect(
host=”localhost”,
database=”mydb”,
user=”postgres”,
password=”password123″
)
cursor = conn.cursor()
cursor.execute(“SELECT * FROM users”)
rows = cursor.fetchall()
for row in rows:
print(row)
cursor.close()
conn.close()
except psycopg2.Error as e:
# psycopg2のエラーが発生した場合
print(f”データベースエラー: {e}”)
except Exception as e:
# その他の予期しないエラーが発生した場合
print(f”予期しないエラー: {e}”)
📝 try-exceptの構造
try: ブロック:エラーが起きる可能性のある処理を書く
except エラー型 as e::特定のエラーをキャッチして処理
e:エラーの詳細情報が入っている変数
5-3. finallyで確実にクリーンアップ
finallyブロックは、エラーが起きても起きなくても必ず実行 されます。
接続を閉じる処理に使うと、リソース漏れを防げます。
# ===== finallyを使った安全な接続管理 =====
import psycopg2
conn = None # 初期化しておく
cursor = None
try:
# 接続
conn = psycopg2.connect(
host=”localhost”,
database=”mydb”,
user=”postgres”,
password=”password123″
)
cursor = conn.cursor()
cursor.execute(“SELECT * FROM users”)
rows = cursor.fetchall()
for row in rows:
print(row)
except psycopg2.Error as e:
print(f”データベースエラー: {e}”)
finally:
# エラーが起きても起きなくても、必ず実行される
if cursor:
cursor.close()
print(“カーソルを閉じました”)
if conn:
conn.close()
print(“接続を閉じました”)
📝 finallyが必要な理由
tryブロック内でエラーが発生すると、その後のコードは実行されません。
例えば、SQLの実行でエラーが起きると、conn.close()が実行されずに
接続が開いたままになってしまいます。
finallyを使うと、どんな場合でも確実に 接続を閉じることができます。
5-4. with文で自動クリーンアップ(推奨)
with文を使うと、自動でリソースが解放 されます。
これが最も安全でスマートな方法です。
# ===== with文を使った最もスマートな方法(推奨)=====
import psycopg2
try:
# withを使うと、ブロックを抜けるときに自動でcloseされる
with psycopg2.connect(
host=”localhost”,
database=”mydb”,
user=”postgres”,
password=”password123″
) as conn:
# カーソルもwithで管理
with conn.cursor() as cursor:
cursor.execute(“SELECT * FROM users”)
rows = cursor.fetchall()
for row in rows:
print(row)
# withブロックを抜けると自動でcloseされる
print(“接続は自動で閉じられました”)
except psycopg2.Error as e:
print(f”データベースエラー: {e}”)
✅ with文のメリット
エラーが起きても自動でclose される
コードがシンプル になる
finallyを書く必要がない
リソース漏れを防げる
実務では、with文を使うのがベストプラクティスです!
5-5. よくあるエラーと対処法
エラーメッセージ
原因
対処法
connection refused
サーバーが起動していない またはポートが間違っている
PostgreSQLを起動する ポート番号を確認
authentication failed
ユーザー名またはパスワードが間違っている
認証情報を確認
database does not exist
指定したデータベースが存在しない
データベース名を確認 または作成する
relation does not exist
テーブルが存在しない
テーブル名を確認 スキーマ名を確認
syntax error
SQLの文法エラー
SQL文を見直す
timeout expired
接続がタイムアウトした
ネットワークを確認 タイムアウト値を調整
📝 STEP 3 のまとめ
✅ このステップで学んだこと
DB接続の5ステップ :接続 → カーソル作成 → SQL実行 → データ取得 → 切断
psycopg2 :PostgreSQLに接続するためのライブラリ(プレースホルダーは%s)
sqlite3 :軽量なファイルベースDB、Python標準搭載(プレースホルダーは?)
Pandas連携 :pd.read_sql_query()で簡単にDataFrame化
接続プール :接続を使い回してパフォーマンス向上
commit :INSERT/UPDATE/DELETEの後は必ずcommitする
with文 :自動でリソースを解放する最も安全な方法
💡 実務でのポイント
1. 接続情報を直接コードに書かない
パスワードなどの機密情報は、環境変数や設定ファイルで管理します(後のステップで学習)。
2. with文を使う
リソース漏れを防ぐため、常にwith文を使うことを推奨します。
3. 適切なfetchメソッドを選ぶ
大量データの場合はfetchmany()やPandasを使います。
🎯 次のステップの予告
次のSTEP 4では、「SQLによるデータ抽出」 を学びます。
SELECT文の基本と応用
WHERE句による条件指定
JOINを使った複数テーブルの結合
増分抽出の考え方
ETLの「E」(Extract)をより実践的に学んでいきましょう!
📝 練習問題
問題 1
基礎
SQLiteで新しいデータベースファイルを作成し、productsテーブルを作成してください。
テーブル構成:product_id (INTEGER PRIMARY KEY), name (TEXT), price (INTEGER)
解答を見る
【解答例】
import sqlite3
# データベースに接続(ファイルが作成される)
conn = sqlite3.connect(‘shop.db’)
cursor = conn.cursor()
# テーブルを作成
cursor.execute(”’
CREATE TABLE IF NOT EXISTS products (
product_id INTEGER PRIMARY KEY,
name TEXT,
price INTEGER
)
”’)
# 変更を確定
conn.commit()
# 接続を閉じる
conn.close()
print(“テーブル作成完了!”)
問題 2
基礎
問題1で作成したproductsテーブルに、3つの商品データを挿入してください。
商品例:ノートPC(89800円)、マウス(1980円)、キーボード(4500円)
解答を見る
【解答例】
import sqlite3
conn = sqlite3.connect(‘shop.db’)
cursor = conn.cursor()
# 3つの商品を挿入
products = [
(‘ノートPC’, 89800),
(‘マウス’, 1980),
(‘キーボード’, 4500)
]
for name, price in products:
cursor.execute(”’
INSERT INTO products (name, price)
VALUES (?, ?)
”’, (name, price))
# 変更を確定
conn.commit()
# 確認のため取得
cursor.execute(‘SELECT * FROM products’)
for row in cursor.fetchall():
print(row)
conn.close()
問題 3
基礎
SQLiteからデータを取得して、PandasのDataFrameに変換してください。
解答を見る
【解答例】
import sqlite3
import pandas as pd
# 接続
conn = sqlite3.connect(‘shop.db’)
# DataFrameに変換
df = pd.read_sql_query(‘SELECT * FROM products’, conn)
# 接続を閉じる
conn.close()
# 表示
print(df)
print()
print(f”商品数: {len(df)}件”)
print(f”平均価格: {df[‘price’].mean():.0f}円”)
問題 4
応用
try-except-finallyを使って、エラーが起きても確実に接続を閉じるコードを書いてください。
productsテーブルから価格が5000円以上の商品を取得してください。
解答を見る
【解答例】
import sqlite3
conn = None
try:
# 接続
conn = sqlite3.connect(‘shop.db’)
cursor = conn.cursor()
# 5000円以上の商品を取得
cursor.execute(‘SELECT * FROM products WHERE price >= 5000’)
rows = cursor.fetchall()
print(“5000円以上の商品:”)
for row in rows:
print(f” {row[1]}: {row[2]}円”)
except sqlite3.Error as e:
print(f”データベースエラー: {e}”)
finally:
# 必ず接続を閉じる
if conn:
conn.close()
print(“\n接続を閉じました”)
問題 5
応用
with文を使って、同じ処理(5000円以上の商品取得)を書き直してください。
解答を見る
【解答例】
import sqlite3
try:
# with文で自動クローズ
with sqlite3.connect(‘shop.db’) as conn:
cursor = conn.cursor()
# 5000円以上の商品を取得
cursor.execute(‘SELECT * FROM products WHERE price >= 5000’)
rows = cursor.fetchall()
print(“5000円以上の商品:”)
for row in rows:
print(f” {row[1]}: {row[2]}円”)
# withブロックを抜けると自動で閉じられる
print(“\n接続は自動で閉じられました”)
except sqlite3.Error as e:
print(f”データベースエラー: {e}”)
問題 6
応用
PandasのDataFrameを作成し、それをSQLiteに保存してから、再度読み込んでください。
社員データ:名前(山田、佐藤、鈴木)、部署(営業、開発、人事)、年齢(30、25、35)
解答を見る
【解答例】
import sqlite3
import pandas as pd
# DataFrameを作成
df = pd.DataFrame({
‘name’: [‘山田’, ‘佐藤’, ‘鈴木’],
‘department’: [‘営業’, ‘開発’, ‘人事’],
‘age’: [30, 25, 35]
})
print(“作成したDataFrame:”)
print(df)
print()
# SQLiteに保存
with sqlite3.connect(‘company.db’) as conn:
df.to_sql(‘employees’, conn, if_exists=’replace’, index=False)
print(“SQLiteに保存しました”)
# 再度読み込み
with sqlite3.connect(‘company.db’) as conn:
df_loaded = pd.read_sql_query(‘SELECT * FROM employees’, conn)
print(“\nSQLiteから読み込んだDataFrame:”)
print(df_loaded)
問題 7
発展
データベースに接続してデータを取得する関数を作成してください。
関数名:get_products_by_price_range(min_price, max_price)
機能:指定した価格範囲の商品を取得してDataFrameで返す
解答を見る
【解答例】
import sqlite3
import pandas as pd
def get_products_by_price_range(min_price, max_price):
“””
指定した価格範囲の商品を取得する
Parameters:
min_price: 最低価格
max_price: 最高価格
Returns:
pandas DataFrame(該当商品)
“””
try:
with sqlite3.connect(‘shop.db’) as conn:
query = ”’
SELECT * FROM products
WHERE price >= ? AND price <= ?
ORDER BY price
'''
df = pd.read_sql_query(query, conn, params=(min_price, max_price))
return df
except sqlite3.Error as e:
print(f"エラー: {e}")
return pd.DataFrame() # 空のDataFrameを返す
# 使用例
result = get_products_by_price_range(1000, 10000)
print("1000円〜10000円の商品:")
print(result)
問題 8
発展
商品データを追加・更新・削除する関数をそれぞれ作成してください。
add_product(name, price)、update_price(product_id, new_price)、delete_product(product_id)
解答を見る
【解答例】
import sqlite3
def add_product(name, price):
“””商品を追加する”””
try:
with sqlite3.connect(‘shop.db’) as conn:
cursor = conn.cursor()
cursor.execute(
‘INSERT INTO products (name, price) VALUES (?, ?)’,
(name, price)
)
conn.commit()
print(f”追加しました: {name} ({price}円)”)
return True
except sqlite3.Error as e:
print(f”エラー: {e}”)
return False
def update_price(product_id, new_price):
“””価格を更新する”””
try:
with sqlite3.connect(‘shop.db’) as conn:
cursor = conn.cursor()
cursor.execute(
‘UPDATE products SET price = ? WHERE product_id = ?’,
(new_price, product_id)
)
conn.commit()
if cursor.rowcount > 0:
print(f”更新しました: ID={product_id} → {new_price}円”)
return True
else:
print(f”該当する商品がありません: ID={product_id}”)
return False
except sqlite3.Error as e:
print(f”エラー: {e}”)
return False
def delete_product(product_id):
“””商品を削除する”””
try:
with sqlite3.connect(‘shop.db’) as conn:
cursor = conn.cursor()
cursor.execute(
‘DELETE FROM products WHERE product_id = ?’,
(product_id,)
)
conn.commit()
if cursor.rowcount > 0:
print(f”削除しました: ID={product_id}”)
return True
else:
print(f”該当する商品がありません: ID={product_id}”)
return False
except sqlite3.Error as e:
print(f”エラー: {e}”)
return False
# 使用例
add_product(“ヘッドフォン”, 12800)
update_price(1, 79800)
delete_product(999) # 存在しないID