-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
148 lines (120 loc) · 5.16 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import os
from argparse import ArgumentParser
from pathlib import Path
from configs.make_config import get_config
from optimizer.optimize import optimize
from person_box_detector.inference_od import detect_persons
from pipeline.lib.defs import Job
from pipeline.lib.defs import Pipeline
from post_processor.audio_adder import add_audio
from post_processor.overlay import overlay
from pre_processor.data_fetcher import fetch_data
from pre_processor.sampler import sample
from splitter.splitter import split
# from person_box_detector.inference_bkp import detect_persons
if os.getenv("ENVIRONMENT") == "test":
DEBUG = True
else:
DEBUG = False
def wrapper_function(pipeline: Pipeline) -> None:
pipeline()
def clear_files():
data_path = Path("./data/")
for folder in data_path.iterdir():
if not folder.is_dir():
continue
for stuff in folder.iterdir():
if (
stuff.name.endswith(".json")
or stuff.name.endswith(".mp4")
or stuff.name.endswith(".csv")
or stuff.name.endswith("feather")
or stuff.name.endswith("avi")
):
stuff.unlink(missing_ok=True)
if stuff.is_dir():
for ele in stuff.iterdir():
if ele.is_dir():
for i in ele.iterdir():
i.unlink(missing_ok=True)
ele.rmdir()
else:
ele.unlink(missing_ok=True)
stuff.rmdir()
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument(
"--config_file_path", type=str, default="./configs/config.hjson"
)
args = parser.parse_args()
"""The basic procedure to compose a pipeline is done by doing the following steps:
1. Read the config from the file under - './configs/*.hjson' - make a dict of Config objects
2. Create the steps using the Job objects - each Job object requires a function and a Config object
3. Put the above Job objects in any kind of iterable or collection (like List or Tuple) following a particular order.
4. Make a Pipeline object using the list of Jobs created in Step - 3"""
if not DEBUG:
clear_files()
# Step-1: get all details from config file
dict_of_configs = get_config(path_to_config=args.config_file_path)
# Step-2: declare jobs
fetch_data_step = Job(func=fetch_data, conf=dict_of_configs["fetch_data"])
sample_step = Job(func=sample, conf=dict_of_configs["sample"])
detect_persons_step = Job(
func=detect_persons, conf=dict_of_configs["detect_persons"]
)
split_step = Job(func=split, conf=dict_of_configs["split"])
optimization_step = Job(func=optimize, conf=dict_of_configs["optimization"])
overlay_step = Job(func=overlay, conf=dict_of_configs["overlay"])
audio_adder_step = Job(func=add_audio, conf=dict_of_configs["audio_adder"])
# upload_step = Job(func=upload_video, conf=collection_of_configs['upload'])
# Step-3: the jobs below are put in a certain order for the pipeline
list_of_jobs = (
fetch_data_step,
sample_step,
detect_persons_step,
split_step,
optimization_step,
overlay_step,
audio_adder_step
# upload_step
)
# Step-4: instantiate a pipeline object
pipeline_1 = Pipeline(list_of_steps=list_of_jobs)
# execute pipeline
pipeline_1()
# clear intermediate data created by the pipeline
if not DEBUG:
pipeline_1.clear()
"""below we see an example of how we can instantiate a pipeline with just a first step"""
# pipeline_1 = Pipeline(start_step=fetch_data_step)
# # now we could also add steps to the pipeline individually
# pipeline_1.add_job(sample_step)
# pipeline.add_job(detect_persons_step)
# pipeline.add_job(split_step)
# pipeline.add_job(optimization_step)
# pipeline.add_job(overlay_step)
# pipeline.add_job(upload_step)
# # execute pipeline
# pipeline_1()
"""Below we make another pipeline following the exact same steps described before.
We do this to check for parallel execution - ideally a new pipeline means a new config"""
# collection_of_configs_2 = get_config(path_to_config="./configs/config.json")
# # declare jobs
# fetch_data_step_2 = Job(func=fetch_data, conf=collection_of_configs_2['fetch_data'])
# sample_step_2 = Job(func=sample, conf=collection_of_configs_2['sample'])
# detect_persons_step_2 = Job(func=detect_persons, conf=collection_of_configs_2['detect_persons'])
# split_step_2 = Job(func=split, conf=collection_of_configs_2['split'])
# # the jobs below are put in a certain order for the pipeline
# list_of_jobs_2 = [
# fetch_data_step_2,
# sample_step_2,
# detect_persons_step_2,
# ]
# # declare another pipeline
# pipeline_2 = Pipeline(list_of_steps=list_of_jobs_2)
# # bundle the pipelines together
# collection_of_pipelines = [pipeline_1, pipeline_2]
# # run the pipelines in parallel
# from multiprocessing import Pool
# with Pool() as p:
# res = p.map(wrapper_function, collection_of_pipelines)