tkherox blog

データサイエンスおよびソフトウェア開発、たまに育児についての話を書いています

TorchServe入門

TorchServeとは

TorchServeはPytorchで構築したモデルをサービングするためのモデルサービングライブラリです.

AWSFacebookが連携して開発しているため,Pytorchのコミュニティと共に今後の発展が期待できます.また,GithubのスターもTensorflow Servingの4515と比べると見劣りしますが,2020年12月時点で1464と様々な人から注目されていることが分かります.
また,AWSとの親和性も高く,TorchServeによる推論サーバをSageMakerやEKS上に構築して,スケールアウトする事例を公式のブログにて公開しています.このようにプラットフォーム側がサポート・推奨しているライブラリを利用することは安心感がありますし,本番環境までを意識したモデル構築を実現できるTorchServeの存在は非常に大きいと思います. 私自身もこれまではPytorchで作成したモデルをデプロイする際には自身でAPIサーバの設計と実装,ログ設計,テストなど多くのことを検討・実施しておりましたが,TorchServeの登場によって多くのプロセスを簡略化することができるため非常に有効なツールだと感じています.

github.com

pytorch.org

さて,以降ではTorchServeの紹介と独自で作成したオリジナルモデルをTorchServeにデプロイしてサービングを実践していきます.

アーキテクチャ

Pytorchのアーキテクチャは以下のようになっております.

TorchServeをデプロイするといくつかのコンポーネントが立ち上がります.Frontendと呼ばれるコンポーネントがInference APIを提供して推論用のAPIエンドポイントを提供します.モデルごとにエンドポイントが提供されるため,複数のモデルを同時に扱うことができます.
また,Process Orchestrationの部分ではManagement APIと呼ばれるモデルの登録やステータスを確認するためのエンドポイントも提供しています.モデル新規登録からバックエンドで動作するワーカーのスケールなどの制御を行うことができます.

f:id:takaherox:20200726152512p:plain
architecture

  • 構成要素
    • Frontend(フロントエンド)
      • リクエストとレスポンスを扱うTorchServeのコンポーネントで,クライアントからのリクエストとレスポンスの両方を処理してモデルのライフサイクルを管理します
    • Model Worker (モデルワーカー)
      • ワーカーはモデル推論を実際に実行する責任があり,実際に実行されるモデルのインスタンスを指します.
    • Model(モデル)
      • モデルはstate_dictsなどの他のモデルアーティファクトと共に,データに対してカスタマイズされた前処理や後処理を提供することができます.また,モデルはクラウドストレージまたはローカルホストからロードできます.
    • Plugins(プラグイン)
      • TorchServeの起動時に利用者によってカスタマイズされたエンドポイントや認証認可,バッチ処理を指定できます.
    • Model Store(モデルストア)
      • 読み込みが可能なモデルが存在するディレクトリを指します.

また,TorchServeで提供されるAPIはOpenAPIの標準仕様に則って作成されているため,APIインターフェース仕様を容易に理解できるようにもなっております.API仕様を説明したJson形式ドキュメントを介して以下に示すようなSwaggerUIも確認できるのでそちらも理解を手助けしてくれます.ドキュメントが手厚いのは利用者側からすると組織やグループで導入する際のハードルを下げる重要な要素になってくるので非常にありがたいです.

f:id:takaherox:20200725224319p:plain
swagger ui

メリット

次にTorchServeのメリットを記述します.
やはりTorchServeのメリットは以下になると思います.

  • 機械学習モデルを活用した機能のサービスへの導入高速化
    機械学習で作成したモデルをTorchServeによってAPI提供することができるため,サービス導入までのリードタイムを短縮することが可能となります

  • モデル構築部分へのリソース注力
    実際のビジネス現場ではモデルを構築して終わりではなく,それを活用してオペーレーションを最適化したり,ユーザに価値を届けたりします.そのため,モデル以外にもソフトウェア開発等にリソースを割く必要があるのですが.TorchServeによってその一部が簡略化できるためモデル部分にリソースを集中することができるようになります

  • オペレーションにおけるモデル運用の効率化
    複数のモデルを統一されたプラットフォームでサービングやモデル管理ができるため,運用時のオペレーションを効率化することが可能になります

  • アナリストによるソフトウェア開発への介入
    データサイエンティストがソフトウェア開発の領域に介入することを容易にするため,データサイエンティストがよりシームレスな顧客へのデリバリを可能にすることができます

このように大きなメリットがあるので小さいデメリットがいくつかあったとしても導入を検討する価値はあると思います.モデルをサービングする必要がある場合の実現方法の1つとして覚えておいて損はないはずです.

事前準備

TorchServeによるサービングを実践する前にモデル作成に必要なデータとTorchServeのインストール方法について記載します.

利用データ

今回利用するデータはUCI machine learning repositoryで公開されているadultデータセットになります.
このデータセットはユーザ属性と当該ユーザの年収が50Kを超えるかどうかを示すラベルのデータ内容から構成されています.そのため,分類タスクのデータセットとして一般的に用いられてます.これを使って予測モデルを以降で構築していこうと思います.

archive.ics.uci.edu

adultデータセットに含まれる属性値とその説明について簡単に以下にまとめておきます.

カラム名 説明 変数 補足
age 年齢 連続値 -
workclass 労働階級 カテゴリ変数 8種類
fnlwgt 国勢調査の人口重み 連続値 -
education 学歴 カテゴリ変数 16種類
education-num 教育期間 連続値 -
marital-status 世帯 カテゴリ変数 7種類
occupation 職業 カテゴリ変数 14種類
relationship 続柄 カテゴリ変数 6種類
race 人種 カテゴリ変数 5種類
sex 性別 カテゴリ変数 2種類
capital-gain キャピタルゲイン 連続値 -
capital-loss キャピタルロス 連続値 -
hours-per-week 週当たりの労働時間 連続値 -
native-country 母国 カテゴリ変数 41種類
target 年収ラベル '>50K' = 1, '<=50' = 0 -

インストール

実行環境は以下のようになっています.

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G6042

$ python -V
Python 3.6.9

TorchServeのインストールは以下の pip コマンドでインストールします.

# pytorch インストール
$ pip install torch torchvision

# torchserve インストール
$ pip install torchserve torch-model-archiver

以上で環境構築は終わりです.
次からはモデル学習してTorchServeによる推論サーバをデプロイしていきましょう.

TorchServe実践

ここからが本題のモデルのデプロイ部分になります.
前置きが長くなりましたが,早速実際にモデルをデプロイしていきましょう.

モデル作成

まずデプロイするためのモデルの作成です.
ユーザ属性から年収が500ドルより高い収入を得るか得ないかを2値分類する予測モデルを構築していきます.

  • Pythonライブラリのインポート
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from sklearn.preprocessing import StandardScaler
from sklearn.metrics import auc, roc_curve
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
  • リスト定義
# データフレームに設定するカラムのリスト
cols = [
    'age', 'workclass', 'fnlwgt', 'education', 'education-num', 
    'marital-status', 'occupation', 'relationship', 'race', 'sex',
    'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 
    'target'
]

# 標準化を適応するカラムのリスト
scholar_cols = [
    'age', 'fnlwgt',  'education-num', 'capital-gain',
    'capital-loss', 'hours-per-week',
]

# ダミー化を実施するカラムのリスト
category_cols = [
    'workclass', 'education', 'marital-status', 'occupation',
    'relationship', 'race', 'sex', 'native-country'
]

カラム名のリスト,標準化を実施するカラム名のリスト,ダミー化を実施するカラム名のリストを事前に定義しておきます.後ほど,前処理の工程で利用します.

  • データの読み込み
# UCIのデータセットを読み込み
df = pd.read_csv('../data/adult.data', names=cols, header=None)
# 不完全なデータをNanに置換
df = df.applymap(lambda d: np.nan if d==" ?" else d)
# Nanのレコードを除外
df = df.dropna()

UCIのデータセットを読み込んだ後にデータに含まれている不完全なレコードを除去します.UCIのadultデータセットには欠損データが含まれており.『 ?』のレコードをNanに変換した後に dropna でレコード毎除外しています.
この時点でデータ数が 30162 になっているはずです.

  • 前処理の実施
# targetカラム数値に変換
_, y = np.unique(np.array(df.target), return_inverse=True)
df.loc[:, 'target'] = y

# 連続値を標準化
sc = StandardScaler()
features = sc.fit_transform(df.loc[:, scholar_cols].values)
features
df.loc[:, scholar_cols] = features

# カテゴリ値をダミー化
df = pd.get_dummies(df, columns=category_cols)

処理としては大きく3つの前処理を実施しています.
1つは target 列を学習で扱えるように0と1のラベルに変換します.
そして残りの2つは連続値の標準化とカテゴリ変数のダミー化です.

  • パラメータ定義とデータ分割
# 各種定数
SEED = 42             # シード値
BATCH_SIZE = 32       # バッチサイズ
epochs = 20           # エポック数
learning_rate = 1e-3  # 学習率
device = 'cuda' if torch.cuda.is_available() else 'cpu'
target = 'target'
predictors = [col for col in df.columns if col not in target]
input_num = len(predictors)

# データを学習用とテスト用に分割
X_train, X_test, y_train, y_test = train_test_split(df[predictors], df[target], test_size=0.2, random_state=SEED)

学習で利用するパラメータを定義します.
バッチサイズは32でエポック数は20としています.また,全データのうちの2割をテストデータとして分割します.

  • DatasetクラスとModuleクラスの定義
# 独自データセットの定義
class MyDataset(Dataset):
    def __init__(self, df, target):
        self.dataset = torch.Tensor(df.values)
        self.target = torch.Tensor(target.values).long()
        self.datanum = len(self.dataset)

    def __len__(self):
        return self.datanum
    
    def __getitem__(self, idx):
        out_dataset = self.dataset[idx]
        out_target = self.target[idx]
        return out_dataset, out_target

# ネットワークの定義
class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_num, 64),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, 8),
            nn.ReLU(),
            nn.Linear(8, 2)
        )
    
    def forward(self, x):
        x = self.fc(x)
        return x

Pytorchで学習するためにデータセットクラスを定義します.
利用するために必要なPytorchで独自データセットを定義する方法についてはこちらを参照ください.
ネットワークは単純な5層のMLPを定義して,最終層で2つのパラメータを出力するようにしています.

  • 学習
# ハイパーパラメータ初期化
criterion = nn.CrossEntropyLoss()
model = MyModel().to(device)
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)

# データローダーの作成
train_dataset = MyDataset(X_train, y_train)
test_dataset = MyDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# 学習
train_loss = []
for epoch in range(0, epochs+1):
    running_loss = 0.0
    running_corrects = 0
    model.train()
    for idx, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == targets.data)

    epoch_loss = running_loss / len(train_loader.dataset)
    epoch_acc = running_corrects.double() / len(train_loader.dataset)
    
    print('{}/{} Loss: {:.4f} Acc: {:.4f}'.format(epoch, epochs, epoch_loss, epoch_acc))

# テストデータによる評価データ取得
y_tests = []
y_preds = []
model.eval()
with torch.no_grad():
    for idx, (inputs, targets) in enumerate(test_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        outputs = F.softmax(outputs, dim=1)
        y_preds.append(outputs[:, 1].detach().cpu().numpy())
        y_tests.append(targets.detach().cpu().numpy())

先ほど定義した MyDataset クラスを用いてPytorchでデータを呼び出すためのデータローダーを定義します.これを用いてモデルの学習とテストデータによる評価データの取得を実施します.

  • 評価とモデル保存
# numpy arrayの結合
y_preds = np.concatenate(y_preds)
y_tests = np.concatenate(y_tests)

# ROC Curveの表示
fpr, tpr, thresholds = roc_curve(y_tests, y_preds)
auc = auc(fpr, tpr)
fig = plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label='ROC curve (area = %.3f)' % auc)
plt.xlabel('FPR: False positive rate')
plt.ylabel('TPR: True positive rate')
plt.title('ROC Curve')
plt.legend()
plt.grid()
plt.show()

# Confusion Matrixの表示
matrix_data = confusion_matrix(y_tests, np.round(y_preds), labels=[0, 1])
plt.figure(figsize = (10,7))
sns.heatmap(matrix_data, annot=True, cmap='Blues', fmt='4g')
plt.show()

# モデル保存
input_tensor = torch.rand(1, input_num)
export_model = torch.jit.trace(model, input_tensor)
export_model.save('../models/model.pth')

テストデータによって取得した評価データを用いてAUCと混合分布は以下の通りになりました. AUCは91.3%とかなり高精度のモデルになっていることを確認できました.
また,混合分布からもTrue PositiveとFalse Negativeに大半は分類できていることが見て取れます.
モデル保存は torch.save を用いてモデル全体の情報を保持する形でデータ出力をしています.こちらに関しては別の記事にて詳細にまとめていこうと思います.

f:id:takaherox:20201230160507p:plainf:id:takaherox:20201230160512p:plain
モデル評価のためのグラフ

次からはここで作成したモデルをTorchServeを用いてデプロイしていきましょう.

デプロイ

本題のTorchServeにてモデルのデプロイをします.
大きな流れは以下の通りです.

  • handlerの記述
  • 作成した学習済みモデルのアーカイブ
  • TorchServeでのデプロイ

まず始めにhandlerと呼ばれる推論ロジックを定義するための custom_handler.py ファイルを作成します.このファイルではモデルの読み込みやリクエストで受け取ったデータの前処理等を記述することができます.TorchServeではこのhandlerがエントリポイントになるため実行時にここで記述された内容が実行されます.デフォルトでいくつかのhandlerが用意されていますが,独自で作成したモデルには適応できないためカスタムhandlerを利用します.

今回作成したファイルは以下の通りです.
カスタムhandlerではBaseHandlerクラスを継承して独自の処理内容を記述したファイルを作成します.この際に initializehandle メソッドは必ず定義する必要があるので注意してください. 以下ではpreprocess メソッドでは前処理を postprocess 後処理の内容をオーバーライドして記述しています.

import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from ts.torch_handler.base_handler import BaseHandler

class ModelHandler(BaseHandler):
    def __init__(self):
        self.manifest = None
        self._context = None
        self.initialized = False
        self.model = None
        self.device = None

    def initialize(self, context):
        self.manifest = context.manifest
        properties = context.system_properties
        model_dir = properties.get("model_dir")
        self.device = torch.device("cuda:" + str(properties.get("gpu_id")) if torch.cuda.is_available() else "cpu")

        # Read model serialize/pt file
        serialized_file = self.manifest['model']['serializedFile']
        model_pt_path = os.path.join(model_dir, serialized_file)
        if not os.path.isfile(model_pt_path):
            raise RuntimeError("Missing the model.pt file")
        
        self.model = torch.load(model_pt_path)
        self.initialized = True

    def preprocess(self, data):
        preprocessed_data = data[0]['body']['data']
        if preprocessed_data is None:
            preprocessed_data = data[0].get("data")
        return torch.FloatTensor([preprocessed_data])

    def inference(self, model_input):
        # Do some inference call to engine here and return output
        model_output = self.model.forward(model_input)
        return model_output

    def postprocess(self, inference_output):
        probs = F.softmax(inference_output, dim=1)
        results = [
            {
                "<=50K":  prob[0],
                ">50K": prob[1]
            } 
            for prob in probs.tolist()
        ]
        return results
    
    def handle(self, data, context):
        model_input = self.preprocess(data)
        model_output = self.inference(model_input)
        return self.postprocess(model_output)

続いて torch-model-archiver を利用してモデルをTorchServeで扱う .mar 形式に変換します.handler オプションで先ほど作成した custom_handler.py を指定してアーカイブします.アーカイブされた .mar ファイルは export-path で指定したディレクトリに<model-name>.mar のファイル名で出力されます.以下のコマンドではmodel_storeディレクトリ配下にsevremodel.marファイルが出力されているはずです.
オプションの細かい説明は公式ドキュメントをご覧ください.

$ mkdir model_store
$ torch-model-archiver --model-name servemodel \
                       --version 1.0 \
                       --serialized-file models/model.pth \
                       --handler handlers/custom_handler.py \
                       --export-path model_store

最後に torchserve コマンドによってデプロイを実行します.
先ほどアーカイブした servemodel.mar を指定してください. これでデプロイは完了です.

$ torchserve --start \
             --ncs \
             --model-store model_store \
             --models servemodel.mar

動作確認はヘルスチェック用のエンドポイント /ping に対してリクエストを投げてレスポンス内容を確認してください.statusに Healthy が返って来れば正常に動作しています.

$ curl curl http://127.0.0.1:8080/ping
{
  "status": "Healthy"
}

推論リクエス

さて,TorchServeによってデプロイしたモデルに対して推論を実施してみましょう.

TorchServeのデフォルトの設定では2つのAPIサービスが立ち上がり,ポート番号8080ではInference APIが,ポート番号8081ではManagement APIが利用できるようになります.Inference APIで推論を行うためのエンドポイントは /predictions/<model name>/<version>となるため,今回デプロイしたモデル名を指定して /predictions/servemodel/1.0/ に対してPOSTリクエストを投げて結果を確認してみます.

$ curl -X POST -H 'Content-Type: application/json' -d '{"data": [0.030671, -1.063611, 1.134739, 0.148453, -0.21666, -0.035429, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]}' http://127.0.0.1:8080/predictions/servemodel/1.0/

{
  "<=50K": 0.8627053499221802,
  ">50K": 0.13729462027549744
}

無事に推論が行えて正常にモデルがデプロイできていますね.
詳細な設定によるカスタマイズやバッチ推論等も行えるようなので自身の利用用途に合わせて柔軟に対応させることができると思います.是非皆様も活用してみてください.

まとめ

今回はPytorchで構築したモデルをサービングするためのTorchServeについてご紹介しました.開発が盛んなPytorchコミュニティですので今後の発展が楽しみですね.
また,まだPytorchに関してそんなに詳しくないという方については少し内容が難しかったかと思います.そう言った方には以下のような書籍で学習することをおすすめします.こちらは私も日々参考に利用している書籍で,様々なアルゴリズムの実装例ととフレームワークの詳細な説明があるため1冊で網羅的にPytorchに関する学習をすることが可能です.