Skip to content

Commit

Permalink
run multiple workflows closes #53; updated merge_sources plugin to lo…
Browse files Browse the repository at this point in the history
…ok up for sources with dataset attribute; updapted clean_source plugin to maintain previus dataset_name; updated nvd_source plugin to include 2023 data
  • Loading branch information
epicosy committed Jan 12, 2023
1 parent d152692 commit 7e33fc7
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 145 deletions.
48 changes: 25 additions & 23 deletions tenet/controllers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def run(self):
self.app.workdir.mkdir(parents=True)
self.app.log.info(f"Created workdir {self.app.workdir}")

dataset = self._parse_dataset()
pipeline = self._parse_pipeline()
dataset_path = self._parse_dataset_path()
pipeline_path = self._parse_pipeline_path()

self.app.threads = self.app.pargs.threads if self.app.pargs.threads else self.app.get_config('local_threads')

Expand All @@ -74,61 +74,63 @@ def run(self):
#if not os.path.exists(self.app.bind):
# self.app.log.error(f"Bind path {self.app.bind} is not a valid directory path.")
# exit(1)
self.app.extend('executed_edges', {})

with pipeline.open(mode="r") as stream:
with pipeline_path.open(mode="r") as stream:
try:
self.app.log.info(f"Parsing pipeline file: {pipeline}")
self.app.log.info(f"Parsing pipeline file: {pipeline_path}")
pipeline = parse_pipeline(yaml.safe_load(stream))
self.app.extend('pipeline', pipeline)
except schema.SchemaError as se:
self.app.log.error(str(se))
exit(1)
raise TenetError(str(se))

self.app.extend('pipeline', pipeline)
workflow_handler = self.app.handler.get('handlers', 'workflow', setup=True)
workflow_handler.load(dataset)
workflow_handler(dataset)
for name, layers in pipeline.workflows.items():
# todo: check if this path is necessary
workflow_handler.load(name, dataset_path, layers)
dataset, dataset_path = workflow_handler(dataset_path)

def _parse_dataset(self) -> Path:
def _parse_dataset_path(self) -> Path:
if self.app.pargs.dataset:
dataset = Path(self.app.pargs.dataset)
dataset_path = Path(self.app.pargs.dataset)
else:
dataset_files = [f for f in Path(self.app.pargs.workdir).iterdir() if f.suffix in ['.csv', '.tsv']]

if len(dataset_files) == 0:
raise TenetError('No dataset file found in the specified working directory')
elif len(dataset_files) == 1:
self.app.log.info(f"Using {dataset_files[0]} as dataset...")
dataset = Path(dataset_files[0])
dataset_path = Path(dataset_files[0])
else:
option = [
inquirer.List('dataset', message="Select the dataset you want to use:", choices=dataset_files,
),
]
answer = inquirer.prompt(option)
dataset = Path(answer["dataset"])
dataset_path = Path(answer["dataset"])

if not dataset.exists():
raise TenetError(f"Dataset {dataset} not found.")
if not dataset_path.exists():
raise TenetError(f"Dataset {dataset_path} not found.")

return dataset
return dataset_path

def _parse_pipeline(self) -> Path:
def _parse_pipeline_path(self) -> Path:
if self.app.pargs.file:
pipeline = Path(self.app.pargs.file)
pipeline_path = Path(self.app.pargs.file)
else:
pipeline_files = [f for f in Path(self.app.pargs.workdir).iterdir() if f.suffix in ['.yml', '.yaml']]
if len(pipeline_files) == 0:
raise TenetError('No pipeline file found in the specified working directory')
elif len(pipeline_files) == 1:
self.app.log.info(f"Using {pipeline_files[0]} as pipeline...")
pipeline = Path(pipeline_files[0])
pipeline_path = Path(pipeline_files[0])
else:
option = [inquirer.List('pipeline_files', message="Select the pipeline you want to run:",
choices=pipeline_files)]
answer = inquirer.prompt(option)
pipeline = Path(answer["pipeline_files"])
pipeline_path = Path(answer["pipeline_files"])

if not pipeline.exists():
raise TenetError(f"File {pipeline} not found.")
if not pipeline_path.exists():
raise TenetError(f"File {pipeline_path} not found.")

return pipeline
return pipeline_path
80 changes: 21 additions & 59 deletions tenet/data/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,12 @@

from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, AnyStr, Any, Tuple, Union, Callable
from typing import Dict, Any, Tuple, Callable
from schema import Schema, And, Use, Optional, Or

from tenet.core.exc import TenetError
from tenet.utils.misc import random_id

_container = Schema(
And({'name': str, 'image': str, Optional('output', default=None): str,
'cmds': Schema(And([str], Use(lambda cmds: [ContainerCommand(org=cmd) for cmd in cmds])))},
Use(lambda c: Container(**c))))

_plugin = Schema(
And({'name': str, Optional('kwargs', default={}): dict},
Use(lambda p: Plugin(**p))))
Expand All @@ -28,11 +23,13 @@

_nodes = Schema(
And(
[Or({'container': _container}, {'plugin': _plugin})],
[{'plugin': _plugin}],
Use(lambda n: {v.name: v for el in n for k, v in el.items()})
)
)

_workflows = Schema(And({str: list}, Use(lambda w: {k: v for k, v in w.items()})))


@dataclass
class Connector:
Expand Down Expand Up @@ -155,44 +152,11 @@ def __str__(self):
return self.parsed if self.parsed else self.org


@dataclass
class Container:
"""
Docker container
"""
name: str
image: str
cmds: List[ContainerCommand]
output: str

def find_placeholders(self, skip: bool = True):
"""
Looks up for and returns the placeholders in the commands.
"""
matches = []

def match_placeholder(string: str):
match = re.findall("\{(p\d+)\}", string)

if match:
matches.extend(match)

return match

for cmd in self.cmds:
if match_placeholder(cmd.org):
cmd.skip = skip

match_placeholder(str(self.output))

return set(matches)


@dataclass
class Layer:
edges: Dict[str, Edge]

def traverse(self, nodes: Dict[str, Union[Plugin, Container]]):
def traverse(self, nodes: Dict[str, Plugin]):
return [(nodes[edge.node], edge) for edge in self.edges.values()]


Expand Down Expand Up @@ -252,10 +216,10 @@ class Pipeline:
"""
layers: dict
nodes: dict
workflow: list
workflows: dict

def unpack(self):
return {name: edge for l_name, layer in self.layers.items() if l_name in self.workflow for name, edge in layer.edges.items()}
def unpack(self, workflow: str):
return {name: edge for l_name, layer in self.layers.items() if l_name in self.workflows[workflow] for name, edge in layer.edges.items()}

def match(self, placeholders: list, sink_attrs: list):
"""
Expand All @@ -266,30 +230,28 @@ def match(self, placeholders: list, sink_attrs: list):
return False
return True

def link(self, node_handlers: dict) -> Connectors:
def link(self, workflow: str, node_handlers: dict, connectors: Connectors = None) -> Connectors:
"""
creates the respective connectors
"""
connectors = Connectors()
sources = {}
if connectors is None:
connectors = Connectors()
sources = {}
else:
sources = connectors.sources.copy()

for _, edge in self.unpack().items():
for _, edge in self.unpack(workflow).items():
if edge.sources:
if edge.name not in connectors.sources:
connectors.sources[edge.name] = {}

# If node has attributes, init connector with the value of the attributes
sources[edge.name] = {attr: getattr(node_handlers[edge.name], attr, None) for attr in edge.sources}
if edge.name in sources:
sources[edge.name] = {attr: getattr(node_handlers[edge.name], attr, None) for attr in edge.sources}

for source, links in edge.sinks.items():
if source == edge.name:
if isinstance(self.nodes[edge.node], Container):
placeholders = self.nodes[edge.node].find_placeholders(skip=False)

if not self.match(placeholders, list(links.values())):
raise ValueError(f"Placeholders are not matching attributes in the container commands.")
else:
raise ValueError(f"Plugin {source} cannot reference itself.")
raise ValueError(f"Plugin {source} cannot reference itself.")

if source not in sources:
raise TenetError(f"source {source} must be defined before sink {edge.name}")
Expand All @@ -305,14 +267,14 @@ def link(self, node_handlers: dict) -> Connectors:

return connectors

def walk(self):
def walk(self, layers: list):
"""
Walks the edges and returns list with the traversal. Initializes edge connectors.
"""
# TODO: traversal include parallel execution
traversal = []

for el in self.workflow:
for el in layers:
if el in self.layers:
traversal.extend(self.layers[el].traverse(self.nodes))
elif el in self.nodes:
Expand All @@ -331,5 +293,5 @@ def parse_pipeline(yaml: dict) -> Pipeline:
:return: Pipeline object
"""

return Schema(And({'nodes': _nodes, 'layers': _layers, 'workflow': list},
return Schema(And({'nodes': _nodes, 'layers': _layers, 'workflows': _workflows},
Use(lambda pipe: Pipeline(**pipe)))).validate(yaml)
2 changes: 1 addition & 1 deletion tenet/handlers/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def get_blocks_from_diff(self, diff_text: str, extensions: list = None) -> List[
num_paths = len(diff_path_bound)
diff_path_bound.append(len(lines))
blocks = []

# TODO: we want this too be more flexible, adapt
extensions = extensions if extensions else self.app.get_config('proj_ext')

for path_id in range(num_paths):
Expand Down
5 changes: 4 additions & 1 deletion tenet/handlers/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ def get(self, attr: str, default: Any = None):
try:
return self.app.connectors[self.edge.name, attr]
except KeyError as ke:
self.app.log.warning(ke)
self.app.log.warning(f"Key {ke} not found for edge {self.edge.name}")
return default

def get_sinks(self):
return self.app.connectors.sinks[self.edge.name]

def set(self, attr: str, value: Any, skip: bool = True):
self.app.connectors[self.edge.name, attr] = value

Expand Down
Loading

0 comments on commit 7e33fc7

Please sign in to comment.