STEP 3:PythonでのDB接続基礎

🔌 STEP 3: PythonでのDB接続基礎

Pythonからデータベースに接続してデータを取得する方法を学びます

📋 このステップで学ぶこと

  • データベース接続の基本的な仕組みと流れ
  • psycopg2を使ったPostgreSQLへの接続方法
  • sqlite3を使ったSQLiteへの接続方法
  • PandasとDBの連携によるデータ取得
  • 接続プールの概念と必要性
  • エラーハンドリングの実装方法

🎯 1. データベース接続の基本

1-1. なぜPythonからDBに接続するのか?

STEP 2で学んだように、企業の重要なデータの多くはデータベースに保存されています。
ETLパイプラインでは、このデータベースからプログラムで自動的にデータを取得する必要があります。

Pythonを使えば、データベースに接続してSQLを実行し、結果を取得して加工することができます。
これがETLの「E」(Extract:抽出)の具体的な実装方法です。

🏧 例え話:ATMでお金を引き出す

ATMでお金を引き出す流れを思い出してください。データベース接続も同じような流れです。

【ATMとデータベース接続の対応】 ATMでの操作 データベース接続 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ ① カードを挿入する → データベースに接続する ② 暗証番号を入力する → 認証情報(ユーザー名/パスワード)を提供する ③ 「引き出し」を選ぶ → SQLクエリを実行する ④ 金額を入力する → 取得するデータを指定する ⑤ お金を受け取る → 結果データを取得する ⑥ カードを取り出す → 接続を切断する

1-2. データベース接続の5つのステップ

Pythonからデータベースに接続してデータを取得する流れは、5つのステップに分けられます。

【DB接続の5ステップ】 Step 1: 接続(Connect) ↓ データベースサーバーとの通信路を確立する Step 2: カーソル作成(Cursor) ↓ SQLを実行するための「窓口」を作る Step 3: SQL実行(Execute) ↓ SELECT文やINSERT文を実行する Step 4: データ取得(Fetch) ↓ 実行結果をPython変数に取り込む Step 5: 切断(Close) ↓ 接続を閉じてリソースを解放する

1-3. 重要な用語を理解する

データベース接続で使われる用語を整理しましょう。

用語 意味 例え
接続(Connection) データベースとの通信路 電話をかけて繋がった状態
カーソル(Cursor) SQLを実行するためのオブジェクト 注文を伝える受話器
コミット(Commit) 変更を確定する 「この注文で決定します」
ロールバック(Rollback) 変更を取り消す 「やっぱりキャンセルします」
フェッチ(Fetch) 結果データを取り出す 料理を受け取る
💡 接続とカーソルの関係

接続(Connection)は、データベースとの「電話回線」のようなものです。
カーソル(Cursor)は、その回線を使って「具体的な操作を行う窓口」です。

1つの接続から複数のカーソルを作ることができますが、
通常は1つの接続に対して1つのカーソルを使うことが多いです。

🐘 2. PostgreSQLへの接続(psycopg2)

2-1. psycopg2とは?

psycopg2(サイコピージー・ツー)は、PythonからPostgreSQLに接続するための最も人気のあるライブラリです。

「psyco」はPostgreSQLを意味し、「pg」もPostgreSQLの略称です。
PostgreSQLを使う多くの企業やプロジェクトで標準的に使用されています。

2-2. インストール方法

まず、psycopg2をインストールします。

# pipでインストール(通常の環境) pip install psycopg2-binary # Google Colabの場合 !pip install psycopg2-binary
📝 psycopg2-binary について

psycopg2-binaryは、コンパイル済みのバージョンです。
通常のpsycopg2はインストール時にコンパイルが必要で、
追加の開発ツールが必要になる場合があります。

学習や開発環境ではpsycopg2-binaryで十分です。
本番環境ではpsycopg2を使うことが推奨されています。

2-3. 基本的な接続方法(ステップバイステップ)

PostgreSQLへの接続を、1ステップずつ詳しく見ていきましょう。

ステップ1:ライブラリのインポート

# psycopg2ライブラリを読み込む import psycopg2
📝 この行の意味

import psycopg2は、psycopg2ライブラリを使えるようにする命令です。
このライブラリに含まれる関数やクラスを使って、PostgreSQLに接続します。

ステップ2:データベースに接続

# データベースに接続 conn = psycopg2.connect( host=”localhost”, # サーバーのアドレス port=5432, # ポート番号 database=”mydb”, # データベース名 user=”postgres”, # ユーザー名 password=”password123″ # パスワード )
📝 各パラメータの説明
  • host:データベースサーバーのアドレス
    • ローカル環境なら "localhost" または "127.0.0.1"
    • リモートサーバーなら "db.example.com" のようなアドレス
  • port:PostgreSQLが待ち受けているポート番号(デフォルトは 5432
  • database:接続先のデータベース名
  • user:ログインするユーザー名
  • password:そのユーザーのパスワード

ステップ3:カーソルを作成

# カーソルを作成 cursor = conn.cursor()
📝 この行の意味

conn.cursor()は、接続オブジェクト(conn)から「カーソル」を作成します。
カーソルは、SQLを実行するための「操作窓口」です。
すべてのSQL実行は、このカーソルを通じて行います。

ステップ4:SQLを実行

# SQLを実行 cursor.execute(“SELECT * FROM users”)
📝 この行の意味

cursor.execute("SQL文")は、指定したSQLをデータベースで実行します。
この時点では、SQLが「実行された」だけで、結果はまだPython側に取り込まれていません。
結果を取り込むには、次のfetchを使います。

ステップ5:結果を取得

# すべての結果を取得 rows = cursor.fetchall() # 結果を1行ずつ表示 for row in rows: print(row)
📝 この行の意味

cursor.fetchall()は、SQLの実行結果をすべて取得してリストとして返します。
各行はタプル(変更不可のリスト)として格納されます。
例:[(1, '山田太郎', 'yamada@example.com'), (2, '佐藤花子', 'sato@example.com')]

ステップ6:接続を閉じる

# カーソルを閉じる cursor.close() # 接続を閉じる conn.close()
⚠️ 接続を閉じることが重要な理由

データベース接続はリソースを消費します。
接続を閉じないと、以下の問題が発生する可能性があります:

  • データベースの接続数上限に達する
  • メモリが無駄に消費される
  • 他のプログラムが接続できなくなる

完成コード:基本的な接続

# ===== PostgreSQL接続の完成コード ===== import psycopg2 # ステップ1: データベースに接続 conn = psycopg2.connect( host=”localhost”, port=5432, database=”mydb”, user=”postgres”, password=”password123″ ) # ステップ2: カーソルを作成 cursor = conn.cursor() # ステップ3: SQLを実行 cursor.execute(“SELECT * FROM users”) # ステップ4: 結果を取得 rows = cursor.fetchall() # ステップ5: 結果を表示 for row in rows: print(row) # ステップ6: 接続を閉じる cursor.close() conn.close()

2-4. データ取得の3つの方法

SQLの実行結果を取得する方法は3つあります。データ量に応じて使い分けます。

メソッド 動作 使いどころ
fetchone() 1行だけ取得 1件だけ取得したいとき
ループで1行ずつ処理したいとき
fetchall() すべての行を取得 データ量が少ないとき
すべてのデータが必要なとき
fetchmany(n) n行だけ取得 大量データを分割処理するとき
メモリを節約したいとき
# fetchone() – 1行だけ取得 cursor.execute(“SELECT * FROM users WHERE user_id = 1”) row = cursor.fetchone() print(row) # (1, ‘山田太郎’, ‘yamada@example.com’) # fetchall() – すべての行を取得 cursor.execute(“SELECT * FROM users”) rows = cursor.fetchall() for row in rows: print(row) # fetchmany(5) – 5行だけ取得 cursor.execute(“SELECT * FROM users”) rows = cursor.fetchmany(5) # 最初の5行 for row in rows: print(row)
💡 大量データの場合はfetchmanyを使おう

100万件のデータをfetchall()で取得すると、すべてがメモリに載るため、
メモリ不足でプログラムが落ちる可能性があります。

大量データの場合は、fetchmany(1000)のように分割して取得するか、
後述するPandasを使うのがおすすめです。

2-5. Pandasと組み合わせる(推奨)

実務では、取得したデータをPandas DataFrameとして扱うことが多いです。
Pandasには、SQLの結果を直接DataFrameに変換する便利な関数があります。

# ===== PandasでDB接続(推奨パターン)===== import psycopg2 import pandas as pd # データベースに接続 conn = psycopg2.connect( host=”localhost”, database=”mydb”, user=”postgres”, password=”password123″ ) # SQLを実行して、結果を直接DataFrameに変換 df = pd.read_sql_query(“SELECT * FROM users”, conn) # 接続を閉じる conn.close() # DataFrameを表示 print(df.head())
✅ pd.read_sql_query()のメリット
  • カーソルを作る必要がない(コードがシンプル)
  • 結果がDataFrameで取得できる(すぐに分析できる)
  • 列名が自動的に設定される
  • 大量データも効率的に処理してくれる

2-6. データの挿入・更新・削除

SELECT(取得)だけでなく、INSERT(挿入)、UPDATE(更新)、DELETE(削除)も実行できます。
ただし、これらの操作にはcommitが必要です。

データの挿入(INSERT)

# データを挿入する cursor.execute(“”” INSERT INTO users (name, email) VALUES (%s, %s) “””, (‘佐藤花子’, ‘sato@example.com’)) # 変更を確定(重要!) conn.commit()
📝 %s はプレースホルダー

%sは「ここに値が入る」という目印(プレースホルダー)です。
実際の値は、execute()の第2引数でタプルとして渡します。

なぜプレースホルダーを使うのか?
SQL文に直接値を埋め込むと、SQLインジェクションという
セキュリティ攻撃を受ける危険があります。
プレースホルダーを使うと、自動的にエスケープ処理されて安全です。

データの更新(UPDATE)

# データを更新する cursor.execute(“”” UPDATE users SET email = %s WHERE user_id = %s “””, (‘new_email@example.com’, 1)) # 変更を確定 conn.commit()

データの削除(DELETE)

# データを削除する cursor.execute(“”” DELETE FROM users WHERE user_id = %s “””, (5,)) # タプルにするため、カンマが必要 # 変更を確定 conn.commit()
⚠️ 重要:commitを忘れずに!

INSERT、UPDATE、DELETEを実行した後は、必ずconn.commit()を呼び出してください。
commit()しないと、変更が保存されません

SELECTの場合は、データを読み取るだけなのでcommit()は不要です。

🗄️ 3. SQLiteへの接続

3-1. SQLiteとは?

SQLite(エスキューライト)は、ファイルベースの軽量データベースです。
サーバーを起動する必要がなく、1つのファイルでデータベース全体を管理できます。

✅ SQLiteのメリット
  • 追加インストール不要(Python標準搭載)
  • サーバーのセットアップ不要
  • ファイル1つで管理できる
  • 学習・テストに最適
  • アプリに組み込みやすい
⚠️ SQLiteのデメリット
  • 大規模データには不向き
  • 複数ユーザーの同時書き込みが苦手
  • ネットワーク経由でアクセスできない
  • 一部のSQL機能が使えない
📝 SQLiteの使いどころ

適している場面:学習、プロトタイプ開発、小規模アプリ、モバイルアプリ
適していない場面:大規模Webサービス、複数ユーザーが同時にアクセスするシステム

3-2. SQLiteの基本的な使い方

SQLiteはPythonに標準搭載されているので、インストール不要ですぐに使えます。

ステップ1:ライブラリのインポートと接続

# sqlite3はPython標準ライブラリなので、追加インストール不要 import sqlite3 # データベースに接続(ファイルがなければ自動作成される) conn = sqlite3.connect(‘mydb.db’)
📝 PostgreSQLとの違い

PostgreSQLでは、ホスト名やポート番号を指定しましたが、
SQLiteではファイル名だけを指定します。

指定したファイルが存在しない場合は、自動的に新しいファイルが作成されます。

ステップ2:テーブルの作成

# カーソルを作成 cursor = conn.cursor() # テーブルを作成するSQL cursor.execute(”’ CREATE TABLE IF NOT EXISTS users ( user_id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, email TEXT UNIQUE ) ”’)
📝 SQLの説明
  • CREATE TABLE IF NOT EXISTS:テーブルがなければ作成する
  • INTEGER PRIMARY KEY AUTOINCREMENT:自動で連番が振られる主キー
  • TEXT NOT NULL:文字列型で、空は許可しない
  • UNIQUE:重複した値は許可しない

ステップ3:データの挿入

# データを挿入 cursor.execute(”’ INSERT INTO users (name, email) VALUES (?, ?) ”’, (‘山田太郎’, ‘yamada@example.com’)) # 変更を確定 conn.commit()
⚠️ プレースホルダーの違い

PostgreSQL(psycopg2)%s を使う
SQLite(sqlite3)? を使う

ライブラリによってプレースホルダーの書き方が異なるので注意してください。

完成コード:SQLiteの基本操作

# ===== SQLite基本操作の完成コード ===== import sqlite3 # データベースに接続(ファイルがなければ自動作成) conn = sqlite3.connect(‘mydb.db’) cursor = conn.cursor() # テーブルを作成 cursor.execute(”’ CREATE TABLE IF NOT EXISTS users ( user_id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, email TEXT UNIQUE ) ”’) # データを挿入 cursor.execute(‘INSERT INTO users (name, email) VALUES (?, ?)’, (‘山田太郎’, ‘yamada@example.com’)) cursor.execute(‘INSERT INTO users (name, email) VALUES (?, ?)’, (‘佐藤花子’, ‘sato@example.com’)) # 変更を確定 conn.commit() # データを取得 cursor.execute(‘SELECT * FROM users’) rows = cursor.fetchall() # 結果を表示 for row in rows: print(row) # 接続を閉じる conn.close()

3-3. PandasとSQLiteの組み合わせ

PandasとSQLiteを組み合わせると、DataFrameをそのままデータベースに保存できます。
データの一時保存や、分析結果の永続化に便利です。

DataFrameをSQLiteに保存する

# ===== DataFrameをSQLiteに保存 ===== import sqlite3 import pandas as pd # 接続 conn = sqlite3.connect(‘mydb.db’) # DataFrameを作成 df = pd.DataFrame({ ‘name’: [‘山田太郎’, ‘佐藤花子’, ‘鈴木一郎’], ‘email’: [‘yamada@ex.com’, ‘sato@ex.com’, ‘suzuki@ex.com’], ‘age’: [30, 25, 35] }) # DataFrameをSQLiteに保存 # to_sql()でDataFrameをそのままテーブルとして保存 df.to_sql(‘users’, conn, if_exists=’replace’, index=False) # 接続を閉じる conn.close() print(“保存完了!”)
📝 to_sql()のパラメータ
  • 'users':保存先のテーブル名
  • conn:データベース接続オブジェクト
  • if_exists:テーブルが既に存在する場合の動作
    • 'replace':テーブルを削除して作り直す
    • 'append':既存テーブルにデータを追加
    • 'fail':エラーを発生させる(デフォルト)
  • index=False:DataFrameのインデックスは保存しない

SQLiteからDataFrameに読み込む

# ===== SQLiteからDataFrameに読み込み ===== import sqlite3 import pandas as pd # 接続 conn = sqlite3.connect(‘mydb.db’) # SQLiteからDataFrameに読み込み df = pd.read_sql_query(‘SELECT * FROM users’, conn) # 接続を閉じる conn.close() # 結果を表示 print(df)
【実行結果】 name email age 0 山田太郎 yamada@ex.com 30 1 佐藤花子 sato@ex.com 25 2 鈴木一郎 suzuki@ex.com 35

🏊 4. 接続プールの基礎

4-1. 接続プールとは?

接続プール(Connection Pool)とは、データベース接続を使い回す仕組みです。

通常、データベースに接続するたびに「接続の確立」と「切断」が行われますが、
これには時間とリソースがかかります。接続プールを使うと、この問題を解決できます。

🚗 例え話:レンタカーと自家用車

接続プールなし:毎回、車を買って、使い終わったら廃車にする
→ 購入手続きに時間がかかる、無駄が多い!

接続プールあり:レンタカー会社が車を何台か用意しておいて、必要な人に貸し出す
→ すぐに借りられる、使い終わったら返却して次の人が使う、効率的!

【接続プールの動作イメージ】 ■ 接続プールなしの場合 リクエスト1 → 接続確立(0.5秒) → SQL実行 → 切断 リクエスト2 → 接続確立(0.5秒) → SQL実行 → 切断 リクエスト3 → 接続確立(0.5秒) → SQL実行 → 切断 → 毎回0.5秒のオーバーヘッドが発生 ■ 接続プールありの場合 【プール】接続1、接続2、接続3(事前に用意) リクエスト1 → 接続1を借りる → SQL実行 → 返却 リクエスト2 → 接続2を借りる → SQL実行 → 返却 リクエスト3 → 接続1を借りる → SQL実行 → 返却(接続1が空いたので再利用) → 接続確立のオーバーヘッドがない!

4-2. なぜ接続プールが必要なのか?

問題 接続プールなし 接続プールあり
接続時間 毎回接続確立に時間がかかる 既存の接続を使うので高速
接続数 制御できず、上限に達する可能性 プールサイズで制御可能
リソース 無駄な接続が発生しやすい 効率的に再利用される
安定性 高負荷時に不安定 安定したパフォーマンス

4-3. psycopg2での接続プール

psycopg2には、接続プールを作成するための機能が用意されています。

# ===== psycopg2の接続プール ===== from psycopg2 import pool # 接続プールを作成 connection_pool = pool.SimpleConnectionPool( minconn=1, # 最小接続数(常にこの数の接続を維持) maxconn=10, # 最大接続数(これ以上は作らない) host=”localhost”, database=”mydb”, user=”postgres”, password=”password123″ ) # プールから接続を「借りる」 conn = connection_pool.getconn() # SQLを実行 cursor = conn.cursor() cursor.execute(“SELECT * FROM users”) rows = cursor.fetchall() cursor.close() # 接続を「返却」(閉じない!) connection_pool.putconn(conn) # プログラム終了時にプール全体を閉じる connection_pool.closeall()
📝 接続プールの重要なポイント
  • getconn():プールから接続を借りる
  • putconn(conn):プールに接続を返すclose()ではない!)
  • closeall():プログラム終了時にプール全体を閉じる
  • minconn:常に維持する最小接続数
  • maxconn:作成する最大接続数

重要:接続をclose()ではなくputconn()で返却します。
close()すると接続が破棄されてしまい、再利用できなくなります。

4-4. SQLAlchemyでの接続プール

SQLAlchemyは、より高度なデータベース操作ライブラリです。
接続プールを自動で管理してくれるため、実務でよく使われています。

# ===== SQLAlchemyの接続プール ===== from sqlalchemy import create_engine import pandas as pd # エンジンを作成(接続プールも自動で作られる) engine = create_engine( ‘postgresql://postgres:password123@localhost:5432/mydb’, pool_size=10, # プールサイズ max_overflow=20 # プールが満杯の時の追加接続数 ) # Pandasと組み合わせて使う df = pd.read_sql_query(‘SELECT * FROM users’, engine) # 処理が終わったらエンジンを破棄 engine.dispose() print(df)
✅ SQLAlchemyのメリット
  • 接続プールを自動管理してくれる
  • PostgreSQL、MySQL、SQLiteなど様々なDBに対応
  • ORM(オブジェクト関係マッピング)も使える
  • Pandasとの相性が良い
📝 接続プールを使うべき場面

使うべき場面:
Webアプリケーション、APIサーバー、頻繁にDBアクセスするバッチ処理

使わなくても良い場面:
1回だけ実行するスクリプト、学習・テスト環境、アクセス頻度が低い処理

⚠️ 5. エラーハンドリング

5-1. なぜエラーハンドリングが重要なのか?

データベース接続では、様々なエラーが発生する可能性があります。

  • サーバーが起動していない
  • ネットワークが切断された
  • パスワードが間違っている
  • テーブルが存在しない
  • SQLの文法エラー

エラーが発生した時に適切に対処しないと、プログラムが突然停止したり、
接続が閉じられずにリソースが漏れる問題が発生します。

5-2. try-exceptでエラーをキャッチ

Pythonのtry-except構文を使って、エラーを適切に処理します。

# ===== 基本的なエラーハンドリング ===== import psycopg2 try: # データベースに接続(エラーが起きる可能性がある処理) conn = psycopg2.connect( host=”localhost”, database=”mydb”, user=”postgres”, password=”password123″ ) cursor = conn.cursor() cursor.execute(“SELECT * FROM users”) rows = cursor.fetchall() for row in rows: print(row) cursor.close() conn.close() except psycopg2.Error as e: # psycopg2のエラーが発生した場合 print(f”データベースエラー: {e}”) except Exception as e: # その他の予期しないエラーが発生した場合 print(f”予期しないエラー: {e}”)
📝 try-exceptの構造
  • try: ブロック:エラーが起きる可能性のある処理を書く
  • except エラー型 as e::特定のエラーをキャッチして処理
  • e:エラーの詳細情報が入っている変数

5-3. finallyで確実にクリーンアップ

finallyブロックは、エラーが起きても起きなくても必ず実行されます。
接続を閉じる処理に使うと、リソース漏れを防げます。

# ===== finallyを使った安全な接続管理 ===== import psycopg2 conn = None # 初期化しておく cursor = None try: # 接続 conn = psycopg2.connect( host=”localhost”, database=”mydb”, user=”postgres”, password=”password123″ ) cursor = conn.cursor() cursor.execute(“SELECT * FROM users”) rows = cursor.fetchall() for row in rows: print(row) except psycopg2.Error as e: print(f”データベースエラー: {e}”) finally: # エラーが起きても起きなくても、必ず実行される if cursor: cursor.close() print(“カーソルを閉じました”) if conn: conn.close() print(“接続を閉じました”)
📝 finallyが必要な理由

tryブロック内でエラーが発生すると、その後のコードは実行されません。
例えば、SQLの実行でエラーが起きると、conn.close()が実行されずに
接続が開いたままになってしまいます。

finallyを使うと、どんな場合でも確実に接続を閉じることができます。

5-4. with文で自動クリーンアップ(推奨)

with文を使うと、自動でリソースが解放されます。
これが最も安全でスマートな方法です。

# ===== with文を使った最もスマートな方法(推奨)===== import psycopg2 try: # withを使うと、ブロックを抜けるときに自動でcloseされる with psycopg2.connect( host=”localhost”, database=”mydb”, user=”postgres”, password=”password123″ ) as conn: # カーソルもwithで管理 with conn.cursor() as cursor: cursor.execute(“SELECT * FROM users”) rows = cursor.fetchall() for row in rows: print(row) # withブロックを抜けると自動でcloseされる print(“接続は自動で閉じられました”) except psycopg2.Error as e: print(f”データベースエラー: {e}”)
✅ with文のメリット
  • エラーが起きても自動でcloseされる
  • コードがシンプルになる
  • finallyを書く必要がない
  • リソース漏れを防げる

実務では、with文を使うのがベストプラクティスです!

5-5. よくあるエラーと対処法

エラーメッセージ 原因 対処法
connection refused サーバーが起動していない
またはポートが間違っている
PostgreSQLを起動する
ポート番号を確認
authentication failed ユーザー名またはパスワードが間違っている 認証情報を確認
database does not exist 指定したデータベースが存在しない データベース名を確認
または作成する
relation does not exist テーブルが存在しない テーブル名を確認
スキーマ名を確認
syntax error SQLの文法エラー SQL文を見直す
timeout expired 接続がタイムアウトした ネットワークを確認
タイムアウト値を調整

📝 STEP 3 のまとめ

✅ このステップで学んだこと
  • DB接続の5ステップ:接続 → カーソル作成 → SQL実行 → データ取得 → 切断
  • psycopg2:PostgreSQLに接続するためのライブラリ(プレースホルダーは%s
  • sqlite3:軽量なファイルベースDB、Python標準搭載(プレースホルダーは?
  • Pandas連携pd.read_sql_query()で簡単にDataFrame化
  • 接続プール:接続を使い回してパフォーマンス向上
  • commit:INSERT/UPDATE/DELETEの後は必ずcommitする
  • with文:自動でリソースを解放する最も安全な方法
💡 実務でのポイント

1. 接続情報を直接コードに書かない
パスワードなどの機密情報は、環境変数や設定ファイルで管理します(後のステップで学習)。

2. with文を使う
リソース漏れを防ぐため、常にwith文を使うことを推奨します。

3. 適切なfetchメソッドを選ぶ
大量データの場合はfetchmany()やPandasを使います。

🎯 次のステップの予告

次のSTEP 4では、「SQLによるデータ抽出」を学びます。

  • SELECT文の基本と応用
  • WHERE句による条件指定
  • JOINを使った複数テーブルの結合
  • 増分抽出の考え方

ETLの「E」(Extract)をより実践的に学んでいきましょう!

📝 練習問題

問題 1 基礎

SQLiteで新しいデータベースファイルを作成し、productsテーブルを作成してください。

テーブル構成:product_id (INTEGER PRIMARY KEY), name (TEXT), price (INTEGER)

【解答例】
import sqlite3 # データベースに接続(ファイルが作成される) conn = sqlite3.connect(‘shop.db’) cursor = conn.cursor() # テーブルを作成 cursor.execute(”’ CREATE TABLE IF NOT EXISTS products ( product_id INTEGER PRIMARY KEY, name TEXT, price INTEGER ) ”’) # 変更を確定 conn.commit() # 接続を閉じる conn.close() print(“テーブル作成完了!”)
問題 2 基礎

問題1で作成したproductsテーブルに、3つの商品データを挿入してください。

商品例:ノートPC(89800円)、マウス(1980円)、キーボード(4500円)

【解答例】
import sqlite3 conn = sqlite3.connect(‘shop.db’) cursor = conn.cursor() # 3つの商品を挿入 products = [ (‘ノートPC’, 89800), (‘マウス’, 1980), (‘キーボード’, 4500) ] for name, price in products: cursor.execute(”’ INSERT INTO products (name, price) VALUES (?, ?) ”’, (name, price)) # 変更を確定 conn.commit() # 確認のため取得 cursor.execute(‘SELECT * FROM products’) for row in cursor.fetchall(): print(row) conn.close()
問題 3 基礎

SQLiteからデータを取得して、PandasのDataFrameに変換してください。

【解答例】
import sqlite3 import pandas as pd # 接続 conn = sqlite3.connect(‘shop.db’) # DataFrameに変換 df = pd.read_sql_query(‘SELECT * FROM products’, conn) # 接続を閉じる conn.close() # 表示 print(df) print() print(f”商品数: {len(df)}件”) print(f”平均価格: {df[‘price’].mean():.0f}円”)
問題 4 応用

try-except-finallyを使って、エラーが起きても確実に接続を閉じるコードを書いてください。

productsテーブルから価格が5000円以上の商品を取得してください。

【解答例】
import sqlite3 conn = None try: # 接続 conn = sqlite3.connect(‘shop.db’) cursor = conn.cursor() # 5000円以上の商品を取得 cursor.execute(‘SELECT * FROM products WHERE price >= 5000’) rows = cursor.fetchall() print(“5000円以上の商品:”) for row in rows: print(f” {row[1]}: {row[2]}円”) except sqlite3.Error as e: print(f”データベースエラー: {e}”) finally: # 必ず接続を閉じる if conn: conn.close() print(“\n接続を閉じました”)
問題 5 応用

with文を使って、同じ処理(5000円以上の商品取得)を書き直してください。

【解答例】
import sqlite3 try: # with文で自動クローズ with sqlite3.connect(‘shop.db’) as conn: cursor = conn.cursor() # 5000円以上の商品を取得 cursor.execute(‘SELECT * FROM products WHERE price >= 5000’) rows = cursor.fetchall() print(“5000円以上の商品:”) for row in rows: print(f” {row[1]}: {row[2]}円”) # withブロックを抜けると自動で閉じられる print(“\n接続は自動で閉じられました”) except sqlite3.Error as e: print(f”データベースエラー: {e}”)
問題 6 応用

PandasのDataFrameを作成し、それをSQLiteに保存してから、再度読み込んでください。

社員データ:名前(山田、佐藤、鈴木)、部署(営業、開発、人事)、年齢(30、25、35)

【解答例】
import sqlite3 import pandas as pd # DataFrameを作成 df = pd.DataFrame({ ‘name’: [‘山田’, ‘佐藤’, ‘鈴木’], ‘department’: [‘営業’, ‘開発’, ‘人事’], ‘age’: [30, 25, 35] }) print(“作成したDataFrame:”) print(df) print() # SQLiteに保存 with sqlite3.connect(‘company.db’) as conn: df.to_sql(‘employees’, conn, if_exists=’replace’, index=False) print(“SQLiteに保存しました”) # 再度読み込み with sqlite3.connect(‘company.db’) as conn: df_loaded = pd.read_sql_query(‘SELECT * FROM employees’, conn) print(“\nSQLiteから読み込んだDataFrame:”) print(df_loaded)
問題 7 発展

データベースに接続してデータを取得する関数を作成してください。

関数名:get_products_by_price_range(min_price, max_price)
機能:指定した価格範囲の商品を取得してDataFrameで返す

【解答例】
import sqlite3 import pandas as pd def get_products_by_price_range(min_price, max_price): “”” 指定した価格範囲の商品を取得する Parameters: min_price: 最低価格 max_price: 最高価格 Returns: pandas DataFrame(該当商品) “”” try: with sqlite3.connect(‘shop.db’) as conn: query = ”’ SELECT * FROM products WHERE price >= ? AND price <= ? ORDER BY price ''' df = pd.read_sql_query(query, conn, params=(min_price, max_price)) return df except sqlite3.Error as e: print(f"エラー: {e}") return pd.DataFrame() # 空のDataFrameを返す # 使用例 result = get_products_by_price_range(1000, 10000) print("1000円〜10000円の商品:") print(result)
問題 8 発展

商品データを追加・更新・削除する関数をそれぞれ作成してください。

add_product(name, price)、update_price(product_id, new_price)、delete_product(product_id)

【解答例】
import sqlite3 def add_product(name, price): “””商品を追加する””” try: with sqlite3.connect(‘shop.db’) as conn: cursor = conn.cursor() cursor.execute( ‘INSERT INTO products (name, price) VALUES (?, ?)’, (name, price) ) conn.commit() print(f”追加しました: {name} ({price}円)”) return True except sqlite3.Error as e: print(f”エラー: {e}”) return False def update_price(product_id, new_price): “””価格を更新する””” try: with sqlite3.connect(‘shop.db’) as conn: cursor = conn.cursor() cursor.execute( ‘UPDATE products SET price = ? WHERE product_id = ?’, (new_price, product_id) ) conn.commit() if cursor.rowcount > 0: print(f”更新しました: ID={product_id} → {new_price}円”) return True else: print(f”該当する商品がありません: ID={product_id}”) return False except sqlite3.Error as e: print(f”エラー: {e}”) return False def delete_product(product_id): “””商品を削除する””” try: with sqlite3.connect(‘shop.db’) as conn: cursor = conn.cursor() cursor.execute( ‘DELETE FROM products WHERE product_id = ?’, (product_id,) ) conn.commit() if cursor.rowcount > 0: print(f”削除しました: ID={product_id}”) return True else: print(f”該当する商品がありません: ID={product_id}”) return False except sqlite3.Error as e: print(f”エラー: {e}”) return False # 使用例 add_product(“ヘッドフォン”, 12800) update_price(1, 79800) delete_product(999) # 存在しないID
📝

学習メモ

ETL・データパイプライン構築 - Step 3

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE