STEP 16:クラウドストレージへのロード

☁️ STEP 16: クラウドストレージへのロード

AWS S3やGoogle Cloud Storageにデータをアップロードしよう

📋 このステップで学ぶこと

  • AWS S3へのアップロード(boto3の使い方)
  • Google Cloud Storageへのアップロード
  • ファイル構造の設計(パーティショニング)
  • 実践演習:集計結果を日次で保存
  • セキュリティとアクセス管理

⏱️ 学習時間の目安:2時間

📝 練習問題:10問(基礎4問・応用4問・発展2問)

🎯 1. クラウドストレージとは?

1-1. クラウドストレージの基本

クラウドストレージとは、インターネット上にデータを保存できるサービスです。
どこからでもアクセスでき、大容量のデータを保存できます。

📚 例え話:巨大なレンタル倉庫

クラウドストレージは、インターネット上の巨大なレンタル倉庫のようなものです。

自分のPCの保存領域:自宅の棚(容量に限りがある)
クラウドストレージ:レンタル倉庫(必要なだけ借りられる)

いつでもどこからでも荷物を出し入れでき、
火事や地震があっても荷物は安全に保管されています。

1-2. 主なクラウドストレージサービス

主要クラウドストレージの比較
サービス 提供元 特徴
AWS S3 Amazon 最も人気、豊富な連携サービス
GCS Google BigQueryと連携しやすい
Azure Blob Microsoft Microsoft製品と連携しやすい

1-3. クラウドストレージを使うメリット

クラウドストレージのメリット
メリット 説明
大容量 ペタバイト級のデータも保存可能
高い可用性 99.99%以上の稼働率、障害に強い
スケーラブル 必要に応じて容量を増やせる
コスト効率 使った分だけ課金、初期費用不要
どこからでもアクセス インターネットがあれば世界中からアクセス可能
💡 実務でのよくある使い方
  • 日次で集計した結果をS3に保存
  • ログファイルをS3にアーカイブ
  • 機械学習用の大量データを保存
  • データレイクとして活用(生データを全て保存)

📦 2. AWS S3へのアップロード

2-1. boto3ライブラリのインストール

boto3は、PythonからAWSのサービスを操作するためのライブラリです。

pip install boto3

2-2. 認証情報の設定

S3にアクセスするには、アクセスキーシークレットキーが必要です。

⚠️ セキュリティ上の重要な注意
  • アクセスキーとシークレットキーは絶対にコードに直接書かない
  • 環境変数に保存する
  • GitHubなどにアップロードしない(.gitignoreに追加)
  • 定期的に認証情報を更新する
# ===== 環境変数の設定 ===== # Linux/Mac export AWS_ACCESS_KEY_ID=”your_access_key” export AWS_SECRET_ACCESS_KEY=”your_secret_key” export AWS_DEFAULT_REGION=”ap-northeast-1″ # Windows(コマンドプロンプト) set AWS_ACCESS_KEY_ID=your_access_key set AWS_SECRET_ACCESS_KEY=your_secret_key set AWS_DEFAULT_REGION=ap-northeast-1 # Windows(PowerShell) $env:AWS_ACCESS_KEY_ID=”your_access_key” $env:AWS_SECRET_ACCESS_KEY=”your_secret_key” $env:AWS_DEFAULT_REGION=”ap-northeast-1″

2-3. 基本的なS3アップロード

# ===== S3への基本的なアップロード ===== import boto3 import pandas as pd # サンプルデータ df = pd.DataFrame({ ‘商品ID’: [1, 2, 3, 4, 5], ‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’, ‘ぶどう’, ‘いちご’], ‘価格’: [150, 100, 80, 300, 250], ‘在庫’: [50, 100, 80, 30, 40] }) print(“=== アップロードするデータ ===”) print(df) # まずローカルにCSVファイルを作成 df.to_csv(‘products.csv’, index=False, encoding=’utf-8-sig’) # S3クライアントを作成(環境変数から自動で認証情報を取得) s3 = boto3.client(‘s3’) # S3にアップロード bucket_name = ‘my-data-bucket’ # あなたのバケット名 file_name = ‘products.csv’ # ローカルファイル名 s3_path = ‘data/products.csv’ # S3内のパス s3.upload_file(file_name, bucket_name, s3_path) print(f”\n✅ S3にアップロードしました!”) print(f” パス: s3://{bucket_name}/{s3_path}”)
【実行結果】 === アップロードするデータ === 商品ID 商品名 価格 在庫 0 1 りんご 150 50 1 2 バナナ 100 100 2 3 みかん 80 80 3 4 ぶどう 300 30 4 5 いちご 250 40 ✅ S3にアップロードしました! パス: s3://my-data-bucket/data/products.csv

2-4. メモリ上のデータを直接アップロード

ファイルを作成せずに、メモリ上のデータを直接S3にアップロードできます。

# ===== メモリから直接S3にアップロード ===== import boto3 import io import pandas as pd # サンプルデータ df = pd.DataFrame({ ‘商品ID’: [1, 2, 3], ‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’], ‘価格’: [150, 100, 80] }) # DataFrameをCSV文字列に変換 csv_buffer = io.StringIO() df.to_csv(csv_buffer, index=False) # S3にアップロード s3 = boto3.client(‘s3′) s3.put_object( Bucket=’my-data-bucket’, Key=’data/products_direct.csv’, Body=csv_buffer.getvalue().encode(‘utf-8’) ) print(“✅ メモリから直接S3にアップロードしました!”)
【実行結果】 ✅ メモリから直接S3にアップロードしました!
🎯 アップロード方法の使い分け
  • upload_file:ローカルファイルをアップロード
  • put_object:メモリ上のデータを直接アップロード(ファイル不要)

メモリから直接アップロードする方がディスクI/Oが不要で効率的です。

2-5. S3からファイルをダウンロード

# ===== S3からファイルをダウンロード ===== import boto3 import pandas as pd s3 = boto3.client(‘s3’) # 方法1:ファイルに保存してから読み込み s3.download_file( ‘my-data-bucket’, # バケット名 ‘data/products.csv’, # S3内のパス ‘downloaded_products.csv’ # ローカルに保存するファイル名 ) df = pd.read_csv(‘downloaded_products.csv’) print(“=== ダウンロードしたデータ ===”) print(df)
【実行結果】 === ダウンロードしたデータ === 商品ID 商品名 価格 在庫 0 1 りんご 150 50 1 2 バナナ 100 100 2 3 みかん 80 80 3 4 ぶどう 300 30 4 5 いちご 250 40

2-6. S3内のファイル一覧を取得

# ===== S3内のファイル一覧を取得 ===== import boto3 s3 = boto3.client(‘s3′) response = s3.list_objects_v2( Bucket=’my-data-bucket’, Prefix=’data/’ # このプレフィックスで始まるファイルのみ ) if ‘Contents’ in response: print(“=== S3内のファイル一覧 ===”) for obj in response[‘Contents’]: size_kb = obj[‘Size’] / 1024 print(f” {obj[‘Key’]} ({size_kb:.1f} KB)”) else: print(“ファイルが見つかりませんでした”)
【実行結果】 === S3内のファイル一覧 === data/products.csv (0.2 KB) data/products_direct.csv (0.1 KB) data/sales_2024-01-01.csv (0.5 KB) data/sales_2024-01-02.csv (0.5 KB)

☁️ 3. Google Cloud Storageへのアップロード

3-1. google-cloud-storageライブラリのインストール

pip install google-cloud-storage

3-2. 認証情報の設定

GCSを使うには、サービスアカウントキー(JSONファイル)が必要です。

# 環境変数で認証情報を設定 export GOOGLE_APPLICATION_CREDENTIALS=”/path/to/your-service-account-key.json”

3-3. 基本的なGCSアップロード

# ===== GCSへの基本的なアップロード ===== from google.cloud import storage import pandas as pd # サンプルデータ df = pd.DataFrame({ ‘商品ID’: [1, 2, 3], ‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’], ‘価格’: [150, 100, 80] }) # ローカルにCSVを保存 df.to_csv(‘products.csv’, index=False, encoding=’utf-8-sig’) # GCSクライアントを作成 client = storage.Client() # バケットを取得 bucket = client.get_bucket(‘my-gcs-bucket’) # ファイルをアップロード blob = bucket.blob(‘data/products.csv’) blob.upload_from_filename(‘products.csv’) print(f”✅ GCSにアップロードしました!”) print(f” パス: gs://my-gcs-bucket/data/products.csv”)
【実行結果】 ✅ GCSにアップロードしました! パス: gs://my-gcs-bucket/data/products.csv

3-4. メモリから直接アップロード

# ===== メモリから直接GCSにアップロード ===== from google.cloud import storage import io import pandas as pd df = pd.DataFrame({ ‘商品ID’: [1, 2, 3], ‘商品名’: [‘りんご’, ‘バナナ’, ‘みかん’], ‘価格’: [150, 100, 80] }) # DataFrameをCSV文字列に変換 csv_buffer = io.StringIO() df.to_csv(csv_buffer, index=False) # GCSにアップロード client = storage.Client() bucket = client.get_bucket(‘my-gcs-bucket’) blob = bucket.blob(‘data/products_direct.csv’) blob.upload_from_string(csv_buffer.getvalue(), content_type=’text/csv’) print(“✅ メモリから直接GCSにアップロードしました!”)
💡 S3とGCSの違い
操作 AWS S3 GCS
クライアント作成 boto3.client(‘s3’) storage.Client()
アップロード upload_file / put_object blob.upload_from_filename
ダウンロード download_file / get_object blob.download_to_filename

📂 4. ファイル構造の設計(パーティショニング)

4-1. パーティショニングとは?

パーティショニングとは、データを特定のルールでフォルダ分けすることです。
日付や地域などで分けることで、必要なデータだけを高速に取得できます。

📁 よくあるパーティショニング例
s3://my-bucket/ └── sales/ ├── year=2024/ │ ├── month=01/ │ │ ├── day=01/ │ │ │ └── data.csv │ │ ├── day=02/ │ │ │ └── data.csv │ │ └── … │ └── month=02/ │ └── … └── year=2025/ └── …

4-2. 日付でパーティショニング

# ===== 日付でパーティショニングしてアップロード ===== import boto3 import io import pandas as pd from datetime import datetime def upload_with_date_partition(df, bucket_name, base_path, date=None): “”” 日付ごとにパーティショニングしてS3にアップロード Parameters: ———– df : DataFrame アップロードするデータ bucket_name : str S3バケット名 base_path : str ベースパス date : datetime or None 日付(Noneの場合は今日) Returns: ——– str : S3パス “”” if date is None: date = datetime.now() # パーティションパスを作成 year = date.strftime(‘%Y’) month = date.strftime(‘%m’) day = date.strftime(‘%d’) s3_path = f”{base_path}/year={year}/month={month}/day={day}/data.csv” # CSVに変換 csv_buffer = io.StringIO() df.to_csv(csv_buffer, index=False) # S3にアップロード s3 = boto3.client(‘s3’) s3.put_object( Bucket=bucket_name, Key=s3_path, Body=csv_buffer.getvalue().encode(‘utf-8’) ) print(f”✅ アップロード完了”) print(f” パス: s3://{bucket_name}/{s3_path}”) return s3_path # 使用例 df = pd.DataFrame({ ‘売上ID’: [1, 2, 3], ‘金額’: [1000, 2000, 1500] }) upload_with_date_partition(df, ‘my-data-bucket’, ‘sales’, datetime(2024, 1, 15))
【実行結果】 ✅ アップロード完了 パス: s3://my-data-bucket/sales/year=2024/month=01/day=15/data.csv

4-3. 複数の軸でパーティショニング

# ===== 日付と地域でパーティショニング ===== def upload_with_multi_partition(df, bucket_name, base_path, date, region): “””日付と地域でパーティショニング””” year = date.strftime(‘%Y’) month = date.strftime(‘%m’) day = date.strftime(‘%d’) # パーティションパス(地域 → 年 → 月 → 日) s3_path = f”{base_path}/region={region}/year={year}/month={month}/day={day}/data.csv” # CSV変換してアップロード csv_buffer = io.StringIO() df.to_csv(csv_buffer, index=False) s3 = boto3.client(‘s3’) s3.put_object( Bucket=bucket_name, Key=s3_path, Body=csv_buffer.getvalue().encode(‘utf-8’) ) print(f”✅ {region}の{date.strftime(‘%Y-%m-%d’)}データをアップロード”) return s3_path # 使用例 from datetime import datetime tokyo_df = pd.DataFrame({‘売上’: [1000, 2000]}) osaka_df = pd.DataFrame({‘売上’: [1500, 2500]}) today = datetime(2024, 1, 15) upload_with_multi_partition(tokyo_df, ‘my-data-bucket’, ‘sales’, today, ‘tokyo’) upload_with_multi_partition(osaka_df, ‘my-data-bucket’, ‘sales’, today, ‘osaka’)
【実行結果】 ✅ tokyoの2024-01-15データをアップロード ✅ osakaの2024-01-15データをアップロード
🎯 パーティショニングのメリット
  • 高速なデータ取得:必要な日付だけ読み込める
  • コスト削減:不要なデータを読まない(課金対象外)
  • 管理しやすい:ファイルが整理される
  • 並列処理:パーティションごとに処理できる

💼 5. 実践演習:日次集計結果の保存

5-1. シナリオ

毎日の売上データを集計して、日付ごとにS3に保存するパイプラインを作ります。

# ===== 日次売上パイプライン ===== import pandas as pd import numpy as np from datetime import datetime, timedelta import boto3 import io def generate_daily_sales(date): “””指定日の売上データを生成(ダミー)””” return pd.DataFrame({ ‘日付’: [date.strftime(‘%Y-%m-%d’)] * 10, ‘商品ID’: range(1, 11), ‘売上’: np.random.randint(1000, 10000, 10) }) def aggregate_daily_sales(df): “””日次売上を集計””” return pd.DataFrame({ ‘日付’: [df[‘日付’].iloc[0]], ‘合計売上’: [df[‘売上’].sum()], ‘平均売上’: [df[‘売上’].mean().round(0)], ‘最大売上’: [df[‘売上’].max()], ‘最小売上’: [df[‘売上’].min()], ‘件数’: [len(df)] }) def save_to_s3_partitioned(df, bucket_name, base_path, date): “””日付でパーティショニングしてS3に保存””” year = date.strftime(‘%Y’) month = date.strftime(‘%m’) day = date.strftime(‘%d’) s3_path = f”{base_path}/year={year}/month={month}/day={day}/summary.csv” csv_buffer = io.StringIO() df.to_csv(csv_buffer, index=False) s3 = boto3.client(‘s3’) s3.put_object( Bucket=bucket_name, Key=s3_path, Body=csv_buffer.getvalue().encode(‘utf-8’) ) return s3_path def run_daily_sales_pipeline(start_date, end_date, bucket_name): “””日次売上パイプラインを実行””” current_date = start_date print(“=” * 60) print(“日次売上パイプライン開始”) print(“=” * 60) while current_date <= end_date: # 1. 売上データ生成 sales_df = generate_daily_sales(current_date) # 2. 集計 summary_df = aggregate_daily_sales(sales_df) # 3. S3に保存 s3_path = save_to_s3_partitioned( summary_df, bucket_name, 'daily-sales-summary', current_date ) total_sales = summary_df['合計売上'].iloc[0] print(f"✅ {current_date.strftime('%Y-%m-%d')}: 合計売上 ¥{total_sales:,}") # 次の日へ current_date += timedelta(days=1) print("=" * 60) print("パイプライン完了!") # 実行例(2024年1月1日〜5日) start = datetime(2024, 1, 1) end = datetime(2024, 1, 5) run_daily_sales_pipeline(start, end, 'my-data-bucket')
【実行結果】 ============================================================ 日次売上パイプライン開始 ============================================================ ✅ 2024-01-01: 合計売上 ¥45,678 ✅ 2024-01-02: 合計売上 ¥52,345 ✅ 2024-01-03: 合計売上 ¥48,901 ✅ 2024-01-04: 合計売上 ¥61,234 ✅ 2024-01-05: 合計売上 ¥55,789 ============================================================ パイプライン完了!

🔐 6. セキュリティとアクセス管理

6-1. 認証情報の安全な管理

⚠️ やってはいけないこと
  • コードに直接アクセスキーを書く
  • GitHubにアクセスキーをアップロード
  • 誰とでもアクセスキーを共有
  • 権限が強すぎるキーを使う
✅ 安全な認証情報管理
  1. 環境変数を使う
  2. .envファイルを使う(.gitignoreに追加)
  3. IAMロールを使う(AWS EC2上で実行する場合)
  4. 最小権限の原則:必要最小限の権限だけ付与

6-2. .envファイルを使った認証情報管理

# ===== .envファイルの内容(Gitにはコミットしない!)===== AWS_ACCESS_KEY_ID=your_access_key_here AWS_SECRET_ACCESS_KEY=your_secret_key_here AWS_DEFAULT_REGION=ap-northeast-1 S3_BUCKET_NAME=my-data-bucket
# ===== Pythonコード ===== import os from dotenv import load_dotenv import boto3 # .envファイルを読み込み load_dotenv() # 環境変数から取得 BUCKET_NAME = os.getenv(‘S3_BUCKET_NAME’) # S3クライアント作成(自動的に環境変数から認証情報を取得) s3 = boto3.client(‘s3’) print(f”✅ バケット ‘{BUCKET_NAME}’ に接続しました”)

6-3. IAMポリシーの例(最小権限)

{ “Version”: “2012-10-17”, “Statement”: [ { “Effect”: “Allow”, “Action”: [ “s3:PutObject”, “s3:GetObject”, “s3:ListBucket” ], “Resource”: [ “arn:aws:s3:::my-data-bucket/*”, “arn:aws:s3:::my-data-bucket” ] } ] }

💡 説明:このポリシーは、my-data-bucketバケットに対して、
ファイルのアップロード(PutObject)、取得(GetObject)、一覧表示(ListBucket)のみを許可します。

📝 STEP 16 のまとめ

✅ このステップで学んだこと
  • AWS S3:boto3を使ったアップロード・ダウンロード
  • Google Cloud Storage:google-cloud-storageの使い方
  • パーティショニング:日付や地域でフォルダ分け
  • 実践演習:日次集計結果を自動でS3に保存
  • セキュリティ:認証情報の安全な管理方法
💡 重要ポイント
  • アクセスキーは絶対にコードに書かない
  • 環境変数.envファイルを使う
  • パーティショニングでデータを整理する
  • 最小権限の原則を守る
  • S3とGCSは使い方がほぼ同じ
🎯 次のステップの予告

次のSTEP 17では、「エラーハンドリングとロギング」を学びます。

  • try-exceptの実践的な使い方
  • loggingモジュールの活用
  • リトライ戦略

📝 練習問題

問題 1 基礎

DataFrameをCSVに変換して、S3にアップロードしてください。

df = pd.DataFrame({ ‘顧客ID’: [1, 2, 3], ‘名前’: [‘田中’, ‘佐藤’, ‘鈴木’], ‘購入金額’: [15000, 25000, 10000] }) bucket_name = ‘my-data-bucket’ s3_path = ‘customers/data.csv’ # ここにコードを書いてください
【解答】
import boto3 import io # DataFrameをCSVに変換 csv_buffer = io.StringIO() df.to_csv(csv_buffer, index=False) # S3にアップロード s3 = boto3.client(‘s3’) s3.put_object( Bucket=bucket_name, Key=s3_path, Body=csv_buffer.getvalue().encode(‘utf-8’) ) print(f”✅ アップロード完了: s3://{bucket_name}/{s3_path}”)
問題 2 基礎

S3からファイルをダウンロードして、DataFrameとして読み込んでください。

【解答】
import boto3 import pandas as pd import io s3 = boto3.client(‘s3′) # S3からオブジェクトを取得 response = s3.get_object( Bucket=’my-data-bucket’, Key=’customers/data.csv’ ) # メモリ上で直接DataFrameに変換 df = pd.read_csv(io.BytesIO(response[‘Body’].read())) print(df)
問題 3 基礎

S3内の特定プレフィックス配下のファイル一覧を取得してください。

【解答】
import boto3 s3 = boto3.client(‘s3′) response = s3.list_objects_v2( Bucket=’my-data-bucket’, Prefix=’data/’ ) if ‘Contents’ in response: for obj in response[‘Contents’]: print(f”{obj[‘Key’]} ({obj[‘Size’]} bytes)”) else: print(“ファイルが見つかりませんでした”)
問題 4 基礎

GCSにDataFrameをアップロードしてください。

【解答】
from google.cloud import storage import io import pandas as pd df = pd.DataFrame({‘ID’: [1, 2, 3], ‘値’: [100, 200, 300]}) # CSV変換 csv_buffer = io.StringIO() df.to_csv(csv_buffer, index=False) # GCSにアップロード client = storage.Client() bucket = client.get_bucket(‘my-gcs-bucket’) blob = bucket.blob(‘data/test.csv’) blob.upload_from_string(csv_buffer.getvalue(), content_type=’text/csv’) print(“✅ GCSにアップロードしました”)
問題 5 応用

日付でパーティショニングしてS3にアップロードする関数を作成してください。

【解答】
import boto3 import io from datetime import datetime def upload_with_partition(df, bucket_name, base_path, date=None): “””日付でパーティショニングしてS3にアップロード””” if date is None: date = datetime.now() year = date.strftime(‘%Y’) month = date.strftime(‘%m’) day = date.strftime(‘%d’) s3_path = f”{base_path}/year={year}/month={month}/day={day}/data.csv” csv_buffer = io.StringIO() df.to_csv(csv_buffer, index=False) s3 = boto3.client(‘s3’) s3.put_object( Bucket=bucket_name, Key=s3_path, Body=csv_buffer.getvalue().encode(‘utf-8’) ) print(f”✅ アップロード: s3://{bucket_name}/{s3_path}”) return s3_path # 使用例 df = pd.DataFrame({‘売上’: [1000, 2000, 1500]}) upload_with_partition(df, ‘my-bucket’, ‘sales’, datetime(2024, 1, 15))
問題 6 応用

S3内の特定プレフィックス配下のファイルを全てダウンロードする関数を作成してください。

【解答】
import boto3 import os def download_s3_folder(bucket_name, prefix, local_dir): “””S3フォルダ配下のファイルを全てダウンロード””” s3 = boto3.client(‘s3’) os.makedirs(local_dir, exist_ok=True) response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) if ‘Contents’ not in response: print(“ファイルが見つかりませんでした”) return for obj in response[‘Contents’]: s3_key = obj[‘Key’] file_name = os.path.basename(s3_key) local_path = os.path.join(local_dir, file_name) s3.download_file(bucket_name, s3_key, local_path) print(f”✅ ダウンロード: {s3_key}”) print(f”\n全{len(response[‘Contents’])}件完了”) # 使用例 download_s3_folder(‘my-bucket’, ‘sales/year=2024/’, ‘./downloads’)
問題 7 応用

複数日分のデータを生成して、日付ごとにパーティショニングしてS3にアップロードしてください。

【解答】
import pandas as pd import numpy as np from datetime import datetime, timedelta import boto3 import io def upload_multi_day_data(start_date, end_date, bucket_name, base_path): “””複数日分のデータを日付ごとにアップロード””” current_date = start_date s3 = boto3.client(‘s3’) while current_date <= end_date: # ダミーデータ生成 df = pd.DataFrame({ '日付': [current_date.strftime('%Y-%m-%d')] * 10, '売上': np.random.randint(1000, 10000, 10) }) # パーティションパス year = current_date.strftime('%Y') month = current_date.strftime('%m') day = current_date.strftime('%d') s3_path = f"{base_path}/year={year}/month={month}/day={day}/data.csv" # アップロード csv_buffer = io.StringIO() df.to_csv(csv_buffer, index=False) s3.put_object( Bucket=bucket_name, Key=s3_path, Body=csv_buffer.getvalue().encode('utf-8') ) print(f"✅ {current_date.strftime('%Y-%m-%d')}") current_date += timedelta(days=1) # 使用例 start = datetime(2024, 1, 1) end = datetime(2024, 1, 5) upload_multi_day_data(start, end, 'my-bucket', 'sales')
問題 8 応用

環境変数を使った安全なS3アップロード関数を作成してください。

【解答】
import os import boto3 import io from dotenv import load_dotenv def safe_upload_to_s3(df, s3_key): “””環境変数を使った安全なS3アップロード””” load_dotenv() # 環境変数から取得 bucket_name = os.getenv(‘S3_BUCKET_NAME’) if not bucket_name: raise ValueError(“S3_BUCKET_NAMEが設定されていません”) try: s3 = boto3.client(‘s3’) csv_buffer = io.StringIO() df.to_csv(csv_buffer, index=False) s3.put_object( Bucket=bucket_name, Key=s3_key, Body=csv_buffer.getvalue().encode(‘utf-8’) ) print(f”✅ アップロード成功: s3://{bucket_name}/{s3_key}”) return True except Exception as e: print(f”❌ アップロード失敗: {e}”) return False # 使用例 df = pd.DataFrame({‘ID’: [1, 2, 3]}) safe_upload_to_s3(df, ‘data/test.csv’)
問題 9 発展

日次集計パイプラインを作成してください。売上データを生成→集計→S3に保存する処理を実装してください。

【解答】
import pandas as pd import numpy as np from datetime import datetime, timedelta import boto3 import io class DailySalesPipeline: def __init__(self, bucket_name, base_path): self.bucket_name = bucket_name self.base_path = base_path self.s3 = boto3.client(‘s3’) def generate_sales(self, date): “””売上データ生成””” return pd.DataFrame({ ‘日付’: [date.strftime(‘%Y-%m-%d’)] * 10, ‘商品ID’: range(1, 11), ‘売上’: np.random.randint(1000, 10000, 10) }) def aggregate(self, df): “””集計””” return pd.DataFrame({ ‘日付’: [df[‘日付’].iloc[0]], ‘合計’: [df[‘売上’].sum()], ‘平均’: [df[‘売上’].mean().round(0)], ‘件数’: [len(df)] }) def upload(self, df, date): “””S3にアップロード””” year, month, day = date.strftime(‘%Y’), date.strftime(‘%m’), date.strftime(‘%d’) s3_path = f”{self.base_path}/year={year}/month={month}/day={day}/summary.csv” csv_buffer = io.StringIO() df.to_csv(csv_buffer, index=False) self.s3.put_object( Bucket=self.bucket_name, Key=s3_path, Body=csv_buffer.getvalue().encode(‘utf-8’) ) return s3_path def run(self, start_date, end_date): “””パイプライン実行””” current = start_date while current <= end_date: sales = self.generate_sales(current) summary = self.aggregate(sales) s3_path = self.upload(summary, current) print(f"✅ {current.strftime('%Y-%m-%d')}: 合計 ¥{summary['合計'].iloc[0]:,}") current += timedelta(days=1) # 使用例 pipeline = DailySalesPipeline('my-bucket', 'daily-summary') pipeline.run(datetime(2024, 1, 1), datetime(2024, 1, 5))
問題 10 発展

S3とGCSの両方に同時にアップロードする関数を作成してください。

【解答】
import boto3 from google.cloud import storage import io import pandas as pd def upload_to_both_clouds(df, s3_bucket, s3_key, gcs_bucket, gcs_blob): “””S3とGCSの両方にアップロード””” results = {‘s3’: False, ‘gcs’: False} csv_buffer = io.StringIO() df.to_csv(csv_buffer, index=False) csv_content = csv_buffer.getvalue() # S3にアップロード try: s3 = boto3.client(‘s3’) s3.put_object( Bucket=s3_bucket, Key=s3_key, Body=csv_content.encode(‘utf-8’) ) results[‘s3′] = True print(f”✅ S3: s3://{s3_bucket}/{s3_key}”) except Exception as e: print(f”❌ S3エラー: {e}”) # GCSにアップロード try: client = storage.Client() bucket = client.get_bucket(gcs_bucket) blob = bucket.blob(gcs_blob) blob.upload_from_string(csv_content, content_type=’text/csv’) results[‘gcs’] = True print(f”✅ GCS: gs://{gcs_bucket}/{gcs_blob}”) except Exception as e: print(f”❌ GCSエラー: {e}”) return results # 使用例 df = pd.DataFrame({‘ID’: [1, 2, 3], ‘値’: [100, 200, 300]}) upload_to_both_clouds( df, s3_bucket=’my-s3-bucket’, s3_key=’data/test.csv’, gcs_bucket=’my-gcs-bucket’, gcs_blob=’data/test.csv’ )
📝

学習メモ

ETL・データパイプライン構築 - Step 16

📋 過去のメモ一覧
#artnasekai #学習メモ
LINE