📋 このステップで学ぶこと
- AWS S3へのアップロード(boto3の使い方)
- Google Cloud Storageへのアップロード
- ファイル構造の設計(パーティショニング)
- 実践演習:集計結果を日次で保存
- セキュリティとアクセス管理
⏱️ 学習時間の目安:2時間
📝 練習問題:10問(基礎4問・応用4問・発展2問)
🎯 1. クラウドストレージとは?
1-1. クラウドストレージの基本
クラウドストレージとは、インターネット上にデータを保存できるサービスです。
どこからでもアクセスでき、大容量のデータを保存できます。
📚 例え話:巨大なレンタル倉庫
クラウドストレージは、インターネット上の巨大なレンタル倉庫のようなものです。
・自分のPCの保存領域:自宅の棚(容量に限りがある)
・クラウドストレージ:レンタル倉庫(必要なだけ借りられる)
いつでもどこからでも荷物を出し入れでき、
火事や地震があっても荷物は安全に保管されています。
1-2. 主なクラウドストレージサービス
主要クラウドストレージの比較
| サービス |
提供元 |
特徴 |
| AWS S3 |
Amazon |
最も人気、豊富な連携サービス |
| GCS |
Google |
BigQueryと連携しやすい |
| Azure Blob |
Microsoft |
Microsoft製品と連携しやすい |
1-3. クラウドストレージを使うメリット
クラウドストレージのメリット
| メリット |
説明 |
| 大容量 |
ペタバイト級のデータも保存可能 |
| 高い可用性 |
99.99%以上の稼働率、障害に強い |
| スケーラブル |
必要に応じて容量を増やせる |
| コスト効率 |
使った分だけ課金、初期費用不要 |
| どこからでもアクセス |
インターネットがあれば世界中からアクセス可能 |
💡 実務でのよくある使い方
- 日次で集計した結果をS3に保存
- ログファイルをS3にアーカイブ
- 機械学習用の大量データを保存
- データレイクとして活用(生データを全て保存)
📦 2. AWS S3へのアップロード
2-1. boto3ライブラリのインストール
boto3は、PythonからAWSのサービスを操作するためのライブラリです。
pip install boto3
2-2. 認証情報の設定
S3にアクセスするには、アクセスキーとシークレットキーが必要です。
⚠️ セキュリティ上の重要な注意
- アクセスキーとシークレットキーは絶対にコードに直接書かない
- 環境変数に保存する
- GitHubなどにアップロードしない(.gitignoreに追加)
- 定期的に認証情報を更新する
# ===== 環境変数の設定 =====
# Linux/Mac
export AWS_ACCESS_KEY_ID=”your_access_key”
export AWS_SECRET_ACCESS_KEY=”your_secret_key”
export AWS_DEFAULT_REGION=”ap-northeast-1″
# Windows(コマンドプロンプト)
set AWS_ACCESS_KEY_ID=your_access_key
set AWS_SECRET_ACCESS_KEY=your_secret_key
set AWS_DEFAULT_REGION=ap-northeast-1
# Windows(PowerShell)
$env:AWS_ACCESS_KEY_ID=”your_access_key”
$env:AWS_SECRET_ACCESS_KEY=”your_secret_key”
$env:AWS_DEFAULT_REGION=”ap-northeast-1″
2-3. 基本的なS3アップロード
# ===== S3への基本的なアップロード =====
import boto3
import pandas as pd
# サンプルデータ
df = pd.DataFrame({
‘商品ID’: [1, 2, 3, 4, 5],
‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’, ‘ぶどう’, ‘いちご’],
‘価格’: [150, 100, 80, 300, 250],
‘在庫’: [50, 100, 80, 30, 40]
})
print(“=== アップロードするデータ ===”)
print(df)
# まずローカルにCSVファイルを作成
df.to_csv(‘products.csv’, index=False, encoding=’utf-8-sig’)
# S3クライアントを作成(環境変数から自動で認証情報を取得)
s3 = boto3.client(‘s3’)
# S3にアップロード
bucket_name = ‘my-data-bucket’ # あなたのバケット名
file_name = ‘products.csv’ # ローカルファイル名
s3_path = ‘data/products.csv’ # S3内のパス
s3.upload_file(file_name, bucket_name, s3_path)
print(f”\n✅ S3にアップロードしました!”)
print(f” パス: s3://{bucket_name}/{s3_path}”)
【実行結果】
=== アップロードするデータ ===
商品ID 商品名 価格 在庫
0 1 りんご 150 50
1 2 バナナ 100 100
2 3 みかん 80 80
3 4 ぶどう 300 30
4 5 いちご 250 40
✅ S3にアップロードしました!
パス: s3://my-data-bucket/data/products.csv
2-4. メモリ上のデータを直接アップロード
ファイルを作成せずに、メモリ上のデータを直接S3にアップロードできます。
# ===== メモリから直接S3にアップロード =====
import boto3
import io
import pandas as pd
# サンプルデータ
df = pd.DataFrame({
‘商品ID’: [1, 2, 3],
‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’],
‘価格’: [150, 100, 80]
})
# DataFrameをCSV文字列に変換
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
# S3にアップロード
s3 = boto3.client(‘s3′)
s3.put_object(
Bucket=’my-data-bucket’,
Key=’data/products_direct.csv’,
Body=csv_buffer.getvalue().encode(‘utf-8’)
)
print(“✅ メモリから直接S3にアップロードしました!”)
【実行結果】
✅ メモリから直接S3にアップロードしました!
🎯 アップロード方法の使い分け
- upload_file:ローカルファイルをアップロード
- put_object:メモリ上のデータを直接アップロード(ファイル不要)
メモリから直接アップロードする方がディスクI/Oが不要で効率的です。
2-5. S3からファイルをダウンロード
# ===== S3からファイルをダウンロード =====
import boto3
import pandas as pd
s3 = boto3.client(‘s3’)
# 方法1:ファイルに保存してから読み込み
s3.download_file(
‘my-data-bucket’, # バケット名
‘data/products.csv’, # S3内のパス
‘downloaded_products.csv’ # ローカルに保存するファイル名
)
df = pd.read_csv(‘downloaded_products.csv’)
print(“=== ダウンロードしたデータ ===”)
print(df)
【実行結果】
=== ダウンロードしたデータ ===
商品ID 商品名 価格 在庫
0 1 りんご 150 50
1 2 バナナ 100 100
2 3 みかん 80 80
3 4 ぶどう 300 30
4 5 いちご 250 40
2-6. S3内のファイル一覧を取得
# ===== S3内のファイル一覧を取得 =====
import boto3
s3 = boto3.client(‘s3′)
response = s3.list_objects_v2(
Bucket=’my-data-bucket’,
Prefix=’data/’ # このプレフィックスで始まるファイルのみ
)
if ‘Contents’ in response:
print(“=== S3内のファイル一覧 ===”)
for obj in response[‘Contents’]:
size_kb = obj[‘Size’] / 1024
print(f” {obj[‘Key’]} ({size_kb:.1f} KB)”)
else:
print(“ファイルが見つかりませんでした”)
【実行結果】
=== S3内のファイル一覧 ===
data/products.csv (0.2 KB)
data/products_direct.csv (0.1 KB)
data/sales_2024-01-01.csv (0.5 KB)
data/sales_2024-01-02.csv (0.5 KB)
☁️ 3. Google Cloud Storageへのアップロード
3-1. google-cloud-storageライブラリのインストール
pip install google-cloud-storage
3-2. 認証情報の設定
GCSを使うには、サービスアカウントキー(JSONファイル)が必要です。
# 環境変数で認証情報を設定
export GOOGLE_APPLICATION_CREDENTIALS=”/path/to/your-service-account-key.json”
3-3. 基本的なGCSアップロード
# ===== GCSへの基本的なアップロード =====
from google.cloud import storage
import pandas as pd
# サンプルデータ
df = pd.DataFrame({
‘商品ID’: [1, 2, 3],
‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’],
‘価格’: [150, 100, 80]
})
# ローカルにCSVを保存
df.to_csv(‘products.csv’, index=False, encoding=’utf-8-sig’)
# GCSクライアントを作成
client = storage.Client()
# バケットを取得
bucket = client.get_bucket(‘my-gcs-bucket’)
# ファイルをアップロード
blob = bucket.blob(‘data/products.csv’)
blob.upload_from_filename(‘products.csv’)
print(f”✅ GCSにアップロードしました!”)
print(f” パス: gs://my-gcs-bucket/data/products.csv”)
【実行結果】
✅ GCSにアップロードしました!
パス: gs://my-gcs-bucket/data/products.csv
3-4. メモリから直接アップロード
# ===== メモリから直接GCSにアップロード =====
from google.cloud import storage
import io
import pandas as pd
df = pd.DataFrame({
‘商品ID’: [1, 2, 3],
‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’],
‘価格’: [150, 100, 80]
})
# DataFrameをCSV文字列に変換
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
# GCSにアップロード
client = storage.Client()
bucket = client.get_bucket(‘my-gcs-bucket’)
blob = bucket.blob(‘data/products_direct.csv’)
blob.upload_from_string(csv_buffer.getvalue(), content_type=’text/csv’)
print(“✅ メモリから直接GCSにアップロードしました!”)
💡 S3とGCSの違い
| 操作 |
AWS S3 |
GCS |
| クライアント作成 |
boto3.client(‘s3’) |
storage.Client() |
| アップロード |
upload_file / put_object |
blob.upload_from_filename |
| ダウンロード |
download_file / get_object |
blob.download_to_filename |
📂 4. ファイル構造の設計(パーティショニング)
4-1. パーティショニングとは?
パーティショニングとは、データを特定のルールでフォルダ分けすることです。
日付や地域などで分けることで、必要なデータだけを高速に取得できます。
📁 よくあるパーティショニング例
s3://my-bucket/
└── sales/
├── year=2024/
│ ├── month=01/
│ │ ├── day=01/
│ │ │ └── data.csv
│ │ ├── day=02/
│ │ │ └── data.csv
│ │ └── …
│ └── month=02/
│ └── …
└── year=2025/
└── …
4-2. 日付でパーティショニング
# ===== 日付でパーティショニングしてアップロード =====
import boto3
import io
import pandas as pd
from datetime import datetime
def upload_with_date_partition(df, bucket_name, base_path, date=None):
“””
日付ごとにパーティショニングしてS3にアップロード
Parameters:
———–
df : DataFrame
アップロードするデータ
bucket_name : str
S3バケット名
base_path : str
ベースパス
date : datetime or None
日付(Noneの場合は今日)
Returns:
——–
str : S3パス
“””
if date is None:
date = datetime.now()
# パーティションパスを作成
year = date.strftime(‘%Y’)
month = date.strftime(‘%m’)
day = date.strftime(‘%d’)
s3_path = f”{base_path}/year={year}/month={month}/day={day}/data.csv”
# CSVに変換
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
# S3にアップロード
s3 = boto3.client(‘s3’)
s3.put_object(
Bucket=bucket_name,
Key=s3_path,
Body=csv_buffer.getvalue().encode(‘utf-8’)
)
print(f”✅ アップロード完了”)
print(f” パス: s3://{bucket_name}/{s3_path}”)
return s3_path
# 使用例
df = pd.DataFrame({
‘売上ID’: [1, 2, 3],
‘金額’: [1000, 2000, 1500]
})
upload_with_date_partition(df, ‘my-data-bucket’, ‘sales’, datetime(2024, 1, 15))
【実行結果】
✅ アップロード完了
パス: s3://my-data-bucket/sales/year=2024/month=01/day=15/data.csv
4-3. 複数の軸でパーティショニング
# ===== 日付と地域でパーティショニング =====
def upload_with_multi_partition(df, bucket_name, base_path, date, region):
“””日付と地域でパーティショニング”””
year = date.strftime(‘%Y’)
month = date.strftime(‘%m’)
day = date.strftime(‘%d’)
# パーティションパス(地域 → 年 → 月 → 日)
s3_path = f”{base_path}/region={region}/year={year}/month={month}/day={day}/data.csv”
# CSV変換してアップロード
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
s3 = boto3.client(‘s3’)
s3.put_object(
Bucket=bucket_name,
Key=s3_path,
Body=csv_buffer.getvalue().encode(‘utf-8’)
)
print(f”✅ {region}の{date.strftime(‘%Y-%m-%d’)}データをアップロード”)
return s3_path
# 使用例
from datetime import datetime
tokyo_df = pd.DataFrame({‘売上’: [1000, 2000]})
osaka_df = pd.DataFrame({‘売上’: [1500, 2500]})
today = datetime(2024, 1, 15)
upload_with_multi_partition(tokyo_df, ‘my-data-bucket’, ‘sales’, today, ‘tokyo’)
upload_with_multi_partition(osaka_df, ‘my-data-bucket’, ‘sales’, today, ‘osaka’)
【実行結果】
✅ tokyoの2024-01-15データをアップロード
✅ osakaの2024-01-15データをアップロード
🎯 パーティショニングのメリット
- 高速なデータ取得:必要な日付だけ読み込める
- コスト削減:不要なデータを読まない(課金対象外)
- 管理しやすい:ファイルが整理される
- 並列処理:パーティションごとに処理できる
💼 5. 実践演習:日次集計結果の保存
5-1. シナリオ
毎日の売上データを集計して、日付ごとにS3に保存するパイプラインを作ります。
# ===== 日次売上パイプライン =====
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import boto3
import io
def generate_daily_sales(date):
“””指定日の売上データを生成(ダミー)”””
return pd.DataFrame({
‘日付’: [date.strftime(‘%Y-%m-%d’)] * 10,
‘商品ID’: range(1, 11),
‘売上’: np.random.randint(1000, 10000, 10)
})
def aggregate_daily_sales(df):
“””日次売上を集計”””
return pd.DataFrame({
‘日付’: [df[‘日付’].iloc[0]],
‘合計売上’: [df[‘売上’].sum()],
‘平均売上’: [df[‘売上’].mean().round(0)],
‘最大売上’: [df[‘売上’].max()],
‘最小売上’: [df[‘売上’].min()],
‘件数’: [len(df)]
})
def save_to_s3_partitioned(df, bucket_name, base_path, date):
“””日付でパーティショニングしてS3に保存”””
year = date.strftime(‘%Y’)
month = date.strftime(‘%m’)
day = date.strftime(‘%d’)
s3_path = f”{base_path}/year={year}/month={month}/day={day}/summary.csv”
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
s3 = boto3.client(‘s3’)
s3.put_object(
Bucket=bucket_name,
Key=s3_path,
Body=csv_buffer.getvalue().encode(‘utf-8’)
)
return s3_path
def run_daily_sales_pipeline(start_date, end_date, bucket_name):
“””日次売上パイプラインを実行”””
current_date = start_date
print(“=” * 60)
print(“日次売上パイプライン開始”)
print(“=” * 60)
while current_date <= end_date:
# 1. 売上データ生成
sales_df = generate_daily_sales(current_date)
# 2. 集計
summary_df = aggregate_daily_sales(sales_df)
# 3. S3に保存
s3_path = save_to_s3_partitioned(
summary_df,
bucket_name,
'daily-sales-summary',
current_date
)
total_sales = summary_df['合計売上'].iloc[0]
print(f"✅ {current_date.strftime('%Y-%m-%d')}: 合計売上 ¥{total_sales:,}")
# 次の日へ
current_date += timedelta(days=1)
print("=" * 60)
print("パイプライン完了!")
# 実行例(2024年1月1日〜5日)
start = datetime(2024, 1, 1)
end = datetime(2024, 1, 5)
run_daily_sales_pipeline(start, end, 'my-data-bucket')
【実行結果】
============================================================
日次売上パイプライン開始
============================================================
✅ 2024-01-01: 合計売上 ¥45,678
✅ 2024-01-02: 合計売上 ¥52,345
✅ 2024-01-03: 合計売上 ¥48,901
✅ 2024-01-04: 合計売上 ¥61,234
✅ 2024-01-05: 合計売上 ¥55,789
============================================================
パイプライン完了!
🔐 6. セキュリティとアクセス管理
6-1. 認証情報の安全な管理
⚠️ やってはいけないこと
- コードに直接アクセスキーを書く
- GitHubにアクセスキーをアップロード
- 誰とでもアクセスキーを共有
- 権限が強すぎるキーを使う
✅ 安全な認証情報管理
- 環境変数を使う
- .envファイルを使う(.gitignoreに追加)
- IAMロールを使う(AWS EC2上で実行する場合)
- 最小権限の原則:必要最小限の権限だけ付与
6-2. .envファイルを使った認証情報管理
# ===== .envファイルの内容(Gitにはコミットしない!)=====
AWS_ACCESS_KEY_ID=your_access_key_here
AWS_SECRET_ACCESS_KEY=your_secret_key_here
AWS_DEFAULT_REGION=ap-northeast-1
S3_BUCKET_NAME=my-data-bucket
# ===== Pythonコード =====
import os
from dotenv import load_dotenv
import boto3
# .envファイルを読み込み
load_dotenv()
# 環境変数から取得
BUCKET_NAME = os.getenv(‘S3_BUCKET_NAME’)
# S3クライアント作成(自動的に環境変数から認証情報を取得)
s3 = boto3.client(‘s3’)
print(f”✅ バケット ‘{BUCKET_NAME}’ に接続しました”)
6-3. IAMポリシーの例(最小権限)
{
“Version”: “2012-10-17”,
“Statement”: [
{
“Effect”: “Allow”,
“Action”: [
“s3:PutObject”,
“s3:GetObject”,
“s3:ListBucket”
],
“Resource”: [
“arn:aws:s3:::my-data-bucket/*”,
“arn:aws:s3:::my-data-bucket”
]
}
]
}
💡 説明:このポリシーは、my-data-bucketバケットに対して、
ファイルのアップロード(PutObject)、取得(GetObject)、一覧表示(ListBucket)のみを許可します。
📝 STEP 16 のまとめ
✅ このステップで学んだこと
- AWS S3:boto3を使ったアップロード・ダウンロード
- Google Cloud Storage:google-cloud-storageの使い方
- パーティショニング:日付や地域でフォルダ分け
- 実践演習:日次集計結果を自動でS3に保存
- セキュリティ:認証情報の安全な管理方法
💡 重要ポイント
- アクセスキーは絶対にコードに書かない
- 環境変数や.envファイルを使う
- パーティショニングでデータを整理する
- 最小権限の原則を守る
- S3とGCSは使い方がほぼ同じ
🎯 次のステップの予告
次のSTEP 17では、「エラーハンドリングとロギング」を学びます。
- try-exceptの実践的な使い方
- loggingモジュールの活用
- リトライ戦略
📝 練習問題
問題 1
基礎
DataFrameをCSVに変換して、S3にアップロードしてください。
df = pd.DataFrame({
‘顧客ID’: [1, 2, 3],
‘名前’: [‘田中’, ‘佐藤’, ‘鈴木’],
‘購入金額’: [15000, 25000, 10000]
})
bucket_name = ‘my-data-bucket’
s3_path = ‘customers/data.csv’
# ここにコードを書いてください
【解答】
import boto3
import io
# DataFrameをCSVに変換
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
# S3にアップロード
s3 = boto3.client(‘s3’)
s3.put_object(
Bucket=bucket_name,
Key=s3_path,
Body=csv_buffer.getvalue().encode(‘utf-8’)
)
print(f”✅ アップロード完了: s3://{bucket_name}/{s3_path}”)
問題 2
基礎
S3からファイルをダウンロードして、DataFrameとして読み込んでください。
【解答】
import boto3
import pandas as pd
import io
s3 = boto3.client(‘s3′)
# S3からオブジェクトを取得
response = s3.get_object(
Bucket=’my-data-bucket’,
Key=’customers/data.csv’
)
# メモリ上で直接DataFrameに変換
df = pd.read_csv(io.BytesIO(response[‘Body’].read()))
print(df)
問題 3
基礎
S3内の特定プレフィックス配下のファイル一覧を取得してください。
【解答】
import boto3
s3 = boto3.client(‘s3′)
response = s3.list_objects_v2(
Bucket=’my-data-bucket’,
Prefix=’data/’
)
if ‘Contents’ in response:
for obj in response[‘Contents’]:
print(f”{obj[‘Key’]} ({obj[‘Size’]} bytes)”)
else:
print(“ファイルが見つかりませんでした”)
問題 4
基礎
GCSにDataFrameをアップロードしてください。
【解答】
from google.cloud import storage
import io
import pandas as pd
df = pd.DataFrame({‘ID’: [1, 2, 3], ‘値’: [100, 200, 300]})
# CSV変換
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
# GCSにアップロード
client = storage.Client()
bucket = client.get_bucket(‘my-gcs-bucket’)
blob = bucket.blob(‘data/test.csv’)
blob.upload_from_string(csv_buffer.getvalue(), content_type=’text/csv’)
print(“✅ GCSにアップロードしました”)
問題 5
応用
日付でパーティショニングしてS3にアップロードする関数を作成してください。
【解答】
import boto3
import io
from datetime import datetime
def upload_with_partition(df, bucket_name, base_path, date=None):
“””日付でパーティショニングしてS3にアップロード”””
if date is None:
date = datetime.now()
year = date.strftime(‘%Y’)
month = date.strftime(‘%m’)
day = date.strftime(‘%d’)
s3_path = f”{base_path}/year={year}/month={month}/day={day}/data.csv”
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
s3 = boto3.client(‘s3’)
s3.put_object(
Bucket=bucket_name,
Key=s3_path,
Body=csv_buffer.getvalue().encode(‘utf-8’)
)
print(f”✅ アップロード: s3://{bucket_name}/{s3_path}”)
return s3_path
# 使用例
df = pd.DataFrame({‘売上’: [1000, 2000, 1500]})
upload_with_partition(df, ‘my-bucket’, ‘sales’, datetime(2024, 1, 15))
問題 6
応用
S3内の特定プレフィックス配下のファイルを全てダウンロードする関数を作成してください。
【解答】
import boto3
import os
def download_s3_folder(bucket_name, prefix, local_dir):
“””S3フォルダ配下のファイルを全てダウンロード”””
s3 = boto3.client(‘s3’)
os.makedirs(local_dir, exist_ok=True)
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
if ‘Contents’ not in response:
print(“ファイルが見つかりませんでした”)
return
for obj in response[‘Contents’]:
s3_key = obj[‘Key’]
file_name = os.path.basename(s3_key)
local_path = os.path.join(local_dir, file_name)
s3.download_file(bucket_name, s3_key, local_path)
print(f”✅ ダウンロード: {s3_key}”)
print(f”\n全{len(response[‘Contents’])}件完了”)
# 使用例
download_s3_folder(‘my-bucket’, ‘sales/year=2024/’, ‘./downloads’)
問題 7
応用
複数日分のデータを生成して、日付ごとにパーティショニングしてS3にアップロードしてください。
【解答】
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import boto3
import io
def upload_multi_day_data(start_date, end_date, bucket_name, base_path):
“””複数日分のデータを日付ごとにアップロード”””
current_date = start_date
s3 = boto3.client(‘s3’)
while current_date <= end_date:
# ダミーデータ生成
df = pd.DataFrame({
'日付': [current_date.strftime('%Y-%m-%d')] * 10,
'売上': np.random.randint(1000, 10000, 10)
})
# パーティションパス
year = current_date.strftime('%Y')
month = current_date.strftime('%m')
day = current_date.strftime('%d')
s3_path = f"{base_path}/year={year}/month={month}/day={day}/data.csv"
# アップロード
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
s3.put_object(
Bucket=bucket_name,
Key=s3_path,
Body=csv_buffer.getvalue().encode('utf-8')
)
print(f"✅ {current_date.strftime('%Y-%m-%d')}")
current_date += timedelta(days=1)
# 使用例
start = datetime(2024, 1, 1)
end = datetime(2024, 1, 5)
upload_multi_day_data(start, end, 'my-bucket', 'sales')
問題 8
応用
環境変数を使った安全なS3アップロード関数を作成してください。
【解答】
import os
import boto3
import io
from dotenv import load_dotenv
def safe_upload_to_s3(df, s3_key):
“””環境変数を使った安全なS3アップロード”””
load_dotenv()
# 環境変数から取得
bucket_name = os.getenv(‘S3_BUCKET_NAME’)
if not bucket_name:
raise ValueError(“S3_BUCKET_NAMEが設定されていません”)
try:
s3 = boto3.client(‘s3’)
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
s3.put_object(
Bucket=bucket_name,
Key=s3_key,
Body=csv_buffer.getvalue().encode(‘utf-8’)
)
print(f”✅ アップロード成功: s3://{bucket_name}/{s3_key}”)
return True
except Exception as e:
print(f”❌ アップロード失敗: {e}”)
return False
# 使用例
df = pd.DataFrame({‘ID’: [1, 2, 3]})
safe_upload_to_s3(df, ‘data/test.csv’)
問題 9
発展
日次集計パイプラインを作成してください。売上データを生成→集計→S3に保存する処理を実装してください。
【解答】
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import boto3
import io
class DailySalesPipeline:
def __init__(self, bucket_name, base_path):
self.bucket_name = bucket_name
self.base_path = base_path
self.s3 = boto3.client(‘s3’)
def generate_sales(self, date):
“””売上データ生成”””
return pd.DataFrame({
‘日付’: [date.strftime(‘%Y-%m-%d’)] * 10,
‘商品ID’: range(1, 11),
‘売上’: np.random.randint(1000, 10000, 10)
})
def aggregate(self, df):
“””集計”””
return pd.DataFrame({
‘日付’: [df[‘日付’].iloc[0]],
‘合計’: [df[‘売上’].sum()],
‘平均’: [df[‘売上’].mean().round(0)],
‘件数’: [len(df)]
})
def upload(self, df, date):
“””S3にアップロード”””
year, month, day = date.strftime(‘%Y’), date.strftime(‘%m’), date.strftime(‘%d’)
s3_path = f”{self.base_path}/year={year}/month={month}/day={day}/summary.csv”
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
self.s3.put_object(
Bucket=self.bucket_name,
Key=s3_path,
Body=csv_buffer.getvalue().encode(‘utf-8’)
)
return s3_path
def run(self, start_date, end_date):
“””パイプライン実行”””
current = start_date
while current <= end_date:
sales = self.generate_sales(current)
summary = self.aggregate(sales)
s3_path = self.upload(summary, current)
print(f"✅ {current.strftime('%Y-%m-%d')}: 合計 ¥{summary['合計'].iloc[0]:,}")
current += timedelta(days=1)
# 使用例
pipeline = DailySalesPipeline('my-bucket', 'daily-summary')
pipeline.run(datetime(2024, 1, 1), datetime(2024, 1, 5))
問題 10
発展
S3とGCSの両方に同時にアップロードする関数を作成してください。
【解答】
import boto3
from google.cloud import storage
import io
import pandas as pd
def upload_to_both_clouds(df, s3_bucket, s3_key, gcs_bucket, gcs_blob):
“””S3とGCSの両方にアップロード”””
results = {‘s3’: False, ‘gcs’: False}
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
csv_content = csv_buffer.getvalue()
# S3にアップロード
try:
s3 = boto3.client(‘s3’)
s3.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=csv_content.encode(‘utf-8’)
)
results[‘s3′] = True
print(f”✅ S3: s3://{s3_bucket}/{s3_key}”)
except Exception as e:
print(f”❌ S3エラー: {e}”)
# GCSにアップロード
try:
client = storage.Client()
bucket = client.get_bucket(gcs_bucket)
blob = bucket.blob(gcs_blob)
blob.upload_from_string(csv_content, content_type=’text/csv’)
results[‘gcs’] = True
print(f”✅ GCS: gs://{gcs_bucket}/{gcs_blob}”)
except Exception as e:
print(f”❌ GCSエラー: {e}”)
return results
# 使用例
df = pd.DataFrame({‘ID’: [1, 2, 3], ‘値’: [100, 200, 300]})
upload_to_both_clouds(
df,
s3_bucket=’my-s3-bucket’,
s3_key=’data/test.csv’,
gcs_bucket=’my-gcs-bucket’,
gcs_blob=’data/test.csv’
)