DETR 源码笔记_CSDN(一)
RocheL
Aug 31, 2022
Last edited: 2022-9-4
type
Post
status
Published
date
Aug 31, 2022
slug
detr_csdn_1
summary
DETR参考CSDNblog的代码注释,主要是进transformer之前的数据处理。DETR源码笔记(一)_在努力的松鼠的博客
tags
engineer
category
技术分享
icon
password
Property
Aug 31, 2022 04:53 AM
DETR参考CSDNblog的代码注释,主要是进transformer之前的数据处理。
CSDN
来源:
源码获取:https://gitee.com/fgy120/DETR
首先对 DETR 做个简单介绍
上图即为 DETR 的流程 pipeline,相比以前的 RCNN 系列、YOLO 系列等,最特别的在于加入了 Transformer。
目录
main 函数(一)参数设置
直接看源码,从 train.py 的主函数开始。
if __name__ == '__main__': parser = argparse.ArgumentParser('DETR training and evaluation script', parents=[get_args_parser()]) args = parser.parse_args() if args.output_dir: Path(args.output_dir).mkdir(parents=True, exist_ok=True) #以output_dir创建Path对象并执行mkdir创建文件夹操作 main(args)
首先是常规的参数解析操作,利用的 argparse 库,主要通过解析命令行输入的参数来设置模型训练的超参数或其他设置。第一步创建解析对象 parser,运行 parser.parse_args 方法得到解析后的各个参数 args,默认为解析运行代码的命令行。如果其中包含 output_dir 参数且 output_dir 不存在,利用 Pathlib 中的 Path 库的 mkdir 方法创建 output_dir 的路径文件夹。
Path(args.output_dir).mkdir(parents=True, exist_ok=True)
parents:如果父目录不存在,是否创建父目录。
exist_ok:只有在目录不存在时创建目录,目录已存在时不会抛出异常。
argparse 具体介绍可以看这篇。argparse 解析器
更新后新的参数设置如下:
def get_args_parser(): parser = argparse.ArgumentParser('Set transformer detector', add_help=False) parser.add_argument('--lr', default=1e-4, type=float) parser.add_argument('--lr_backbone', default=1e-5, type=float) parser.add_argument('--batch_size', default=2, type=int) parser.add_argument('--weight_decay', default=1e-4, type=float)#正则项 parser.add_argument('--epochs', default=300, type=int) parser.add_argument('--lr_drop', default=200, type=int)#每隔200个epoch,学习率减为原十分之一 parser.add_argument('--clip_max_norm', default=0.1, type=float, help='gradient clipping max norm') # Model parameters parser.add_argument('--frozen_weights', type=str, default=None, help="Path to the pretrained model. If set, only the mask head will be trained") #是否冻结backbone权重 # * Backbone parser.add_argument('--backbone', default='resnet50', type=str, help="Name of the convolutional backbone to use") parser.add_argument('--dilation', action='store_true', help="If true, we replace stride with dilation in the last convolutional block (DC5)") #卷积的最后一步是否使用空洞卷积 #关于action='store_true',其等同于default=false,即命令中有"-xxx"时有效,没有时跳过,store_false相反 parser.add_argument('--position_embedding', default='sine', type=str, choices=('sine', 'learned'), help="Type of positional embedding to use on top of the image features") #位置编码采用可学习的还是正余弦 # * Transformer parser.add_argument('--enc_layers', default=6, type=int, help="Number of encoding layers in the transformer") parser.add_argument('--dec_layers', default=6, type=int, help="Number of decoding layers in the transformer") #编码解码器层数 parser.add_argument('--dim_feedforward', default=2048, type=int, help="Intermediate size of the feedforward layers in the transformer blocks") #backbone输出后的维度 parser.add_argument('--hidden_dim', default=256, type=int, help="Size of the embeddings (dimension of the transformer)") #transformer中两层mlp的神经元(隐藏层)节点个数 parser.add_argument('--dropout', default=0.1, type=float, help="Dropout applied in the transformer")#drop_out比率 parser.add_argument('--nheads', default=8, type=int, help="Number of attention heads inside the transformer's attentions")#多头注意力的头数 parser.add_argument('--num_queries', default=100, type=int, help="Number of query slots") #重点,输出多少框 parser.add_argument('--pre_norm', action='store_true') # * Segmentation parser.add_argument('--masks', action='store_true', help="Train segmentation head if the flag is provided")#分割头 # Loss parser.add_argument('--no_aux_loss', dest='aux_loss', action='store_false', help="Disables auxiliary decoding losses (loss at each layer)") # * Matcher parser.add_argument('--set_cost_class', default=1, type=float, help="Class coefficient in the matching cost") parser.add_argument('--set_cost_bbox', default=5, type=float, help="L1 box coefficient in the matching cost") parser.add_argument('--set_cost_giou', default=2, type=float, help="giou box coefficient in the matching cost") #匹配器的权重,选框的时候用的 # * Loss coefficients parser.add_argument('--mask_loss_coef', default=1, type=float) parser.add_argument('--dice_loss_coef', default=1, type=float) parser.add_argument('--bbox_loss_coef', default=5, type=float) parser.add_argument('--giou_loss_coef', default=2, type=float) parser.add_argument('--eos_coef', default=0.1, type=float, help="Relative classification weight of the no-object class") #选完框之后真正做loss下降的时候的权重 # dataset parameters parser.add_argument('--dataset_file', default='coco') parser.add_argument('--coco_path', type=str) parser.add_argument('--coco_panoptic_path', type=str) parser.add_argument('--remove_difficult', action='store_true') parser.add_argument('--output_dir', default='', help='path where to save, empty for no saving') parser.add_argument('--device', default='cuda', help='device to use for training / testing') parser.add_argument('--seed', default=42, type=int) parser.add_argument('--resume', default='', help='resume from checkpoint') parser.add_argument('--start_epoch', default=0, type=int, metavar='N', help='start epoch') parser.add_argument('--eval', action='store_true') parser.add_argument('--num_workers', default=2, type=int) # distributed training parameters parser.add_argument('--world_size', default=1, type=int, help='number of distributed processes') parser.add_argument('--dist_url', default='env://', help='url used to set up distributed training') return parser
接着进入 main() 函数
def main(args): utils.init_distributed_mode(args)#分布式训练初始化,关闭 print("git:\n {}\n".format(utils.get_sha()))#获得git 状态 if args.frozen_weights is not None: assert args.masks, "Frozen training is meant for segmentation only" #冻结训练只使用于分割 print(args) device = torch.device(args.device) #选择cuda或者cpu,tensor分配到的设备 # fix the seed for reproducibility #相同的随机种子seed将模型在初始化过程中所用到的“随机数”全部固定下来,以保证每次重新训练模型需要初始化模型参数的时候能够得到相同的初始化参数,从而达到稳定复现训练结果的目的 #get_rank得到多节点训练时的节点序号,所以可以得到对应的seed seed = args.seed + utils.get_rank() #utils.get_rank()当分布式训练时,需要多个seed torch.manual_seed(seed) np.random.seed(seed) random.seed(seed)
utils.init_distributed_mode(args):判断是否进行分布式训练,根据你的电脑的环境配置中是否有相关配置来判断或设置,一般单卡单机的话都是执行到 else 语句就 return 了。
def init_distributed_mode(args): if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ: #os.environ: 获取环境变量作为字典,例:通过os.environ.get(“HOME”),就可以获取环境变量HOME的值 args.rank = int(os.environ["RANK"]) args.world_size = int(os.environ['WORLD_SIZE']) args.gpu = int(os.environ['LOCAL_RANK']) elif 'SLURM_PROCID' in os.environ: args.rank = int(os.environ['SLURM_PROCID']) args.gpu = args.rank % torch.cuda.device_count() else: print('Not using distributed mode') args.distributed = False return args.distributed = True torch.cuda.set_device(args.gpu) args.dist_backend = 'nccl' print('| distributed init (rank {}): {}'.format( args.rank, args.dist_url), flush=True) torch.distributed.init_process_group(backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank) torch.distributed.barrier() setup_for_distributed(args.rank == 0)
上述第一个if是用来做多节点的,rank是进程序号,world_wise是进程数量,local_rank是当前进程占用的gpu,第二个if是用slurm做多机协同多卡,最后使用torch.distributed部署,该接口比torch.nn.DataParallel事实证明要好,详细参考PyTorch 多进程分布式训练实战 | 拾荒志 或者PyTorch 多进程分布式训练实战 | 拾荒志 (murphypei.github.io)
utils.get_sha():通过命令行获得git 的commitID和git status以及所在的branch。
#获得git 状态 def get_sha(): cwd = os.path.dirname(os.path.abspath(__file__)) #os.path.dirname去掉文件名返回目录 def _run(command): return subprocess.check_output(command, cwd=cwd).decode('ascii').strip() sha = 'N/A' diff = "clean" branch = 'N/A' try: sha = _run(['git', 'rev-parse', 'HEAD']) #在命令行中cmd路径下输入git rev-parse HEAD获得git commit id #subprocess模块允许我们启动一个新线程,并连接到它们的输入输出error通道,从而获取返回值 subprocess.check_output(['git', 'diff'], cwd=cwd) diff = _run(['git', 'diff-index', 'HEAD']) diff = "has uncommited changes" if diff else "clean" branch = _run(['git', 'rev-parse', '--abbrev-ref', 'HEAD']) except Exception: pass message = f"sha: {sha}, status: {diff}, branch: {branch}" return message
subprocess.check_output(command, cwd=cwd) subprocess库的check_output方法通过在cwd打开cmd,然后输入commend,并返回cmd的输出
device = torch.device(args.device)#选择cuda或者cpu,通过解析得到device的参数来决定tensor分配到的设备是GPU还是CPU
seed = args.seed + utils.get_rank() #utils.get_rank()当分布式训练时,需要多个seed torch.manual_seed(seed) np.random.seed(seed) random.seed(seed) seed会决定上面三种取随机数方法的值,相同的随机种子seed将模型在初始化过程中所用到的“随机数”全部固定下来,即每次初始化都是一样的,以保证每次重新训练模型需要初始化模型参数的时候能够得到相同的初始化参数,从而达到稳定复现训练结果的目的。
main 函数(二)搭建模型
pytorch遍历参数修改属性
- model.named_parameters()
return
: 返回model的所有参数的(name, tensor)的键值对。可以修改参数的requires_grad
属性。用法
: 常用于对网络的参数进行一些特殊的处理(比如 fine-tuning)。
no_decay = ['bias', 'LayerNorm.weight'] optimizer_grouped_parameters = [ {'params': [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], 'weight_decay': 1e-2}, {'params': [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], 'weight_decay': 0.0} ] """ model.named_parameters返回键值对,赋给n,p。经过后面的判断条件,// 即所有name不在no_decay列表中的n对应的p都对应params,并设置权重衰减系数 后面一个dict反之 """ # AdamW是实现了权重衰减的优化器 optimizer = AdamW(optimizer_grouped_parameters, lr=1e-5)
- model.parameters()
return
: 返回model的所有参数的tensor。可以修改参数的requires_grad属性。用法
: 主要提供给optimizer。
optimizer = torch.optim.Adam(model.parameters(), args.learning_rate, betas=(args.momentum, 0.999))
- model.state_dict()
return
: 返回model的参数的(name, tensor)的键值对字典,参数的requires_grad=false,不可以修改参数的requires_grad属性。用法
: 常用于保存模型和加载模型的时候使用。
torch.save(model.state_dict(),"model.path") # 保存参数 model = model() # 代码中创建网络结构 params = torch.load("model.path") # 加载参数 model.load_state_dict(params) # 应用到网络结构中
detr模型main中搭建调用、优化器、数据集等设置
model, criterion, postprocessors = build_model(args)#构建model model.to(device) model_without_ddp = model#ddp:分布式处理 if args.distributed: model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu]) model_without_ddp = model.module n_parameters = sum(p.numel() for p in model.parameters() if p.requires_grad) print('number of params:', n_parameters) #计算可学参数总数 param_dicts = [ {"params": [p for n, p in model_without_ddp.named_parameters() if "backbone" not in n and p.requires_grad]}, { "params": [p for n, p in model_without_ddp.named_parameters() if "backbone" in n and p.requires_grad], "lr": args.lr_backbone, }, ] optimizer = torch.optim.AdamW(param_dicts, lr=args.lr, weight_decay=args.weight_decay) lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, args.lr_drop) #隔args.lr_drop个epoch学习率减为原1/10 dataset_train = build_dataset(image_set='train', args=args) dataset_val = build_dataset(image_set='val', args=args) #分布式训练样本切分 if args.distributed: sampler_train = DistributedSampler(dataset_train) sampler_val = DistributedSampler(dataset_val, shuffle=False) else: sampler_train = torch.utils.data.RandomSampler(dataset_train) sampler_val = torch.utils.data.SequentialSampler(dataset_val) batch_sampler_train = torch.utils.data.BatchSampler( sampler_train, args.batch_size, drop_last=True) #dataloader data_loader_train = DataLoader(dataset_train, batch_sampler=batch_sampler_train, collate_fn=utils.collate_fn, num_workers=args.num_workers) data_loader_val = DataLoader(dataset_val, args.batch_size, sampler=sampler_val, drop_last=False, collate_fn=utils.collate_fn, num_workers=args.num_workers) #coco的api if args.dataset_file == "coco_panoptic": # We also evaluate AP during panoptic training, on original coco DS coco_val = datasets.coco.build("val", args) base_ds = get_coco_api_from_dataset(coco_val) else: base_ds = get_coco_api_from_dataset(dataset_val) if args.frozen_weights is not None: checkpoint = torch.load(args.frozen_weights, map_location='cpu') model_without_ddp.detr.load_state_dict(checkpoint['model']) #中间结果和恢复训练resume output_dir = Path(args.output_dir) if args.resume: if args.resume.startswith('https'): checkpoint = torch.hub.load_state_dict_from_url( args.resume, map_location='cpu', check_hash=True) else: checkpoint = torch.load(args.resume, map_location='cpu') model_without_ddp.load_state_dict(checkpoint['model']) if not args.eval and 'optimizer' in checkpoint and 'lr_scheduler' in checkpoint and 'epoch' in checkpoint: optimizer.load_state_dict(checkpoint['optimizer']) lr_scheduler.load_state_dict(checkpoint['lr_scheduler']) args.start_epoch = checkpoint['epoch'] + 1 if args.eval: test_stats, coco_evaluator = evaluate(model, criterion, postprocessors, data_loader_val, base_ds, device, args.output_dir) if args.output_dir: utils.save_on_master(coco_evaluator.coco_eval["bbox"].eval, output_dir / "eval.pth") return #一个epoch print("Start training") start_time = time.time() for epoch in range(args.start_epoch, args.epochs): if args.distributed: sampler_train.set_epoch(epoch) train_stats = train_one_epoch( model, criterion, data_loader_train, optimizer, device, epoch, args.clip_max_norm) lr_scheduler.step() if args.output_dir: checkpoint_paths = [output_dir / 'checkpoint.pth'] # extra checkpoint before LR drop and every 100 epochs if (epoch + 1) % args.lr_drop == 0 or (epoch + 1) % 100 == 0: checkpoint_paths.append(output_dir / f'checkpoint{epoch:04}.pth') for checkpoint_path in checkpoint_paths: utils.save_on_master({ 'model': model_without_ddp.state_dict(), 'optimizer': optimizer.state_dict(), 'lr_scheduler': lr_scheduler.state_dict(), 'epoch': epoch, 'args': args, }, checkpoint_path) test_stats, coco_evaluator = evaluate( model, criterion, postprocessors, data_loader_val, base_ds, device, args.output_dir ) log_stats = {**{f'train_{k}': v for k, v in train_stats.items()}, **{f'test_{k}': v for k, v in test_stats.items()}, 'epoch': epoch, 'n_parameters': n_parameters} #f'{}作用相当于''.format() if args.output_dir and utils.is_main_process(): with (output_dir / "log.txt").open("a") as f: f.write(json.dumps(log_stats) + "\n") # for evaluation logs if coco_evaluator is not None: (output_dir / 'eval').mkdir(exist_ok=True) if "bbox" in coco_evaluator.coco_eval: filenames = ['latest.pth'] if epoch % 50 == 0: filenames.append(f'{epoch:03}.pth') for name in filenames: torch.save(coco_evaluator.coco_eval["bbox"].eval, output_dir / "eval" / name) total_time = time.time() - start_time total_time_str = str(datetime.timedelta(seconds=int(total_time))) print('Training time {}'.format(total_time_str))
网络搭建函数细节
model, criterion, postprocessors = build_model(args)构建网络模型
build(args):
def build_model(args): return build(args) #构建模型 def build(args): #设置识别目标类型,可根据自己的数据集修改 num_classes = 2 if args.dataset_file != 'coco' else 2 if args.dataset_file == "coco_panoptic": num_classes = 2 #设置cpu或者GPU device = torch.device(args.device) #搭建主干网络 backbone = build_backbone(args) #搭建transformer transformer = build_transformer(args) #搭建DETR模型 model = DETR( backbone, transformer, num_classes=num_classes, num_queries=args.num_queries, aux_loss=args.aux_loss, ) #分割任务用 if args.masks: model = DETRsegm(model) matcher = build_matcher(args) weight_dict = {'loss_ce': 1, 'loss_bbox': args.bbox_loss_coef} weight_dict['loss_giou'] = args.giou_loss_coef if args.masks: weight_dict["loss_mask"] = args.mask_loss_coef weight_dict["loss_dice"] = args.dice_loss_coef # TODO this is a hack if args.aux_loss: aux_weight_dict = {} for i in range(args.dec_layers - 1): aux_weight_dict.update({k + f'_{i}': v for k, v in weight_dict.items()}) weight_dict.update(aux_weight_dict) losses = ['labels', 'boxes', 'cardinality'] if args.masks: losses += ["masks"] criterion = SetCriterion(num_classes, matcher=matcher, weight_dict=weight_dict, eos_coef=args.eos_coef, losses=losses) criterion.to(device) postprocessors = {'bbox': PostProcess()} if args.masks: postprocessors['segm'] = PostProcessSegm() if args.dataset_file == "coco_panoptic": is_thing_map = {i: i <= 90 for i in range(201)} postprocessors["panoptic"] = PostProcessPanoptic(is_thing_map, True, threshold=0.85) return model, criterion, postprocessors
build_backbone():包括构建位置编码器以及 backbone
def build_backbone(args): #搭建位置编码器 position_embedding = build_position_encoding(args) train_backbone = args.lr_backbone > 0 #是否需要记录backbone的每层输出 return_interm_layers = args.masks backbone = Backbone(args.backbone, train_backbone, return_interm_layers, args.dilation) #将backbone和位置编码器集合在一起放在一个model里 model = Joiner(backbone, position_embedding) #设置model的输出通道数 model.num_channels = backbone.num_channels return model
build_position_encoding(args): 构建位置编码器,有两种方式,一种是使用正、余弦函数来对各位置的奇、偶维度进行编码,不需要额外的参数进行学习,DETR默认使用的就是这种正余弦编码。还有一种是可学习的。下面主要讲解正余弦编码
def build_position_encoding(args): N_steps = args.hidden_dim // 2 #args.hidden_dim transformer的输入张量的channel数,位置编码和backbone的featuremap结合后需要输入到transformer中 #余弦编码方式,文章说采用正余弦函数,是根据归纳偏置和经验做出的选择 if args.position_embedding in ('v2', 'sine'): # TODO find a better way of exposing other arguments position_embedding = PositionEmbeddingSine(N_steps, normalize=True) #可学习的编码方式 elif args.position_embedding in ('v3', 'learned'): position_embedding = PositionEmbeddingLearned(N_steps) else: raise ValueError(f"not supported {args.position_embedding}") return position_embedding
PositionEmbeddingSine(N_steps, normalize=True):正余弦编码方式,这种方式是将各个位置的各个维度映射到角度上,因此有个scale,默认是2pi。下面的是编码公式
class PositionEmbeddingSine(nn.Module): """ This is a more standard version of the position embedding, very similar to the one used by the Attention is all you need paper, generalized to work on images. """ def __init__(self, num_pos_feats=64, temperature=10000, normalize=False, scale=None): super().__init__() self.num_pos_feats = num_pos_feats#transformer输入张量的channel大小//2 self.temperature = temperature self.normalize = normalize if scale is not None and normalize is False: raise ValueError("normalize should be True if scale is passed") if scale is None: scale = 2 * math.pi self.scale = scale def forward(self, tensor_list): #(batch,channel,height,width)注意,height 和 width 是图像经过backbone后的featuremap的高宽,如果用resnet50作为backbone则height=图像Height//32,width=图像Width//32 #这里假设为res50,mask为(2,24,24) x = tensor_list.tensors #(batch,height,width) mask是为了指示那些位置是padding而来的,mask中值为true的部分就是padding的部分 mask = tensor_list.mask #(batch,height,width)取反后not_mask中值为true的部分即为非padding的部分,真实有效 not_mask = ~mask #cumsum()方法在列和行分别进行累加 #沿着列方向累加,并转为float型得到y_embed(batch,height,width) # 示例:[[[1,1,1,...,1], # [2,2,2,...,2], # ... # [h,h,h,...,h]],...] y_embed = not_mask.cumsum(1, dtype=torch.float32) #在行方向累加,并转为float型得到x_embed(batch,height,width) # 示例:[[[1,2,3,...,w], # [1,2,3,...,w], # ... # [1,2,3,...,w]],...] x_embed = not_mask.cumsum(2, dtype=torch.float32) # 进行归一化 if self.normalize: eps = 1e-6 #y_embed[:, -1:, :]取每一个batch的最后一列全部元素组成新的矩阵(batch,1,width) # 示例:[[[h,h,h,...,h]], # ... # [h,h,h,...,h]],...] #对batch中每一个分别进行角度归一化 #得到公式中的pos y_embed = y_embed / (y_embed[:, -1:, :] + eps) * self.scale x_embed = x_embed / (x_embed[:, :, -1:] + eps) * self.scale #x_embed ,y_embed 都为(2,24,24) #torch.arange(start=0, end, step=1, *, out=None, dtype=None, layout=torch.strided, device=None, requires_grad=False) #返回一个一维向量,其大小为(end-start)/step,取值区间为[start, end) ,从start开始,以step为步长增加,直到end结束(不包括end) #创建0到(num_pos_feats-1)=127的步长为1的float一维张量 # tensor([0., 1., 2., 3., 4., 5., 6., 7., 8., 9., 10., 11., # 12., 13., 14., 15., 16., 17., 18., 19., 20., 21., 22., 23., # ... # 120., 121., 122., 123., 124., 125., 126., 127.], device='cuda:0') dim_t = torch.arange(self.num_pos_feats, dtype=torch.float32, device=x.device) #dim_t // 2得到有重复值的0到63的一维张量 # tensor([0., 0., 1., 1., 2., 2., 3., 3., 4., 4., 5., 5., 6., 6., # 7., 7., 8., 8., 9., 9., 10., 10., 11., 11., 12., 12., 13., 13., # ... # 56., 56., 57., 57., 58., 58., 59., 59., 60., 60., 61., 61., 62., 62., # 63., 63.], device='cuda:0') #(2 * (dim_t // 2) / self.num_pos_feats)得到 # tensor([0.0000, 0.0000, 0.0156, 0.0156, 0.0312, 0.0312, 0.0469, 0.0469, 0.0625, # 0.0625, 0.0781, 0.0781, 0.0938, 0.0938, 0.1094, 0.1094, 0.1250, 0.1250, # ... # 0.9062, 0.9219, 0.9219, 0.9375, 0.9375, 0.9531, 0.9531, 0.9688, 0.9688, # 0.9844, 0.9844], device='cuda:0') #最后得到dim_t # tensor([1.0000e+00, 1.0000e+00, 1.1548e+00, 1.1548e+00, 1.3335e+00, 1.3335e+00, # 1.5399e+00, 1.5399e+00, 1.7783e+00, 1.7783e+00, 2.0535e+00, 2.0535e+00, # 2.3714e+00, 2.3714e+00, 2.7384e+00, 2.7384e+00, 3.1623e+00, 3.1623e+00, # ... # 5.6234e+03, 5.6234e+03, 6.4938e+03, 6.4938e+03, 7.4989e+03, 7.4989e+03, # 8.6596e+03, 8.6596e+03], device='cuda:0') #pow(10000,2i/d),2i需要在num_pos_feats范围内,因此i为dim_t // 2,且这样就在一个张量里有了两个相同的张量分别表示奇数行列和偶数行列,方便后面操作 #得到公式中的分母1000^(2i/d),i = dim_t//2 dim_t = self.temperature ** (2 * (dim_t // 2) / self.num_pos_feats) #此时dim_t还是一个128维的向量 #pos_x(b,h,w,num_post_feats) 得到公式中的pos/1000^(2i/d),因为图像是2D的,所以pos有横列两种 pos_x = x_embed[:, :, :, None] / dim_t pos_y = y_embed[:, :, :, None] / dim_t #pos_x,pos_y 变成了(b,h,w,num_post_feats) #torch.stack 沿着一个新维度进行堆叠拼接outputs = torch.stack(inputs, dim=?) #inputs : 待连接的张量序列。dim : 新的维度, 必须在0到len(outputs)之间。len(outputs)=len(inputs)+1 #torch.sin() 会将输入值作为弧度而不是角度计算sin值,cos()类似 #0::2双冒号表示从0开始步长为2取值到最后,使用这个是为了将奇数行列用cos编码,偶数行列用sin编码 #torch.flatten(input, start_dim=0, end_dim=-1) start_dim:平铺的起始维度。end_dim: 平铺的结束维度。 #.flatten(3) 从第三维开始到最后一维进行平铺到一个维度上 #(batch,height,width,num_post_feats)得到公式的sin(pos/1000^(2i/d)) 和 cos(pos/1000^(2i/d))并放在一起 pos_x = torch.stack((pos_x[:, :, :, 0::2].sin(), pos_x[:, :, :, 1::2].cos()), dim=4).flatten(3) pos_y = torch.stack((pos_y[:, :, :, 0::2].sin(), pos_y[:, :, :, 1::2].cos()), dim=4).flatten(3) #torch.cat()将多给矩阵连接 outputs = torch.cat(inputs, dim=?) inputs : 待连接的张量序列;dim : 选择的扩维, 必须在0到len(inputs[0])之间,沿着此维连接张量序列。 #permute()将tensor的维度进行交换 #(batch,2*num_post_feats,height,width)将一个像素的位置可以用对应的横向编码和纵向编码值表示 pos = torch.cat((pos_y, pos_x), dim=3).permute(0, 3, 1, 2) return pos
得到位置编码 pos,pos 是一个(batch, 2* num_post_feats, height, width)的 tensor,每个 batch 中的每一个前 num_post_feats 的(height, width)tensor 表示 y 方向的位置编码 pos_y, 后 num_post_feats 则表示 x 方向的位置编码,合并使用则可以得到类似(pos_y,pos_x)的效果来对 2D 张量进行位置编码
build_position 得到位置编码后回到接下来回到 build_backbone 函数
def build_backbone(args): #搭建位置编码器 position_embedding = build_position_encoding(args) train_backbone = args.lr_backbone > 0 #是否需要记录backbone的每层输出 return_interm_layers = args.masks backbone = Backbone(args.backbone, train_backbone, return_interm_layers, args.dilation) #将backbone和位置编码器集合在一起放在一个model里 model = Joiner(backbone, position_embedding) #设置model的输出通道数 model.num_channels = backbone.num_channels return model
args.lr_backbone 默认为 1e-5,则 train_backbone 默认为 true,通过设置 backbone 的 lr 来设置是否训练网络时接收 backbone 的梯度从而让 backbone 也训练。return_interm_layers 在后面解释。进入到 Backbone() 函数,args.backbone 默认为 resnet50,args.dilatyion 默认为 false。
class Backbone(BackboneBase): """ResNet backbone with frozen BatchNorm.""" def __init__(self, name: str, train_backbone: bool, return_interm_layers: bool, dilation: bool): #torchvision.models是pytorch的一个重要的包,包含了各种常用的网络结构,并且提供了预训练模型 #getattr(obj,name)获取obj中命名为name的组成。可以理解为获取obj.name #获取torchvision.models中实现的resnet50网络结构 backbone = getattr(torchvision.models, name)( replace_stride_with_dilation=[False, False, dilation], pretrained=True, norm_layer=FrozenBatchNorm2d) #replace_stride_with_dilation 决定是否使用膨胀卷积;pretrained 是否使用预训练模型;norm_layer 使用FrozenBatchNorm2d归一化方式 num_channels = 512 if name in ('resnet18', 'resnet34') else 2048 super().__init__(backbone, train_backbone, num_channels, return_interm_layers)
获得 resnet50 网络结构,并设置输出 channels 为 2048,所以我们的 backbone 的输出则是 (batch,2048,H//32,W//32),在父类 BackboneBase.__init__中进行初始化。
class BackboneBase(nn.Module): def __init__(self, backbone: nn.Module, train_backbone: bool, num_channels: int, return_interm_layers: bool): super().__init__() #调用nn.Module.__init__(),创建Backbone框架 for name, parameter in backbone.named_parameters():#对resnet50架构中的设置 #(python中优先级 not>or>and) # 如果train_backbone为false,或者layer2,3,4都不在backbone中,backbone是用字典表示的,把backbone冻结,不进行梯度回传训练 if not train_backbone or 'layer2' not in name and 'layer3' not in name and 'layer4' not in name: parameter.requires_grad_(False) if return_interm_layers:#设置你想要能够获得输出的层 return_layers = {"layer1": "0", "layer2": "1", "layer3": "2", "layer4": "3"} else: return_layers = {'layer4': 0} #IntermediateLayerGetter(Model)获取一个Model中你指定要获取的哪些层的输出,然后这些层的输出会在一个有序的字典中 self.body = IntermediateLayerGetter(backbone, return_layers=return_layers) self.num_channels = num_channels def forward(self, tensor_list): xs = self.body(tensor_list.tensors)#输入的tensor list 经过backbone后得到featuremap out: Dict[str, NestedTensor] = {}#按顺序遍历layer,在return_layers中的layer输出则会放到out中 #typing包导入的Dict,介绍:https://www.cnblogs.com/lzc978/p/11207170.html for name, x in xs.items(): #将mask插值带与输出特征图尺寸一致 mask = F.interpolate(tensor_list.mask[None].float(), size=x.shape[-2:]).bool()[0] out[name] = NestedTensor(x, mask)#将图像张量与mask封装到一起 return out
到这,位置编码和 backbone 都搭建完毕,回到 build_backbone
def build_backbone(args): #搭建位置编码器 position_embedding = build_position_encoding(args) train_backbone = args.lr_backbone > 0 #是否需要记录backbone的每层输出 return_interm_layers = args.masks backbone = Backbone(args.backbone, train_backbone, return_interm_layers, args.dilation) #将backbone和位置编码器集合在一起放在一个model里 model = Joiner(backbone, position_embedding) #设置model的输出通道数为backbone的输出通道数,resnet50为2048 model.num_channels = backbone.num_channels return model
接着在 Joiner()中,将 backbone 和位置编码器用 nn.Sequential() 按顺序结合,forward 可结合前面的一起来看,过一遍操作
class Joiner(nn.Sequential): def __init__(self, backbone, position_embedding): super().__init__(backbone, position_embedding) #self[0]是backbone,self[1]是position_embedding def forward(self, tensor_list): #对backbone的输出进行位置编码,最终返回backbone的输出及对应的位置编码结果 xs = self[0](tensor_list)#tensor_list经过backbone后得到xs序列,其中每一个包括mask(batch, W/32,H/32)和featuremap(batch, 2042, W/32,H/32) out = [] pos = [] for name, x in xs.items(): out.append(x) #把mask和featuremap添加到out中 # position encoding pos.append(self[1](x).to(x.tensors.dtype))#把x作为输入给到位置编码器,得到的输出添加到pos中 return out, pos
backbone 搭建完成后,回到 build(args),接下来是搭建 transformer,就在 DETR 源码笔记(二)吧 。 > 本文由简悦 SimpRead 转码
Mask的解释
按照训练流程首先介绍 backbone 以及数据进入 encoder 之前的部分
当训练时,使用 torch.manual_seed(seed) 函数,只要网络参数不变的情况下,网络的参数初始化每次都是固定的;如果没有设置,每次训练时的初始化都是随机的,导致结果不确定。
例如:要训练自己的数据集通常需要对 num_classes 进行设置。(其中 num_classes 的设置根据自己数据集类别数量 + 1,也就是说,假设 coco 的数据集中总共有 90 个类,此时的 num_classes 就是 91)
假设刚开始设置的 num_classes=5,那么只要训练过程中网络的参数不变,那么网络的初始化参数都是一样的;如果下次训练时 num_classes=6,最直观的变化就是数据集中图像的读取顺序发生了变化,并且由于训练时有对图像进行随机拉伸,也就导致参数变化后,同一张图像在上一次训练时的尺寸和当前训练尺寸不一。
backbone 调用的是 torchvision 中定义的 resnet50,这里的 backbone 也就是送入 transformer 之前用来提取图像特征图的骨架,所以张量在经过 resnet50 的卷积后得到的特征图通道数由原来的 3 通道变为 2048,WH = W/32 H/32,具体来说,假设一开始输入的张量是由 batch size 为 2,长宽都为 768 组成的 3 通道图像,即 [b, c, h, w] = [2,3,768,768],经过 resnet50 后,shape 变为 [2,2048,24,24]。
具体的代码如下:
class ResNet(nn.Module): def __init__( self, block: Type[Union[BasicBlock, Bottleneck]], layers: List[int], num_classes: int = 1000, zero_init_residual: bool = False, groups: int = 1, width_per_group: int = 64, replace_stride_with_dilation: Optional[List[bool]] = None, norm_layer: Optional[Callable[..., nn.Module]] = None ) -> None: super(ResNet, self).__init__() if norm_layer is None: norm_layer = nn.BatchNorm2d self._norm_layer = norm_layer self.inplanes = 64 self.dilation = 1 if replace_stride_with_dilation is None: # each element in the tuple indicates if we should replace # the 2x2 stride with a dilated convolution instead replace_stride_with_dilation = [False, False, False] if len(replace_stride_with_dilation) != 3: raise ValueError("replace_stride_with_dilation should be None " "or a 3-element tuple, got {}".format(replace_stride_with_dilation)) self.groups = groups self.base_width = width_per_group # 输入的W * H = W / 2 * H / 2 # 特征图分辨率降低为1/2,通道数从3升为64 self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False) self.bn1 = norm_layer(self.inplanes) self.relu = nn.ReLU(inplace=True) # W * H = W / 2 * H / 2 # 特征图分辨率降低为1/4,通道数仍然为64 self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) # stride为1,不改变分辨率,依然为1/4,通道数从64升为256 self.layer1 = self._make_layer(block, 64, layers[0]) # W * H = W / 2 * H / 2 # stride为2,特征图分辨率降低为1/8,通道数从256升为512 self.layer2 = self._make_layer(block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0]) # W * H = W / 2 * H / 2 # stride为2,特征图分辨率降低为1/16,通道数从512升为1024 self.layer3 = self._make_layer(block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1]) # W * H = W / 2 * H / 2 # stride为2,特征图分辨率降低为1/32,通道数从512升为2048 self.layer4 = self._make_layer(block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2]) self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) self.fc = nn.Linear(512 * block.expansion, num_classes) for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) # Zero-initialize the last BN in each residual branch, # so that the residual branch starts with zeros, and each residual block behaves like an identity. # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677 if zero_init_residual: for m in self.modules(): if isinstance(m, Bottleneck): nn.init.constant_(m.bn3.weight, 0) # type: ignore[arg-type] elif isinstance(m, BasicBlock): nn.init.constant_(m.bn2.weight, 0) # type: ignore[arg-type]
还是假设输入为 [b, c, h, w] = [2,3,768,768] 的张量(没有特殊声明的话后面提到的张量也是基于 [2,3,768,768] 的计算得来),通过 resnet50 得到 [2,2048,24,24] 的张量(这个张量也称为 Feature Map)后需要对原始输入中的 mask 进行相应的 reshape,mask 其实是在 dataloader 生成时,原始输入中原始图像位置的映射。
怎么解释这个 “原始输入中原始图像位置的映射” 呢?
由于生成数据时对数据集中图像进行随机 load,再加上对图像进行随机裁剪,所以同一 batch 的数据尺寸存在差异,但是同一 batch 输入 resnet 的大小需要保持一致,就需要对图像进行 padding(全 0)操作以保证同一 batch 的尺寸相同。具体来说就是找到该 batch 下最大的 W 和最大的 H,然后 batch 下所有的图像根据这个最大的 W*H 进行 padding。
而 mask 就是为了记录未 padding 前的原始图像在 padding 后的图像中的位置。举例来说就是假设 batch 为 2,其中一张图像的 wh=768768,另一张图像的 wh 为 576580,这个最大的 W 和 H 就是 768。生成大小为 [768,786] 全 0 的张量,而较小的图像填充在这全 0 张量的左上角,也就是说张量中 [576:768] 以及 [580:768] 的部分都为 0,反应在 mask 上就是 [0:580,0:576] 的部分为 False,表示未被 padding 部分,[576:768]以及 [580:768] 的部分为 True,表示被 padding 部分。效果如下图:
具体操作见如下代码:
def nested_tensor_from_tensor_list(tensor_list: List[Tensor]): # TODO make this more general if tensor_list[0].ndim == 3: if torchvision._is_tracing(): # nested_tensor_from_tensor_list() does not export well to ONNX # call _onnx_nested_tensor_from_tensor_list() instead return _onnx_nested_tensor_from_tensor_list(tensor_list) # TODO make it support different-sized images max_size = _max_by_axis([list(img.shape) for img in tensor_list]) # min_size = tuple(min(s) for s in zip(*[img.shape for img in tensor_list])) batch_shape = [len(tensor_list)] + max_size b, c, h, w = batch_shape dtype = tensor_list[0].dtype device = tensor_list[0].device tensor = torch.zeros(batch_shape, dtype=dtype, device=device) mask = torch.ones((b, h, w), dtype=torch.bool, device=device) for img, pad_img, m in zip(tensor_list, tensor, mask): # 根据同一batch中最大的W和H对所有余图像进行padding pad_img[: img.shape[0], : img.shape[1], : img.shape[2]].copy_(img) # 有padding的部分设为True m[: img.shape[1], :img.shape[2]] = False else: raise ValueError('not supported') return NestedTensor(tensor, mask)
mask 按照 Feature Map 的 h 和 w 进行 reshape,即原始输入中的 mask 为 [2,768,768],将其 shape 变为 [2,24,24],最终输出的 out 为 {mask,[2,24,24],tensor_list,[2,2048,24,24]}。得到 out 之后,就需要根据 Transformer 所需要的数据结构,将 out 转化为能够被 Transformer Encoder 处理的序列化数据。如位置编码,降维,shape 转换。
首先是位置编码,也就是 position encoding(PE),这里的位置编码是基于 out 中的 mask,也就是 [2,24,24] 进行的。算法类似于最初在《Attention is all you need》中提出的 PE,这里只是将位置编码作用于图像中。最后输出编码后的位置信息,shape 为[2,256,24,24]。
class BackboneBase(nn.Module): def __init__(self, backbone: nn.Module, train_backbone: bool, num_channels: int, return_interm_layers: bool): super().__init__() for name, parameter in backbone.named_parameters(): if not train_backbone or 'layer2' not in name and 'layer3' not in name and 'layer4' not in name: parameter.requires_grad_(False) if return_interm_layers: return_layers = {"layer1": "0", "layer2": "1", "layer3": "2", "layer4": "3"} else: return_layers = {'layer4': "0"} self.body = IntermediateLayerGetter(backbone, return_layers=return_layers) self.num_channels = num_channels def forward(self, tensor_list: NestedTensor): # 通过self.body即resnet50获取最后一层卷积得到的张量[2,3,768,768]->[2,2048,24,24] xs = self.body(tensor_list.tensors) out: Dict[str, NestedTensor] = {} for name, x in xs.items(): # 获取原始输入的mask[2,768,768] m = tensor_list.mask assert m is not None # 根据resnet50输出的wh维度进行reshape即[2,768,768]->[2,24,24] mask = F.interpolate(m[None].float(), size=x.shape[-2:]).to(torch.bool)[0] out[name] = NestedTensor(x, mask) # 此时的输出为{mask,[2,24,24],tensor_list,[2,2048,24,24]} return out class Backbone(BackboneBase): """ResNet backbone with frozen BatchNorm.""" def __init__(self, name: str, train_backbone: bool, return_interm_layers: bool, dilation: bool): backbone = getattr(torchvision.models, name)( replace_stride_with_dilation=[False, False, dilation], pretrained=is_main_process(), norm_layer=FrozenBatchNorm2d) num_channels = 512 if name in ('resnet18', 'resnet34') else 2048 super().__init__(backbone, train_backbone, num_channels, return_interm_layers) class Joiner(nn.Sequential): def __init__(self, backbone, position_embedding): super().__init__(backbone, position_embedding) def forward(self, tensor_list: NestedTensor): # xs为{mask,[2,24,24],tensor_list,[2,2048,24,24]},self[0]即为Backbone中resnet50输出结果 xs = self[0](tensor_list) out: List[NestedTensor] = [] pos = [] for name, x in xs.items(): out.append(x) # position encoding # 位置编码使用的是PositionEmbeddingSine() pos.append(self[1](x).to(x.tensors.dtype)) # 其中out为{mask,[2,24,24],tensor_list,[2,2048,24,24]},pos为根据mask得到的PE,shape为[2,256,24,24] return out, pos
再者是降维,在数据送入 transformer 之前,即:
class DETR(nn.Module): """ This is the DETR module that performs object detection """ def __init__(self, backbone, transformer, num_classes, num_queries, aux_loss=False): """ Initializes the model. Parameters: backbone: torch module of the backbone to be used. See backbone.py transformer: torch module of the transformer architecture. See transformer.py num_classes: number of object classes num_queries: number of object queries, ie detection slot. This is the maximal number of objects DETR can detect in a single image. For COCO, we recommend 100 queries. aux_loss: True if auxiliary decoding losses (loss at each decoder layer) are to be used. """ super().__init__() self.num_queries = num_queries self.transformer = transformer hidden_dim = transformer.d_model self.class_embed = nn.Linear(hidden_dim, num_classes + 1) self.bbox_embed = MLP(hidden_dim, hidden_dim, 4, 3) self.query_embed = nn.Embedding(num_queries, hidden_dim) self.input_proj = nn.Conv2d(backbone.num_channels, hidden_dim, kernel_size=1) self.backbone = backbone self.aux_loss = aux_loss def forward(self, samples: NestedTensor): """ The forward expects a NestedTensor, which consists of: - samples.tensor: batched images, of shape [batch_size x 3 x H x W] - samples.mask: a binary mask of shape [batch_size x H x W], containing 1 on padded pixels It returns a dict with the following elements: - "pred_logits": the classification logits (including no-object) for all queries. Shape= [batch_size x num_queries x (num_classes + 1)] - "pred_boxes": The normalized boxes coordinates for all queries, represented as (center_x, center_y, height, width). These values are normalized in [0, 1], relative to the size of each individual image (disregarding possible padding). See PostProcess for information on how to retrieve the unnormalized bounding box. - "aux_outputs": Optional, only returned when auxilary losses are activated. It is a list of dictionnaries containing the two above keys for each decoder layer. """ if isinstance(samples, (list, torch.Tensor)): samples = nested_tensor_from_tensor_list(samples) # features:{mask,[2,24,24],tensor_list,[2,2048,24,24]},pos:[2,256,24,24] features, pos = self.backbone(samples) # src:[2,2048,24,24], mask:[2,24,24] src, mask = features[-1].decompose() assert mask is not None # 将数据送入transformer # self.input_proj() 将src降维:[2,2048,24,24] -> [2,256,24,24] # query_embed由nn.Embedding初始化,shape[100,256] hs = self.transformer(self.input_proj(src), mask, self.query_embed.weight, pos[-1])[0] outputs_class = self.class_embed(hs) outputs_coord = self.bbox_embed(hs).sigmoid() out = {'pred_logits': outputs_class[-1], 'pred_boxes': outputs_coord[-1]} if self.aux_loss: out['aux_outputs'] = self._set_aux_loss(outputs_class, outputs_coord) return out
需要将 backbone 网络输出的 Feature Map 使用 1x1 的线性层降维,得到与 mask 相同的 channel,即 [2,256,24,24]
class Transformer(nn.Module): def __init__(self, d_model=512, nhead=8, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=2048, dropout=0.1, activation="relu", normalize_before=False, return_intermediate_dec=False): super().__init__() encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout, activation, normalize_before) encoder_norm = nn.LayerNorm(d_model) if normalize_before else None self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm) decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout, activation, normalize_before) decoder_norm = nn.LayerNorm(d_model) self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm, return_intermediate=return_intermediate_dec) self._reset_parameters() self.d_model = d_model self.nhead = nhead def _reset_parameters(self): for p in self.parameters(): if p.dim() > 1: nn.init.xavier_uniform_(p) def forward(self, src, mask, query_embed, pos_embed): # flatten NxCxHxW to HWxNxC bs, c, h, w = src.shape # 将降维后的src转换维度[NxCxHxW]->[HWxNxC],即[2,256,24,24]->[576,2,256] src = src.flatten(2).permute(2, 0, 1) # 将位置编码转换维度[NxCxHxW]->[HWxNxC],即[2,256,24,24]->[576,2,256] pos_embed = pos_embed.flatten(2).permute(2, 0, 1) # 词嵌入向量由[num_embeddings, embedding_dim]->[num_embeddings, N, embedding_dim] # 即[100,256]->[100,2,256] query_embed = query_embed.unsqueeze(1).repeat(1, bs, 1) # 将mask[2,24,24]->[2,576] mask = mask.flatten(1) tgt = torch.zeros_like(query_embed) memory = self.encoder(src, src_key_padding_mask=mask, pos=pos_embed) hs = self.decoder(tgt, memory, memory_key_padding_mask=mask, pos=pos_embed, query_pos=query_embed) return hs.transpose(1, 2), memory.permute(1, 2, 0).view(bs, c, h, w)
最后是 reshape,将降维后的 H 和 W 维度合并,然后进行维度转化 [NxCxHxW]->[HWxNxC], 即 [2,256,24,24]->[576,2,256],此时输入 transformer 的 Feature Map 的 shape 转换为 [576,2,256],同时由 mask 生成的位置编码(pos)维度也转化为 [576,2,256]。词嵌入向量由 [num_embeddings, embedding_dim]->[num_embeddings, N, embedding_dim],即 [100,256]->[100,2,256]( 对于 torch.nn.Embedding 的理解可以看这篇文章),mask 的也将 H 和 W 维度合并,shape 由 [2,24,24] 转化为[2,576]。
到这里大致把输入 transformer 之前的数据处理过程理清,接下来是 transformer 部分,不了解 transformer 的可以看一下我的另一篇文章 transformer 学习笔记。与 NLP 中的 transformer 有一定的区别,具体可见 DETR 代码学习笔记(二)。 > 本文由简悦 SimpRead 转码
- Catalog
- About
0%