S3で5GB以上のファイルを操作する方法
はじめに
データ分析に関わる人であれば誰でも一度は AWS の S3 をストレージとして利用したことはあるかと思います.
今回は Jupyter Notebook より boto3 を介して S3 に5GB以上のデータを保存しようとした際に詰まった内容について,同様の問題で苦しむ人の一助になればという想いと自身の備忘録も兼ねてまとめていきたいと思います.
実行環境
まず初めに実行環境について記載しておきます.
$ sw_vers ProductName: Mac OS X ProductVersion: 10.15.7 BuildVersion: 19H1519 $ python -V Python 3.9.1
発生事象
では,本題の詰まった事象についてです.
Jupyter Notebook から boto3 を介して S3 に 5 GB 以上のデータを保存しようとしたところ,「ClientError」によってアップロードが正常に行えないという事象に直面しました.具体的なコードは以下です.
import io import os import boto3 import random import numpy as np import pandas as pd # faker import faker from faker import Faker # boto3 from boto3 import Session from botocore.session import get_session from botocore.session import ClientError, ParamValidationError class GenerateDummyData: """ダミーデータ生成クラス""" def __init__(self, seed:int=None): """ Args: seed: ランダム生成かかわるシード値 Returns: """ self.fake = Faker() if seed: Faker.seed(seed) def create_rows(self, rows:int=1): """ Args: rows: 生成するレコード数 Returns: df: ダミーのデータフレーム """ output = [{ "name": self.fake.name(), "address": self.fake.address(), "email": self.fake.email(), "bs": self.fake.bs(), "city": self.fake.city(), "state": self.fake.state(), "date_time": self.fake.date_time(), "randomdata": random.randint(1000, 2000) } for x in range(rows)] df = pd.DataFrame(output) return df def put_s3(dataframe: pd.DataFrame, bucket:str, key:str) -> bool: """データフレームをS3に保存する関数 Args: dataframe: 保存するデータフレーム bucket: s3の保存するバケット名 key: s3の保存するキー名 Returns: bool: 保存実施の成否 """ try: session = get_session() autorefresh_session = Session(botocore_session=session) s3_client = autorefresh_session.client('s3') csv_buffer = io.StringIO() dataframe.to_csv(csv_buffer, index=False, header=True, encoding='utf-8') s3_client.put_object( Bucket=bucket, Key=key, Body=csv_buffer.getvalue() ) return True except ClientEror as e: return False # ダミーデータ生成 # 5GB以上になるように大量のレコードを生成 df = generator.create_rows(10000000000) # ストレージへの保存 put_s3(df, "sample-bucket", "storage/dataframe/df.csv")
上記のコードより発生したエラー内容は以下です.
エラー内容をみるとファイルサイズが最大値を超えているためにエラーが起きていることが分かります.
ClientError: An error occurred (EntityTooLarge) when calling the PutObject operation: Your proposed upload exceeds the maximum allowed size
原因と解決策
ClientErrorが発生した理由ですが,boto3のAPIで用意しているput_object
メソッドでアップロードできるオブジェクトサイズは5GBという制限があるようです.
boto3の公式ドキュメントには明確な記載はありませんでしたが,AWS公式ドキュメントには以下のようにしっかりS3の仕様について言及がありました.
- 1 回の PUT オペレーションでは、最大 5GB の単一のオブジェクトをアップロード可能
- マルチパートアップロード API を使用すると、最大 5 TB のサイズの単一の大容量オブジェクトをアップロードできます
そのため,S3の仕様に則ってboto3側のAPIでも同様にPUTオペレーションでの制限がかかるのは当然のことだと思います.では,どのようにこの問題に対処するのかという部分ですが,大きく2つがあります. 1つはboto3のTransferAPIを利用する方法と,もう1つはpandasのto_csvメソッドを利用する方法です.
それぞれの実装方法を以下に示しながら解説していきます.
- boto3の S3 Transfers の利用
boto3の S3 Transfers を利用した実装方法は以下になります.
この S3 Transfer のモジュールの特徴は指定したファイルサイズを上回った際に、自動的にマルチパート転送に切り替わるという機能を持っています.これによってファイルサイズが5GB以上になった場合でも,マルチパートアップロードを利用して最大 5TB までのデータをアップロードすることが可能になります.
from tempfile import NamedTemporaryFile from boto3 import Session from botocore.session import get_session from botocore.session import ClientError, ParamValidationError from boto3.s3.transfer import S3Transfer from boto3.s3.transfer import TransferConfig def put_s3(dataframe: pd.DataFrame, bucket:str, key:str) -> bool: """データフレームをS3に保存する関数 Args: dataframe: 保存するデータフレーム bucket: s3の保存するバケット名 key: s3の保存するキー名 Returns: bool: 保存実施の成否 """ try: session = get_session() autorefresh_session = Session(botocore_session=session) s3_client = autorefresh_session.client('s3') f = NamedTemporaryFile() dataframe.to_csv(f, index=False, header=True, encoding='utf-8') config = TransferConfig( multipart_threshold = 8 * 1024 * 1024, max_concurrency = 10, multipart_chunksize = 8388608, num_download_attempts = 10, max_io_queue = 100 ) transfer = S3Transfer(client=s3_client, config=config) transfer.upload_file( filename=f.name, bucket=bucket, key=key ) return True except ClientEror as e: return False
実装もそこまで難しく S3Transfer
クラスと TransferConfig
クラスをインポートして利用するだけです.
閾値等の各種パラメータの設定を TransferConfig
クラスにて行い,設定したオブジェクトを S3Transfer
に渡してインスタンス化するだけでマルチパートアップロードに対応した処理が記載できます.注意点があるとすれば,この S3Transfer
クラスのメソッドには download_file
と upload_file
のメソッドしか定義されていないため,バイナリやバッファを直接指定することができない点になります.そのため,上記の実装ではアップロードする対象のオブジェクトを一時ファイルとして出力して一時ファイルのパスを指定するとった処理で記載しています.
- pandasの to_csv メソッド利用
続いてシンプルにpandasの to_csv
のみで利用する方法です.
注意点としてPandasのバージョンが 0.20.1 以上でかつ,別途 s3fs
のライブラリのインストールが必要になります.
s3fs
はユーザ領域にS3をマウントしてファイルを保存するためAPIによる制約を受けなくなるということです.普段Pandasに慣れている人はこちらの方が可読性が高くて,学習コストもほとんどゼロに等しいため、バージョン制約や s3fs
のインストール制約がない方はこちらの方法を利用する方が無難かと思われます.
def put_s3(dataframe: pd.DataFrame, bucket:str, key:str) -> bool: """データフレームをS3に保存する関数 Args: dataframe: 保存するデータフレーム bucket: s3の保存するバケット名 key: s3の保存するキー名 Returns: bool: 保存実施の成否 """ try: dataframe.to_csv(f"s3://{bucket}/{key}", header=True, index=False) return True except: return False
リリースノートに Pandas の機能アップデート情報が記載されていますので興味があればそちらも是非参照してみてください.
まとめ
今回は備忘録も兼ねてクラウドサービスの活用という文脈でS3への大容量ファイルのアップロード方法についての記事を記載しました.
普段からライブラリの操作についてはドキュメントを割と目を通してから利用する方ですが,改めて公式ドキュメントをしっかり読むことの重要性を再認識しました.S3は開発でも機械学習でも利用頻度が高いクラウドサービスだと思うので参考になればと思います.