diff --git a/pyproject.toml b/pyproject.toml index 1145d4c..a2f9e2f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,7 +53,8 @@ dependencies = [ "kaleido==0.2.1", "google-cloud-storage", "plotly~=5.11.0", - "inquirer~=3.1.2" + "inquirer~=3.1.2", + "networkx~=3.0" ] [project.optional-dependencies] diff --git a/tenet/data/plugin.py b/tenet/data/plugin.py new file mode 100644 index 0000000..c1b8c98 --- /dev/null +++ b/tenet/data/plugin.py @@ -0,0 +1,58 @@ +from dataclasses import dataclass, field + + +@dataclass +class Sources: + assets: dict = field(default_factory=lambda: {}) + modified: dict = field(default_factory=lambda: {}) + + def is_init(self): + return all(self.modified.values()) + + def __len__(self): + return len(self.assets) + + def __getitem__(self, key: str): + return self.assets[key] + + def __setitem__(self, key: str, value): + self.modified[key] = True + self.assets[key] = value + + def __iter__(self): + return iter(self.assets) + + def keys(self): + return self.assets.keys() + + def items(self): + return self.assets.items() + + def values(self): + return self.assets.values() + + +@dataclass +class Sinks: + assets: dict = field(default_factory=lambda: {}) + + def __len__(self): + return len(self.assets) + + def __getitem__(self, key: str): + return self.assets[key] + + def __setitem__(self, key: str, value): + self.assets[key] = value + + def __iter__(self): + return iter(self.assets) + + def keys(self): + return self.assets.keys() + + def items(self): + return self.assets.items() + + def values(self): + return self.assets.values() diff --git a/tenet/data/schema.py b/tenet/data/schema.py index 149df10..1913be9 100644 --- a/tenet/data/schema.py +++ b/tenet/data/schema.py @@ -1,40 +1,23 @@ -import re +import networkx as nx from dataclasses import dataclass, field from pathlib import Path -from typing import Dict, Any, Tuple, Callable -from schema import Schema, And, Use, Optional, Or +from typing import Dict, Any, Tuple, Callable, List, Union, OrderedDict -from tenet.core.exc import TenetError -from tenet.utils.misc import random_id - -_plugin = Schema( - And({'name': str, Optional('kwargs', default={}): dict}, - Use(lambda p: Plugin(**p)))) +from schema import Schema, And, Optional, Or -_edges = Schema( - And([{str: {'node': str, Optional('sinks', default={}): dict, Optional('sources', default=[]): list, - Optional('placeholders', default={}): dict, Optional('kwargs', default={}): dict}}], - Use(lambda els: {k: Edge(name=k, sinks=v['sinks'], sources=v['sources'], kwargs=v['kwargs'], node=v['node'], - placeholders={t: Placeholder(tag=t, value=p, node=k) for t, p in v['placeholders'].items()}) for el in els for k, v in el.items()})) -) - -_layers = Schema(And({str: _edges}, Use(lambda l: {k: Layer(edges=v) for k, v in l.items()}))) +from tenet.utils.misc import random_id -_nodes = Schema( - And( - [{'plugin': _plugin}], - Use(lambda n: {v.name: v for el in n for k, v in el.items()}) - ) -) -_workflows = Schema(And({str: list}, Use(lambda w: {k: v for k, v in w.items()}))) +_nodes = Schema(And([{str: {'plugin': str, Optional('kwargs', default={}): dict}}])) +_edges = Schema(And([{str: {'at': str, 'to': str, Optional('links', default={}): dict}}])) +_links = Schema(And({str: {str: {str: [str]}}})) @dataclass class Connector: """ - Data class representing edge connections + Data class representing node connections """ source: str sink: str @@ -42,32 +25,20 @@ class Connector: links: Dict[str, str] = field(default_factory=lambda: {}) attrs: Dict[str, Any] = field(default_factory=lambda: {}) - def map_placeholders(self): - """ - Looks for placeholders in the attributes and returns them with their associated values. - """ - matches = {} - - for source_attr, sink_attr in self.links.items(): - if re.search("(p\d+)", sink_attr): - matches[sink_attr] = self.attrs[source_attr] - - return matches - def __getitem__(self, key: str): for source_attr, sink_attr in self.links.items(): - if key == sink_attr: - return self.attrs[source_attr] + return self.attrs[sink_attr] # in case the key is used inside the same plugin if key == source_attr: return self.attrs[source_attr] - raise ValueError(f'Key {key} for sink {self.sink} not found in links. Set it in {self.source} source.') + raise ValueError(f"Key '{key}' for sink '{self.sink}' not found in links. Set it in '{self.source}' source.") def __setitem__(self, key: str, value: Any): - if key in self.attrs: - self.attrs[key] = value + # TODO: check if 'if' is necessary + # if key in self.attrs: + self.attrs[key] = value def is_init(self) -> bool: """ @@ -94,70 +65,181 @@ def match(self, attr: str, kind: Any = None): @dataclass -class Edge: +class ContainerCommand: + """ + Data object representing the container command. + """ + org: str + parsed: str = None + skip: bool = True + parse_fn: Callable = None + tag: str = None + + def __str__(self): + return self.parsed if self.parsed else self.org + + +@dataclass +class Node: """ Data object representing connections between nodes """ name: str - node: str - sinks: dict = field(default_factory=lambda: {}) - sources: list = field(default_factory=lambda: []) - placeholders: dict = field(default_factory=lambda: {}) + plugin: str + layer: str = None kwargs: dict = field(default_factory=lambda: {}) - @property - def connectors(self): - cons = {} - cons.update(self.sinks) - # cons.update({l: k for k, v in self.sources.items() for l in v}) - - return cons - @dataclass -class Plugin: +class Edge: """ - Code extensions + Data object representing connections between nodes """ name: str - kwargs: dict = field(default_factory=lambda: {}) + at: Node + to: Node + links: dict = field(default_factory=lambda: {}) + + def to_list(self) -> list: + return [self.at, self.to] @dataclass -class Placeholder: - tag: str - node: str - value: Any +class Layer: + name: str + edges: Dict[str, Edge] - def __str__(self): - return f"{self.node} - {self.tag}={self.value}" + def to_list(self,): + return [(e.at, e.to) for e in self.edges.values()] + + def get_nodes(self): + return [n for e in self.edges.values() for n in [e.at, e.to]] + + def set_nodes_layer(self, at_node: bool = True, to_node: bool = True): + for edge in self.edges.values(): + if at_node and not edge.at.layer: + edge.at.layer = self.name + if to_node and not edge.to.layer: + edge.to.layer = self.name @dataclass -class ContainerCommand: - """ - Data object representing the container command. - """ - org: str - parsed: str = None - skip: bool = True - placeholders: Dict[str, Placeholder] = field(default_factory=lambda: {}) - parse_fn: Callable = None - tag: str = None +class ParallelLayer: + name: str + tracks: Dict[str, List[Edge]] - def get_placeholders(self): - return {t: p.value for t, p in self.placeholders.items()} + def to_list(self) -> list: + return [(e.at, e.to) for t, v in self.tracks.items() for e in v] + + def set_nodes_layer(self, at_node: bool = True, to_node: bool = False): + for track in self.tracks.values(): + for edge in track: + # init layer for 'at nodes' only + if to_node and not edge.to.layer: + edge.to.layer = self.name + if at_node and not edge.at.layer: + edge.at.layer = self.name + + def get_nodes(self): + nodes = [] + for t, v in self.tracks.items(): + for e in v: + if e.at not in nodes: + nodes.append(e.at) + if e.to not in nodes: + nodes.append(e.to) + + return nodes + + +@dataclass +class DAGPath: + nodes: List[Union[Node, ParallelLayer]] + + def flatten(self) -> List[Node]: + res = [] + last_node = None + for el in self.nodes: + if isinstance(el, ParallelLayer): + res.extend([n for n in el.get_nodes() if n != last_node]) + else: + last_node = el + res.append(el) + + return res def __str__(self): - return self.parsed if self.parsed else self.org + return f"{' -> '.join([el.name for el in self.nodes])}" @dataclass -class Layer: - edges: Dict[str, Edge] +class Workflow: + name: str + layers: OrderedDict[str, Union[Layer, ParallelLayer]] + _nodes: Dict[str, Node] = field(default_factory=lambda: {}) + + def __str__(self): + return f"{' -> '.join([el for el in self.layers])}" + + @property + def nodes(self): + if not self._nodes: + last_layer = self.layers[next(reversed(self.layers))] + + if isinstance(last_layer, ParallelLayer): + last_layer.set_nodes_layer(at_node=False, to_node=True) + + for i, (l_name, layer) in enumerate(self.layers.items()): + layer.set_nodes_layer() + for node in layer.get_nodes(): + if node.name not in self._nodes: + self._nodes[node.name] = node + + return self._nodes + + def get_graph(self) -> nx.DiGraph: + print(f"Building graph for layers {list(self.layers.keys())}") + # set nodes + _ = self.nodes + graph = nx.DiGraph() + + for i, (l_name, layer) in enumerate(self.layers.items()): + if isinstance(layer, ParallelLayer): + graph.add_node(l_name, type='layer', layer=l_name) + for track in layer.tracks.values(): + if graph.has_node(track[-1].at.name): + graph.add_edge(track[-1].at.name, l_name) + + if i == 0 and track[-1].to.layer != l_name and not graph.has_node(track[-1].to.name): + graph.add_node(track[-1].to.name, type='node', layer=track[-1].to.layer) + graph.add_edge(l_name, track[-1].to.name) + else: + for e_name, edge in layer.edges.items(): + graph.add_node(edge.at.name, type='node', layer=l_name) + graph.add_node(edge.to.name, type='node', layer=l_name) + graph.add_edge(edge.at.name, edge.to.name) + + return graph + + def get_all_paths(self, graph: nx.DiGraph) -> List[DAGPath]: + if len(graph.edges) == 0: + for n, data in graph.nodes(data=True): + if data['type'] == 'layer': + return [DAGPath([self.layers[n]])] + else: + return [DAGPath([self.nodes[n]])] + + nodes = {k: v['type'] for k, v in graph.nodes(data=True)} + roots = [node for node in graph.nodes if graph.in_degree(node) == 0] + leaves = [node for node in graph.nodes if graph.out_degree(node) == 0] + paths = [] - def traverse(self, nodes: Dict[str, Plugin]): - return [(nodes[edge.node], edge) for edge in self.edges.values()] + for root in roots: + for leaf in leaves: + for path in nx.all_simple_paths(graph, root, leaf): + paths.append(DAGPath([self.nodes[n] if nodes[n] == 'node' else self.layers[n] for n in path])) + + return paths @dataclass @@ -168,22 +250,41 @@ class Connectors: sinks: Dict[str, Dict[str, Connector]] = field(default_factory=lambda: {}) sources: Dict[str, Dict[str, Connector]] = field(default_factory=lambda: {}) + def init_sinks(self, name: str): + if name not in self.sinks: + self.sinks[name] = {} + + def init_sources(self, name: str): + if name not in self.sources: + self.sources[name] = {} + + def init_links(self, at_node: str, to_node: str, links: dict): + connector = Connector(source=at_node, sink=to_node, attrs={}, links=links) + self.sources[at_node][to_node] = connector + self.sources[to_node][at_node] = connector + def __getitem__(self, sink_key: Tuple[str, str]): sink, key = sink_key + # todo: check this code if is used for connector in self.sinks[sink].values(): - if key in connector.links.values(): - return connector[key] + if connector.sink == sink: + for k, v in connector.links.items(): + if connector.sink == sink and key == v: + return connector[v] for connector in self.sources[sink].values(): - if key in connector.links.keys(): - return connector[key] + if connector.sink == sink: + for k, v in connector.links.items(): + if key == v: + return connector[v] def __setitem__(self, source_attr: Tuple[str, str], value: Any): source, attr = source_attr for name, connector in self.sources[source].items(): - connector[attr] = value + if attr in connector.links: + connector[connector.links[attr]] = value def has_values(self, source: str) -> bool: """ @@ -206,86 +307,10 @@ def has_source(self, node: str, attr: str = None): if attr in connector.attrs: return True return False - return node in self.sources - - -@dataclass -class Pipeline: - """ - Data object representing the pipeline - """ - layers: dict - nodes: dict - workflows: dict - - def unpack(self, workflow: str): - return {name: edge for l_name, layer in self.layers.items() if l_name in self.workflows[workflow] for name, edge in layer.edges.items()} - - def match(self, placeholders: list, sink_attrs: list): - """ - Checks whether the placeholders match sink attributes. - """ - for attr in sink_attrs: - if attr not in placeholders: - return False - return True - - def link(self, workflow: str, node_handlers: dict, connectors: Connectors = None) -> Connectors: - """ - creates the respective connectors - """ - if connectors is None: - connectors = Connectors() - sources = {} - else: - sources = connectors.sources.copy() - - for _, edge in self.unpack(workflow).items(): - if edge.sources: - if edge.name not in connectors.sources: - connectors.sources[edge.name] = {} - - # If node has attributes, init connector with the value of the attributes - if edge.name in sources: - sources[edge.name] = {attr: getattr(node_handlers[edge.name], attr, None) for attr in edge.sources} - - for source, links in edge.sinks.items(): - if source == edge.name: - raise ValueError(f"Plugin {source} cannot reference itself.") - - if source not in sources: - raise TenetError(f"source {source} must be defined before sink {edge.name}") - - if source in sources: - connector = Connector(source=source, sink=edge.name, attrs=sources[source], links=links) - connectors.sources[source][edge.name] = connector - - if edge.name not in connectors.sinks: - connectors.sinks[edge.name] = {} - - connectors.sinks[edge.name][source] = connector - - return connectors - - def walk(self, layers: list): - """ - Walks the edges and returns list with the traversal. Initializes edge connectors. - """ - # TODO: traversal include parallel execution - traversal = [] - - for el in layers: - if el in self.layers: - traversal.extend(self.layers[el].traverse(self.nodes)) - elif el in self.nodes: - traversal.append((self.nodes[el], None)) - else: - raise ValueError(f"{el} not found.") - - return traversal + return node in self.sources and self.sources[node] -def parse_pipeline(yaml: dict) -> Pipeline: +def parse_pipeline(yaml: dict): """ Parse the yaml file into piepline @@ -293,5 +318,6 @@ def parse_pipeline(yaml: dict) -> Pipeline: :return: Pipeline object """ - return Schema(And({'nodes': _nodes, 'layers': _layers, 'workflows': _workflows}, - Use(lambda pipe: Pipeline(**pipe)))).validate(yaml) + return Schema(And({'nodes': _nodes, 'edges': _edges, 'links': _links, + 'layers': Schema(And({str: Or(list, [list])})), + 'workflows': Schema(And({str: list}))})).validate(yaml) diff --git a/tenet/handlers/container.py b/tenet/handlers/container.py index 563f06a..9655028 100644 --- a/tenet/handlers/container.py +++ b/tenet/handlers/container.py @@ -82,7 +82,8 @@ def execute_cmd(self, cmd: ContainerCommand) -> bool: cmd.skip = True return True - def run_cmds(self, container_id: str, cmds: List[ContainerCommand]) -> Tuple[bool, List[CommandData]]: + def run_cmds(self, container_id: str, cmds: List[ContainerCommand], supress_err: bool = False) \ + -> Tuple[bool, List[CommandData]]: """ Run commands inside the specified container. """ @@ -104,7 +105,7 @@ def run_cmds(self, container_id: str, cmds: List[ContainerCommand]) -> Tuple[boo if cmd.parse_fn is not None: cmd_data.parsed_output = cmd.parse_fn(cmd_data.output) - if cmd_data.error or cmd_data.return_code != 0: + if (cmd_data.error or cmd_data.return_code != 0) and (not supress_err): return False, cmds_data return True, cmds_data diff --git a/tenet/handlers/github.py b/tenet/handlers/github.py index 83369aa..3aa16f9 100644 --- a/tenet/handlers/github.py +++ b/tenet/handlers/github.py @@ -185,6 +185,10 @@ def has_rate_available(self): @property def git_api(self): with self.lock: + if not self._tokens: + tokens = self.app.pargs.tokens.split(',') + self._tokens = deque(tokens, maxlen=len(tokens)) + if not self._git_api: self._git_api = Github(self._tokens[0]) self._tokens.rotate(-1) @@ -204,18 +208,6 @@ def git_api(self): with self.lock: self._git_api = None - @property - def tokens(self): - self._tokens.rotate(-1) - return self._tokens[0] - - @tokens.setter - def tokens(self, value: Union[str, list]): - if isinstance(value, str): - value = [value] - - self._tokens = deque(value, maxlen=len(value)) - @staticmethod def parse_commit_sha(commit_sha: Union[list, str]) -> list: commit_hashes = [] @@ -418,10 +410,15 @@ def get_commit_parents(commit: Commit) -> set: def get_commit_comments(self, commit: Commit, raise_err: bool = False) -> dict: comments, count = {}, 1 err_msg = None - commit_comments = [] try: - commit_comments = commit.get_comments() + for comment in commit.get_comments(): + comments[f'com_{count}'] = { + 'author': comment.user.login, + 'datetime': comment.created_at.strftime("%m/%d/%Y, %H:%M:%S"), + 'body': comment.body.strip() + } + count += 1 except RateLimitExceededException as rle: err_msg = f"Rate limit exhausted: {rle}" #except Exception: @@ -432,14 +429,6 @@ def get_commit_comments(self, commit: Commit, raise_err: bool = False) -> dict: raise TenetError(err_msg) self.app.log.error(err_msg) - # TODO: add check for rate limit - for comment in commit_comments: - comments[f'com_{count}'] = { - 'author': comment.user.login, - 'datetime': comment.created_at.strftime("%m/%d/%Y, %H:%M:%S"), - 'body': comment.body.strip() - } - count += 1 return comments diff --git a/tenet/handlers/node.py b/tenet/handlers/node.py index 9245346..858b100 100644 --- a/tenet/handlers/node.py +++ b/tenet/handlers/node.py @@ -4,9 +4,10 @@ import pandas as pd from cement import Handler -from tenet.core.exc import Skip +from tenet.core.exc import Skip, TenetError from tenet.core.interfaces import PluginsInterface -from tenet.data.schema import Edge +from tenet.data.plugin import Sources, Sinks +from tenet.data.schema import Node class NodeHandler(PluginsInterface, Handler): @@ -17,8 +18,10 @@ def __init__(self, **kw): super().__init__(**kw) self.path = None self.output = None - self.edge = None self.node = None + self.edge = None + self.sources = Sources() + self.sinks = Sinks() def load_dataset(self, suffix: str = '.csv', terminate: bool = False): """ @@ -33,8 +36,8 @@ def load_dataset(self, suffix: str = '.csv', terminate: bool = False): try: return pd.read_csv(str(self.output)) except pd.errors.EmptyDataError as ede: - self.app.log.warning(ede) - return self.output.open(mode='r').read() + self.app.log.warning(f"Empty Data Error: {ede}") + return pd.DataFrame() self.app.log.error("dataset not found") @@ -43,40 +46,47 @@ def load_dataset(self, suffix: str = '.csv', terminate: bool = False): return None + def check_sink(self, name: str): + if not self.sinks[name]: + raise TenetError(f"Sink '{name}' of '{self.node.name}' node not instantiated") + + # check if path exists + if isinstance(self.sinks[name], Path) and not self.sinks[name].exists(): + raise TenetError(f"Path '{self.sinks[name]}' for sink '{name}' of '{self.node.name}' node not found.") + def get(self, attr: str, default: Any = None): try: - return self.app.connectors[self.edge.name, attr] + self.sinks[attr] = self.app.connectors[self.node.name, attr] + self.check_sink(attr) + return self.app.connectors[self.node.name, attr] except KeyError as ke: - self.app.log.warning(f"Key {ke} not found for edge {self.edge.name}") - return default - - def get_sinks(self): - return self.app.connectors.sinks[self.edge.name] + if not default: + raise TenetError(f"Sink '{ke}' of '{self.node.name}' node not found") - def set(self, attr: str, value: Any, skip: bool = True): - self.app.connectors[self.edge.name, attr] = value + self.sinks[attr] = default - if self.has_dataset and self.app.connectors.has_source(self.edge.name) and \ - self.app.connectors.has_values(self.edge.name) and skip: - raise Skip(f"Connectors for source \"{self.edge.name}\" are instantiated and exist.") + def set(self, attr: str, value: Any): + self.app.connectors[self.node.name, attr] = value + self.sources[attr] = value - def load(self, edge: Edge, dataset_name: str, ext: str = '.csv'): + def load(self, node: Node, dataset_name: str, ext: str = '.csv'): """ Loads all the related paths to the workflow - :param edge: edge object + :param node: node object + :param dataset_name: name of the dataset + :param ext: extension of the dataset """ - paths = {p.name: p for p in self.app.workdir.iterdir()} + self.path = self.app.workdir / node.layer / node.name - if edge.name in paths: - self.app.log.info(f"Loading node {edge.name}") - self.path = Path(paths[edge.name]) + if self.path.exists(): + self.app.log.info(f"Loading node {node.name}") + self.path = Path(self.path) # TODO: include correct datasets, and add the layer as well else: - self.app.log.info(f"Making directory for {edge.name}.") - self.path = Path(self.app.workdir / edge.name) - self.path.mkdir() + self.app.log.info(f"Making directory for {node.name}.") + self.path.mkdir(parents=True, exist_ok=True) self.output = self.path / f"{dataset_name}{ext}" @@ -85,7 +95,6 @@ def has_dataset(self): """ Checks whether output exists. """ - return self.output.exists() @property @@ -93,7 +102,7 @@ def is_skippable(self): """ Checks whether node execution can be skipped, that is, output exists and node does not have dependencies. """ - return self.has_dataset and not self.app.connectors.has_source(self.edge.name) + return self.has_dataset and len(self.sources) == 0 def __str__(self): - return self.edge.name if self.edge else "" + return self.node.name if self.node else "" diff --git a/tenet/handlers/pipeline.py b/tenet/handlers/pipeline.py new file mode 100644 index 0000000..1726049 --- /dev/null +++ b/tenet/handlers/pipeline.py @@ -0,0 +1,119 @@ +import yaml + +from pathlib import Path +from cement import Handler +from schema import SchemaError +from collections import OrderedDict +from typing import List + +from tenet.core.exc import TenetError +from tenet.core.interfaces import HandlersInterface +from ..data.schema import parse_pipeline, Connectors, Layer, ParallelLayer, Node, Workflow, Edge + + +class PipelineHandler(HandlersInterface, Handler): + """ + Workflow handler to execute the pipeline. + """ + + class Meta: + label = 'pipeline' + + def __init__(self, **kw): + super().__init__(**kw) + self.edges = {} + self.nodes = {} + self.workflows = {} + + def _init_nodes(self, nodes: list): + for node in nodes: + for k, v in node.items(): + self.nodes[k] = Node(name=k, **v) + + def _init_edges(self, edges: list): + for edge_el in edges: + for e_name, edge in edge_el.items(): + self.edges[e_name] = Edge(name=e_name, at=self.nodes[edge['at']], to=self.nodes[edge['to']]) + + def _init_workflows(self, workflows: dict): + for w_name, workflow in workflows.items(): + layers = OrderedDict() + + for layer in workflow: + if layer not in self.layers: + raise TenetError(f"Layer {layer} not defined") + + layers[layer] = self.layers[layer] + + self.workflows[w_name] = Workflow(name=w_name, layers=layers) + + def _init_layers(self, layers): + self.layers = {} + + for l_name, l_edges in layers.items(): + if any(isinstance(x, list) for x in l_edges): + self.layers[l_name] = ParallelLayer(tracks={i: [self.edges[e] for e in x] for i, x in enumerate(l_edges)}, + name=l_name) + else: + self.layers[l_name] = Layer(edges={le: self.edges[le] for le in l_edges}, name=l_name) + + def _init_connectors(self): + self.app.extend('connectors', Connectors()) + + for name, node in self.nodes.items(): + self.app.connectors.init_sources(name) + self.app.connectors.init_sinks(name) + + def _init_links(self, links): + """ + creates the respective connectors + """ + self.app.log.info("Linking nodes") + parsed_links = {} + + for at_node, at_node_links in links.items(): + for at_node_source, to_node_links in at_node_links.items(): + for to_node_sink, to_nodes in to_node_links.items(): + for to_node in to_nodes: + if at_node not in parsed_links: + parsed_links[at_node] = {} + if to_node not in parsed_links[at_node]: + parsed_links[at_node][to_node] = {} + + parsed_links[at_node][to_node][at_node_source] = to_node_sink + + for at_node, to_nodes in parsed_links.items(): + for to_node, at_node_links in to_nodes.items(): + self.app.connectors.init_links(at_node=at_node, to_node=to_node, links=at_node_links) + + def load(self, path: Path): + """ + Loads and initializes pipeline + """ + + with path.open(mode="r") as stream: + try: + self.app.log.info(f"Parsing pipeline file: {path}") + pipeline = parse_pipeline(yaml.safe_load(stream)) + except SchemaError as se: + raise TenetError(str(se)) + + self._init_nodes(pipeline['nodes']) + self._init_edges(pipeline['edges']) + self._init_layers(pipeline['layers']) + self._init_workflows(pipeline['workflows']) + + self._init_connectors() + self._init_links(pipeline['links']) + + def get_nodes(self, nodes: str) -> List[Node]: + return [self.nodes[node] for node in nodes] + + def get_nodes_from_edges(self, edges: List[Edge]) -> List[Node]: + nodes = [] + for edge in edges: + for n in edge.to_list(): + if self.nodes[n] not in nodes: + nodes.append(self.nodes[n]) + + return nodes diff --git a/tenet/handlers/plugin.py b/tenet/handlers/plugin.py index 031c18d..5dd1131 100644 --- a/tenet/handlers/plugin.py +++ b/tenet/handlers/plugin.py @@ -123,6 +123,20 @@ def run(self, dataset: pd.DataFrame, **kwargs) -> Union[pd.DataFrame, None]: """ pass + @abstractmethod + def set_sources(self): + """ + Sets sources of the plugin for sinks of other plugins + """ + pass + + @abstractmethod + def get_sinks(self): + """ + Inits sinks of the plugin with sources of other plugins + """ + pass + def plot(self, dataset: pd.DataFrame, **kwargs): """ Plots resulting dataframe diff --git a/tenet/handlers/workflow.py b/tenet/handlers/workflow.py index 1a9261e..d968fcb 100644 --- a/tenet/handlers/workflow.py +++ b/tenet/handlers/workflow.py @@ -1,16 +1,18 @@ -from typing import Tuple - import pandas as pd import tqdm +from typing import Tuple, List + from cement import Handler from pathlib import Path from tenet.core.exc import Skip, TenetError from tenet.core.interfaces import HandlersInterface -from tenet.data.schema import Edge +from tenet.data.schema import Node, DAGPath, ParallelLayer from collections import OrderedDict +from tenet.handlers.plugin import PluginHandler + class WorkflowHandler(HandlersInterface, Handler): """ @@ -22,77 +24,118 @@ class Meta: def __init__(self, **kw): super().__init__(**kw) - self.traversal = OrderedDict() + self.current_nodes = OrderedDict() + self.previous_node = None - def load(self, workflow_name, dataset_path: Path, layers: list): + def _load(self, dataset_path: Path, path: DAGPath): """ Loads and initializes plugins """ - self.app.log.info(f"Loading {layers} layers for {workflow_name} workflow") - for node, edge in tqdm.tqdm(self.app.pipeline.walk(layers), desc="Initializing plugins", colour='blue'): - if not edge: - edge = Edge(name=node.name, node=node.name) - self.app.log.info(f"Traversing edge {edge.name}") + self.app.log.info(f"Loading nodes in {path} path...") + + for node in tqdm.tqdm(path.flatten(), desc="Initializing plugins", colour='blue'): + if node.name not in self.current_nodes: + node_handler = self.app.get_plugin_handler(node.plugin) + node_handler.load(node, dataset_name=dataset_path.stem) + node_handler.node = node + else: + node_handler = self.current_nodes[node.name] + + self.current_nodes[node.name] = node_handler - node_handler = self.app.get_plugin_handler(node.name) - node_handler.load(edge, dataset_name=dataset_path.stem) + def _exec_node(self, node_handler: PluginHandler, dataframe: pd.DataFrame) -> Tuple[pd.DataFrame, Path]: + self.app.log.info(f"Running node {node_handler}") + node_handler.get_sinks() + node_handler.set_sources() - node_handler.node = node - node_handler.edge = edge - self.traversal[edge.name] = node_handler + if node_handler.is_skippable: + self.app.log.info(f"{node_handler.node.name}: dataset {node_handler.output} exists.") + dataframe = pd.read_csv(node_handler.output) + dataset_path = node_handler.output - self.app.log.info("Linking nodes") - # Instantiates the connectors for the nodes - if hasattr(self.app, 'connectors'): - self.app.connectors = self.app.pipeline.link(workflow_name, self.app.executed_edges, self.app.connectors) + return dataframe, dataset_path + + kwargs = node_handler.node.kwargs.copy() + + if node_handler.node.kwargs: + kwargs.update(node_handler.node.kwargs) + + if node_handler.has_dataset and node_handler.sources.is_init() and \ + self.app.connectors.has_values(node_handler.node.name): + + self.app.log.warning(f"Connectors for source \"{node_handler.node.name}\" are instantiated and exist.") + self.app.log.warning(f"Skipping {node_handler}.") + dataframe = node_handler.load_dataset() + dataset_path = node_handler.output else: - self.app.extend('connectors', self.app.pipeline.link(workflow_name, self.traversal)) + dataframe = node_handler.run(dataset=dataframe, **kwargs) + dataset_path = node_handler.output + + if dataframe is not None: + dataframe.to_csv(str(node_handler.output), index=False) + self.app.log.info(f"Saving dataset {node_handler.output}.") + else: + raise TenetError(f"Node {node_handler} returned no dataframe. Stopping execution.") - def __call__(self, dataset_path: Path) -> Tuple[pd.DataFrame, Path]: + self.previous_node = node_handler.node + + return dataframe, dataset_path + + def _exec_layer(self, layer: ParallelLayer, dataframe: pd.DataFrame, dataset_path: Path) \ + -> Tuple[pd.DataFrame, Path]: + # TODO: verify connection between edges in tracks + tracks_outcome = [] + layer_dataset_path = self.app.workdir / layer.name / dataset_path.name + + if not layer_dataset_path.exists(): + for t_name, edges in layer.tracks.items(): + self.app.log.info(f"Executing track {t_name}") + track_outcome = [(dataframe, dataset_path)] + + for edge in edges: + for node in edge.to_list(): + # TODO: change; here we skip the previous executed node that leaded to this layer + if node == self.previous_node: + continue + node_handler = self.current_nodes[node.name] + track_dataframe, track_dataset_path = self._exec_node(node_handler, track_outcome[-1][0]) + track_outcome.append((track_dataframe, track_dataset_path)) + + if not self.app.pargs.suppress_plot: + self.app.log.info(f"{node_handler.node.name} plotting...") + node_handler.plot(dataframe) + + tracks_outcome.append(track_outcome[-1]) + + dataframe = pd.concat([d for d, dp in tracks_outcome], ignore_index=True) + dataframe.to_csv(str(layer_dataset_path), index=False) + self.app.log.info(f"{len(dataframe)} entries after merging {list(layer.tracks.keys())} tracks;") + else: + # TODO: should load the sources from executed nodes + self.app.log.warning(f"Dataset for '{layer.name}' layer exists, loading {layer_dataset_path} ...") + dataframe = pd.read_csv(str(layer_dataset_path)) + + self.previous_node = None + + return dataframe, layer_dataset_path + + def __call__(self, dataset_path: Path, dag_path: DAGPath) -> Tuple[pd.DataFrame, Path]: + self._load(dataset_path, dag_path) dataframe = pd.read_csv(str(dataset_path), sep='\t' if dataset_path.suffix == '.tsv' else ',') if dataframe.empty: raise TenetError(f"Dataset is empty.") - while tqdm.tqdm(self.traversal, desc="Executing pipeline", colour='green'): - node_name, node_handler = self.traversal.popitem(last=False) - - self.app.log.info(f"Running node {node_handler}") - - if node_handler.is_skippable: - self.app.log.info(f"{node_handler.edge.name}: dataset {node_handler.output} exists.") - dataframe = pd.read_csv(node_handler.output) - dataset_path = node_handler.output + for el in tqdm.tqdm(dag_path.nodes, desc="Executing pipeline", colour='green'): + if not isinstance(el, Node): + dataframe, dataset_path = self._exec_layer(el, dataframe, dataset_path) + else: + node_handler = self.current_nodes[el.name] + dataframe, dataset_path = self._exec_node(node_handler, dataframe) if not self.app.pargs.suppress_plot: - self.app.log.info(f"{node_handler.edge.name} plotting...") + self.app.log.info(f"{node_handler.node.name} plotting...") node_handler.plot(dataframe) - else: - kwargs = node_handler.node.kwargs.copy() - - if node_handler.edge.kwargs: - kwargs.update(node_handler.edge.kwargs) - - try: - dataframe = node_handler.run(dataset=dataframe, **kwargs) - dataset_path = node_handler.output - - if dataframe is not None: - dataframe.to_csv(str(node_handler.output), index=False) - self.app.log.info(f"Saving dataset {node_handler.output}.") - else: - raise TenetError(f"Node {node_handler} returned no dataframe. Stopping execution.") - if not self.app.pargs.suppress_plot: - node_handler.plot(dataframe) - except Skip as se: - self.app.log.warning(f"{se} Skipping {node_handler}.") - dataframe = node_handler.load_dataset() - dataset_path = node_handler.output - - if not self.app.pargs.suppress_plot: - node_handler.plot(dataframe) - - self.app.executed_edges[node_name] = node_handler return dataframe, dataset_path diff --git a/tenet/main.py b/tenet/main.py index 8635868..7008d00 100644 --- a/tenet/main.py +++ b/tenet/main.py @@ -13,6 +13,7 @@ from tenet.handlers.command import CommandHandler from tenet.handlers.runner import MultiTaskHandler from tenet.handlers.workflow import WorkflowHandler +from tenet.handlers.pipeline import PipelineHandler from tenet.handlers.plugin_loader import PluginLoader from tenet.handlers.container import ContainerHandler from tenet.handlers.code_parser import CodeParserHandler @@ -72,7 +73,7 @@ def get_absolute_path(package, file_path): # register handlers handlers = [ Base, Plugin, CWE, PluginLoader, ContainerHandler, CommandHandler, WorkflowHandler, CodeParserHandler, - MultiTaskHandler, GithubHandler, CWEListHandler, FileParserHandler, SamplingHandler + MultiTaskHandler, GithubHandler, CWEListHandler, FileParserHandler, SamplingHandler, PipelineHandler ] def get_config(self, key: str): @@ -91,7 +92,10 @@ def get_plugin_handler(self, name: str): """ try: - self.plugin.load_plugin(name) + + if name not in self.plugin.get_loaded_plugins(): + self.plugin.load_plugin(name) + plugin = self.handler.resolve('plugins', name) plugin.__init__() plugin._setup(self) diff --git a/tenet/plugins/astminer.py b/tenet/plugins/astminer.py index f9d69df..65d685f 100644 --- a/tenet/plugins/astminer.py +++ b/tenet/plugins/astminer.py @@ -12,8 +12,8 @@ from tenet.data.schema import ContainerCommand from tenet.handlers.plugin import PluginHandler -re_plain = '(?P