1. 概要
PyTorch Lightningを使って転移学習による画像分類を実装していきたいと思います。
今回はVGG16を使ってモデルを実装していきます。
チュートリアルを見るといろいろな設定ができますが、すでにあるPyTorchの全体感を損ねないように学習時のループ処理をPyTorch Lightningに置き換える形で実装しました。
PyTorch Lightningを使用しないで転移学習を実装した記事もあるので、良ければ参考にしながら見ていただければと思います。
2. モデル化の流れ
PyTorchは次の流れでモデル化していけば大きく間違えることはないかと思います。
- 準備
- データ準備
- 前処理
- Datasetの作成
- DataLoaderの作成
- Lightningモジュールの定義
- ネットワークの定義
- 損失関数の定義
- 最適化手法の定義
- 学習・評価
- 学習処理の設定
- 学習と予測の実行
では流れに沿って実装していきたいと思います。
3. 準備
3.1. データ準備
今回使用するデータは、犬のデータセットを使用して画像分類を試みます。
まずは下記のサイトにアクセスして画像データをダウンロードします。
http://vision.stanford.edu/aditya86/ImageNetDogs/
「Images」のリンクをクリックすれば、画像データのダウンロードが開始されます。
ダウンロードしたデータを解凍すると、120種類の犬種がディレクトリ別に格納されています。
すべてのデータを使うと計算リソースを必要とするので、
次の5犬種に絞ってモデルを構築していきます。
・チワワ(Chihuahua)
・シーズー(Shih-Tzu)
・ボルゾイ(borzoi)
・パグ(pug)
・グレートデン(Great-Dane)
ディレクトリ構成は下記のようにしてあります。
├── Images # 画像データ │ ├── n02085620-Chihuahua │ ├── n02086240-Shih-Tzu │ ├── n02090622-borzoi │ ├── n02109047-Great_Dane │ └── n02110958-pug └── PyTorchによる転移学習の実装.ipynb # 実装スクリプト
最後に今回使用するライブラリです。
# ライブラリの読み込み import os from PIL import Image import torch import torch.utils.data as data import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import models, transforms import pytorch_lightning as pl from pytorch_lightning.metrics.functional import accuracy import matplotlib.pyplot as plt %matplotlib inline
3.2. 前処理
Datasetにデータを渡す準備として、前処理をしていきます。
まずは、用意したデータセットを学習データと検証データに分割します。
格納されているデータ数は各犬種ごとに異なるので、それぞれ学習データを80%、検証データを20%にします。
そして、学習データ、検証データそれぞれのファイルパスを格納したリストを用意します。
def make_filepath_list(): """ 学習データ、検証データそれぞれのファイルへのパスを格納したリストを返す Returns ------- train_file_list: list 学習データファイルへのパスを格納したリスト valid_file_list: list 検証データファイルへのパスを格納したリスト """ train_file_list = [] valid_file_list = [] for top_dir in os.listdir('./Images/'): file_dir = os.path.join('./Images/', top_dir) file_list = os.listdir(file_dir) # 各犬種ごとに8割を学習データ、2割を検証データとする num_data = len(file_list) num_split = int(num_data * 0.8) train_file_list += [os.path.join('./Images', top_dir, file).replace('\\', '/') for file in file_list[:num_split]] valid_file_list += [os.path.join('./Images', top_dir, file).replace('\\', '/') for file in file_list[num_split:]] return train_file_list, valid_file_list # 画像データへのファイルパスを格納したリストを取得する train_file_list, valid_file_list = make_filepath_list() print('学習データ数 : ', len(train_file_list)) # 先頭3件だけ表示 print(train_file_list[:3]) print('検証データ数 : ', len(valid_file_list)) # 先頭3件だけ表示 print(valid_file_list[:3])
# Output 学習データ数 : 696 ['./Images/n02085620-Chihuahua/n02085620_10074.jpg', './Images/n02085620-Chihuahua/n02085620_10131.jpg', './Images/n02085620-Chihuahua/n02085620_10621.jpg'] 検証データ数 : 177 ['./Images/n02085620-Chihuahua/n02085620_588.jpg', './Images/n02085620-Chihuahua/n02085620_5927.jpg', './Images/n02085620-Chihuahua/n02085620_6295.jpg']
次に画像データに対する前処理(resizeなど)の処理を記述したクラスを作成します。
このクラスに画像データを通せば、指定した前処理を施したデータがえられます。
{train: ~, valid: ~}としている理由は、学習時と推論時で実施する前処理を変えるためです。
学習時にはモデルの性能を高めるために、データオーグメンテーション用の前処理を加えていますが、推論時にはデータオーグメンテーションする必要がないため、画像のサイズを整えるなどの処理だけにとどめています。
class ImageTransform(object): """ 入力画像の前処理クラス 画像のサイズをリサイズする Attributes ---------- resize: int リサイズ先の画像の大きさ mean: (R, G, B) 各色チャンネルの平均値 std: (R, G, B) 各色チャンネルの標準偏差 """ def __init__(self, resize, mean, std): self.data_trasnform = { 'train': transforms.Compose([ # データオーグメンテーション transforms.RandomHorizontalFlip(), # 画像をresize×resizeの大きさに統一する transforms.Resize((resize, resize)), # Tensor型に変換する transforms.ToTensor(), # 色情報の標準化をする transforms.Normalize(mean, std) ]), 'valid': transforms.Compose([ # 画像をresize×resizeの大きさに統一する transforms.Resize((resize, resize)), # Tensor型に変換する transforms.ToTensor(), # 色情報の標準化をする transforms.Normalize(mean, std) ]) } def __call__(self, img, phase='train'): return self.data_trasnform[phase](img) # 動作確認 img = Image.open('./Images/n02085620-Chihuahua/n02085620_199.jpg') # リサイズ先の画像サイズ resize = 300 # 今回は簡易的に(0.5, 0.5, 0.5)で標準化 mean = (0.5, 0.5, 0.5) std = (0.5, 0.5, 0.5) transform = ImageTransform(resize, mean, std) img_transformed = transform(img, 'train') plt.imshow(img) plt.show() plt.imshow(img_transformed.numpy().transpose((1, 2, 0))) plt.show()
実際にImageTransformクラスを実行してみると、画像がリサイズされ色も標準化されていることがわかります。
加えて、phaseパラメータをtrainに設定して何回か実行すると、画像が左右反転したデータも出力されることがわかります。
3.3. Datasetの作成
PyTorchのDatasetクラスを継承させたクラスを作ります。
処理を記述する箇所は「__len__」「__getitem__」の二つです。
「__len__」にはDatasetに含まれるデータ数を返す処理を記述します。
「__getitem__」にはindex番号を引数にとり、学習データとラベルデータを返す処理を記述します。
class DogDataset(data.Dataset): """ 犬種のDataseクラス。 PyTorchのDatasetクラスを継承させる。 Attrbutes --------- file_list: list 画像のファイルパスを格納したリスト classes: list 犬種のラベル名 transform: object 前処理クラスのインスタンス phase: 'train' or 'valid' 学習か検証化を設定 """ def __init__(self, file_list, classes, transform=None, phase='train'): self.file_list = file_list self.transform = transform self.classes = classes self.phase = phase def __len__(self): """ 画像の枚数を返す """ return len(self.file_list) def __getitem__(self, index): """ 前処理した画像データのTensor形式のデータとラベルを取得 """ # 指定したindexの画像を読み込む img_path = self.file_list[index] img = Image.open(img_path) # 画像の前処理を実施 img_transformed = self.transform(img, self.phase) # 画像ラベルをファイル名から抜き出す label = self.file_list[index].split('/')[2][10:] # ラベル名を数値に変換 label = self.classes.index(label) return img_transformed, label # 動作確認 # クラス名 dog_classes = [ 'Chihuahua', 'Shih-Tzu', 'borzoi', 'Great_Dane', 'pug' ] # リサイズ先の画像サイズ resize = 300 # 今回は簡易的に(0.5, 0.5, 0.5)で標準化 mean = (0.5, 0.5, 0.5) std = (0.5, 0.5, 0.5) # Datasetの作成 train_dataset = DogDataset( file_list=train_file_list, classes=dog_classes, transform=ImageTransform(resize, mean, std), phase='train' ) valid_dataset = DogDataset( file_list=valid_file_list, classes=dog_classes, transform=ImageTransform(resize, mean, std), phase='valid' ) index = 0 print(train_dataset.__getitem__(index)[0].size()) print(train_dataset.__getitem__(index)[1])
# Output torch.Size([3, 300, 300]) 0
3.4. DataLoaderの作成
バッチ処理を適用するためにDataLoaderを作成して、データセットをバッチ単位で取り出せるようにします。
のちの学習のために、学習用のDataLoaderと検証用のDataLoaderを用意しています。
# バッチサイズの指定 batch_size = 64 # DataLoaderを作成 train_dataloader = data.DataLoader( train_dataset, batch_size=batch_size, shuffle=True) valid_dataloader = data.DataLoader( valid_dataset, batch_size=32, shuffle=False) # 動作確認 # イテレータに変換 batch_iterator = iter(train_dataloader) # 1番目の要素を取り出す inputs, labels = next(batch_iterator) print(inputs.size()) print(labels)
# Output torch.Size([32, 3, 300, 300]) tensor([2, 0, 2, 4, 4, 3, 4, 4, 3, 3, 1, 1, 3, 4, 4, 0, 1, 4, 0, 4, 3, 1, 1, 0, 4, 4, 3, 0, 1, 1, 1, 3])
4. Lighgningモジュールの定義
Lightningモジュールを定義するためには、pl.LightningModule
を継承させたクラスを作成します。
このLightningモジュールの中にネットワーク、損失関数、最適化関数などを定義していきます。
class Net(pl.LightningModule): # この中にネットワークの定義を記述していく # ~
4.1. ネットワークの定義
「__init__」の中に親クラスのコンストラクタの呼び出しと使用する層を記述して、「forward」の中に順伝播処理を記述していきます。
class Net(pl.LightningModule): # ネットワークで使用する層を記述する def __init__(self): super().__init__() vgg16 = models.vgg16(pretrained=True) # 出力層を今回のタスクに適応できるように変更する vgg16.classifier[6] = nn.Linear(in_features=4096, out_features=5) self.vgg16 = vgg16 # 学習させるパラメータ名 update_param_names = ['classifier.6.weight', 'classifier.6.bias'] # 学習させるパラメータ以外は勾配計算をなくし、変化しないように設定 for name, param in self.vgg16.named_parameters(): if name in update_param_names: param.requires_grad = True else: param.requires_grad = False # 順伝播処理を記述する def forward(self, x): x = self.vgg16(x) return x net = Net() print(net)
# Output Net( (vgg16): VGG( (features): Sequential( (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (1): ReLU(inplace=True) (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (3): ReLU(inplace=True) (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False) (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (6): ReLU(inplace=True) (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (8): ReLU(inplace=True) (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False) (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (11): ReLU(inplace=True) (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (13): ReLU(inplace=True) (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (15): ReLU(inplace=True) (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False) (17): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (18): ReLU(inplace=True) (19): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (20): ReLU(inplace=True) (21): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (22): ReLU(inplace=True) (23): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False) (24): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (25): ReLU(inplace=True) (26): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (27): ReLU(inplace=True) (28): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (29): ReLU(inplace=True) (30): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False) ) (avgpool): AdaptiveAvgPool2d(output_size=(7, 7)) (classifier): Sequential( (0): Linear(in_features=25088, out_features=4096, bias=True) (1): ReLU(inplace=True) (2): Dropout(p=0.5, inplace=False) (3): Linear(in_features=4096, out_features=4096, bias=True) (4): ReLU(inplace=True) (5): Dropout(p=0.5, inplace=False) (6): Linear(in_features=4096, out_features=5, bias=True) ) ) )
「__init__」の中を少し詳しく解説します。
まず最初に、学習済みモデルを今回のタスクに適応させるために最後の出力層を書き換えています。
モデルは内部的に、(features)、(avgpool)、(classifier)と分かれているので、リストを操作する感じで操作できます。
書き換える際は (classifier) の (6) の nn.Linear(in_features=4096, out_features=1000, bias=True)
をnn.Linear(in_features=4096, out_features=5, bias=True)
に書き換えます。
実際に変更前と変更後を確認すると次のようになります。
print('変更前 : ', net.vgg16.classifier[6]) net.vgg16.classifier[6] = nn.Linear(in_features=4096, out_features=5) print('変更前 : ', net.vgg16.classifier[6])
# Output 変更前 : Linear(in_features=4096, out_features=1000, bias=True) 変更前 : Linear(in_features=4096, out_features=5, bias=True)
次に、今回は転移学習を実装するので、更新するモデルの重みを指定していきます。
転移学習の場合、最後の出力層のみ重みを更新させ、それ以外の層は更新させないようにします。
PyTorchの場合、各層の重みに対して更新させるかどうかを「requires_grad」で指定します。
Trueであれば更新、Falseであれば固定となります。
まずは重みがどのように格納されているか確認します。
list(net.vgg16.classifier[6].named_parameters())
# Output [('weight', Parameter containing: tensor([[-0.0094, 0.0072, 0.0016, ..., 0.0025, -0.0076, -0.0001], [ 0.0144, -0.0069, 0.0046, ..., -0.0141, 0.0110, 0.0079], [ 0.0110, -0.0037, -0.0114, ..., -0.0063, 0.0045, 0.0155], [-0.0033, -0.0076, -0.0147, ..., 0.0011, -0.0067, -0.0145], [ 0.0147, 0.0143, 0.0108, ..., 0.0137, -0.0088, -0.0138]], requires_grad=True)), ('bias', Parameter containing: tensor([ 0.0076, -0.0093, -0.0136, -0.0016, -0.0076], requires_grad=True))]
出力から確認できるようにweight、biasが格納されているので、この二つの変数に対して requires_grad を設定していきます。
net.named_parameters()と実行すると、各層の名前と重みが得られます。
# 出力内容の確認 for name, param in net.vgg16.named_parameters(): print('name : ', name)
# Output name : features.0.weight name : features.0.bias name : features.2.weight name : features.2.bias ~~~~~~ name : classifier.3.weight name : classifier.3.bias name : classifier.6.weight name : classifier.6.bias
これを利用して、指定した層以外のrequires_grad
をFalseにしています。
# 学習させるパラメータ名 update_param_names = ['classifier.6.weight', 'classifier.6.bias'] # 学習させるパラメータ以外は勾配計算をなくし、変化しないように設定 for name, param in net.vgg16.named_parameters(): if name in update_param_names: param.requires_grad = True else: param.requires_grad = False
これでネットワークの定義が完了です。
4.2. 損失関数の定義
Lightningモジュールの中のtraining_step関数
に損失関数を定義していきます。
これは各バッチ内での処理に相当します。
各epochの終了時に精度を確認したいので、training_epoch_end関数
も同時に定義しています。
損失関数はクロスエントロピー誤差を使用します。
class Net(nn.Module): # ネットワークで使用する層を記述する def __init__(self): # ~~ # 順伝播処理を記述する def forward(self, x): # ~~ # 学習の際に使用する損失関数を記述している def training_step(self, batch, batch_idx): x, y = batch y_hat = self(x) loss = F.cross_entropy(y_hat, y) return {'loss': loss, 'y_hat': y_hat, 'y': y, 'batch_loss': loss.item() * x.size(0)} # 各エポック終了時の処理を記述している def training_epoch_end(self, train_step_outputs): y_hat = torch.cat([val['y_hat'] for val in train_step_outputs], dim=0) y = torch.cat([val['y'] for val in train_step_outputs], dim=0) epoch_loss = sum([val['batch_loss'] for val in train_step_outputs]) / y_hat.size(0) preds = torch.argmax(y_hat, dim=1) acc = accuracy(preds, y) self.log('train_loss', epoch_loss, prog_bar=True, on_epoch=True) self.log('train_acc', acc, prog_bar=True, on_epoch=True) print('-------- Current Epoch {} --------'.format(self.current_epoch + 1)) print('train Loss: {:.4f} train Acc: {:.4f}'.format(epoch_loss, acc)) def validation_step(self, batch, batch_idx): x, y = batch y_hat = self(x) loss = F.cross_entropy(y_hat, y) return {'y_hat': y_hat, 'y': y, 'batch_loss': loss.item() * x.size(0)} def validation_epoch_end(self, val_step_outputs): # x_hatを一つにまとめる y_hat = torch.cat([val['y_hat'] for val in val_step_outputs], dim=0) y = torch.cat([val['y'] for val in val_step_outputs], dim=0) epoch_loss = sum([val['batch_loss'] for val in val_step_outputs]) / y_hat.size(0) preds = torch.argmax(y_hat, dim=1) acc = accuracy(preds, y) self.log('val_loss', epoch_loss, prog_bar=True, on_epoch=True) self.log('val_acc', acc, prog_bar=True, on_epoch=True) print('valid Loss: {:.4f} valid Acc: {:.4f}'.format(epoch_loss, acc))
関数の内容を少し詳しく解説します。
元のPyTorchのコードに照らしあわせると、それぞれ関数は次の位置に当たります。
training_step関数
は各バッチ単位に相当し、training_epoch_end関数
は各エポック単位に相当します。
model = Model() model.train() torch.set_grad_enabled(True) for epoch in epochs: outputs = [] for batch in data: x, y = batch # traninig_step y_hat = model(x) # traninig_step loss = loss(y_hat, x) # traninig_step outputs.append({'loss': loss, 'y_hat', y_hat, 'y': y}) # traninig_step # 学習処理が続く... total_loss = outputs.mean() # training_epoch_end
training_step関数
では損失関数を定義し、lossを戻り値に指定しています。
この際にloss単体を戻り値に指定するか、lossというKeyを持つ辞書を返す必要があるようです。
つまりは、↓のように書くか、
def training_step(self, batch, batch_idx): # ~~ return loss
または、↓のような感じにする必要があります。
def training_step(self, batch, batch_idx): # ~~ return {'loss': loss, 'x_hat': x_hat, 'y': y, 'batch_loss': loss.item() * x.size(0)}
各エポック終了時に、精度を測るためにtraining_step関数
の戻り値に正解ラベルと予測結果を格納しています。
training_epoch_end関数
では、train_step_outputsに各バッチでの結果がリスト形式に格納されているので、ループを回して精度を計算します。
lossの計算ときにPyTorchの仕様上各バッチ内での平均のlossが計算されます。
loss.item() * x.size()
をすることで、lossにデータ数を掛けてlossの平均から合計に変換しています。
training_epoch_end関数
内で「全データの損失 ÷ データ数」とすることで、モデル全体の損失和を計算しなおす処理を加えています。
validation_step関数
とvalidation_epoch_end関数
は、評価時に使用する関数で基本的にはtraining_step関数
とtraining_epoch_end関数
とやっていることは同じです。
4.3. 最適化手法の定義
Lightningモジュールの中のconfigure_optimizers関数
に最適化手法を定義していきます。
Lightningモジュール自体にパラメータを持っているので、self.parameters()
で最適関数に渡します。
class Net(nn.LightningModule): # ネットワークで使用する層を記述する def __init__(self): # ~~ # 順伝播処理を記述する def forward(self, x): # ~~ # 学習の際に使用する損失関数を記述している def training_step(self, batch, batch_idx): # ~~ # 各エポック終了時の処理を記述している def training_epoch_end(self, train_step_outputs): # ~~ def validation_step(self, batch, batch_idx): # ~~ def validation_epoch_end(self, val_step_outputs): # ~~ # 最適化手法を記述する def configure_optimizers(self): optimizer = optim.SGD(self.parameters(), lr=0.01) return optimizer
5. 学習と予測
ここからがPyTorchであれば、あれこれ学習に関連する処理を書くのですが、PyTorch Lightningであれば簡単にできてしまいます。
5.1. 学習処理の設定
Trainer
に学習に必要なパラメータを指定します。
これだけで学習の設定が可能になります。
# ネットワーク作成 net = Net() # EarlyStoppingの設定 es = pl.callbacks.EarlyStopping(monitor='val_loss') trainer = pl.Trainer( max_epochs=30, callbacks=[es], # GPUを使用する場合 # gpus=2 )
5.2. 学習と予測の実行
学習の実行はtrainer
に必要な変数を渡すだけです。
trainer.fit( net, # ネットワーク train_dataloader=train_dataloader, # 学習データ val_dataloaders=valid_dataloader, # 検証データ )
# Output -------- Current Epoch 1 -------- train Loss: 0.7981 train Acc: 0.7989 valid Loss: 0.3647 valid Acc: 0.9831 -------- Current Epoch 2 -------- train Loss: 0.2802 train Acc: 0.9756 valid Loss: 0.2459 valid Acc: 0.9831 -------- Current Epoch 3 -------- train Loss: 0.2130 train Acc: 0.9799 valid Loss: 0.2019 valid Acc: 0.9718 -------- Current Epoch 4 -------- train Loss: 0.1733 train Acc: 0.9828 valid Loss: 0.1743 valid Acc: 0.9774 -------- Current Epoch 5 -------- train Loss: 0.1512 train Acc: 0.9756 valid Loss: 0.1543 valid Acc: 0.9887 -------- Current Epoch 6 -------- train Loss: 0.1376 train Acc: 0.9784 valid Loss: 0.1420 valid Acc: 0.9831 -------- Current Epoch 7 -------- train Loss: 0.1219 train Acc: 0.9842 valid Loss: 0.1327 valid Acc: 0.9774 -------- Current Epoch 8 -------- train Loss: 0.1126 train Acc: 0.9885 valid Loss: 0.1247 valid Acc: 0.9831 -------- Current Epoch 9 -------- train Loss: 0.1013 train Acc: 0.9871 valid Loss: 0.1180 valid Acc: 0.9831 -------- Current Epoch 10 -------- train Loss: 0.0990 train Acc: 0.9813 valid Loss: 0.1129 valid Acc: 0.9831 -------- Current Epoch 11 -------- train Loss: 0.0847 train Acc: 0.9914 valid Loss: 0.1100 valid Acc: 0.9831 -------- Current Epoch 12 -------- train Loss: 0.0905 train Acc: 0.9871 valid Loss: 0.1071 valid Acc: 0.9774 ~~~~~
以上でPyTorchでのモデル化の流れが完了です。
Lightningモジュールは機能は追加されてはいますが、基本的には同じPyTorchのtorch.nn.Moduleなので、同じように操作することができます。
つまり、予測時はPyTorchと変わらない操作で予測することができます。
# 予測用のダミーデータ x = torch.randn(3, 3, 300, 300) # 予測の実行 preds = net(x)
以上でPyTorchでのモデル化の流れが完了です。
今回は転移学習を使用して画像分類モデルを実装しました。
かなり高い精度で予測ができているようです。
今回は30回程学習させましたが、11回目のループくらいから若干ですが過学習しているように見受けられます。
ループを回す回数は調整が必要かと思われます。
6. 全体のコード
最後に全体のコードをまとめて載せておきます。
# ライブラリの読み込み import os from PIL import Image import matplotlib.pyplot as plt %matplotlib inline import torch import torch.nn as nn import torch.optim as optim import torch.utils.data as data import torchvision from torchvision import models, transforms # 学習データ、検証データへの分割 def make_filepath_list(): """ 学習データ、検証データそれぞれのファイルへのパスを格納したリストを返す Returns ------- train_file_list: list 学習データファイルへのパスを格納したリスト valid_file_list: list 検証データファイルへのパスを格納したリスト """ train_file_list = [] valid_file_list = [] for top_dir in os.listdir('./Images/'): file_dir = os.path.join('./Images/', top_dir) file_list = os.listdir(file_dir) # 各犬種ごとに8割を学習データ、2割を検証データとする num_data = len(file_list) num_split = int(num_data * 0.8) train_file_list += [os.path.join('./Images', top_dir, file).replace('\\', '/') for file in file_list[:num_split]] valid_file_list += [os.path.join('./Images', top_dir, file).replace('\\', '/') for file in file_list[num_split:]] return train_file_list, valid_file_list # 前処理クラス class ImageTransform(object): """ 入力画像の前処理クラス 画像のサイズをリサイズする Attributes ---------- resize: int リサイズ先の画像の大きさ mean: (R, G, B) 各色チャンネルの平均値 std: (R, G, B) 各色チャンネルの標準偏差 """ def __init__(self, resize, mean, std): self.data_trasnform = { 'train': transforms.Compose([ # データオーグメンテーション transforms.RandomHorizontalFlip(), # 画像をresize×resizeの大きさに統一する transforms.Resize((resize, resize)), # Tensor型に変換する transforms.ToTensor(), # 色情報の標準化をする transforms.Normalize(mean, std) ]), 'valid': transforms.Compose([ # 画像をresize×resizeの大きさに統一する transforms.Resize((resize, resize)), # Tensor型に変換する transforms.ToTensor(), # 色情報の標準化をする transforms.Normalize(mean, std) ]) } def __call__(self, img, phase='train'): return self.data_trasnform[phase](img) # Datasetクラス class DogDataset(data.Dataset): """ 犬種のDataseクラス。 PyTorchのDatasetクラスを継承させる。 Attrbutes --------- file_list: list 画像のファイルパスを格納したリスト classes: list 犬種のラベル名 transform: object 前処理クラスのインスタンス phase: 'train' or 'valid' 学習か検証化を設定 """ def __init__(self, file_list, classes, transform=None, phase='train'): self.file_list = file_list self.transform = transform self.classes = classes self.phase = phase def __len__(self): """ 画像の枚数を返す """ return len(self.file_list) def __getitem__(self, index): """ 前処理した画像データのTensor形式のデータとラベルを取得 """ # 指定したindexの画像を読み込む img_path = self.file_list[index] img = Image.open(img_path) # 画像の前処理を実施 img_transformed = self.transform(img, self.phase) # 画像ラベルをファイル名から抜き出す label = self.file_list[index].split('/')[2][10:] # ラベル名を数値に変換 label = self.classes.index(label) return img_transformed, label # Lightningモジュール class Net(pl.LightningModule): # ネットワークで使用する層を記述する def __init__(self): super().__init__() vgg16 = models.vgg16(pretrained=True) # 出力層を今回のタスクに適応できるように変更する vgg16.classifier[6] = nn.Linear(in_features=4096, out_features=5) self.vgg16 = vgg16 # 学習させるパラメータ名 update_param_names = ['classifier.6.weight', 'classifier.6.bias'] # 学習させるパラメータ以外は勾配計算をなくし、変化しないように設定 for name, param in self.vgg16.named_parameters(): if name in update_param_names: param.requires_grad = True else: param.requires_grad = False # 順伝播処理を記述する def forward(self, x): x = self.vgg16(x) return x # 学習の際に使用する損失関数を記述している def training_step(self, batch, batch_idx): x, y = batch y_hat = self(x) loss = F.cross_entropy(y_hat, y) return {'loss': loss, 'y_hat': y_hat, 'y': y, 'batch_loss': loss.item() * x.size(0)} # 各エポック終了時の処理を記述している def training_epoch_end(self, train_step_outputs): y_hat = torch.cat([val['y_hat'] for val in train_step_outputs], dim=0) y = torch.cat([val['y'] for val in train_step_outputs], dim=0) epoch_loss = sum([val['batch_loss'] for val in train_step_outputs]) / y_hat.size(0) preds = torch.argmax(y_hat, dim=1) acc = accuracy(preds, y) self.log('train_loss', epoch_loss, prog_bar=True, on_epoch=True) self.log('train_acc', acc, prog_bar=True, on_epoch=True) print('-------- Current Epoch {} --------'.format(self.current_epoch + 1)) print('train Loss: {:.4f} train Acc: {:.4f}'.format(epoch_loss, acc)) def validation_step(self, batch, batch_idx): x, y = batch y_hat = self(x) loss = F.cross_entropy(y_hat, y) return {'y_hat': y_hat, 'y': y, 'batch_loss': loss.item() * x.size(0)} def validation_epoch_end(self, val_step_outputs): # x_hatを一つにまとめる y_hat = torch.cat([val['y_hat'] for val in val_step_outputs], dim=0) y = torch.cat([val['y'] for val in val_step_outputs], dim=0) epoch_loss = sum([val['batch_loss'] for val in val_step_outputs]) / y_hat.size(0) preds = torch.argmax(y_hat, dim=1) acc = accuracy(preds, y) self.log('val_loss', epoch_loss, prog_bar=True, on_epoch=True) self.log('val_acc', acc, prog_bar=True, on_epoch=True) print('valid Loss: {:.4f} valid Acc: {:.4f}'.format(epoch_loss, acc)) # 最適化手法を記述する def configure_optimizers(self): optimizer = optim.SGD(self.parameters(), lr=0.01) return optimizer # 各種パラメータの用意 # クラス名 dog_classes = [ 'Chihuahua', 'Shih-Tzu', 'borzoi', 'Great_Dane', 'pug' ] # リサイズ先の画像サイズ resize = 300 # 今回は簡易的に(0.5, 0.5, 0.5)で標準化 mean = (0.5, 0.5, 0.5) std = (0.5, 0.5, 0.5) # バッチサイズの指定 batch_size = 64 # エポック数 num_epochs = 30 # 前処理 # 学習データ、検証データのファイルパスを格納したリストを取得する train_file_list, valid_file_list = make_filepath_list() # Datasetの作成 train_dataset = DogDataset( file_list=train_file_list, classes=dog_classes, transform=ImageTransform(resize, mean, std), phase='train' ) valid_dataset = DogDataset( file_list=valid_file_list, classes=dog_classes, transform=ImageTransform(resize, mean, std), phase='valid' ) # DataLoaderの作成 train_dataloader = data.DataLoader( train_dataset, batch_size=batch_size, shuffle=True) valid_dataloader = data.DataLoader( valid_dataset, batch_size=32, shuffle=False) # Lightningモジュールの作成 net = Net() # 学習処理の設定 # EarlyStoppingの設定 es = pl.callbacks.EarlyStopping(monitor='val_loss') trainer = pl.Trainer( max_epochs=30, callbacks=[es], # GPUを使用する場合 # gpus=2 ) # 学習の実行 trainer.fit( net, # ネットワーク train_dataloader=train_dataloader, # 学習データ val_dataloaders=valid_dataloader, # 検証データ )