tkherox blog

データサイエンスおよびソフトウェア開発、たまに育児についての話を書いています

S3で5GB以上のファイルを操作する方法

はじめに

データ分析に関わる人であれば誰でも一度は AWS の S3 をストレージとして利用したことはあるかと思います.
今回は Jupyter Notebook より boto3 を介して S3 に5GB以上のデータを保存しようとした際に詰まった内容について,同様の問題で苦しむ人の一助になればという想いと自身の備忘録も兼ねてまとめていきたいと思います.

github.com

実行環境

まず初めに実行環境について記載しておきます.

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.15.7
BuildVersion:   19H1519

$ python -V
Python 3.9.1

発生事象

では,本題の詰まった事象についてです.

Jupyter Notebook から boto3 を介して S3 に 5 GB 以上のデータを保存しようとしたところ,「ClientError」によってアップロードが正常に行えないという事象に直面しました.具体的なコードは以下です.

import io
import os
import boto3
import random
import numpy as np
import pandas as pd

# faker
import faker
from faker import Faker

# boto3
from boto3 import Session
from botocore.session import get_session
from botocore.session import ClientError, ParamValidationError

class GenerateDummyData:
    """ダミーデータ生成クラス"""

    def __init__(self, seed:int=None):
        """
        Args:
             seed: ランダム生成かかわるシード値
        Returns:
        """
        self.fake = Faker()
        if seed: Faker.seed(seed)

    def create_rows(self, rows:int=1):
        """
        Args:
            rows: 生成するレコード数
        Returns:
            df: ダミーのデータフレーム
        """
        output = [{
            "name": self.fake.name(),
            "address": self.fake.address(),
            "email": self.fake.email(),
            "bs": self.fake.bs(),
            "city": self.fake.city(),
            "state": self.fake.state(),
            "date_time": self.fake.date_time(),
            "randomdata": random.randint(1000, 2000) } for x in range(rows)]

        df = pd.DataFrame(output)
        return df
    

def put_s3(dataframe: pd.DataFrame, bucket:str, key:str) -> bool:
    """データフレームをS3に保存する関数
    Args:
        dataframe: 保存するデータフレーム
        bucket: s3の保存するバケット名
        key: s3の保存するキー名
    Returns:
        bool: 保存実施の成否
    """
    try:
        session = get_session()
        autorefresh_session = Session(botocore_session=session)
        s3_client = autorefresh_session.client('s3')

        csv_buffer = io.StringIO()
        dataframe.to_csv(csv_buffer, index=False, header=True, encoding='utf-8')
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=csv_buffer.getvalue()
        )
        return True
    except ClientEror as e:
        return False

# ダミーデータ生成
#  5GB以上になるように大量のレコードを生成
df = generator.create_rows(10000000000)

# ストレージへの保存
put_s3(df, "sample-bucket", "storage/dataframe/df.csv")

上記のコードより発生したエラー内容は以下です.
エラー内容をみるとファイルサイズが最大値を超えているためにエラーが起きていることが分かります.

ClientError: An error occurred (EntityTooLarge) when calling the PutObject operation: Your proposed upload exceeds the maximum allowed size

原因と解決策

ClientErrorが発生した理由ですが,boto3のAPIで用意しているput_object メソッドでアップロードできるオブジェクトサイズは5GBという制限があるようです.
boto3の公式ドキュメントには明確な記載はありませんでしたが,AWS公式ドキュメントには以下のようにしっかりS3の仕様について言及がありました.

  • 1 回の PUT オペレーションでは、最大 5GB の単一のオブジェクトをアップロード可能
  • マルチパートアップロード API を使用すると、最大 5 TB のサイズの単一の大容量オブジェクトをアップロードできます

docs.aws.amazon.com

そのため,S3の仕様に則ってboto3側のAPIでも同様にPUTオペレーションでの制限がかかるのは当然のことだと思います.では,どのようにこの問題に対処するのかという部分ですが,大きく2つがあります. 1つはboto3のTransferAPIを利用する方法と,もう1つはpandasのto_csvメソッドを利用する方法です.
それぞれの実装方法を以下に示しながら解説していきます.

  1. boto3の S3 Transfers の利用

boto3の S3 Transfers を利用した実装方法は以下になります.
この S3 Transfer のモジュールの特徴は指定したファイルサイズを上回った際に、自動的にマルチパート転送に切り替わるという機能を持っています.これによってファイルサイズが5GB以上になった場合でも,マルチパートアップロードを利用して最大 5TB までのデータをアップロードすることが可能になります.

from tempfile import NamedTemporaryFile

from boto3 import Session
from botocore.session import get_session
from botocore.session import ClientError, ParamValidationError
from boto3.s3.transfer import S3Transfer
from boto3.s3.transfer import TransferConfig

def put_s3(dataframe: pd.DataFrame, bucket:str, key:str) -> bool:
    """データフレームをS3に保存する関数
    Args:
        dataframe: 保存するデータフレーム
        bucket: s3の保存するバケット名
        key: s3の保存するキー名
    Returns:
        bool: 保存実施の成否
    """
    try:
        session = get_session()
        autorefresh_session = Session(botocore_session=session)
        s3_client = autorefresh_session.client('s3')

        f = NamedTemporaryFile()
        dataframe.to_csv(f, index=False, header=True, encoding='utf-8')
 
        config = TransferConfig(
            multipart_threshold = 8 * 1024 * 1024,
            max_concurrency = 10,
            multipart_chunksize = 8388608,
            num_download_attempts = 10,
            max_io_queue = 100
        )
        transfer = S3Transfer(client=s3_client, config=config)
        transfer.upload_file(
            filename=f.name,
            bucket=bucket,
            key=key
        ) 
        return True
    except ClientEror as e:
        return False
 

実装もそこまで難しく S3Transfer クラスと TransferConfig クラスをインポートして利用するだけです.
閾値等の各種パラメータの設定を TransferConfig クラスにて行い,設定したオブジェクトを S3Transfer に渡してインスタンス化するだけでマルチパートアップロードに対応した処理が記載できます.注意点があるとすれば,この S3Transfer クラスのメソッドには download_fileupload_file のメソッドしか定義されていないため,バイナリやバッファを直接指定することができない点になります.そのため,上記の実装ではアップロードする対象のオブジェクトを一時ファイルとして出力して一時ファイルのパスを指定するとった処理で記載しています.

  • pandasの to_csv メソッド利用

続いてシンプルにpandasの to_csv のみで利用する方法です.
注意点としてPandasのバージョンが 0.20.1 以上でかつ,別途 s3fs のライブラリのインストールが必要になります.
s3fs はユーザ領域にS3をマウントしてファイルを保存するためAPIによる制約を受けなくなるということです.普段Pandasに慣れている人はこちらの方が可読性が高くて,学習コストもほとんどゼロに等しいため、バージョン制約や s3fs のインストール制約がない方はこちらの方法を利用する方が無難かと思われます.

def put_s3(dataframe: pd.DataFrame, bucket:str, key:str) -> bool:
    """データフレームをS3に保存する関数
    Args:
        dataframe: 保存するデータフレーム
        bucket: s3の保存するバケット名
        key: s3の保存するキー名
    Returns:
        bool: 保存実施の成否
    """
    try:
        dataframe.to_csv(f"s3://{bucket}/{key}", header=True, index=False)
        return True
    except:
        return False

リリースノートに Pandas の機能アップデート情報が記載されていますので興味があればそちらも是非参照してみてください.

まとめ

今回は備忘録も兼ねてクラウドサービスの活用という文脈でS3への大容量ファイルのアップロード方法についての記事を記載しました.
普段からライブラリの操作についてはドキュメントを割と目を通してから利用する方ですが,改めて公式ドキュメントをしっかり読むことの重要性を再認識しました.S3は開発でも機械学習でも利用頻度が高いクラウドサービスだと思うので参考になればと思います.

参考情報