-
Notifications
You must be signed in to change notification settings - Fork 130
/
utils.py
202 lines (154 loc) · 5.9 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# Copyright (c) 2018-present, Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
#
import hashlib
import os
import pathlib
import shutil
import sys
import time
import cv2
import numpy as np
import torch
def add_path():
Alphapose_path = os.path.abspath('joints_detectors/Alphapose')
hrnet_path = os.path.abspath('joints_detectors/hrnet')
trackers_path = os.path.abspath('pose_trackers')
paths = filter(lambda p: p not in sys.path, [Alphapose_path, hrnet_path, trackers_path])
sys.path.extend(paths)
def wrap(func, *args, unsqueeze=False):
"""
Wrap a torch function so it can be called with NumPy arrays.
Input and return types are seamlessly converted.
"""
# Convert input types where applicable
args = list(args)
for i, arg in enumerate(args):
if type(arg) == np.ndarray:
args[i] = torch.from_numpy(arg)
if unsqueeze:
args[i] = args[i].unsqueeze(0)
result = func(*args)
# Convert output types where applicable
if isinstance(result, tuple):
result = list(result)
for i, res in enumerate(result):
if type(res) == torch.Tensor:
if unsqueeze:
res = res.squeeze(0)
result[i] = res.numpy()
return tuple(result)
elif type(result) == torch.Tensor:
if unsqueeze:
result = result.squeeze(0)
return result.numpy()
else:
return result
def deterministic_random(min_value, max_value, data):
digest = hashlib.sha256(data.encode()).digest()
raw_value = int.from_bytes(digest[:4], byteorder='little', signed=False)
return int(raw_value / (2 ** 32 - 1) * (max_value - min_value)) + min_value
def alpha_map(prediction):
p_min, p_max = prediction.min(), prediction.max()
k = 1.6 / (p_max - p_min)
b = 0.8 - k * p_max
prediction = k * prediction + b
return prediction
def change_score(prediction, detectron_detection_path):
detectron_predictions = np.load(detectron_detection_path, allow_pickle=True)['positions_2d'].item()
pose = detectron_predictions['S1']['Directions 1']
prediction[..., 2] = pose[..., 2]
return prediction
class Timer:
def __init__(self, message, show=True):
self.message = message
self.elapsed = 0
self.show = show
def __enter__(self):
self.start = time.perf_counter()
def __exit__(self, exc_type, exc_val, exc_tb):
if self.show:
print(f'{self.message} --- elapsed time: {time.perf_counter() - self.start} s')
def calculate_area(data):
"""
Get the rectangle area of keypoints.
:param data: AlphaPose json keypoint format([x, y, score, ... , x, y, score]) or AlphaPose result keypoint format([[x, y], ..., [x, y]])
:return: area
"""
data = np.array(data)
if len(data.shape) == 1:
data = np.reshape(data, (-1, 3))
width = min(data[:, 0]) - max(data[:, 0])
height = min(data[:, 1]) - max(data[:, 1])
return np.abs(width * height)
def read_video(filename, fps=None, skip=0, limit=-1):
stream = cv2.VideoCapture(filename)
i = 0
while True:
grabbed, frame = stream.read()
# if the `grabbed` boolean is `False`, then we have
# reached the end of the video file
if not grabbed:
print('===========================> This video get ' + str(i) + ' frames in total.')
sys.stdout.flush()
break
i += 1
if i > skip:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
yield np.array(frame)
if i == limit:
break
def split_video(video_path):
stream = cv2.VideoCapture(video_path)
output_dir = os.path.dirname(video_path)
video_name = os.path.basename(video_path)
video_name = video_name[:video_name.rfind('.')]
save_folder = pathlib.Path(f'./{output_dir}/alpha_pose_{video_name}/split_image/')
shutil.rmtree(str(save_folder), ignore_errors=True)
save_folder.mkdir(parents=True, exist_ok=True)
total_frames = int(stream.get(cv2.CAP_PROP_FRAME_COUNT))
length = len(str(total_frames)) + 1
i = 1
while True:
grabbed, frame = stream.read()
if not grabbed:
print(f'Split totally {i + 1} images from video.')
break
save_path = f'{save_folder}/output{str(i).zfill(length)}.png'
cv2.imwrite(save_path, frame)
i += 1
saved_path = os.path.dirname(save_path)
print(f'Split images saved in {saved_path}')
return saved_path
def evaluate(test_generator, model_pos, action=None, return_predictions=False):
"""
Inference the 3d positions from 2d position.
:type test_generator: UnchunkedGenerator
:param test_generator:
:param model_pos: 3d pose model
:param return_predictions: return predictions if true
:return:
"""
joints_left, joints_right = list([4, 5, 6, 11, 12, 13]), list([1, 2, 3, 14, 15, 16])
with torch.no_grad():
model_pos.eval()
N = 0
for _, batch, batch_2d in test_generator.next_epoch():
inputs_2d = torch.from_numpy(batch_2d.astype('float32'))
if torch.cuda.is_available():
inputs_2d = inputs_2d.cuda()
# Positional model
predicted_3d_pos = model_pos(inputs_2d)
if test_generator.augment_enabled():
# Undo flipping and take average with non-flipped version
predicted_3d_pos[1, :, :, 0] *= -1
predicted_3d_pos[1, :, joints_left + joints_right] = predicted_3d_pos[1, :, joints_right + joints_left]
predicted_3d_pos = torch.mean(predicted_3d_pos, dim=0, keepdim=True)
if return_predictions:
return predicted_3d_pos.squeeze(0).cpu().numpy()
if __name__ == '__main__':
os.chdir('..')
split_video('outputs/kobe.mp4')