This repository has been archived by the owner on Aug 27, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
89 lines (70 loc) · 2.82 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from src.converter import LinkMLConverter
from src.model.benchmark import Benchmark
import os
import argparse
from src.validation import Validator, ValidationError
from src.workflow.snakemake.snakemake import SnakemakeEngine
##
## This is used just for testing the omni_workflow module
## Snakemake file generation happens in Snakefile and snakemake.py
##
def main(benchmark_file):
benchmark = Benchmark(benchmark_file)
converter = benchmark.get_converter()
print(benchmark.get_definition())
stages = converter.get_stages()
for stage_id in stages:
stage = stages[stage_id]
stage_name = stage["name"]
print("Stage", stage_name if stage_name else stage_id)
modules_in_stage = converter.get_modules_by_stage(stage)
print(" ", stage_name, "with modules", modules_in_stage.keys(), "\n")
print(" Implicit inputs:\n", converter.get_stage_implicit_inputs(stage))
print(
" Explicit inputs:\n",
[
converter.get_explicit_inputs(i)
for i in converter.get_stage_implicit_inputs(stage)
],
)
print(" Outputs\n", converter.get_stage_outputs(stage))
print("------")
for module_id in modules_in_stage:
module = modules_in_stage[module_id]
module_name = module["name"]
print(" Module", module_name if module_name else module_id)
print(" Repo:", converter.get_module_repository(module))
print(" Excludes:", converter.get_module_excludes(module))
print(" Params:", converter.get_module_parameters(module))
print("------")
print("------")
nodes = benchmark.get_nodes()
print("All nodes:", nodes)
execution_paths = benchmark.get_execution_paths()
print("All execution paths:", execution_paths)
outputs_paths = sorted(benchmark.get_output_paths())
print("All output paths:", outputs_paths)
benchmark.plot_benchmark_graph()
# Serialize workflow to Snakefile
workflow = SnakemakeEngine()
workflow.run_workflow(benchmark)
# workflow.run_node_workflow(nodes[2], input_dir="out/data/D1/default", dataset="D1")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Test OmniWorkflow converter.")
parser.add_argument(
"--benchmark_file",
default="data/Benchmark_001.yaml",
type=str,
help="Location of the benchmark file",
)
args = parser.parse_args()
benchmark_file = args.benchmark_file
if os.path.exists(benchmark_file):
try:
main(benchmark_file)
except ValidationError as e:
print("Validation failed: \n")
for error in e.errors:
print(error)
else:
raise RuntimeError(f"Benchmark file {benchmark_file} does not exist.")