Resnet复现

RocheL
Feb 10, 2022
Last edited: 2022-8-11
type
Post
status
Published
date
Feb 10, 2022
slug
paper&code
summary
复现15年Resnet论文
tags
PaperLib
category
学习思考
icon
password
Property
Aug 2, 2022 02:15 AM
URL
💡
本文备份此前做过的Resnet复现代码,含注释
 

Resnet

综述

简介

残差神经网络(ResNet)是由微软研究院的何恺明、张祥雨、任少卿、孙剑等人提出的。ResNet 在2015 年的ILSVRC(ImageNet Large Scale Visual Recognition Challenge)中取得了冠军。
残差神经网络的主要贡献是发现了“退化现象(Degradation)”,并针对退化现象发明了 “快捷连接(Shortcut connection)”,极大的消除了深度过大的神经网络训练困难问题。神经网络的“深度”首次突破了100层、最大的神经网络甚至超过了1000层。
论文网址:

复现代码

import numpy as np import pandas as pd from sklearn.metrics import accuracy_score, confusion_matrix, classification_report import matplotlib.pyplot as plt import seaborn as sns import copy import time import torch import torch.nn as nn from torch.optim import Adam import torch.utils.data as Data from torchvision import transforms from torchvision.datasets import FashionMNIST # 处理训练集数据 def train_data_process(): # 加载FashionMNIST数据集 train_data = FashionMNIST(root="../data/fashionmnist", # 数据路径 train=True, # 只使用训练数据集 transform=transforms.Compose([transforms.Resize(size=224), transforms.ToTensor()]), # 把PIL.Image或者numpy.array数据类型转变为torch.FloatTensor类型 # 尺寸为Channel * Height * Width,数值范围缩小为[0.0, 1.0] download=True, # 若本身没有下载相应的数据集,则选择True ) train_loader = Data.DataLoader(dataset=train_data, # 传入的数据集 batch_size=64, # 每个Batch中含有的样本数量 shuffle=False, # 不对数据集重新排序 num_workers=0, # 加载数据所开启的进程数量 ) print("The number of batch in train_loader:", len(train_loader)) # 一共有938个batch,每个batch含有64个训练样本 # 获得一个Batch的数据 for step, (b_x, b_y) in enumerate(train_loader): if step > 0: break batch_x = b_x.squeeze().numpy() # 将四维张量移除第1维,并转换成Numpy数组 # squeeze方法用于删除shape为1的维度 batch_y = b_y.numpy() # 将张量转换成Numpy数组 class_label = train_data.classes # 训练集的标签 class_label[0] = "T-shirt" print("the size of batch in train data:", batch_x.shape) # plt可视化一个Batch的图像 plt.figure(figsize=(12, 5)) #figure(num=None, figsize=None, dpi=None, facecolor=None, edgecolor=None, frameon=True) #num:图像编号或名称,数字为编号 ,字符串为名称 figsize:指定figure的宽和高,单位为英寸; #dpi参数指定绘图对象的分辨率,即每英寸多少个像素,缺省值为80 facecolor:背景颜色 #edgecolor:边框颜色 frameon:是否显示边框 for ii in np.arange(len(batch_y)): plt.subplot(4, 16, ii + 1) #subplot(nrows行数,ncols列数,xy偏移量sharex,sharey,subplot_kw,**fig_kw) #eg.plt.subplot(221) plt.imshow(batch_x[ii, :, :], cmap=plt.cm.gray) plt.title(class_label[batch_y[ii]], size=9) plt.axis("off") plt.subplots_adjust(wspace=0.05) plt.show() return train_loader, class_label # 处理测试集数据 def test_data_process(): test_data = FashionMNIST(root="../data/fashionmnist", # 数据路径 train=False, # 不使用训练数据集 transform=transforms.Compose([transforms.Resize(size=224), transforms.ToTensor()]), # 把PIL.Image或者numpy.array数据类型转变为torch.FloatTensor类型 # 尺寸为Channel * Height * Width,数值范围缩小为[0.0, 1.0] download=False, # 如果前面数据已经下载,这里不再需要重复下载 ) test_loader = Data.DataLoader(dataset=test_data, # 传入的数据集 batch_size=1, # 每个Batch中含有的样本数量 shuffle=True, # 不对数据集重新排序 num_workers=0, # 加载数据所开启的进程数量 ) # 获得一个Batch的数据 for step, (b_x, b_y) in enumerate(test_loader): if step > 0: break batch_x = b_x.squeeze().numpy() # 将四维张量移除第1维,并转换成Numpy数组 batch_y = b_y.numpy() # 将张量转换成Numpy数组 print("The size of batch in test data:", batch_x.shape) return test_loader class Residual(nn.Module): def __init__(self, in_channels, out_channels, use_1x1conv=False, stride=1): # 输入通道数,输出通道数,跨层使能1x1卷积,步长 super(Residual, self).__init__() #继承初始化 self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride) # 定义第一个卷积块 self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1) # 定义第二个卷积块 # 定义1x1卷积块 if use_1x1conv: self.conv3 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride) else: self.conv3 = None # Batch归一化 self.bn1 = nn.BatchNorm2d(out_channels) self.bn2 = nn.BatchNorm2d(out_channels) # 定义前向传播路径 def forward(self, x): y = nn.functional.relu(self.bn1(self.conv1(x))) y = self.bn2(self.conv2(y)) if self.conv3: x = self.conv3(x) #残差加和 return nn.functional.relu(y + x) def resnet_block(in_channels, out_channels, num_residuals, first_block=False): if first_block: assert in_channels == out_channels # 第一个模块的通道数同输入通道数一致 blk = [] for i in range(num_residuals): if i == 0 and not first_block: blk.append(Residual(in_channels, out_channels, use_1x1conv=True, stride=2)) else: blk.append(Residual(out_channels, out_channels)) return nn.Sequential(*blk) # 定义一个全局平均池化层 class GlobalAvgPool2d(nn.Module): def __init__(self): super(GlobalAvgPool2d, self).__init__() def forward(self, x): return nn.functional.avg_pool2d(x, kernel_size=x.size()[2:]) # 池化窗口形状等于输入图像的形状 # 定义ResNet网络结构 def ResNet(): net = nn.Sequential( nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(kernel_size=3, stride=2, padding=1)) net.add_module("resnet_block1", resnet_block(64, 64, 2, first_block=True)) net.add_module("resnet_block2", resnet_block(64, 128, 2)) net.add_module("resnet_block3", resnet_block(128, 256, 2)) net.add_module("resnet_block4", resnet_block(256, 512, 2)) net.add_module("global_avg_pool", GlobalAvgPool2d()) # GlobalAvgPool2d的输出: (Batch, 512, 1, 1) net.add_module("fc", nn.Sequential(nn.Flatten(), nn.Linear(512, 10))) return net # 定义网络的训练过程 def train_model(model, traindataloader, train_rate, criterion, device, optimizer, num_epochs=25): ''' :param model: 网络模型 :param traindataloader: 训练数据集,会切分为训练集和验证集 :param train_rate: 训练集batch_size的百分比 :param criterion: 损失函数 :param device: 运行设备 :param optimizer: 优化方法 :param num_epochs: 训练的轮数 ''' batch_num = len(traindataloader) # batch数量 train_batch_num = round(batch_num * train_rate) # 将80%的batch用于训练,round()函数四舍五入 best_model_wts = copy.deepcopy(model.state_dict()) # 复制当前模型的参数 # 初始化参数 best_acc = 0.0 # 最高准确度 train_loss_all = [] # 训练集损失函数列表 train_acc_all = [] # 训练集准确度列表 val_loss_all = [] # 验证集损失函数列表 val_acc_all = [] # 验证集准确度列表 since = time.time() # 当前时间 # 进行迭代训练模型 for epoch in range(num_epochs): print('Epoch {}/{}'.format(epoch, num_epochs - 1)) print('-' * 10) # 初始化参数 train_loss = 0.0 # 训练集损失函数 train_corrects = 0 # 训练集准确度 train_num = 0 # 训练集样本数量 val_loss = 0.0 # 验证集损失函数 val_corrects = 0 # 验证集准确度 val_num = 0 # 验证集样本数量 # 对每一个mini-batch训练和计算 for step, (b_x, b_y) in enumerate(traindataloader): b_x = b_x.to(device) b_y = b_y.to(device) if step < train_batch_num: # 使用数据集的80%用于训练 model.train() # 设置模型为训练模式,启用Batch Normalization和Dropout output = model(b_x) # 前向传播过程,输入为一个batch,输出为一个batch中对应的预测 pre_lab = torch.argmax(output, 1) # 查找每一行中最大值对应的行标 loss = criterion(output, b_y) # 计算每一个batch的损失函数 optimizer.zero_grad() # 将梯度初始化为0 loss.backward() # 反向传播计算 optimizer.step() # 根据网络反向传播的梯度信息来更新网络的参数,以起到降低loss函数计算值的作用 train_loss += loss.item() * b_x.size(0) # 对损失函数进行累加 train_corrects += torch.sum(pre_lab == b_y.data) # 如果预测正确,则准确度train_corrects加1 train_num += b_x.size(0) # 当前用于训练的样本数量 else: # 使用数据集的20%用于验证 model.eval() # 设置模型为评估模式,不启用Batch Normalization和Dropout output = model(b_x) # 前向传播过程,输入为一个batch,输出为一个batch中对应的预测 pre_lab = torch.argmax(output, 1) # 查找每一行中最大值对应的行标 loss = criterion(output, b_y) # 计算每一个batch中64个样本的平均损失函数 val_loss += loss.item() * b_x.size(0) # 将验证集中每一个batch的损失函数进行累加 val_corrects += torch.sum(pre_lab == b_y.data) # 如果预测正确,则准确度val_corrects加1 val_num += b_x.size(0) # 当前用于验证的样本数量 # 计算并保存每一次迭代的成本函数和准确率 train_loss_all.append(train_loss / train_num) # 计算并保存训练集的成本函数 train_acc_all.append(train_corrects.double().item() / train_num) # 计算并保存训练集的准确率 val_loss_all.append(val_loss / val_num) # 计算并保存验证集的成本函数 val_acc_all.append(val_corrects.double().item() / val_num) # 计算并保存验证集的准确率 print('{} Train Loss: {:.4f} Train Acc: {:.4f}'.format(epoch, train_loss_all[-1], train_acc_all[-1])) print('{} Val Loss: {:.4f} Val Acc: {:.4f}'.format(epoch, val_loss_all[-1], val_acc_all[-1])) # 寻找最高准确度 if val_acc_all[-1] > best_acc: best_acc = val_acc_all[-1] # 保存当前的最高准确度 best_model_wts = copy.deepcopy(model.state_dict()) # 保存当前最高准确度下的模型参数 time_use = time.time() - since # 计算耗费时间 print("Train and val complete in {:.0f}m {:.0f}s".format(time_use // 60, time_use % 60)) # 选择最优参数 model.load_state_dict(best_model_wts) # 加载最高准确度下的模型参数 train_process = pd.DataFrame(data={"epoch": range(num_epochs), "train_loss_all": train_loss_all, "val_loss_all": val_loss_all, "train_acc_all": train_acc_all, "val_acc_all": val_acc_all} ) # 将每一代的损失函数和准确度保存为DataFrame格式 # 显示每一次迭代后的训练集和验证集的损失函数和准确率 plt.figure(figsize=(12, 4)) plt.subplot(1, 2, 1) plt.plot(train_process['epoch'], train_process.train_loss_all, "ro-", label="Train loss") plt.plot(train_process['epoch'], train_process.val_loss_all, "bs-", label="Val loss") plt.legend() plt.xlabel("epoch") plt.ylabel("Loss") plt.subplot(1, 2, 2) plt.plot(train_process['epoch'], train_process.train_acc_all, "ro-", label="Train acc") plt.plot(train_process['epoch'], train_process.val_acc_all, "bs-", label="Val acc") plt.xlabel("epoch") plt.ylabel("acc") plt.legend() plt.show() return model, train_process # 测试模型 def test_model(model, testdataloader, device): ''' :param model: 网络模型 :param testdataloader: 测试数据集 :param device: 运行设备 ''' # 初始化参数 test_corrects = 0.0 test_num = 0 test_acc = 0.0 # 只进行前向传播计算,不计算梯度,从而节省内存,加快运行速度 with torch.no_grad(): for test_data_x, test_data_y in testdataloader: test_data_x = test_data_x.to(device) test_data_y = test_data_y.to(device) model.eval() # 设置模型为评估模式,不启用Batch Normalization和Dropout output = model(test_data_x) # 前向传播过程,输入为测试数据集,输出为对每个样本的预测 pre_lab = torch.argmax(output, 1) # 查找每一行中最大值对应的行标 test_corrects += torch.sum(pre_lab == test_data_y.data) # 如果预测正确,则准确度val_corrects加1 test_num += test_data_x.size(0) # 当前用于训练的样本数量 test_acc = test_corrects.double().item() / test_num # 计算在测试集上的分类准确率 print("test accuracy:", test_acc) # 模型的训练和测试 def train_model_process(myconvnet): optimizer = torch.optim.Adam(myconvnet.parameters(), lr=0.001) # 使用Adam优化器,学习率为0.001 criterion = nn.CrossEntropyLoss() # 损失函数为交叉熵函数 device = 'cuda' if torch.cuda.is_available() else 'cpu' # GPU加速 train_loader, class_label = train_data_process() # 加载训练集 test_loader = test_data_process() # 加载测试集 myconvnet = myconvnet.to(device) myconvnet, train_process = train_model(myconvnet, train_loader, 0.8, criterion, device, optimizer, num_epochs=25) # 开始训练模型 test_model(myconvnet, test_loader, device) # 使用测试集进行评估 if __name__ == '__main__': model = ResNet() train_model_process(model)
 

参考链接

  • 分步实现resnet 其中【一】含有dataloader编写
  • 整合的文章,一篇介绍完
  • 预处理三部分 比较完备
pytorch数据操作---dataset,dataloader,transform_xys430381_1的博客-CSDN博客
pytorch输入数据PipeLine一般遵循一个"三步走"的策略,一般pytorch 的数据加载到模型的操作顺序是这样的: ① 创建一个 Dataset 对象。必须实现__len__()、 getitem()这两个方法,这里面会用到transform对数据集进行扩充。 ② 创建一个 DataLoader 对象。它是对DataSet对象进行迭代的,一般不需要事先里面的其他方法了。 ③ 循环遍历这个 DataLoader 对象。将img, label加载到模型中进行训练 注意这三个类均在torch.utils.data 中 (第一篇)pytorch数据预处理三剑客之--Dataset,DataLoader 这一篇章已经很明确的说明了如何使用DataSet类和DataLoader类, (第二篇)pytorch数据预处理三剑客之--DataLoader的重要参数collate_fn,sampler 第二篇文章中详细介绍了DataLoader类中的几个重要的常用的参数,如sampler参数、collate_fn参数 (第三篇)pytorch数据预处理三剑客之--Transform 在数据与处理的过程中,还会遇到数据增强、数据裁剪等各种操作,当然这些操作我们可以预先自己来实现,但是pytorch提供了强大的处理工具来对图像进行预处理,这也是本文的重点,详细介绍 torchvision中的transform操作。 多种组合变换有一定的先后顺序,处理PILImage的变换方法(大多数方法)都需要放在ToTensor方法之前, 而处理tensor的方法(比如Normalize方法)就要放在ToTensor方法之后。 接下来介绍transforms中的函数:(从比较重要的开始) ToTensor:convert a PIL image to tensor (HWC) in range [ 0 , 255 ], and then to a torch.Tensor(CHW) in the range [ 0.0 , 1.0 ] Normalize:Normalized an tensor image with mean and standard deviation。 即:用给定的均值和标准差分别对每个通道的数据进行正则化。 具体来说,给定均值(M1,...,Mn),给定标准差(S1,...,Sn),其中n是通道数(一般是3) 自制transform---- 如果自制transform类,则该类的_call函数的输入输出都是PIL格式的图像。 PIL是HWC顺序,并且三个通道为RGB(不是cv2的BGR)。 特别注意, 如果过程中要采用cv2处理图像,只需将PIL转换为np.array形式:np.array(PIL),无需将RGB转换为BGR。- 如果一定要转换颜色通道,也务必在cv2操作完之后,将BGR转回RGB。- 否则,会导致通道顺序与预训练网络见过的通道顺序不一致,性能降低。 实际上,最终tranform返回的,可以显式转换: img = Image.fromarray(img.astype('uint8')).convert('RGB') 也可以只需是HWC,RGB,取值范围在0-55之间的np.array即可。框架会自动转换。 pytorch数据预处理:如何自定义transforms方法 PyTorch 学习笔记(一):让PyTorch读取你的数据集 主要内容: 重点看 getitem函数,getitem接收一个index,然后返回图片数据和标签,这个index通常指的是一个list的index, 这个list的每个元素就包含了图片数据的路径和标签信息。 然而,如何制作这个list呢,通常的方法是将图片的路径和标签信息存储在一个txt中,然后从该txt中读取。 那么读取自己数据的基本流程就是: - 制作存储了图片的路径和标签信息的txt - 将这些信息转化为list,该list每一个元素对应一个样本 - 通过getitem函数,读取数据和标签,并返回数据和标签 pytorch中transform函数详解
pytorch数据操作---dataset,dataloader,transform_xys430381_1的博客-CSDN博客
 
2019FastDepth论文复现(tensorflow)读文献工作流