PyTorch通过提供大量强大的工具和技术,一直在推动计算机视觉和深度学习领域的发展。
在计算机视觉领域,基于深度学习的执行需要处理大量的图像数据集,因此需要一个加速的环境来加快执行过程以达到可接受的精度水平。
PyTorch通过XLA(加速线性代数)提供了这一特性,XLA是一种线性代数编译器,可以针对多种类型的硬件,包括GPU和TPU。PyTorch/XLA环境与Google云TPU集成,实现了更快的执行速度。
在本文中,我们将在PyTorch中使用TPU演示一种深卷积神经网络ResNet50的实现。
该模型将在PyTorch/XLA环境中进行训练和测试,以完成CIFAR10数据集的分类任务。我们还将检查在50个epoch训练所花费的时间。
ResNet50在Pytorch的实现
为了利用TPU的功能,这个实现是在Google Colab中完成的。首先,我们需要从Notebook设置下的硬件加速器中选择TPU。
选择TPU后,我们将使用下面的行验证环境代码:
import os
assert os.environ['COLAB_TPU_ADDR']
如果启用了TPU,它将成功执行,否则它将抛出‘KeyError: ‘COLAB_TPU_ADDR’’。你也可以通过打印TPU地址来检查TPU。
TPU_Path = 'grpc://'+os.environ['COLAB_TPU_ADDR']
print('TPU Address:', TPU_Path)
在下一步中,我们将安装XLA环境以加快执行过程。我们在上一篇文章中实现了卷积神经网络。
VERSION = "20200516"
!curl https://raw.githubusercontent.com/pytorch/xla/master/contrib/scripts/env-setup.py -o pytorch-xla-env-setup.py
!python pytorch-xla-env-setup.py --version $VERSION
现在,我们将在这里导入所有必需的库。
from matplotlib import pyplot as plt
import numpy as np
import os
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch_xla
import torch_xla.core.xla_model as xm
import torch_xla.debug.metrics as met
import torch_xla.distributed.parallel_loader as pl
import torch_xla.distributed.xla_multiprocessing as xmp
import torch_xla.utils.utils as xu
import torchvision
from torchvision import datasets, transforms
import time
from google.colab.patches import cv2_imshow
import cv2
导入库之后,我们将定义并初始化所需的参数。
# 定义参数
FLAGS = {}
FLAGS['data_dir'] = "/tmp/cifar"
FLAGS['batch_size'] = 128
FLAGS['num_workers'] = 4
FLAGS['learning_rate'] = 0.02
FLAGS['momentum'] = 0.9
FLAGS['num_epochs'] = 50
FLAGS['num_cores'] = 8
FLAGS['log_steps'] = 20
FLAGS['metrics_debug'] = False
在下一步中,我们将定义ResNet50模型。
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, in_planes, planes, stride=1):
super(BasicBlock, self).__init__()
self.conv1 = nn.Conv2d(
in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(
planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)
self.shortcut = nn.Sequential()
if stride != 1 or in_planes != self.expansion * planes:
self.shortcut = nn.Sequential(
nn.Conv2d(
in_planes,
self.expansion * planes,
kernel_size=1,
stride=stride,
bias=False), nn.BatchNorm2d(self.expansion * planes))
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += self.shortcut(x)
out = F.relu(out)
return out
class ResNet(nn.Module):
def __init__(self, block, num_blocks, num_classes=10):
super(ResNet, self).__init__()
self.in_planes = 64
self.conv1 = nn.Conv2d(
3, 64, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
self.linear = nn.Linear(512 * block.expansion, num_classes)
def _make_layer(self, block, planes, num_blocks, stride):
strides = [stride] + [1] * (num_blocks - 1)
layers = []
for stride in strides:
layers.append(block(self.in_planes, planes, stride))
self.in_planes = planes * block.expansion
return nn.Sequential(*layers)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = self.layer4(out)
out = F.avg_pool2d(out, 4)
out = torch.flatten(out, 1)
out = self.linear(out)
return F.log_softmax(out, dim=1)
def ResNet50():
return ResNet(BasicBlock, [3, 4, 6, 4, 3])
下面的代码片段将定义加载CIFAR10数据集、准备训练和测试数据集、训练过程和测试过程的函数。
SERIAL_EXEC = xmp.MpSerialExecutor()
# 只在内存中实例化一次模型权重。
WRAPPED_MODEL = xmp.MpModelWrapper(ResNet50())
def train_resnet50():
torch.manual_seed(1)
def get_dataset():
norm = transforms.Normalize(
mean=(0.4914, 0.4822, 0.4465), std=(0.2023, 0.1994, 0.2010))
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
norm,
])
transform_test = transforms.Compose([
transforms.ToTensor(),
norm,
])
train_dataset = datasets.CIFAR10(
root=FLAGS['data_dir'],
train=True,
download=True,
transform=transform_train)
test_dataset = datasets.CIFAR10(
root=FLAGS['data_dir'],
train=False,
download=True,
transform=transform_test)
return train_dataset, test_dataset
# 使用串行执行器可以避免多个进程
# 下载相同的数据。
train_dataset, test_dataset = SERIAL_EXEC.run(get_dataset)
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset,
num_replicas=xm.xrt_world_size(),
rank=xm.get_ordinal(),
shuffle=True)
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=FLAGS['batch_size'],
sampler=train_sampler,
num_workers=FLAGS['num_workers'],
drop_last=True)
test_loader = torch.utils.data.DataLoader(
test_dataset,
batch_size=FLAGS['batch_size'],
shuffle=False,
num_workers=FLAGS['num_workers'],
drop_last=True)
# 将学习率缩放
learning_rate = FLAGS['learning_rate'] * xm.xrt_world_size()
# 获取损失函数、优化器和模型
device = xm.xla_device()
model = WRAPPED_MODEL.to(device)
optimizer = optim.SGD(model.parameters(), lr=learning_rate,
momentum=FLAGS['momentum'], weight_decay=5e-4)
loss_fn = nn.NLLLoss()
def train_loop_fn(loader):
tracker = xm.RateTracker()
model.train()
for x, (data, target) in enumerate(loader):
optimizer.zero_grad()
output = model(data)
loss = loss_fn(output, target)
loss.backward()
xm.optimizer_step(optimizer)
tracker.add(FLAGS['batch_size'])
if x % FLAGS['log_steps'] == 0:
print('[xla:{}]({}) Loss={:.2f} Time={}'.format(xm.get_ordinal(), x, loss.item(), time.asctime()), flush=True)
def test_loop_fn(loader):
total_samples = 0
correct = 0
model.eval()
data, pred, target = None, None, None
for data, target in loader:
output = model(data)
pred = output.max(1, keepdim=True)[1]
correct += pred.eq(target.view_as(pred)).sum().item()
total_samples += data.size()[0]
accuracy = 100.0 * correct / total_samples
print('[xla:{}] Accuracy={:.2f}%'.format(
xm.get_ordinal(), accuracy), flush=True)
return accuracy, data, pred, target
# 训练和评估的循环
accuracy = 0.0
data, pred, target = None, None, None
for epoch in range(1, FLAGS['num_epochs'] + 1):
para_loader = pl.ParallelLoader(train_loader, [device])
train_loop_fn(para_loader.per_device_loader(device))
xm.master_print("Finished training epoch {}".format(epoch))
para_loader = pl.ParallelLoader(test_loader, [device])
accuracy, data, pred, target = test_loop_fn(para_loader.per_device_loader(device))
if FLAGS['metrics_debug']:
xm.master_print(met.metrics_report(), flush=True)
return accuracy, data, pred, target
现在,我们将开始ResNet50的训练。训练将在我们在参数中定义的50个epoch内完成。训练开始前,我们会记录训练时间,训练结束后,我们将打印总时间。
start_time = time.time()
# 启动训练流程
def training(rank, flags):
global FLAGS
FLAGS = flags
torch.set_default_tensor_type('torch.FloatTensor')
accuracy, data, pred, target = train_resnet50()
if rank == 0:
# 检索TPU核心0上的张量并绘制。
plot_results(data.cpu(), pred.cpu(), target.cpu())
xmp.spawn(training, args=(FLAGS,), nprocs=FLAGS['num_cores'],
start_method='fork')
训练结束后,我们会打印训练过程所花费的时间。
最后,在训练过程中,我们将模型对样本测试数据的预测可视化。
end_time = time.time()
print("Time taken = ", end_time-start_time)
本文暂时没有评论,来添加一个吧(●'◡'●)