diff --git a/orchestrator/migrations/0002_data_migration_for_block_registry.py b/orchestrator/migrations/0002_data_migration_for_block_registry.py index 4d4be0f8..ed5ad7c7 100644 --- a/orchestrator/migrations/0002_data_migration_for_block_registry.py +++ b/orchestrator/migrations/0002_data_migration_for_block_registry.py @@ -9,7 +9,7 @@ def seed_blocks_into_registry(apps, schema_editor): BlockRegistry( block_type="DATA_BLOCK", block_id=1, - block_name="Raw Data", + block_name="US Stock Data", inputs=[ { "fieldData": { @@ -19,24 +19,44 @@ def seed_blocks_into_registry(apps, schema_editor): "fieldName": "Equity Name", "fieldType": "search", "fieldVariableName": "equity_name", - }, # TODO: Change fieldType from dropdown to search + }, { - "fieldData": {"base": "/dataType", "method": "GET"}, - "fieldName": "Data Type", + "fieldData": {"base": "/candlestick", "method": "GET"}, + "fieldName": "Candlesticks", "fieldType": "dropdown", - "fieldVariableName": "data_type", + "fieldVariableName": "candlestick", }, { - "fieldData": {"base": "/interval", "method": "GET"}, - "fieldName": "Interval", - "fieldType": "dropdown", - "fieldVariableName": "interval", + "fieldName": "Date Range", + "fieldType": "date_range", + "fieldVariableNames": ["start_date", "end_date"], + }, + ], + validations={ + "input": {"required": [], "allowed_blocks": []}, + "output": [{"blockType": "DATA_BLOCK", "number": 1}], + }, + ).save() + + BlockRegistry( + block_type="DATA_BLOCK", + block_id=2, + block_name="Crypto Data", + inputs=[ + { + "fieldData": { + "base": "/cryptoName?name=", + "method": "GET", + }, + "fieldName": "Crypto Name", + "fieldType": "search", + "fieldVariableName": "crypto_name", }, { - "fieldData": {"base": "/outputSize", "method": "GET"}, - "fieldName": "Output Size", + "fieldData": {"base": "/candlestick", "method": "GET"}, + "fieldName": "Candlesticks", "fieldType": "dropdown", - "fieldVariableName": "outputsize", + "fieldVariableName": "candlestick", }, { "fieldName": "Date Range", @@ -69,7 +89,10 @@ def seed_blocks_into_registry(apps, schema_editor): validations={ "input": { "required": [{"blockType": "DATA_BLOCK", "number": 1}], - "allowed_blocks": [{"blockId": "1", "blockType": "DATA_BLOCK"}], + "allowed_blocks": [ + {"blockId": "1", "blockType": "DATA_BLOCK"}, + {"blockId": "2", "blockType": "DATA_BLOCK"}, + ], }, "output": [{"blockType": "COMPUTATIONAL_BLOCK", "number": 1}], }, @@ -104,6 +127,45 @@ def seed_blocks_into_registry(apps, schema_editor): }, ).save() + BlockRegistry( + block_type="SIGNAL_BLOCK", + block_id=2, + block_name="Saddle", + inputs=[ + { + "fieldData": {"base": "/saddleType", "method": "GET"}, + "fieldName": "Saddle Type", + "fieldType": "dropdown", + "fieldVariableName": "saddle_type", + }, + { + "fieldData": {"base": "/eventAction", "method": "GET"}, + "fieldName": "Event Action", + "fieldType": "dropdown", + "fieldVariableName": "event_action", + }, + { + "fieldName": "Consecutive Up", + "fieldVariableName": "consecutive_up", + "fieldType": "input", + }, + { + "fieldName": "Consecutive Down", + "fieldVariableName": "consecutive_down", + "fieldType": "input", + }, + ], + validations={ + "input": { + "required": [{"blockType": "COMPUTATIONAL_BLOCK", "number": 1}], + "allowed_blocks": [ + {"blockId": "1", "blockType": "COMPUTATIONAL_BLOCK"} + ], + }, + "output": [{"blockType": "SIGNAL_BLOCK", "number": 2}], + }, + ).save() + BlockRegistry( block_type="STRATEGY_BLOCK", block_id=1, @@ -148,7 +210,9 @@ def seed_blocks_into_registry(apps, schema_editor): ], "allowed_blocks": [ {"blockId": "1", "blockType": "SIGNAL_BLOCK"}, + {"blockId": "2", "blockType": "SIGNAL_BLOCK"}, {"blockId": "1", "blockType": "DATA_BLOCK"}, + {"blockId": "2", "blockType": "DATA_BLOCK"}, ], }, "output": [{"blockType": "STRATEGY_BLOCK", "number": 1}], diff --git a/orchestrator/services/flow/run.py b/orchestrator/services/flow/run.py deleted file mode 100644 index c2a1ac39..00000000 --- a/orchestrator/services/flow/run.py +++ /dev/null @@ -1,6 +0,0 @@ -from orchestrator.services.flow.spectrum_flow import SpectrumFlow - - -def run(vertices, edges): - spectrum_flow = SpectrumFlow(vertices, edges) - return spectrum_flow diff --git a/orchestrator/services/flow/spectrum_flow.py b/orchestrator/services/flow/spectrum_flow.py index 7f3e11ad..9eb1e87f 100644 --- a/orchestrator/services/flow/spectrum_flow.py +++ b/orchestrator/services/flow/spectrum_flow.py @@ -1,39 +1,53 @@ import json -import logging import requests from os import environ from copy import deepcopy from orchestrator.models import BlockRegistry -from orchestrator.services.flow.graph import Graph -class SpectrumFlow: - """ - Ingests a list of vertices and edges to create a representation of a "spectrum flow" - """ +class Graph: + def __init__(self): + self.adjacency_list = {} + + def __repr__(self): + return repr(self.adjacency_list) + def insert(self, source_vertex, dest_vertex): + """ + Inserts one or both vertices (depending on if either exists) and + connects one vertex to the other + + Attributes: + source_vertex: Start / Originating Vertex + dest_vertex: Destination Vertex + """ + if not (source_vertex in self.adjacency_list): + self.adjacency_list[source_vertex] = set() + + self.adjacency_list[source_vertex].add(dest_vertex) + + +class DependencyGraph: def __init__(self, vertices, edges): self.vertices = vertices self.edges = edges - self.graph = self.generate_adjacency_list() + self.graph = self.generate_graph() self.dependency_graph = self.generate_dependency_graph() - self.batched_tasks = self.get_batched_tasks() + self.batched_tasks = self.generate_batched_tasks() - def generate_adjacency_list(self): + def generate_graph(self): """ - Creates an adjacency list when passed in a list of nodes and edges + Generates an adjacency list graph representation using the node and edge pairs """ graph = Graph() - # Initializes the adjacency list with initial values - for vertex in self.vertices: - id = vertex["id"] - if not id in graph.adjacency_list: - graph.adjacency_list[id] = set() + # Initializes the Adjacency List of Blocks + for block_id, vertex in self.vertices.items(): + if block_id not in graph.adjacency_list: + graph.adjacency_list[block_id] = set() - # Iterates through the edges and populates the values in the adjacency list for edge in self.edges: graph.insert(edge["source"], edge["target"]) @@ -41,18 +55,18 @@ def generate_adjacency_list(self): def generate_dependency_graph(self): """ - Creates a depedency graph by transposing the adjacency list + Transposes the adjacency list to create a graph of dependencies """ dependency_graph = Graph() # Initializes the adjacency list with initial values for source_vertex, _ in self.graph.adjacency_list.items(): - if not source_vertex in dependency_graph.adjacency_list: + if source_vertex not in dependency_graph.adjacency_list: dependency_graph.adjacency_list[source_vertex] = set() # Reverses the direction of the node connections for source_vertex, dest_vertices in self.graph.adjacency_list.items(): - if not source_vertex in dependency_graph.adjacency_list: + if source_vertex not in dependency_graph.adjacency_list: dependency_graph.adjacency_list[source_vertex] = set() for dest_vertex in dest_vertices: @@ -60,9 +74,9 @@ def generate_dependency_graph(self): return dependency_graph - def get_batched_tasks(self): + def generate_batched_tasks(self): """ - Traverses through the adjacency list to sequence through tasks + Creates a series of batches tasks that need to be executed sequentially """ batches = [] @@ -87,326 +101,203 @@ def get_batched_tasks(self): return batches - def validate_strategy(self): - def get_block_by_id(id): - """ - Retrieves a block from the list of vertices passed in initially - Attributes: - id: ID of Block in Flow - """ - for vertex in self.vertices: - if vertex["id"] == id: - return vertex - - def dfs( - visited, - block_id_in_flow, - allowed_block_data, - target_block_data, - blocks_found, - ): - """ - Performs a DFS recursively that iterates through the directed adjacency list. - It attempts to determine which blocks downstream in the sequence have their required data - Attributes: - visited: Set of blocks that have already been traversed - block_id_in_flow: The current block ID being iterated on - allowed_block_data: List of permitted input blocks - target_block_data: States the block type and number of blocks being searched for - """ +class SpectrumFlow: + def __init__(self, vertices, edges): + self.vertices = vertices + self.edges = edges - block_data = get_block_by_id(block_id_in_flow) - - if ( - block_data["data"]["metadata"]["blockType"] - == target_block_data["blockType"] - ): - for allowed_block in allowed_block_data: - if str(block_data["data"]["metadata"]["blockType"]) == str( - allowed_block["blockType"] - ) and str(block_data["data"]["metadata"]["blockId"]) == str( - allowed_block["blockId"] - ): - blocks_found.append(block_id_in_flow) - - # Stopping Condition - if len(blocks_found) == int(target_block_data["number"]): - return - - if block_id_in_flow not in visited: - visited.add(block_id_in_flow) - for neighbor in self.dependency_graph.adjacency_list[block_id_in_flow]: - dfs( - visited, - neighbor, - allowed_block_data, - target_block_data, - blocks_found, - ) + graph = DependencyGraph(vertices, edges) - def get_block_data_from_registry(block_data, block_id_in_flow): - """ - Retrieves the Block Data from the registry given the block ID in the flow + self.graph = graph.graph.adjacency_list + self.dependency_graph = graph.dependency_graph.adjacency_list + self.batched_tasks = graph.batched_tasks - Attributes: - block_data: Full JSON of Block Data from Front-End - block_id_in_flow: The Block ID generated by the front-end when assembling a flow - """ - # Block Data from Block Registry - # TODO: Maybe this should be optimized since its querying the whole table - block_registry_data = ( + self.edge_validation = {} + self.is_valid = self.run(mode="VALIDATE") + + def _get_block_by_id(self, block_id): + """ + Retrieves a block by its ID + + block_id: ID of block in flow + """ + try: + return self.vertices[block_id] + except KeyError: + raise Exception(f"The Block ID {block_id} could not be found") + + @staticmethod + def _get_block_data_from_registry(block_type, block_id): + try: + return ( BlockRegistry.objects.all() - .filter(block_type=block_data["data"]["metadata"]["blockType"]) - .filter(block_id=block_data["data"]["metadata"]["blockId"])[0] + .filter(block_type=block_type) + .filter(block_id=block_id) + .first() ) + except Exception as e: + raise Exception(e) + + def _dfs( + self, + visited, + block_id_in_flow, + allowed_block_data, + target_block_data, + blocks_found, + ): + """ + Recursively iterates through directed adjancency list - return block_registry_data + Attempts to determine which blocks downstream in the sequence have required data - is_valid = True - # Main Running Code - for task in self.batched_tasks: - # Goes through a set of tasks. Each FOR loop should make a request - # TODO: This part of the process can be asynchronous - for task_to_be_run in task: - # Gets the full sent data about the block - block_data = get_block_by_id(task_to_be_run) - block_registry_data = get_block_data_from_registry( - block_data, task_to_be_run + Attributes: + visited: Set of blocks that have been traversed + block_id_in_flow: Current block ID being unpacked + allowed_block_data: List of allowed input blocks + target_block_data: Block Type and Number of Blocks being searched for + """ + block_data = self._get_block_by_id(block_id_in_flow) + + if block_data["blockType"] == target_block_data["blockType"]: + for allowed_block in allowed_block_data: + if str(block_data["blockType"]) == str( + allowed_block["blockType"] + ) and str(block_data["blockId"]) == str(allowed_block["blockId"]): + blocks_found.append(block_id_in_flow) + + if len(blocks_found) == int(target_block_data["number"]): + return + + if block_id_in_flow not in visited: + visited.add(block_id_in_flow) + + for neighbor in self.dependency_graph[block_id_in_flow]: + self._dfs( + visited, + neighbor, + allowed_block_data, + target_block_data, + blocks_found, ) - if len(self.dependency_graph.adjacency_list[task_to_be_run]) == 0: - is_valid = ( - is_valid - and len(block_registry_data.validations["input"]["required"]) - == 0 - ) - else: - blocks_found = [] - for required_block in block_registry_data.validations["input"][ - "required" - ]: - # Visited Set - visited = set() - - dfs( - visited, - task_to_be_run, - block_registry_data.validations["input"]["allowed_blocks"], - required_block, - blocks_found, - ) - - assembled_dependency_list_from_flow = {} - for item in set(blocks_found): - item_block_data = get_block_by_id(item) - if ( - item_block_data["data"]["metadata"]["blockType"] - not in assembled_dependency_list_from_flow - ): - assembled_dependency_list_from_flow[ - item_block_data["data"]["metadata"]["blockType"] - ] = 0 - assembled_dependency_list_from_flow[ - item_block_data["data"]["metadata"]["blockType"] - ] += 1 - - for required in block_registry_data.validations["input"][ - "required" - ]: - is_valid = is_valid and ( - assembled_dependency_list_from_flow[required["blockType"]] - == required["number"] - ) - return is_valid + def _make_run_request( + self, block_id_in_flow, block_registry_data, input_payload, output_payload + ): + """ + Hits the `/run` endpoint for each block to complete the request - def run_batched_tasks_v3(self): - def get_block_by_id(id): - """ - Retrieves a block from the list of vertices passed in initially - Attributes: - id: ID of Block in Flow - """ - for vertex in self.vertices: - if vertex["id"] == id: - return vertex - - def dfs( - visited, - block_id_in_flow, - allowed_block_data, - target_block_data, - blocks_found, - ): - """ - Performs a DFS recursively that iterates through the directed adjacency list. - It attempts to determine which blocks downstream in the sequence have their required data + Attributes + block_id_in_flow: Block ID generated by the frontend + block_registry_data: Block Data queried from the frontend + input_payload: Input Payload + output_payload: Output Payload + """ - Attributes: - visited: Set of blocks that have already been traversed - block_id_in_flow: The current block ID being iterated on - allowed_block_data: List of permitted input blocks - target_block_data: States the block type and number of blocks being searched for - """ + request_url = f"{environ['API_BASE_URL']}/{block_registry_data.block_type}/{block_registry_data.block_id}/run" - block_data = get_block_by_id(block_id_in_flow) - - if ( - block_data["data"]["metadata"]["blockType"] - == target_block_data["blockType"] - ): - for allowed_block in allowed_block_data: - if str(block_data["data"]["metadata"]["blockType"]) == str( - allowed_block["blockType"] - ) and str(block_data["data"]["metadata"]["blockId"]) == str( - allowed_block["blockId"] - ): - blocks_found.append(block_id_in_flow) - - # Stopping Condition - if len(blocks_found) == int(target_block_data["number"]): - return - - if block_id_in_flow not in visited: - visited.add(block_id_in_flow) - for neighbor in self.dependency_graph.adjacency_list[block_id_in_flow]: - dfs( - visited, - neighbor, - allowed_block_data, - target_block_data, - blocks_found, - ) + # Input Transformation + input_cleaned_payload = {} + for k, v in input_payload.items(): + if type(v) is dict and "value" in v: + input_cleaned_payload[k] = v["value"] - def get_block_data_from_registry(block_data, block_id_in_flow): - """ - Retrieves the Block Data from the registry given the block ID in the flow + request_payload = {"input": input_cleaned_payload, "output": output_payload} - Attributes: - block_data: Full JSON of Block Data from Front-End - block_id_in_flow: The Block ID generated by the front-end when assembling a flow - """ - # Block Data from Block Registry - # TODO: Maybe this should be optimized since its querying the whole table - block_registry_data = ( - BlockRegistry.objects.all() - .filter(block_type=block_data["data"]["metadata"]["blockType"]) - .filter(block_id=block_data["data"]["metadata"]["blockId"])[0] - ) + r = requests.post(request_url, json=request_payload) - return block_registry_data + output = {} + if r.status_code == 200: + block_type_id_key = f"{block_registry_data.block_type}-{block_registry_data.block_id}-{block_id_in_flow}" - def make_run_request( - block_id_in_flow, block_registry_data, input_payload, output_payload - ): - """ - Makes a request against remote resources to complete the request + if block_type_id_key not in output.keys(): + output[block_type_id_key] = {} - Attributes: - block_id_in_flow: The Block ID generated by the front-end when assembling a flow - block_registry_data: Block Data queried from the front-end - input_payload: JSON Payload of Form Inputs - output_payload: JSON Payload of required information from previous steps - """ - # Make a POST request to a run endpoint to run the block - request_url = f"{environ['API_BASE_URL']}/{block_registry_data.block_type}/{block_registry_data.block_id}/run" + try: + response_json = r.json() + if "response" in response_json: + output[block_type_id_key] = response_json["response"] + else: + raise Exception("JSON Key 'response' could not be found") - request_payload = { - "input": input_payload, - "output": output_payload, - } + except json.decoder.JSONDecodeError as e: + raise Exception("JSON Decode Error") + except Exception as e: + raise Exception("Unhandled Exception: ", e) + else: + print("Error: ", r.json()) - # TODO: Remove once done debugging - # with open(f"input-payload-{block_id_in_flow}.json", "w") as outfile: - # json.dump(request_payload, outfile) - - r = requests.post(request_url, json=request_payload) - - output = {} - if r.status_code == 200: - # Updates the Block Outputs overall JSON - block_type_id_key = f"{block_registry_data.block_type}-{block_registry_data.block_id}-{block_id_in_flow}" # TODO: Add the block_id_in_flow to the cache key in case 2 blocks of same type are used - print("Block Type ID Key: ", block_type_id_key) - if block_type_id_key not in list(output.keys()): - output[block_type_id_key] = {} - - try: - response_json = r.json() - # Standardized Return From Block with key "response" - if "response" in response_json: - output[block_type_id_key] = response_json["response"] - except json.decoder.JSONDecodeError as e: - print("JSON Decode Error") - except Exception as e: - print("Generic Exception: ", e) - else: - logging.error( - f"A Response {r.status_code} when querying URL {request_url} with ..." - ) + return output - return output + def run(self, mode="VALIDATE"): + """ + Validates a flow to ensure that all nodes are connected correctly + """ output_cache = {} - def get_data_from_cache(block_id_in_flow): + def _get_data_from_cache(block_id_in_flow): """ - Makes a request to the output cache to retrieve data + Retrieves data about block from cache Attributes: - block_id_in_flow: The Block ID generated by the front-end when assembling a flow + block_id: Block ID from Flow """ - block_data = get_block_by_id(block_id_in_flow) - block_registry_data = get_block_data_from_registry( - block_data, block_id_in_flow + block_data = self._get_block_by_id(block_id_in_flow) + block_registry_data = self._get_block_data_from_registry( + block_data["blockType"], block_data["blockId"] ) cache_key = f"{block_registry_data.block_type}-{block_registry_data.block_id}-{block_id_in_flow}" - if cache_key in list(output_cache.keys()): + if cache_key in output_cache.keys(): return cache_key, output_cache[cache_key] else: - print("Cache Key: ", cache_key) - print("Output Cache: ", output_cache.keys()) - raise f"Data does not exist in cache for {block_id_in_flow} with {cache_key}" + raise Exception( + f"Data does not exist in cache for {block_id_in_flow} with {cache_key}" + ) + + is_valid = True + + if len(self.batched_tasks) == 0: + is_valid = False - # Main Running Code for task in self.batched_tasks: - # Goes through a set of tasks. Each FOR loop should make a request - # TODO: This part of the process can be asynchronous for task_to_be_run in task: - # Gets the full sent data about the block - block_data = get_block_by_id(task_to_be_run) - block_registry_data = get_block_data_from_registry( - block_data, task_to_be_run + block_data = self._get_block_by_id(task_to_be_run) + block_registry_data = self._get_block_data_from_registry( + block_data["blockType"], block_data["blockId"] ) - # If the task has no dependencies, make the request immediately, - # otherwise perform a DFS search to extract all related dependencies - if len(self.dependency_graph.adjacency_list[task_to_be_run]) == 0: - response = make_run_request( - task_to_be_run, - block_registry_data, - block_data["data"]["input"], - {}, - ) + # Iterate through block data to gauge whether inputs exist + for key, value in block_data.items(): + if type(value) is dict and "value" in value.keys(): + if value["value"] == "": + is_valid = False - # Adds to a cache to ensure that requests don't need to be re-run - output_cache = {**output_cache, **response} - else: - # TODO: Implement DFS code to get the list of related objects + # Checks if the graph has an edge + if len(self.dependency_graph[task_to_be_run]) == 0: + is_valid = ( + is_valid + and len(block_registry_data.validations["input"]["required"]) + == 0 + ) - # The following variables are used in the DFS + if mode == "RUN": + response = self._make_run_request( + task_to_be_run, block_registry_data, block_data, {} + ) - # Contains list of Block ID's from the flow that are dependencies - # for running the block associated with the `task_to_be_run` + # Adds to a cache to ensure that requests don't need to be re-run + output_cache = {**output_cache, **response} + else: blocks_found = [] for required_block in block_registry_data.validations["input"][ "required" ]: - # Visited Set visited = set() - - dfs( + self._dfs( visited, task_to_be_run, block_registry_data.validations["input"]["allowed_blocks"], @@ -414,23 +305,95 @@ def get_data_from_cache(block_id_in_flow): blocks_found, ) - print(f"Task {task_to_be_run} - {blocks_found}") - - # Assembles all dependency data into the output_payload variable output_payload = {} - for block_id in blocks_found: - cache_key, response = get_data_from_cache(block_id) + assembled_dependency_list_from_flow = {} + for item in set(blocks_found): + item_block_data = self._get_block_by_id(item) + + if ( + item_block_data["blockType"] + not in assembled_dependency_list_from_flow + ): + assembled_dependency_list_from_flow[ + item_block_data["blockType"] + ] = 0 + assembled_dependency_list_from_flow[ + item_block_data["blockType"] + ] += 1 - output_payload = {**output_payload, cache_key: response} + if mode == "RUN": + cache_key, response = _get_data_from_cache(item) + output_payload = {**output_payload, cache_key: response} - response = make_run_request( - task_to_be_run, - block_registry_data, - block_data["data"]["input"], - output_payload, + if mode == "RUN": + response = self._make_run_request( + task_to_be_run, + block_registry_data, + block_data, + output_payload, + ) + + # Adds to a cache to ensure that requests don't need to be re-run + output_cache = {**output_cache, **response} + + for required in block_registry_data.validations["input"][ + "required" + ]: + if required["blockType"] in assembled_dependency_list_from_flow: + is_valid = ( + is_valid + and assembled_dependency_list_from_flow[ + required["blockType"] + ] + == required["number"] + ) + else: + is_valid = False + + if mode == "VALIDATE": + for edge in self.edges: + source_block = self._get_block_by_id(edge["source"]) + target_block = self._get_block_by_id(edge["target"]) + + target_block_data = self._get_block_data_from_registry( + target_block["blockType"], target_block["blockId"] + ) + + is_edge_valid = False + for allowed_block in target_block_data.validations["input"][ + "allowed_blocks" + ]: + is_edge_valid = is_edge_valid or ( + (str(allowed_block["blockId"]) == str(source_block["blockId"])) + and ( + str(allowed_block["blockType"]) + == str(source_block["blockType"]) + ) ) - # Adds to a cache to ensure that requests don't need to be re-run - output_cache = {**output_cache, **response} + target_block = self._get_block_data_from_registry( + target_block["blockType"], target_block["blockId"] + ) + + allowed_connections = [] + if not is_edge_valid: + for allowed_block in target_block_data.validations["input"][ + "allowed_blocks" + ]: + block_data = self._get_block_data_from_registry( + allowed_block["blockType"], allowed_block["blockId"] + ) + allowed_connections.append(block_data.block_name) + + self.edge_validation[edge["id"]] = { + "status": is_edge_valid, + "target_block": target_block.block_name, + "allowed_connections": allowed_connections, + } + + return is_valid - return output_cache + elif mode == "RUN": + return output_cache + else: + return None diff --git a/orchestrator/services/flow/spectrum_flow_v2.py b/orchestrator/services/flow/spectrum_flow_v2.py deleted file mode 100644 index 711dab0e..00000000 --- a/orchestrator/services/flow/spectrum_flow_v2.py +++ /dev/null @@ -1,360 +0,0 @@ -import json -import requests -from os import environ -from copy import deepcopy - -from orchestrator.models import BlockRegistry - - -class Graph: - def __init__(self): - self.adjacency_list = {} - - def __repr__(self): - return repr(self.adjacency_list) - - def insert(self, source_vertex, dest_vertex): - """ - Inserts one or both vertices (depending on if either exists) and - connects one vertex to the other - - Attributes: - source_vertex: Start / Originating Vertex - dest_vertex: Destination Vertex - """ - if not (source_vertex in self.adjacency_list): - self.adjacency_list[source_vertex] = set() - - self.adjacency_list[source_vertex].add(dest_vertex) - - -class DependencyGraph: - def __init__(self, vertices, edges): - self.vertices = vertices - self.edges = edges - - self.graph = self.generate_graph() - self.dependency_graph = self.generate_dependency_graph() - self.batched_tasks = self.generate_batched_tasks() - - def generate_graph(self): - """ - Generates an adjacency list graph representation using the node and edge pairs - """ - graph = Graph() - - # Initializes the Adjacency List of Blocks - for block_id, vertex in self.vertices.items(): - if block_id not in graph.adjacency_list: - graph.adjacency_list[block_id] = set() - - for edge in self.edges: - graph.insert(edge["source"], edge["target"]) - - return graph - - def generate_dependency_graph(self): - """ - Transposes the adjacency list to create a graph of dependencies - """ - dependency_graph = Graph() - - # Initializes the adjacency list with initial values - for source_vertex, _ in self.graph.adjacency_list.items(): - if source_vertex not in dependency_graph.adjacency_list: - dependency_graph.adjacency_list[source_vertex] = set() - - # Reverses the direction of the node connections - for source_vertex, dest_vertices in self.graph.adjacency_list.items(): - if source_vertex not in dependency_graph.adjacency_list: - dependency_graph.adjacency_list[source_vertex] = set() - - for dest_vertex in dest_vertices: - dependency_graph.insert(dest_vertex, source_vertex) - - return dependency_graph - - def generate_batched_tasks(self): - """ - Creates a series of batches tasks that need to be executed sequentially - """ - batches = [] - - dependency_graph = deepcopy(self.dependency_graph) - - while dependency_graph.adjacency_list: - # Retrieves nodes with no dependencies - nodes_with_no_dependencies = { - k for k, v in dependency_graph.adjacency_list.items() if not v - } - - if not nodes_with_no_dependencies: - raise ValueError("Circular Dependency Found") - - for node in nodes_with_no_dependencies: - del dependency_graph.adjacency_list[node] - - for deps in dependency_graph.adjacency_list.values(): - deps.difference_update(nodes_with_no_dependencies) - - batches.append({name for name in nodes_with_no_dependencies}) - - return batches - - -class SpectrumFlow: - def __init__(self, vertices, edges): - self.vertices = vertices - self.edges = edges - - graph = DependencyGraph(vertices, edges) - - self.graph = graph.graph.adjacency_list - self.dependency_graph = graph.dependency_graph.adjacency_list - self.batched_tasks = graph.batched_tasks - - self.is_valid = self.run(mode="VALIDATE") - - def _get_block_by_id(self, block_id): - """ - Retrieves a block by its ID - - block_id: ID of block in flow - """ - try: - return self.vertices[block_id] - except KeyError: - raise Exception(f"The Block ID {block_id} could not be found") - - @staticmethod - def _get_block_data_from_registry(block_type, block_id): - try: - return ( - BlockRegistry.objects.all() - .filter(block_type=block_type) - .filter(block_id=block_id) - .first() - ) - except Exception as e: - raise Exception(e) - - def _dfs( - self, - visited, - block_id_in_flow, - allowed_block_data, - target_block_data, - blocks_found, - ): - """ - Recursively iterates through directed adjancency list - - Attempts to determine which blocks downstream in the sequence have required data - - Attributes: - visited: Set of blocks that have been traversed - block_id_in_flow: Current block ID being unpacked - allowed_block_data: List of allowed input blocks - target_block_data: Block Type and Number of Blocks being searched for - """ - block_data = self._get_block_by_id(block_id_in_flow) - - if block_data["blockType"] == target_block_data["blockType"]: - for allowed_block in allowed_block_data: - if str(block_data["blockType"]) == str( - allowed_block["blockType"] - ) and str(block_data["blockId"]) == str(allowed_block["blockId"]): - blocks_found.append(block_id_in_flow) - - if len(blocks_found) == int(target_block_data["number"]): - return - - if block_id_in_flow not in visited: - visited.add(block_id_in_flow) - - for neighbor in self.dependency_graph[block_id_in_flow]: - self._dfs( - visited, - neighbor, - allowed_block_data, - target_block_data, - blocks_found, - ) - - def _make_run_request( - self, block_id_in_flow, block_registry_data, input_payload, output_payload - ): - """ - Hits the `/run` endpoint for each block to complete the request - - Attributes - block_id_in_flow: Block ID generated by the frontend - block_registry_data: Block Data queried from the frontend - input_payload: Input Payload - output_payload: Output Payload - """ - - request_url = f"{environ['API_BASE_URL']}/{block_registry_data.block_type}/{block_registry_data.block_id}/run" - - # Input Transformation - input_cleaned_payload = {} - for k, v in input_payload.items(): - if type(v) is dict and "value" in v: - input_cleaned_payload[k] = v["value"] - - request_payload = {"input": input_cleaned_payload, "output": output_payload} - - r = requests.post(request_url, json=request_payload) - - output = {} - if r.status_code == 200: - block_type_id_key = f"{block_registry_data.block_type}-{block_registry_data.block_id}-{block_id_in_flow}" - - if block_type_id_key not in output.keys(): - output[block_type_id_key] = {} - - try: - response_json = r.json() - if "response" in response_json: - output[block_type_id_key] = response_json["response"] - else: - raise Exception("JSON Key 'response' could not be found") - - except json.decoder.JSONDecodeError as e: - raise Exception("JSON Decode Error") - except Exception as e: - raise Exception("Unhandled Exception: ", e) - else: - print("Error: ", r.json()) - - return output - - def run(self, mode="VALIDATE"): - """ - Validates a flow to ensure that all nodes are connected correctly - """ - - output_cache = {} - - def _get_data_from_cache(block_id_in_flow): - """ - Retrieves data about block from cache - - Attributes: - block_id: Block ID from Flow - """ - block_data = self._get_block_by_id(block_id_in_flow) - block_registry_data = self._get_block_data_from_registry( - block_data["blockType"], block_data["blockId"] - ) - - cache_key = f"{block_registry_data.block_type}-{block_registry_data.block_id}-{block_id_in_flow}" - - if cache_key in output_cache.keys(): - return cache_key, output_cache[cache_key] - else: - raise Exception( - f"Data does not exist in cache for {block_id_in_flow} with {cache_key}" - ) - - is_valid = True - - if len(self.batched_tasks) == 0: - is_valid = False - - for task in self.batched_tasks: - for task_to_be_run in task: - block_data = self._get_block_by_id(task_to_be_run) - block_registry_data = self._get_block_data_from_registry( - block_data["blockType"], block_data["blockId"] - ) - - # Iterate through block data to gauge whether inputs exist - for key, value in block_data.items(): - if type(value) is dict and "value" in value.keys(): - if value["value"] == "": - is_valid = False - - if len(self.dependency_graph[task_to_be_run]) == 0: - is_valid = ( - is_valid - and len(block_registry_data.validations["input"]["required"]) - == 0 - ) - - if mode == "RUN": - response = self._make_run_request( - task_to_be_run, block_registry_data, block_data, {} - ) - - # Adds to a cache to ensure that requests don't need to be re-run - output_cache = {**output_cache, **response} - else: - blocks_found = [] - for required_block in block_registry_data.validations["input"][ - "required" - ]: - visited = set() - self._dfs( - visited, - task_to_be_run, - block_registry_data.validations["input"]["allowed_blocks"], - required_block, - blocks_found, - ) - - output_payload = {} - assembled_dependency_list_from_flow = {} - for item in set(blocks_found): - item_block_data = self._get_block_by_id(item) - - if ( - item_block_data["blockType"] - not in assembled_dependency_list_from_flow - ): - assembled_dependency_list_from_flow[ - item_block_data["blockType"] - ] = 0 - assembled_dependency_list_from_flow[ - item_block_data["blockType"] - ] += 1 - - if mode == "RUN": - cache_key, response = _get_data_from_cache(item) - output_payload = {**output_payload, cache_key: response} - - if mode == "RUN": - response = self._make_run_request( - task_to_be_run, - block_registry_data, - block_data, - output_payload, - ) - - # Adds to a cache to ensure that requests don't need to be re-run - output_cache = {**output_cache, **response} - - for required in block_registry_data.validations["input"][ - "required" - ]: - # print("Block Type:", required["blockType"]) - # print("Required Block Number: ", required["number"]) - # print(assembled_dependency_list_from_flow) - - if required["blockType"] in assembled_dependency_list_from_flow: - is_valid = ( - is_valid - and assembled_dependency_list_from_flow[ - required["blockType"] - ] - == required["number"] - ) - else: - is_valid = False - - if mode == "VALIDATE": - return is_valid - elif mode == "RUN": - return output_cache - else: - return None diff --git a/orchestrator/tests/test_dependency_graph.py b/orchestrator/tests/test_dependency_graph.py index 21405eda..27312db5 100644 --- a/orchestrator/tests/test_dependency_graph.py +++ b/orchestrator/tests/test_dependency_graph.py @@ -1,6 +1,6 @@ from django.test import TestCase -from orchestrator.services.flow.spectrum_flow_v2 import DependencyGraph +from orchestrator.services.flow.spectrum_flow import DependencyGraph from orchestrator.tests.data.test_data_validation import SINGLE_FULL_FLOW_VALID diff --git a/orchestrator/tests/test_views.py b/orchestrator/tests/test_views.py index 7274f95a..b759dd78 100644 --- a/orchestrator/tests/test_views.py +++ b/orchestrator/tests/test_views.py @@ -24,26 +24,34 @@ def test_ok(self): "response": { "DATA_BLOCK": { "1": { - "blockName": "Raw Data", - "blockMetadata": "/orchestration/$DATA_BLOCK/$1/", - } + "blockName": "US Stock Data", + "blockMetadata": "/orchestration/DATA_BLOCK/1/", + }, + "2": { + "blockName": "Crypto Data", + "blockMetadata": "/orchestration/DATA_BLOCK/2/", + }, }, "COMPUTATIONAL_BLOCK": { "1": { "blockName": "Technical Analysis", - "blockMetadata": "/orchestration/$COMPUTATIONAL_BLOCK/$1/", + "blockMetadata": "/orchestration/COMPUTATIONAL_BLOCK/1/", } }, "SIGNAL_BLOCK": { "1": { "blockName": "Event", - "blockMetadata": "/orchestration/$SIGNAL_BLOCK/$1/", - } + "blockMetadata": "/orchestration/SIGNAL_BLOCK/1/", + }, + "2": { + "blockName": "Saddle", + "blockMetadata": "/orchestration/SIGNAL_BLOCK/2/", + }, }, "STRATEGY_BLOCK": { "1": { "blockName": "Backtest", - "blockMetadata": "/orchestration/$STRATEGY_BLOCK/$1/", + "blockMetadata": "/orchestration/STRATEGY_BLOCK/1/", } }, } @@ -65,7 +73,7 @@ def test_ok(self): self.assertDictEqual( response.json(), { - "blockName": "Raw Data", + "blockName": "US Stock Data", "blockType": "DATA_BLOCK", "blockId": 1, "inputs": [ @@ -76,22 +84,10 @@ def test_ok(self): "fieldVariableName": "equity_name", }, { - "fieldData": {"base": "/dataType", "method": "GET"}, - "fieldName": "Data Type", + "fieldData": {"base": "/candlestick", "method": "GET"}, + "fieldName": "Candlesticks", "fieldType": "dropdown", - "fieldVariableName": "data_type", - }, - { - "fieldData": {"base": "/interval", "method": "GET"}, - "fieldName": "Interval", - "fieldType": "dropdown", - "fieldVariableName": "interval", - }, - { - "fieldData": {"base": "/outputSize", "method": "GET"}, - "fieldName": "Output Size", - "fieldType": "dropdown", - "fieldVariableName": "outputsize", + "fieldVariableName": "candlestick", }, { "fieldName": "Date Range", @@ -307,6 +303,63 @@ def test_block_id_dne(self): class ValidateFlowTest(TestCase): + def test_single_edge_invalid(self): + request_payload = { + "nodeList": { + "1": { + "blockType": "DATA_BLOCK", + "blockId": 1, + "equity_name": {"value": "AAPL", "options": []}, + "data_type": { + "value": "intraday", + "options": ["intraday", "daily_adjusted"], + }, + "interval": {"value": "1min", "options": ["1min"]}, + "outputsize": {"value": "compact", "options": ["compact", "full"]}, + "start_date": {"value": "2021-06-21 19:58:00"}, + "end_date": {"value": "2021-06-21 20:00:00"}, + }, + "4": { + "blockType": "SIGNAL_BLOCK", + "blockId": 1, + "event_type": {"value": "INTERSECT"}, + "event_action": {"value": "BUY"}, + }, + }, + "edgeList": [ + { + "source": "1", + "sourceHandle": "output_id888", + "target": "4", + "targetHandle": "input_id891", + "type": "edge", + "id": "reactflow__edge-1output_id888-4input_id891", + }, + ], + } + + auth = set_up_authentication() + response = self.client.post( + "/orchestration/validate", + json.dumps(request_payload), + content_type="application/json", + **{"HTTP_AUTHORIZATION": f"Bearer {auth['token']}"}, + ) + + self.assertDictEqual( + response.json(), + { + "valid": False, + "edges": { + "reactflow__edge-1output_id888-4input_id891": { + "status": False, + "target_block": "Event", + "allowed_connections": ["Technical Analysis"], + } + }, + }, + ) + def test_ok(self): auth = set_up_authentication() response = self.client.post( @@ -316,7 +369,39 @@ def test_ok(self): **{"HTTP_AUTHORIZATION": f"Bearer {auth['token']}"}, ) - self.assertDictEqual(response.json(), {"valid": True}) + self.assertDictEqual( + response.json(), + { + "valid": True, + "edges": { + "reactflow__edge-1output_id888-2input_id891": { + "status": True, + "target_block": "Technical Analysis", + "allowed_connections": [], + }, + "reactflow__edge-1output_id1136-3input_id1143": { + "status": True, + "target_block": "Technical Analysis", + "allowed_connections": [], + }, + "reactflow__edge-2output_id1356-4input_id1363": { + "status": True, + "target_block": "Event", + "allowed_connections": [], + }, + "reactflow__edge-3output_id1576-4input_id1579": { + "status": True, + "target_block": "Event", + "allowed_connections": [], + }, + "reactflow__edge-4output_id1796-5input_id1799": { + "status": True, + "target_block": "Backtest", + "allowed_connections": [], + }, + }, + }, + ) def test_invalid(self): auth = set_up_authentication() @@ -327,7 +412,29 @@ def test_invalid(self): **{"HTTP_AUTHORIZATION": f"Bearer {auth['token']}"}, ) - self.assertDictEqual(response.json(), {"valid": False}) + self.assertDictEqual( + response.json(), + { + "valid": False, + "edges": { + "reactflow__edge-1output_id502-4input_id589": { + "status": False, + "target_block": "Event", + "allowed_connections": ["Technical Analysis"], + }, + "reactflow__edge-4output_id986-2input_id1089": { + "status": False, + "target_block": "Technical Analysis", + "allowed_connections": ["US Stock Data", "Crypto Data"], + }, + "reactflow__edge-4output_id1230-3input_id1417": { + "status": False, + "target_block": "Event", + "allowed_connections": ["Technical Analysis"], + }, + }, + }, + ) class RunFlowTest(TestCase): diff --git a/orchestrator/views.py b/orchestrator/views.py index fa379934..b994cdb2 100644 --- a/orchestrator/views.py +++ b/orchestrator/views.py @@ -11,8 +11,7 @@ from authentication.decorators import SpectrumAuthentication, SpectrumIsAuthenticated from orchestrator.models import BlockRegistry -from orchestrator.services.flow.run import run -from orchestrator.services.flow.spectrum_flow_v2 import SpectrumFlow +from orchestrator.services.flow.spectrum_flow import SpectrumFlow class AllMetadataView(APIView): @@ -24,15 +23,14 @@ def get(self, request): response = {} for block_registry in all_blocks_from_registry: - response = { - **response, - block_registry.block_type: { - block_registry.block_id: { - "blockName": block_registry.block_name, - "blockMetadata": f"/orchestration/${block_registry.block_type}/${block_registry.block_id}/", - } - }, - } + if block_registry.block_type not in response: + response[block_registry.block_type] = {} + + if block_registry.block_id not in response[block_registry.block_type]: + response[block_registry.block_type][block_registry.block_id] = { + "blockName": block_registry.block_name, + "blockMetadata": f"/orchestration/{block_registry.block_type}/{block_registry.block_id}/", + } return JsonResponse({"response": response}) @@ -43,12 +41,6 @@ class MetadataView(APIView): def get(self, request, block_type, block_id): try: - # block_registry = ( - # BlockRegistry.objects.all() - # .filter(block_type=block_type) - # .filter(block_id=block_id)[0] - # ) - block_registry = BlockRegistry.objects.get( block_type=block_type, block_id=block_id ) @@ -108,7 +100,7 @@ def post(self, request): if request_body["nodeList"] is not {} and request_body["edgeList"] is not []: flow = SpectrumFlow(request_body["nodeList"], request_body["edgeList"]) - return JsonResponse({"valid": flow.is_valid}) + return JsonResponse({"valid": flow.is_valid, "edges": flow.edge_validation}) else: return JsonResponse({"valid": False}) diff --git a/requirements.txt b/requirements.txt index 165ec2b2..b760821b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -36,7 +36,7 @@ pylint==2.8.3 pylint-django==2.4.4 pylint-plugin-utils==0.6 python-dateutil==2.8.1 -python-dotenv==0.18.0 +python-dotenv==0.19.0 python3-openid==3.2.0 pytz==2021.1 regex==2021.7.6