DETR 源码笔记_CSDN(二)

RocheL
Sep 4, 2022
Last edited: 2022-9-4
type
Post
status
Published
date
Sep 4, 2022
slug
detr_csdn_2
summary
DETR参考CSDNblog的代码注释,主要transformer构建和后处理。DETR源码笔记(一)_在努力的松鼠的博客
tags
engineer
category
技术分享
icon
password
Property
Sep 4, 2022 01:39 AM
URL
DETR参考CSDNblog的代码注释,主要transformer构建和后处理

CSDN

来源:

搭建 Transformer

def build(args): num_classes = 2 if args.dataset_file != 'coco' else 2 if args.dataset_file == "coco_panoptic": num_classes = 2 device = torch.device(args.device) #搭建主干网络 backbone = build_backbone(args) #搭建transformer transformer = build_transformer(args) model = DETR( backbone, transformer, num_classes=num_classes, num_queries=args.num_queries, aux_loss=args.aux_loss, )
看 build_transformer(args):
def build_transformer(args): return Transformer( d_model=args.hidden_dim, dropout=args.dropout, nhead=args.nheads, dim_feedforward=args.dim_feedforward, num_encoder_layers=args.enc_layers, num_decoder_layers=args.dec_layers, normalize_before=args.pre_norm, return_intermediate_dec=True, )
实质调用的 Transformer(),d_model:  transformer 输入通道数, nhead: 多头注意力头数, num_encoder_layer: encoder 层数,num_decoder_layer: decoder 层数, dim_feedforward:前馈网络层输入通道数。
class Transformer(nn.Module): def __init__(self, d_model=512, nhead=8, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=2048, dropout=0.1, activation="relu", normalize_before=False, return_intermediate_dec=False): super().__init__() #d_model transformer模型输入的维数 #编码器一般由多个层组成,层数由num_encoder_layers设置,其中每一层通过TransformerEncoderLayer()设置 #dim_feedforward 前馈网络层的维数 #构建encoder其中一层 encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout, activation, normalize_before) #normalize_before:在多头注意力后和FFN前还是后进行归一化 #在之前做那么最后layer还需要在FFN输出后接一个归一化 encoder_norm = nn.LayerNorm(d_model) if normalize_before else None #构建decoder num_encoder_layers:decoder层数 self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm) #解码器 num_decoder_layers:解码器层数 decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout, activation, normalize_before) decoder_norm = nn.LayerNorm(d_model) #构建解码器 self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm, return_intermediate=return_intermediate_dec) #初始化模型参数 self._reset_parameters() self.d_model = d_model self.nhead = nhead #重置初始化参数操作 def _reset_parameters(self): for p in self.parameters(): if p.dim() > 1: nn.init.xavier_uniform_(p) def forward(self, src, mask, query_embed, pos_embed): # src: transformer输入,mask:图像掩码, query_embed:decoder预测输入embed, pos_embed:位置编码 # flatten NxCxHxW to HWxNxC bs, c, h, w = src.shape #获取encoder输入 src = src.flatten(2).permute(2, 0, 1) #获取位置编码 pos_embed = pos_embed.flatten(2).permute(2, 0, 1) query_embed = query_embed.unsqueeze(1).repeat(1, bs, 1) #获取输入掩码 mask = mask.flatten(1) # torch.zeros_like:生成和括号内变量维度维度一致的全是零的内容。 # tgt初始化,意义为初始化需要预测的目标。因为一开始不清楚需要什么样的目标,所以初始化为0,它会在decoder中 # 不断被refine,但真正在学习的是query embedding,学习到的是整个数据集中目标物体的统计特征。而tgt在每一个epoch都会初始化。 #tgt 也可以理解为上一层解码器的解码输出 shape=(100,N,256) 第一层的tgt=torch.zeros_like(query_embed) 为零矩阵, # query_pos 是可学习输出位置向量, 个人理解 解码器中的这个参数 全局共享 提供全局注意力 query_pos=(100,N,256) tgt = torch.zeros_like(query_embed) #获取encoder输出 memory = self.encoder(src, src_key_padding_mask=mask, pos=pos_embed) #获取decoder输出,return_intermediate_dec为true时,得到decoder每一层的输出 hs = self.decoder(tgt, memory, memory_key_padding_mask=mask, pos=pos_embed, query_pos=query_embed) return hs.transpose(1, 2), memory.permute(1, 2, 0).view(bs, c, h, w)

Encoder

首先用 TransformerEncoderLayer()forward 建立 encoder 中的其中一层,因为每层都是相同的,后面直接复制就行。根据归一化的前后顺序不同有两种构建方式:
class TransformerEncoderLayer(nn.Module): def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation="relu", normalize_before=False): super().__init__() #多头自注意力层,其中包含了embeding,在第一层的多头注意力部分的输入是前面backbone和位置编码器的输出,而在后面其他层,输入则是前一层的输出 #d_model: transformer的特征数,输入的channel数 #nhead 理解为多头注意力的头数,一般为8 self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout) # Implementation of Feedforward model: FFN #nn.Linear(in_features, out_features)全连接层 out_features也是该全连接层的神经元个数 self.linear1 = nn.Linear(d_model, dim_feedforward)#输出维度为dim_feedforward #nn.Dropout(dropout) 在训练的过程中,让神经元以dropout设置的概率随机失去活性,即将神经元的参数变为0,一般用在全连接层,通常是为了防止或减轻过拟合 #注意nn.Dropout(dropout) 会让没有置为0的参数以1/(1-dropout)的scale缩放 self.dropout = nn.Dropout(dropout) self.linear2 = nn.Linear(dim_feedforward, d_model)#再接一个全连接层将输出维度变为d_model #在transformer中,归一化一般使用层归一化nn.LayerNorm(normalized_shape, eps=1e-05, elementwise_affine=True, device=None, dtype=None) # normalized_shape:归一化的维度 eps:加在方差上的数字,避免分母为0 self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) #激活函数 由relu ,gelu ,glu3种,默认使用relu self.activation = _get_activation_fn(activation) #在多头注意力后和FFN前还是后进行归一化 self.normalize_before = normalize_before #加上位置编码 def with_pos_embed(self, tensor, pos: Optional[Tensor]): return tensor if pos is None else tensor + pos #在多头注意力后和FFN后进行归一化 def forward_post(self, src, src_mask: Optional[Tensor] = None, src_key_padding_mask: Optional[Tensor] = None, pos: Optional[Tensor] = None): # 对query和key加上位置编码,value不需要 q = k = self.with_pos_embed(src, pos) # 自注意力层 src2 = self.self_attn(q, k, value=src, attn_mask=src_mask, key_padding_mask=src_key_padding_mask)[0] src = src + self.dropout1(src2) # 自注意力层后归一化 src = self.norm1(src) # FFN src2 = self.linear2(self.dropout(self.activation(self.linear1(src)))) src = src + self.dropout2(src2) # FFN后归一化 src = self.norm2(src) return src # 在多头注意力后和FFN前进行归一化 def forward_pre(self, src, src_mask: Optional[Tensor] = None, src_key_padding_mask: Optional[Tensor] = None, pos: Optional[Tensor] = None): #自注意力层前归一化 src2 = self.norm1(src) #加上位置编码 q = k = self.with_pos_embed(src2, pos) #自注意力层 src2 = self.self_attn(q, k, value=src2, attn_mask=src_mask, key_padding_mask=src_key_padding_mask)[0] src = src + self.dropout1(src2) #FFN前归一化 src2 = self.norm2(src) #FFN src2 = self.linear2(self.dropout(self.activation(self.linear1(src2)))) src = src + self.dropout2(src2) return src def forward(self, src, src_mask: Optional[Tensor] = None, src_key_padding_mask: Optional[Tensor] = None, pos: Optional[Tensor] = None): # 判断在多头注意力后和FFN前还是后进行归一化 if self.normalize_before: return self.forward_pre(src, src_mask, src_key_padding_mask, pos) return self.forward_post(src, src_mask, src_key_padding_mask, pos)
根据前面建立的encoder层来搭建encoder。 self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm)
class TransformerEncoder(nn.Module): def __init__(self, encoder_layer, num_layers, norm=None): super().__init__() #复制encoder_layer直到有num_layers层 encoder_layer #因为encoder每层都是相同的,复制就行了 self.layers = _get_clones(encoder_layer, num_layers) self.num_layers = num_layers self.norm = norm def forward(self, src, mask: Optional[Tensor] = None, src_key_padding_mask: Optional[Tensor] = None, pos: Optional[Tensor] = None): output = src #获得encoder输出 for layer in self.layers: output = layer(output, src_mask=mask, src_key_padding_mask=src_key_padding_mask, pos=pos) if self.norm is not None: output = self.norm(output) return output

Decoder

回到 Transformer(), 接着 decoder 就是和 encoder 类似的操作了
#解码器 num_decoder_layers:解码器层数 decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout, activation, normalize_before) decoder_norm = nn.LayerNorm(d_model) self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm, return_intermediate=return_intermediate_dec)
但需要注意的是,decoder 会多一个输入是 query embedding, query_pos 是可学习输出位置向量, 解码器中的这个参数全局共享,提供全局注意力
我理解它为我们预测的输出,一开始预测就初始化为 0, 需要在 transformer decoder 中不断 refine 它,具体的了解推荐一个源码解析目标检测的跨界之星 DETR(四)、Detection with Transformer - 简书

DETR 搭建

Transformer 搭建完后又回到最上面的 build(args) 函数,紧接着是搭建 DETR 模型,将 Backbone 和 transformer 搭在一起。
model = DETR( backbone, transformer, num_classes=num_classes, num_queries=args.num_queries, aux_loss=args.aux_loss, ) #masks: 是否有分割任务,暂时不考虑 if args.masks: model = DETRsegm(model)
class DETR(nn.Module): """ This is the DETR module that performs object detection """ def __init__(self, backbone, transformer, num_classes, num_queries, aux_loss=False): """ Initializes the model. Parameters: backbone: torch module of the backbone to be used. See backbone.py transformer: torch module of the transformer architecture. See transformer.py num_classes: number of object classes num_queries: number of object queries, ie detection slot. This is the maximal number of objects DETR can detect in a single image. For COCO, we recommend 100 queries. aux_loss: True if auxiliary decoding losses (loss at each decoder layer) are to be used. """ super().__init__() self.num_queries = num_queries self.transformer = transformer hidden_dim = transformer.d_model#transformer输出channel #decoder后再接一个全连接,输出分类结果 self.class_embed = nn.Linear(hidden_dim, num_classes + 1) #利用MLP对框进行回归 self.bbox_embed = MLP(hidden_dim, hidden_dim, 4, 3) #decoder预测输入,每帧预测num_queries个目标 self.query_embed = nn.Embedding(num_queries, hidden_dim) #transformer输入前处理,backbone得到的是num_channels(2048)维度的输出,需要1*1的卷积降维到hidden_dim self.input_proj = nn.Conv2d(backbone.num_channels, hidden_dim, kernel_size=1) self.backbone = backbone self.aux_loss = aux_loss def forward(self, samples: NestedTensor): """ The forward expects a NestedTensor, which consists of: - samples.tensor: batched images, of shape [batch_size x 3 x H x W] - samples.mask: a binary mask of shape [batch_size x H x W], containing 1 on padded pixels It returns a dict with the following elements: - "pred_logits": the classification logits (including no-object) for all queries. Shape= [batch_size x num_queries x (num_classes + 1)] - "pred_boxes": The normalized boxes coordinates for all queries, represented as (center_x, center_y, height, width). These values are normalized in [0, 1], relative to the size of each individual image (disregarding possible padding). See PostProcess for information on how to retrieve the unnormalized bounding box. - "aux_outputs": Optional, only returned when auxilary losses are activated. It is a list of dictionnaries containing the two above keys for each decoder layer. """ if not isinstance(samples, NestedTensor): samples = NestedTensor.from_tensor_list(samples) #backbone得到输出特征图和位置编码 features, pos = self.backbone(samples) #将特征图与掩码分开 src, mask = features[-1].decompose() #得到transformer输出 hs = self.transformer(self.input_proj(src), mask, self.query_embed.weight, pos[-1])[0] #得到分类结果 outputs_class = self.class_embed(hs) #得到目标框位置 outputs_coord = self.bbox_embed(hs).sigmoid() out = {'pred_logits': outputs_class[-1], 'pred_boxes': outputs_coord[-1]} #得到loss if self.aux_loss: out['aux_outputs'] = [{'pred_logits': a, 'pred_boxes': b} for a, b in zip(outputs_class[:-1], outputs_coord[:-1])] return out
#将预测结果与GT进行匹配的匈牙利算法 matcher = build_matcher(args) #loss主要由分类loss,box回归loss,和giou loss组成 #各个loss的权重 weight_dict = {'loss_ce': 1, 'loss_bbox': args.bbox_loss_coef} #5 weight_dict['loss_giou'] = args.giou_loss_coef #2 if args.masks: weight_dict["loss_mask"] = args.mask_loss_coef weight_dict["loss_dice"] = args.dice_loss_coef # TODO this is a hack #当设置了aux_loss,代表需要计算解码器中间层预测结果对应的loss,则也要设置对应的权重 if args.aux_loss: aux_weight_dict = {} for i in range(args.dec_layers - 1): aux_weight_dict.update({k + f'_{i}': v for k, v in weight_dict.items()}) weight_dict.update(aux_weight_dict) losses = ['labels', 'boxes', 'cardinality'] if args.masks: losses += ["masks"] # SetCriterion()构建loss函数 criterion = SetCriterion(num_classes, matcher=matcher, weight_dict=weight_dict, eos_coef=args.eos_coef, losses=losses) criterion.to(device)

LOSS 计算和 GT 匈牙利匹配

DETR 整个模型搭建完成后,回到 build() 中,因为它的预测结果是无序的,是以集合的形式输出,需要准备模型的预测结果与 GT 的匹配函数,来判断 GT 是否被检测分类成功,以进行 loss 计算。匹配函数使用的是匈牙利匹配,一个二分图的最大匹配算法,可以尝试用一下进化版的 KM 匹配算法试一试。
这篇博客讲的挺有趣的
class SetCriterion(nn.Module): """ This class computes the loss for DETR. The process happens in two steps: 1) we compute hungarian assignment between ground truth boxes and the outputs of the model 2) we supervise each pair of matched ground-truth / prediction (supervise class and box) """ def __init__(self, num_classes, matcher, weight_dict, eos_coef, losses): """ Create the criterion. Parameters: num_classes: number of object categories, omitting the special no-object category matcher: module able to compute a matching between targets and proposals weight_dict: dict containing as key the names of the losses and as values their relative weight. eos_coef: relative classification weight applied to the no-object category losses: list of all the losses to be applied. See get_loss for list of available losses. """ super().__init__() self.num_classes = num_classes #种类数 self.matcher = matcher #匹配函数,选择了匈牙利匹配 self.weight_dict = weight_dict #各个loss的权重 self.eos_coef = eos_coef #针对背景分类的loss权重 self.losses = losses #loss结果字典 #设置目标的分类的权重为1,背景的权重为eos_coef empty_weight = torch.ones(self.num_classes + 1) empty_weight[-1] = self.eos_coef #empty_weight存入buffer self.register_buffer('empty_weight', empty_weight) def forward(self, outputs, targets): """ This performs the loss computation. Parameters: outputs: dict of tensors, see the output specification of the model for the format targets: list of dicts, such that len(targets) == batch_size. The expected keys in each dict depends on the losses applied, see each loss' doc """ # targets:GT列表 # [{‘boxes’:...,'labels':...,...},{...},...] # outouts: detr模型输出 # {‘pred_logits’: (b,num_queries, num_classes), # 'pred_boxes': (b,num_queries,4), # 'aux_outputs': [{‘pred_logits’:...,'pred_boxes': ..},{...},...]} # 只取出decoder最后一层的结果 outputs_without_aux = {k: v for k, v in outputs.items() if k != 'aux_outputs'} # Retrieve the matching between the outputs of the last layer and the targets #进行匈牙利匹配得到(单张图片中所有query对应的索引,GT索引)的元组列表indices #ex:[([58,75,92],[1,0,2]),(...),(...),(...)] indices = self.matcher(outputs_without_aux, targets) # Compute the average number of target boxes accross all nodes, for normalization purposes #计算一个batch图像中目标的数量 num_boxes = sum(len(t["labels"]) for t in targets) num_boxes = torch.as_tensor([num_boxes], dtype=torch.float, device=next(iter(outputs.values())).device) #分布式训练时使用 if is_dist_avail_and_initialized(): torch.distributed.all_reduce(num_boxes) # clamp(input,min,max,out)函数的功能将输入input张量每个元素的值压缩到区间[min, max],并返回结果到一个新张量。 num_boxes = torch.clamp(num_boxes / get_world_size(), min=1).item() # Compute all the requested losses #通过get_loss()计算各个loss losses = {} for loss in self.losses: losses.update(self.get_loss(loss, outputs, targets, indices, num_boxes)) # In case of auxiliary losses, we repeat this process with the output of each intermediate layer. # 是否计算中间层多层loss if 'aux_outputs' in outputs: for i, aux_outputs in enumerate(outputs['aux_outputs']): indices = self.matcher(aux_outputs, targets) for loss in self.losses: if loss == 'masks': # Intermediate masks losses are too costly to compute, we ignore them. continue kwargs = {} if loss == 'labels': # Logging is enabled only for the last layer kwargs = {'log': False} l_dict = self.get_loss(loss, aux_outputs, targets, indices, num_boxes, **kwargs) l_dict = {k + f'_{i}': v for k, v in l_dict.items()} losses.update(l_dict) return losses
SetCriterion()计算各个 loss 后返回,最后只对分类损失部分的源码贴了注释,其他的也是类似操作,可以自己看一下。
#分类loss, 用的交叉熵 def loss_labels(self, outputs, targets, indices, num_boxes, log=True): """Classification loss (NLL) targets dicts must contain the key "labels" containing a tensor of dim [nb_target_boxes] """ assert 'pred_logits' in outputs #取出detr输出这的分类结果,key为pred_logits, size为 4*100*6 src_logits = outputs['pred_logits'] #获得batch索引和单帧目标索引 (tensor([0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3]), tensor([58, 75, 92, 75, 92, 58, 75, 92, 58, 75, 92])) 2*11 idx = self._get_src_permutation_idx(indices) #获取对应每个目标的label按indices顺序拼接 tensor([2, 2, 1, 2, 1, 2, 2, 1, 0, 0, 1], device='cuda:0') ,11 target_classes_o = torch.cat([t["labels"][J] for t, (_, J) in zip(targets, indices)]) # 4*100 初始化为背景 target_classes = torch.full(src_logits.shape[:2], self.num_classes, dtype=torch.int64, device=src_logits.device) target_classes[idx] = target_classes_o #交叉熵 loss_ce = F.cross_entropy(src_logits.transpose(1, 2), target_classes, self.empty_weight) losses = {'loss_ce': loss_ce} if log: # TODO this should probably be a separate loss, not hacked in this one here losses['class_error'] = 100 - accuracy(src_logits[idx], target_classes_o)[0] return losses def _get_src_permutation_idx(self, indices): # permute predictions following indices # torch.full_like(input, value),就是将input的形状作为返回结果tensor的形状,值全为value。 #获取对应的目标在batch中的索引tensor([0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3]) batch_idx = torch.cat([torch.full_like(src, i) for i, (src, _) in enumerate(indices)]) #目标在单张图像所有目标中的索引 tensor([58, 75, 92, 75, 92, 58, 75, 92, 58, 75, 92]) src_idx = torch.cat([src for (src, _) in indices]) return batch_idx, src_idx
分类 LOSS 计算
#loss结果字典 losses = ['labels', 'boxes', 'cardinality'] if args.masks: losses += ["masks"] # SetCriterion()构建loss函数 criterion = SetCriterion(num_classes, matcher=matcher, weight_dict=weight_dict, eos_coef=args.eos_coef, losses=losses)#eos_coef 0.1 criterion.to(device) #模型输出后处理,将detr模型输出转为coco格式 postprocessors = {'bbox': PostProcess()} if args.masks: postprocessors['segm'] = PostProcessSegm() if args.dataset_file == "coco_panoptic": is_thing_map = {i: i <= 90 for i in range(201)} postprocessors["panoptic"] = PostProcessPanoptic(is_thing_map, True, threshold=0.85) #返回detr模型,损失计算方法,模型后处理方法 return model, criterion, postprocessors

PostProcess 后处理

再回到 build() 中,处理完 Loss 计算函数,就是准备 detr 模型输出的后处理方法了。
class PostProcess(nn.Module): """ This module converts the model's output into the format expected by the coco api""" # 后处理是为了将模型输出也转化为coco格式。 # 通常模型的后处理是做NMS,但因为我们detr是做匈牙利匹配,每个目标也就只有一个框匹配上,所以不需要nms。 @torch.no_grad() def forward(self, outputs, target_sizes): """ Perform the computation Parameters: outputs: raw outputs of the model target_sizes: tensor of dimension [batch_size x 2] containing the size of each images of the batch For evaluation, this must be the original image size (before any data augmentation) For visualization, this should be the image size after data augment, but before padding """ # 获取模型输出,分类和回归框 out_logits, out_bbox = outputs['pred_logits'], outputs['pred_boxes'] assert len(out_logits) == len(target_sizes) assert target_sizes.shape[1] == 2 #对分类结果进行归一化获得各类的概率 prob = F.softmax(out_logits, -1) #排除掉最后的背景类,因为coco格式里是没有背景类的,然后找出剩下的最大概率值和对应的类别 scores, labels = prob[..., :-1].max(-1) # convert to [x0, y0, x1, y1] format #将检测框中心点、宽高的表达形式转成左上点、右下点的表达形式 boxes = box_ops.box_cxcywh_to_xyxy(out_bbox) # and from relative [0, 1] to absolute [0, height] coordinates #模型在回归部分的输出是归一化的值,需要根据图像尺寸来还原 img_h, img_w = target_sizes.unbind(1) scale_fct = torch.stack([img_w, img_h, img_w, img_h], dim=1) boxes = boxes * scale_fct[:, None, :] #得到最终的coco格式的输出结果 results = [{'scores': s, 'labels': l, 'boxes': b} for s, l, b in zip(scores, labels, boxes)] return results
看 PostProcess,masks 语义模型的暂不探究。
#训练选择AdamW优化器 optimizer = torch.optim.AdamW(param_dicts, lr=args.lr, weight_decay=args.weight_decay) #训练的学习率策略选择StepLR lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, args.lr_drop) #创建训练、验证数据集 dataset_train = build_dataset(image_set='train', args=args) dataset_val = build_dataset(image_set='val', args=args) if args.distributed: sampler_train = DistributedSampler(dataset_train) sampler_val = DistributedSampler(dataset_val, shuffle=False) else: #对于训练集进行随机抽样,对于验证集按顺序验证 sampler_train = torch.utils.data.RandomSampler(dataset_train) sampler_val = torch.utils.data.SequentialSampler(dataset_val) #批次数据训练设置,batch_size为每个batch的图像数量 batch_sampler_train = torch.utils.data.BatchSampler( sampler_train, args.batch_size, drop_last=True) data_loader_train = DataLoader(dataset_train, batch_sampler=batch_sampler_train, collate_fn=utils.collate_fn, num_workers=args.num_workers) data_loader_val = DataLoader(dataset_val, args.batch_size, sampler=sampler_val, drop_last=False, collate_fn=utils.collate_fn, num_workers=args.num_workers) if args.dataset_file == "coco_panoptic": # We also evaluate AP during panoptic training, on original coco DS coco_val = datasets.coco.build("val", args) base_ds = get_coco_api_from_dataset(coco_val) else: base_ds = get_coco_api_from_dataset(dataset_val) if args.frozen_weights is not None: checkpoint = torch.load(args.frozen_weights, map_location='cpu') model_without_ddp.detr.load_state_dict(checkpoint['model']) #模型参数保存地址 output_dir = Path(args.output_dir) #如果是有预训练模型加载 if args.resume: if args.resume.startswith('https'): checkpoint = torch.hub.load_state_dict_from_url( args.resume, map_location='cpu', check_hash=True) else: #加载预训练模型参数 checkpoint = torch.load(args.resume, map_location='cpu') model_without_ddp.load_state_dict(checkpoint['model']) #如果预训练模型里有优化器等设置,按照它的设置进行继续训练 if not args.eval and 'optimizer' in checkpoint and 'lr_scheduler' in checkpoint and 'epoch' in checkpoint: optimizer.load_state_dict(checkpoint['optimizer']) lr_scheduler.load_state_dict(checkpoint['lr_scheduler']) #开始的训练轮次为预训练的轮次+1 args.start_epoch = checkpoint['epoch'] + 1 #如果只是进行eval验证操作,验证之后会return结束不会进行之后的训练 if args.eval: test_stats, coco_evaluator = evaluate(model, criterion, postprocessors, data_loader_val, base_ds, device, args.output_dir) if args.output_dir: #保存用来验证的模型参数 utils.save_on_master(coco_evaluator.coco_eval["bbox"].eval, output_dir / "eval.pth") return #进行训练 print("Start training") start_time = time.time() #epochs为结束的训练轮数 for epoch in range(args.start_epoch, args.epochs): if args.distributed: sampler_train.set_epoch(epoch) #train_one_epoch:一个epoch的训练逻辑 train_stats = train_one_epoch( model, criterion, data_loader_train, optimizer, device, epoch, args.clip_max_norm) lr_scheduler.step() if args.output_dir: checkpoint_paths = [output_dir / 'checkpoint.pth'] # extra checkpoint before LR drop and every 100 epochs #如果训练轮数是lr_drop的或100的倍数保存一个新的模型权重参数文件 if (epoch + 1) % args.lr_drop == 0 or (epoch + 1) % 100 == 0: checkpoint_paths.append(output_dir / f'checkpoint{epoch:04}.pth') #每次的epoch后都要更新checkpoint.pth为最新的训练后更新的权重 for checkpoint_path in checkpoint_paths: utils.save_on_master({ 'model': model_without_ddp.state_dict(), 'optimizer': optimizer.state_dict(), 'lr_scheduler': lr_scheduler.state_dict(), 'epoch': epoch, 'args': args, }, checkpoint_path) #每次epoch后都要验证 test_stats, coco_evaluator = evaluate( model, criterion, postprocessors, data_loader_val, base_ds, device, args.output_dir ) #训练log保存 log_stats = {**{f'train_{k}': v for k, v in train_stats.items()}, **{f'test_{k}': v for k, v in test_stats.items()}, 'epoch': epoch, 'n_parameters': n_parameters} if args.output_dir and utils.is_main_process(): with (output_dir / "log.txt").open("a") as f: f.write(json.dumps(log_stats) + "\n") # for evaluation logs if coco_evaluator is not None: (output_dir / 'eval').mkdir(exist_ok=True) if "bbox" in coco_evaluator.coco_eval: filenames = ['latest.pth'] if epoch % 50 == 0: filenames.append(f'{epoch:03}.pth') for name in filenames: torch.save(coco_evaluator.coco_eval["bbox"].eval, output_dir / "eval" / name) total_time = time.time() - start_time total_time_str = str(datetime.timedelta(seconds=int(total_time))) #打印训练时间 print('Training time {}'.format(total_time_str))

main(三) 构建数据集、训练验证操作

模型搭建讲完后,就回到我们的 main()函数,该轮到构建数据集、模型训练和验证了。大体操作和以前的模型操作大体相同,已经有很多人说了,就不多叙述了,但还是有中文注释供参考。
def train_one_epoch(model: torch.nn.Module, criterion: torch.nn.Module, data_loader: Iterable, optimizer: torch.optim.Optimizer, device: torch.device, epoch: int, max_norm: float = 0): model.train() criterion.train() metric_logger = utils.MetricLogger(delimiter=" ") metric_logger.add_meter('lr', utils.SmoothedValue(window_size=1, fmt='{value:.6f}')) metric_logger.add_meter('class_error', utils.SmoothedValue(window_size=1, fmt='{value:.2f}')) header = 'Epoch: [{}]'.format(epoch) print_freq = 10 for samples, targets in metric_logger.log_every(data_loader, print_freq, header): samples = samples.to(device) targets = [{k: v.to(device) for k, v in t.items()} for t in targets] #获得模型输出 outputs = model(samples) #计算Loss loss_dict = criterion(outputs, targets) #得到各个loss的权重 weight_dict = criterion.weight_dict #得到加权loss和 losses = sum(loss_dict[k] * weight_dict[k] for k in loss_dict.keys() if k in weight_dict) # reduce losses over all GPUs for logging purposes loss_dict_reduced = utils.reduce_dict(loss_dict) loss_dict_reduced_unscaled = {f'{k}_unscaled': v for k, v in loss_dict_reduced.items()} loss_dict_reduced_scaled = {k: v * weight_dict[k] for k, v in loss_dict_reduced.items() if k in weight_dict} losses_reduced_scaled = sum(loss_dict_reduced_scaled.values()) loss_value = losses_reduced_scaled.item() #是否loss不为无穷数,否则训练有误,停止训练 if not math.isfinite(loss_value): print("Loss is {}, stopping training".format(loss_value)) print(loss_dict_reduced) sys.exit(1) #反向梯度传播 optimizer.zero_grad() losses.backward() if max_norm > 0: torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm) optimizer.step() metric_logger.update(loss=loss_value, **loss_dict_reduced_scaled, **loss_dict_reduced_unscaled) metric_logger.update(class_error=loss_dict_reduced['class_error']) metric_logger.update(lr=optimizer.param_groups[0]["lr"]) # gather the stats from all processes #打印训练情况 metric_logger.synchronize_between_processes() print("Averaged stats:", metric_logger) return {k: meter.global_avg for k, meter in metric_logger.meters.items()}
训练的主要部分就在train_one_epoch()中:
def train_one_epoch(model: torch.nn.Module, criterion: torch.nn.Module, data_loader: Iterable, optimizer: torch.optim.Optimizer, device: torch.device, epoch: int, max_norm: float = 0): model.train() criterion.train() metric_logger = utils.MetricLogger(delimiter=" ") metric_logger.add_meter('lr', utils.SmoothedValue(window_size=1, fmt='{value:.6f}')) metric_logger.add_meter('class_error', utils.SmoothedValue(window_size=1, fmt='{value:.2f}')) header = 'Epoch: [{}]'.format(epoch) print_freq = 10 for samples, targets in metric_logger.log_every(data_loader, print_freq, header): samples = samples.to(device) targets = [{k: v.to(device) for k, v in t.items()} for t in targets] #获得模型输出 outputs = model(samples) #计算Loss loss_dict = criterion(outputs, targets) #得到各个loss的权重 weight_dict = criterion.weight_dict #得到加权loss和 losses = sum(loss_dict[k] * weight_dict[k] for k in loss_dict.keys() if k in weight_dict) # reduce losses over all GPUs for logging purposes loss_dict_reduced = utils.reduce_dict(loss_dict) loss_dict_reduced_unscaled = {f'{k}_unscaled': v for k, v in loss_dict_reduced.items()} loss_dict_reduced_scaled = {k: v * weight_dict[k] for k, v in loss_dict_reduced.items() if k in weight_dict} losses_reduced_scaled = sum(loss_dict_reduced_scaled.values()) loss_value = losses_reduced_scaled.item() #是否loss不为无穷数,否则训练有误,停止训练 if not math.isfinite(loss_value): print("Loss is {}, stopping training".format(loss_value)) print(loss_dict_reduced) sys.exit(1) #反向梯度传播 optimizer.zero_grad() losses.backward() if max_norm > 0: torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm) optimizer.step() metric_logger.update(loss=loss_value, **loss_dict_reduced_scaled, **loss_dict_reduced_unscaled) metric_logger.update(class_error=loss_dict_reduced['class_error']) metric_logger.update(lr=optimizer.param_groups[0]["lr"]) # gather the stats from all processes #打印训练情况 metric_logger.synchronize_between_processes() print("Averaged stats:", metric_logger) return {k: meter.global_avg for k, meter in metric_logger.meters.items()}
自此,主体部分就差不多完成了,撒花。。后面会写一个训练自己的数据集的 DETR 的 blog,欢迎评论探讨!
(带注释的代码可以在 https://gitee.com/fgy120/DETR 自取) > 本文由简悦 SimpRead 转码
DETR 源码笔记_CSDN(一)论文笔记