-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_ours.py
143 lines (123 loc) · 4.99 KB
/
test_ours.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import argparse
import os
import os.path as osp
import mmcv
from mmcv import Config, DictAction
from mmcv.cnn import fuse_conv_bn
from mmcv.parallel import MMDataParallel, MMDistributedDataParallel
from mmcv.runner import get_dist_info, init_dist, load_checkpoint
from capeformer import * # noqa
from capeformer.datasets import build_dataset
from mmpose.apis import multi_gpu_test, single_gpu_test
from mmpose.core import wrap_fp16_model
from mmpose.datasets import build_dataloader
from mmpose.models import build_posenet
import warnings
from mpformer import *
warnings.filterwarnings('ignore')
def parse_args():
parser = argparse.ArgumentParser(description='mmpose test model')
parser.add_argument('config', default='mpformer/cfg/s1.py', help='test config file path')
parser.add_argument('checkpoint', default=None, help='checkpoint file')
parser.add_argument('--work-dir', default='output_meta_test', help='the dir to save logs and models')
parser.add_argument('--out', help='output result file')
parser.add_argument('--fuse-conv-bn', action='store_true',
help='Whether to fuse conv and bn, this will slightly increase'
'the inference speed')
parser.add_argument(
'--eval',
default=None,
nargs='+',
help='evaluation metric, which depends on the dataset,'
' e.g., "mAP" for MSCOCO')
parser.add_argument(
'--gpu_collect',
action='store_true',
help='whether to use gpu to collect results')
parser.add_argument('--tmpdir', help='tmp dir for writing some results')
parser.add_argument(
'--cfg-options',
nargs='+',
action=DictAction,
default={},
help='override some settings in the used config, the key-value pair '
'in xxx=yyy format will be merged into config file. For example, '
"'--cfg-options model.backbone.depth=18 model.backbone.with_cp=True'")
parser.add_argument(
'--launcher',
choices=['none', 'pytorch', 'slurm', 'mpi'],
default='pytorch',
help='job launcher')
parser.add_argument('--local_rank', type=int, default=0)
args = parser.parse_args()
if 'LOCAL_RANK' not in os.environ:
os.environ['LOCAL_RANK'] = str(args.local_rank)
return args
def merge_configs(cfg1, cfg2):
# Merge cfg2 into cfg1
# Overwrite cfg1 if repeated, ignore if value is None.
cfg1 = {} if cfg1 is None else cfg1.copy()
cfg2 = {} if cfg2 is None else cfg2
for k, v in cfg2.items():
if v:
cfg1[k] = v
return cfg1
def main():
args = parse_args()
cfg = Config.fromfile(args.config)
if args.cfg_options is not None:
cfg.merge_from_dict(args.cfg_options)
# set cudnn_benchmark
if cfg.get('cudnn_benchmark', False):
torch.backends.cudnn.benchmark = True
cfg.model.pretrained = None
cfg.data.test.test_mode = True
args.work_dir = osp.join(f'./{args.work_dir}', osp.splitext(osp.basename(args.config))[0])
mmcv.mkdir_or_exist(osp.abspath(args.work_dir))
# init distributed env first, since logger depends on the dist info.
if args.launcher == 'none':
distributed = False
else:
distributed = True
init_dist(args.launcher, **cfg.dist_params)
# build the dataloader
dataset = build_dataset(cfg.data.test, dict(test_mode=True))
dataloader_setting = dict(
samples_per_gpu=1,
workers_per_gpu=cfg.data.get('workers_per_gpu', 1),
dist=distributed,
shuffle=False,
drop_last=False)
dataloader_setting = dict(dataloader_setting,
**cfg.data.get('test_dataloader', {}))
data_loader = build_dataloader(dataset, **dataloader_setting)
# build the model and load checkpoint
model = build_posenet(cfg.model)
fp16_cfg = cfg.get('fp16', None)
if fp16_cfg is not None:
wrap_fp16_model(model)
if os.path.exists(args.checkpoint):
load_checkpoint(model, args.checkpoint, map_location='cpu')
else:
print(f'No param is loaded!')
if args.fuse_conv_bn:
model = fuse_conv_bn(model)
if not distributed:
model = MMDataParallel(model, device_ids=[0])
outputs = single_gpu_test(model, data_loader)
else:
model = MMDistributedDataParallel(
model.cuda(),
device_ids=[torch.cuda.current_device()],
broadcast_buffers=False)
outputs = multi_gpu_test(model, data_loader, args.tmpdir, args.gpu_collect)
rank, _ = get_dist_info()
eval_config = cfg.get('evaluation', {})
eval_config = merge_configs(eval_config, dict(metric=args.eval))
if rank == 0:
results = dataset.evaluate([o['major'] for o in outputs], **eval_config)
print()
print(f'\tmPCK: {results["mPCK"]:.2%}')
print()
if __name__ == '__main__':
main()