From 91986f06a7c8874412e01aff451fe2c382610793 Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Thu, 22 Jul 2021 21:20:03 +0100 Subject: [PATCH 01/17] (fix): Updated views due to bug in code for getAllMetadata endpoint and updated data migration to support new block type. --- .../0002_data_migration_for_block_registry.py | 39 +++++++++++++++++++ orchestrator/views.py | 23 ++++------- 2 files changed, 47 insertions(+), 15 deletions(-) diff --git a/orchestrator/migrations/0002_data_migration_for_block_registry.py b/orchestrator/migrations/0002_data_migration_for_block_registry.py index 4d4be0f8..e500f054 100644 --- a/orchestrator/migrations/0002_data_migration_for_block_registry.py +++ b/orchestrator/migrations/0002_data_migration_for_block_registry.py @@ -104,6 +104,45 @@ def seed_blocks_into_registry(apps, schema_editor): }, ).save() + BlockRegistry( + block_type="SIGNAL_BLOCK", + block_id=2, + block_name="Saddle", + inputs=[ + { + "fieldData": {"base": "/saddleType", "method": "GET"}, + "fieldName": "Saddle Type", + "fieldType": "dropdown", + "fieldVariableName": "saddle_type", + }, + { + "fieldData": {"base": "/eventAction", "method": "GET"}, + "fieldName": "Event Action", + "fieldType": "dropdown", + "fieldVariableName": "event_action", + }, + { + "fieldName": "Consecutive Up", + "fieldVariableName": "consecutive_up", + "fieldType": "input", + }, + { + "fieldName": "Consecutive Down", + "fieldVariableName": "consecutive_down", + "fieldType": "input", + }, + ], + validations={ + "input": { + "required": [{"blockType": "COMPUTATIONAL_BLOCK", "number": 1}], + "allowed_blocks": [ + {"blockId": "1", "blockType": "COMPUTATIONAL_BLOCK"} + ], + }, + "output": [{"blockType": "SIGNAL_BLOCK", "number": 2}], + }, + ).save() + BlockRegistry( block_type="STRATEGY_BLOCK", block_id=1, diff --git a/orchestrator/views.py b/orchestrator/views.py index fa379934..ac84929f 100644 --- a/orchestrator/views.py +++ b/orchestrator/views.py @@ -24,15 +24,14 @@ def get(self, request): response = {} for block_registry in all_blocks_from_registry: - response = { - **response, - block_registry.block_type: { - block_registry.block_id: { - "blockName": block_registry.block_name, - "blockMetadata": f"/orchestration/${block_registry.block_type}/${block_registry.block_id}/", - } - }, - } + if block_registry.block_type not in response: + response[block_registry.block_type] = {} + + if block_registry.block_id not in response[block_registry.block_type]: + response[block_registry.block_type][block_registry.block_id] = { + "blockName": block_registry.block_name, + "blockMetadata": f"/orchestration/{block_registry.block_type}/{block_registry.block_id}/", + } return JsonResponse({"response": response}) @@ -43,12 +42,6 @@ class MetadataView(APIView): def get(self, request, block_type, block_id): try: - # block_registry = ( - # BlockRegistry.objects.all() - # .filter(block_type=block_type) - # .filter(block_id=block_id)[0] - # ) - block_registry = BlockRegistry.objects.get( block_type=block_type, block_id=block_id ) From fd8ccd39d611c9c5a1ff2cb84e9c478ec020e637 Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Thu, 22 Jul 2021 21:28:15 +0100 Subject: [PATCH 02/17] (fix): Updated getAllMetadata test with new values. --- orchestrator/tests/test_views.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/orchestrator/tests/test_views.py b/orchestrator/tests/test_views.py index 7274f95a..a6538295 100644 --- a/orchestrator/tests/test_views.py +++ b/orchestrator/tests/test_views.py @@ -25,25 +25,29 @@ def test_ok(self): "DATA_BLOCK": { "1": { "blockName": "Raw Data", - "blockMetadata": "/orchestration/$DATA_BLOCK/$1/", + "blockMetadata": "/orchestration/DATA_BLOCK/1/", } }, "COMPUTATIONAL_BLOCK": { "1": { "blockName": "Technical Analysis", - "blockMetadata": "/orchestration/$COMPUTATIONAL_BLOCK/$1/", + "blockMetadata": "/orchestration/COMPUTATIONAL_BLOCK/1/", } }, "SIGNAL_BLOCK": { "1": { "blockName": "Event", - "blockMetadata": "/orchestration/$SIGNAL_BLOCK/$1/", + "blockMetadata": "/orchestration/SIGNAL_BLOCK/1/", + }, + "2": { + "blockName": "Saddle", + "blockMetadata": "/orchestration/SIGNAL_BLOCK/2/", } }, "STRATEGY_BLOCK": { "1": { "blockName": "Backtest", - "blockMetadata": "/orchestration/$STRATEGY_BLOCK/$1/", + "blockMetadata": "/orchestration/STRATEGY_BLOCK/1/", } }, } From 899b3377fd8f2c64e96756a985794035cf4996c4 Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Thu, 22 Jul 2021 21:40:34 +0100 Subject: [PATCH 03/17] (fix): Updated data migrations to include strategy block as dependency. --- .../migrations/0002_data_migration_for_block_registry.py | 1 + 1 file changed, 1 insertion(+) diff --git a/orchestrator/migrations/0002_data_migration_for_block_registry.py b/orchestrator/migrations/0002_data_migration_for_block_registry.py index e500f054..df94ef49 100644 --- a/orchestrator/migrations/0002_data_migration_for_block_registry.py +++ b/orchestrator/migrations/0002_data_migration_for_block_registry.py @@ -187,6 +187,7 @@ def seed_blocks_into_registry(apps, schema_editor): ], "allowed_blocks": [ {"blockId": "1", "blockType": "SIGNAL_BLOCK"}, + {"blockId": "2", "blockType": "SIGNAL_BLOCK"}, {"blockId": "1", "blockType": "DATA_BLOCK"}, ], }, From f9acaa1b9bde4b71f3a6556a791ccdcc6c33c7d4 Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Thu, 22 Jul 2021 21:43:24 +0100 Subject: [PATCH 04/17] (lint): Linted files. --- orchestrator/tests/test_views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestrator/tests/test_views.py b/orchestrator/tests/test_views.py index a6538295..326d24c1 100644 --- a/orchestrator/tests/test_views.py +++ b/orchestrator/tests/test_views.py @@ -42,7 +42,7 @@ def test_ok(self): "2": { "blockName": "Saddle", "blockMetadata": "/orchestration/SIGNAL_BLOCK/2/", - } + }, }, "STRATEGY_BLOCK": { "1": { From 5fdd4fa6ec88b70ba7cbc1330118449f4c11c628 Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Mon, 26 Jul 2021 13:07:06 +0100 Subject: [PATCH 05/17] (feature): Added new block to metadata. --- .../0002_data_migration_for_block_registry.py | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/orchestrator/migrations/0002_data_migration_for_block_registry.py b/orchestrator/migrations/0002_data_migration_for_block_registry.py index df94ef49..9cbf26a8 100644 --- a/orchestrator/migrations/0002_data_migration_for_block_registry.py +++ b/orchestrator/migrations/0002_data_migration_for_block_registry.py @@ -50,6 +50,38 @@ def seed_blocks_into_registry(apps, schema_editor): }, ).save() + BlockRegistry( + block_type="DATA_BLOCK", + block_id=2, + block_name="Crypto Data", + inputs=[ + { + "fieldData": { + "base": "/cryptoName?name=", + "method": "GET", + }, + "fieldName": "Crypto Name", + "fieldType": "search", + "fieldVariableName": "crypto_name", + }, + { + "fieldData": {"base": "/candlestick", "method": "GET"}, + "fieldName": "Candlesticks", + "fieldType": "dropdown", + "fieldVariableName": "candlestick", + }, + { + "fieldName": "Date Range", + "fieldType": "date_range", + "fieldVariableNames": ["start_date", "end_date"], + }, + ], + validations={ + "input": {"required": [], "allowed_blocks": []}, + "output": [{"blockType": "DATA_BLOCK", "number": 1}], + }, + ).save() + BlockRegistry( block_type="COMPUTATIONAL_BLOCK", block_id=1, @@ -69,7 +101,7 @@ def seed_blocks_into_registry(apps, schema_editor): validations={ "input": { "required": [{"blockType": "DATA_BLOCK", "number": 1}], - "allowed_blocks": [{"blockId": "1", "blockType": "DATA_BLOCK"}], + "allowed_blocks": [{"blockId": "1", "blockType": "DATA_BLOCK"}, {"blockId": "2", "blockType": "DATA_BLOCK"}], }, "output": [{"blockType": "COMPUTATIONAL_BLOCK", "number": 1}], }, @@ -189,6 +221,7 @@ def seed_blocks_into_registry(apps, schema_editor): {"blockId": "1", "blockType": "SIGNAL_BLOCK"}, {"blockId": "2", "blockType": "SIGNAL_BLOCK"}, {"blockId": "1", "blockType": "DATA_BLOCK"}, + {"blockId": "2", "blockType": "DATA_BLOCK"} ], }, "output": [{"blockType": "STRATEGY_BLOCK", "number": 1}], From e77c465d6469f913aa6d4eac7419c07f5b17c2f0 Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Mon, 26 Jul 2021 13:12:53 +0100 Subject: [PATCH 06/17] (fix): Updated the getAllMetadata response. --- orchestrator/tests/test_views.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/orchestrator/tests/test_views.py b/orchestrator/tests/test_views.py index 326d24c1..7b0d2754 100644 --- a/orchestrator/tests/test_views.py +++ b/orchestrator/tests/test_views.py @@ -26,6 +26,10 @@ def test_ok(self): "1": { "blockName": "Raw Data", "blockMetadata": "/orchestration/DATA_BLOCK/1/", + }, + "2": { + "blockName": "Crypto Data", + "blockMetadata": "/orchestration/DATA_BLOCK/2/", } }, "COMPUTATIONAL_BLOCK": { From 76d41d66f1aec4ad39fcbd1e5a2717826481f11a Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Mon, 26 Jul 2021 13:19:51 +0100 Subject: [PATCH 07/17] (lint): Linted files. --- .../migrations/0002_data_migration_for_block_registry.py | 7 +++++-- orchestrator/tests/test_views.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/orchestrator/migrations/0002_data_migration_for_block_registry.py b/orchestrator/migrations/0002_data_migration_for_block_registry.py index 9cbf26a8..9e536fcd 100644 --- a/orchestrator/migrations/0002_data_migration_for_block_registry.py +++ b/orchestrator/migrations/0002_data_migration_for_block_registry.py @@ -101,7 +101,10 @@ def seed_blocks_into_registry(apps, schema_editor): validations={ "input": { "required": [{"blockType": "DATA_BLOCK", "number": 1}], - "allowed_blocks": [{"blockId": "1", "blockType": "DATA_BLOCK"}, {"blockId": "2", "blockType": "DATA_BLOCK"}], + "allowed_blocks": [ + {"blockId": "1", "blockType": "DATA_BLOCK"}, + {"blockId": "2", "blockType": "DATA_BLOCK"}, + ], }, "output": [{"blockType": "COMPUTATIONAL_BLOCK", "number": 1}], }, @@ -221,7 +224,7 @@ def seed_blocks_into_registry(apps, schema_editor): {"blockId": "1", "blockType": "SIGNAL_BLOCK"}, {"blockId": "2", "blockType": "SIGNAL_BLOCK"}, {"blockId": "1", "blockType": "DATA_BLOCK"}, - {"blockId": "2", "blockType": "DATA_BLOCK"} + {"blockId": "2", "blockType": "DATA_BLOCK"}, ], }, "output": [{"blockType": "STRATEGY_BLOCK", "number": 1}], diff --git a/orchestrator/tests/test_views.py b/orchestrator/tests/test_views.py index 7b0d2754..8f86fe6a 100644 --- a/orchestrator/tests/test_views.py +++ b/orchestrator/tests/test_views.py @@ -30,7 +30,7 @@ def test_ok(self): "2": { "blockName": "Crypto Data", "blockMetadata": "/orchestration/DATA_BLOCK/2/", - } + }, }, "COMPUTATIONAL_BLOCK": { "1": { From 667e23042f3133324cc4e5de820814cdd30b84f5 Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Mon, 26 Jul 2021 17:50:02 +0100 Subject: [PATCH 08/17] (fix): Updated migrations. --- .../0002_data_migration_for_block_registry.py | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/orchestrator/migrations/0002_data_migration_for_block_registry.py b/orchestrator/migrations/0002_data_migration_for_block_registry.py index 9e536fcd..ed5ad7c7 100644 --- a/orchestrator/migrations/0002_data_migration_for_block_registry.py +++ b/orchestrator/migrations/0002_data_migration_for_block_registry.py @@ -9,7 +9,7 @@ def seed_blocks_into_registry(apps, schema_editor): BlockRegistry( block_type="DATA_BLOCK", block_id=1, - block_name="Raw Data", + block_name="US Stock Data", inputs=[ { "fieldData": { @@ -19,24 +19,12 @@ def seed_blocks_into_registry(apps, schema_editor): "fieldName": "Equity Name", "fieldType": "search", "fieldVariableName": "equity_name", - }, # TODO: Change fieldType from dropdown to search - { - "fieldData": {"base": "/dataType", "method": "GET"}, - "fieldName": "Data Type", - "fieldType": "dropdown", - "fieldVariableName": "data_type", - }, - { - "fieldData": {"base": "/interval", "method": "GET"}, - "fieldName": "Interval", - "fieldType": "dropdown", - "fieldVariableName": "interval", }, { - "fieldData": {"base": "/outputSize", "method": "GET"}, - "fieldName": "Output Size", + "fieldData": {"base": "/candlestick", "method": "GET"}, + "fieldName": "Candlesticks", "fieldType": "dropdown", - "fieldVariableName": "outputsize", + "fieldVariableName": "candlestick", }, { "fieldName": "Date Range", From c6372d53f7faee98b5d6dd78865ec3850ca51e39 Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Mon, 26 Jul 2021 17:58:22 +0100 Subject: [PATCH 09/17] (feature): Updated test_views with updated block name. --- orchestrator/tests/test_views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestrator/tests/test_views.py b/orchestrator/tests/test_views.py index 8f86fe6a..16d0fe98 100644 --- a/orchestrator/tests/test_views.py +++ b/orchestrator/tests/test_views.py @@ -24,7 +24,7 @@ def test_ok(self): "response": { "DATA_BLOCK": { "1": { - "blockName": "Raw Data", + "blockName": "US Stock Data", "blockMetadata": "/orchestration/DATA_BLOCK/1/", }, "2": { From 5670b063f5263ee6eefb055f28dffd453a6d85ff Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Mon, 26 Jul 2021 18:03:33 +0100 Subject: [PATCH 10/17] (feature): Update test views with new metadata endpoint. --- orchestrator/tests/test_views.py | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/orchestrator/tests/test_views.py b/orchestrator/tests/test_views.py index 16d0fe98..214c2236 100644 --- a/orchestrator/tests/test_views.py +++ b/orchestrator/tests/test_views.py @@ -73,7 +73,7 @@ def test_ok(self): self.assertDictEqual( response.json(), { - "blockName": "Raw Data", + "blockName": "US Stock Data", "blockType": "DATA_BLOCK", "blockId": 1, "inputs": [ @@ -84,22 +84,10 @@ def test_ok(self): "fieldVariableName": "equity_name", }, { - "fieldData": {"base": "/dataType", "method": "GET"}, - "fieldName": "Data Type", + "fieldData": {"base": "/candlestick", "method": "GET"}, + "fieldName": "Candlesticks", "fieldType": "dropdown", - "fieldVariableName": "data_type", - }, - { - "fieldData": {"base": "/interval", "method": "GET"}, - "fieldName": "Interval", - "fieldType": "dropdown", - "fieldVariableName": "interval", - }, - { - "fieldData": {"base": "/outputSize", "method": "GET"}, - "fieldName": "Output Size", - "fieldType": "dropdown", - "fieldVariableName": "outputsize", + "fieldVariableName": "candlestick", }, { "fieldName": "Date Range", From c416f5b57dc01fc800874902a57023a3fd35a6ad Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Wed, 28 Jul 2021 07:38:34 +0100 Subject: [PATCH 11/17] (cleanup): Cleaned up unused code. --- orchestrator/services/flow/run.py | 6 - orchestrator/services/flow/spectrum_flow.py | 542 ++++++++---------- .../services/flow/spectrum_flow_v2.py | 360 ------------ orchestrator/tests/test_dependency_graph.py | 2 +- orchestrator/tests/test_views.py | 3 + orchestrator/views.py | 3 +- 6 files changed, 238 insertions(+), 678 deletions(-) delete mode 100644 orchestrator/services/flow/run.py delete mode 100644 orchestrator/services/flow/spectrum_flow_v2.py diff --git a/orchestrator/services/flow/run.py b/orchestrator/services/flow/run.py deleted file mode 100644 index c2a1ac39..00000000 --- a/orchestrator/services/flow/run.py +++ /dev/null @@ -1,6 +0,0 @@ -from orchestrator.services.flow.spectrum_flow import SpectrumFlow - - -def run(vertices, edges): - spectrum_flow = SpectrumFlow(vertices, edges) - return spectrum_flow diff --git a/orchestrator/services/flow/spectrum_flow.py b/orchestrator/services/flow/spectrum_flow.py index 7f3e11ad..711dab0e 100644 --- a/orchestrator/services/flow/spectrum_flow.py +++ b/orchestrator/services/flow/spectrum_flow.py @@ -1,39 +1,53 @@ import json -import logging import requests from os import environ from copy import deepcopy from orchestrator.models import BlockRegistry -from orchestrator.services.flow.graph import Graph -class SpectrumFlow: - """ - Ingests a list of vertices and edges to create a representation of a "spectrum flow" - """ +class Graph: + def __init__(self): + self.adjacency_list = {} + + def __repr__(self): + return repr(self.adjacency_list) + + def insert(self, source_vertex, dest_vertex): + """ + Inserts one or both vertices (depending on if either exists) and + connects one vertex to the other + + Attributes: + source_vertex: Start / Originating Vertex + dest_vertex: Destination Vertex + """ + if not (source_vertex in self.adjacency_list): + self.adjacency_list[source_vertex] = set() + + self.adjacency_list[source_vertex].add(dest_vertex) + +class DependencyGraph: def __init__(self, vertices, edges): self.vertices = vertices self.edges = edges - self.graph = self.generate_adjacency_list() + self.graph = self.generate_graph() self.dependency_graph = self.generate_dependency_graph() - self.batched_tasks = self.get_batched_tasks() + self.batched_tasks = self.generate_batched_tasks() - def generate_adjacency_list(self): + def generate_graph(self): """ - Creates an adjacency list when passed in a list of nodes and edges + Generates an adjacency list graph representation using the node and edge pairs """ graph = Graph() - # Initializes the adjacency list with initial values - for vertex in self.vertices: - id = vertex["id"] - if not id in graph.adjacency_list: - graph.adjacency_list[id] = set() + # Initializes the Adjacency List of Blocks + for block_id, vertex in self.vertices.items(): + if block_id not in graph.adjacency_list: + graph.adjacency_list[block_id] = set() - # Iterates through the edges and populates the values in the adjacency list for edge in self.edges: graph.insert(edge["source"], edge["target"]) @@ -41,18 +55,18 @@ def generate_adjacency_list(self): def generate_dependency_graph(self): """ - Creates a depedency graph by transposing the adjacency list + Transposes the adjacency list to create a graph of dependencies """ dependency_graph = Graph() # Initializes the adjacency list with initial values for source_vertex, _ in self.graph.adjacency_list.items(): - if not source_vertex in dependency_graph.adjacency_list: + if source_vertex not in dependency_graph.adjacency_list: dependency_graph.adjacency_list[source_vertex] = set() # Reverses the direction of the node connections for source_vertex, dest_vertices in self.graph.adjacency_list.items(): - if not source_vertex in dependency_graph.adjacency_list: + if source_vertex not in dependency_graph.adjacency_list: dependency_graph.adjacency_list[source_vertex] = set() for dest_vertex in dest_vertices: @@ -60,9 +74,9 @@ def generate_dependency_graph(self): return dependency_graph - def get_batched_tasks(self): + def generate_batched_tasks(self): """ - Traverses through the adjacency list to sequence through tasks + Creates a series of batches tasks that need to be executed sequentially """ batches = [] @@ -87,326 +101,201 @@ def get_batched_tasks(self): return batches - def validate_strategy(self): - def get_block_by_id(id): - """ - Retrieves a block from the list of vertices passed in initially - Attributes: - id: ID of Block in Flow - """ - for vertex in self.vertices: - if vertex["id"] == id: - return vertex - - def dfs( - visited, - block_id_in_flow, - allowed_block_data, - target_block_data, - blocks_found, - ): - """ - Performs a DFS recursively that iterates through the directed adjacency list. - It attempts to determine which blocks downstream in the sequence have their required data - Attributes: - visited: Set of blocks that have already been traversed - block_id_in_flow: The current block ID being iterated on - allowed_block_data: List of permitted input blocks - target_block_data: States the block type and number of blocks being searched for - """ +class SpectrumFlow: + def __init__(self, vertices, edges): + self.vertices = vertices + self.edges = edges - block_data = get_block_by_id(block_id_in_flow) - - if ( - block_data["data"]["metadata"]["blockType"] - == target_block_data["blockType"] - ): - for allowed_block in allowed_block_data: - if str(block_data["data"]["metadata"]["blockType"]) == str( - allowed_block["blockType"] - ) and str(block_data["data"]["metadata"]["blockId"]) == str( - allowed_block["blockId"] - ): - blocks_found.append(block_id_in_flow) - - # Stopping Condition - if len(blocks_found) == int(target_block_data["number"]): - return - - if block_id_in_flow not in visited: - visited.add(block_id_in_flow) - for neighbor in self.dependency_graph.adjacency_list[block_id_in_flow]: - dfs( - visited, - neighbor, - allowed_block_data, - target_block_data, - blocks_found, - ) + graph = DependencyGraph(vertices, edges) - def get_block_data_from_registry(block_data, block_id_in_flow): - """ - Retrieves the Block Data from the registry given the block ID in the flow + self.graph = graph.graph.adjacency_list + self.dependency_graph = graph.dependency_graph.adjacency_list + self.batched_tasks = graph.batched_tasks - Attributes: - block_data: Full JSON of Block Data from Front-End - block_id_in_flow: The Block ID generated by the front-end when assembling a flow - """ - # Block Data from Block Registry - # TODO: Maybe this should be optimized since its querying the whole table - block_registry_data = ( + self.is_valid = self.run(mode="VALIDATE") + + def _get_block_by_id(self, block_id): + """ + Retrieves a block by its ID + + block_id: ID of block in flow + """ + try: + return self.vertices[block_id] + except KeyError: + raise Exception(f"The Block ID {block_id} could not be found") + + @staticmethod + def _get_block_data_from_registry(block_type, block_id): + try: + return ( BlockRegistry.objects.all() - .filter(block_type=block_data["data"]["metadata"]["blockType"]) - .filter(block_id=block_data["data"]["metadata"]["blockId"])[0] + .filter(block_type=block_type) + .filter(block_id=block_id) + .first() ) + except Exception as e: + raise Exception(e) + + def _dfs( + self, + visited, + block_id_in_flow, + allowed_block_data, + target_block_data, + blocks_found, + ): + """ + Recursively iterates through directed adjancency list - return block_registry_data + Attempts to determine which blocks downstream in the sequence have required data - is_valid = True - # Main Running Code - for task in self.batched_tasks: - # Goes through a set of tasks. Each FOR loop should make a request - # TODO: This part of the process can be asynchronous - for task_to_be_run in task: - # Gets the full sent data about the block - block_data = get_block_by_id(task_to_be_run) - block_registry_data = get_block_data_from_registry( - block_data, task_to_be_run + Attributes: + visited: Set of blocks that have been traversed + block_id_in_flow: Current block ID being unpacked + allowed_block_data: List of allowed input blocks + target_block_data: Block Type and Number of Blocks being searched for + """ + block_data = self._get_block_by_id(block_id_in_flow) + + if block_data["blockType"] == target_block_data["blockType"]: + for allowed_block in allowed_block_data: + if str(block_data["blockType"]) == str( + allowed_block["blockType"] + ) and str(block_data["blockId"]) == str(allowed_block["blockId"]): + blocks_found.append(block_id_in_flow) + + if len(blocks_found) == int(target_block_data["number"]): + return + + if block_id_in_flow not in visited: + visited.add(block_id_in_flow) + + for neighbor in self.dependency_graph[block_id_in_flow]: + self._dfs( + visited, + neighbor, + allowed_block_data, + target_block_data, + blocks_found, ) - if len(self.dependency_graph.adjacency_list[task_to_be_run]) == 0: - is_valid = ( - is_valid - and len(block_registry_data.validations["input"]["required"]) - == 0 - ) - else: - blocks_found = [] - for required_block in block_registry_data.validations["input"][ - "required" - ]: - # Visited Set - visited = set() - - dfs( - visited, - task_to_be_run, - block_registry_data.validations["input"]["allowed_blocks"], - required_block, - blocks_found, - ) - - assembled_dependency_list_from_flow = {} - for item in set(blocks_found): - item_block_data = get_block_by_id(item) - if ( - item_block_data["data"]["metadata"]["blockType"] - not in assembled_dependency_list_from_flow - ): - assembled_dependency_list_from_flow[ - item_block_data["data"]["metadata"]["blockType"] - ] = 0 - assembled_dependency_list_from_flow[ - item_block_data["data"]["metadata"]["blockType"] - ] += 1 - - for required in block_registry_data.validations["input"][ - "required" - ]: - is_valid = is_valid and ( - assembled_dependency_list_from_flow[required["blockType"]] - == required["number"] - ) - return is_valid + def _make_run_request( + self, block_id_in_flow, block_registry_data, input_payload, output_payload + ): + """ + Hits the `/run` endpoint for each block to complete the request - def run_batched_tasks_v3(self): - def get_block_by_id(id): - """ - Retrieves a block from the list of vertices passed in initially - Attributes: - id: ID of Block in Flow - """ - for vertex in self.vertices: - if vertex["id"] == id: - return vertex - - def dfs( - visited, - block_id_in_flow, - allowed_block_data, - target_block_data, - blocks_found, - ): - """ - Performs a DFS recursively that iterates through the directed adjacency list. - It attempts to determine which blocks downstream in the sequence have their required data + Attributes + block_id_in_flow: Block ID generated by the frontend + block_registry_data: Block Data queried from the frontend + input_payload: Input Payload + output_payload: Output Payload + """ - Attributes: - visited: Set of blocks that have already been traversed - block_id_in_flow: The current block ID being iterated on - allowed_block_data: List of permitted input blocks - target_block_data: States the block type and number of blocks being searched for - """ + request_url = f"{environ['API_BASE_URL']}/{block_registry_data.block_type}/{block_registry_data.block_id}/run" - block_data = get_block_by_id(block_id_in_flow) - - if ( - block_data["data"]["metadata"]["blockType"] - == target_block_data["blockType"] - ): - for allowed_block in allowed_block_data: - if str(block_data["data"]["metadata"]["blockType"]) == str( - allowed_block["blockType"] - ) and str(block_data["data"]["metadata"]["blockId"]) == str( - allowed_block["blockId"] - ): - blocks_found.append(block_id_in_flow) - - # Stopping Condition - if len(blocks_found) == int(target_block_data["number"]): - return - - if block_id_in_flow not in visited: - visited.add(block_id_in_flow) - for neighbor in self.dependency_graph.adjacency_list[block_id_in_flow]: - dfs( - visited, - neighbor, - allowed_block_data, - target_block_data, - blocks_found, - ) + # Input Transformation + input_cleaned_payload = {} + for k, v in input_payload.items(): + if type(v) is dict and "value" in v: + input_cleaned_payload[k] = v["value"] - def get_block_data_from_registry(block_data, block_id_in_flow): - """ - Retrieves the Block Data from the registry given the block ID in the flow + request_payload = {"input": input_cleaned_payload, "output": output_payload} - Attributes: - block_data: Full JSON of Block Data from Front-End - block_id_in_flow: The Block ID generated by the front-end when assembling a flow - """ - # Block Data from Block Registry - # TODO: Maybe this should be optimized since its querying the whole table - block_registry_data = ( - BlockRegistry.objects.all() - .filter(block_type=block_data["data"]["metadata"]["blockType"]) - .filter(block_id=block_data["data"]["metadata"]["blockId"])[0] - ) + r = requests.post(request_url, json=request_payload) - return block_registry_data + output = {} + if r.status_code == 200: + block_type_id_key = f"{block_registry_data.block_type}-{block_registry_data.block_id}-{block_id_in_flow}" - def make_run_request( - block_id_in_flow, block_registry_data, input_payload, output_payload - ): - """ - Makes a request against remote resources to complete the request + if block_type_id_key not in output.keys(): + output[block_type_id_key] = {} - Attributes: - block_id_in_flow: The Block ID generated by the front-end when assembling a flow - block_registry_data: Block Data queried from the front-end - input_payload: JSON Payload of Form Inputs - output_payload: JSON Payload of required information from previous steps - """ - # Make a POST request to a run endpoint to run the block - request_url = f"{environ['API_BASE_URL']}/{block_registry_data.block_type}/{block_registry_data.block_id}/run" + try: + response_json = r.json() + if "response" in response_json: + output[block_type_id_key] = response_json["response"] + else: + raise Exception("JSON Key 'response' could not be found") - request_payload = { - "input": input_payload, - "output": output_payload, - } + except json.decoder.JSONDecodeError as e: + raise Exception("JSON Decode Error") + except Exception as e: + raise Exception("Unhandled Exception: ", e) + else: + print("Error: ", r.json()) - # TODO: Remove once done debugging - # with open(f"input-payload-{block_id_in_flow}.json", "w") as outfile: - # json.dump(request_payload, outfile) - - r = requests.post(request_url, json=request_payload) - - output = {} - if r.status_code == 200: - # Updates the Block Outputs overall JSON - block_type_id_key = f"{block_registry_data.block_type}-{block_registry_data.block_id}-{block_id_in_flow}" # TODO: Add the block_id_in_flow to the cache key in case 2 blocks of same type are used - print("Block Type ID Key: ", block_type_id_key) - if block_type_id_key not in list(output.keys()): - output[block_type_id_key] = {} - - try: - response_json = r.json() - # Standardized Return From Block with key "response" - if "response" in response_json: - output[block_type_id_key] = response_json["response"] - except json.decoder.JSONDecodeError as e: - print("JSON Decode Error") - except Exception as e: - print("Generic Exception: ", e) - else: - logging.error( - f"A Response {r.status_code} when querying URL {request_url} with ..." - ) + return output - return output + def run(self, mode="VALIDATE"): + """ + Validates a flow to ensure that all nodes are connected correctly + """ output_cache = {} - def get_data_from_cache(block_id_in_flow): + def _get_data_from_cache(block_id_in_flow): """ - Makes a request to the output cache to retrieve data + Retrieves data about block from cache Attributes: - block_id_in_flow: The Block ID generated by the front-end when assembling a flow + block_id: Block ID from Flow """ - block_data = get_block_by_id(block_id_in_flow) - block_registry_data = get_block_data_from_registry( - block_data, block_id_in_flow + block_data = self._get_block_by_id(block_id_in_flow) + block_registry_data = self._get_block_data_from_registry( + block_data["blockType"], block_data["blockId"] ) cache_key = f"{block_registry_data.block_type}-{block_registry_data.block_id}-{block_id_in_flow}" - if cache_key in list(output_cache.keys()): + if cache_key in output_cache.keys(): return cache_key, output_cache[cache_key] else: - print("Cache Key: ", cache_key) - print("Output Cache: ", output_cache.keys()) - raise f"Data does not exist in cache for {block_id_in_flow} with {cache_key}" + raise Exception( + f"Data does not exist in cache for {block_id_in_flow} with {cache_key}" + ) + + is_valid = True + + if len(self.batched_tasks) == 0: + is_valid = False - # Main Running Code for task in self.batched_tasks: - # Goes through a set of tasks. Each FOR loop should make a request - # TODO: This part of the process can be asynchronous for task_to_be_run in task: - # Gets the full sent data about the block - block_data = get_block_by_id(task_to_be_run) - block_registry_data = get_block_data_from_registry( - block_data, task_to_be_run + block_data = self._get_block_by_id(task_to_be_run) + block_registry_data = self._get_block_data_from_registry( + block_data["blockType"], block_data["blockId"] ) - # If the task has no dependencies, make the request immediately, - # otherwise perform a DFS search to extract all related dependencies - if len(self.dependency_graph.adjacency_list[task_to_be_run]) == 0: - response = make_run_request( - task_to_be_run, - block_registry_data, - block_data["data"]["input"], - {}, - ) + # Iterate through block data to gauge whether inputs exist + for key, value in block_data.items(): + if type(value) is dict and "value" in value.keys(): + if value["value"] == "": + is_valid = False - # Adds to a cache to ensure that requests don't need to be re-run - output_cache = {**output_cache, **response} - else: - # TODO: Implement DFS code to get the list of related objects + if len(self.dependency_graph[task_to_be_run]) == 0: + is_valid = ( + is_valid + and len(block_registry_data.validations["input"]["required"]) + == 0 + ) - # The following variables are used in the DFS + if mode == "RUN": + response = self._make_run_request( + task_to_be_run, block_registry_data, block_data, {} + ) - # Contains list of Block ID's from the flow that are dependencies - # for running the block associated with the `task_to_be_run` + # Adds to a cache to ensure that requests don't need to be re-run + output_cache = {**output_cache, **response} + else: blocks_found = [] for required_block in block_registry_data.validations["input"][ "required" ]: - # Visited Set visited = set() - - dfs( + self._dfs( visited, task_to_be_run, block_registry_data.validations["input"]["allowed_blocks"], @@ -414,23 +303,58 @@ def get_data_from_cache(block_id_in_flow): blocks_found, ) - print(f"Task {task_to_be_run} - {blocks_found}") - - # Assembles all dependency data into the output_payload variable output_payload = {} - for block_id in blocks_found: - cache_key, response = get_data_from_cache(block_id) + assembled_dependency_list_from_flow = {} + for item in set(blocks_found): + item_block_data = self._get_block_by_id(item) - output_payload = {**output_payload, cache_key: response} + if ( + item_block_data["blockType"] + not in assembled_dependency_list_from_flow + ): + assembled_dependency_list_from_flow[ + item_block_data["blockType"] + ] = 0 + assembled_dependency_list_from_flow[ + item_block_data["blockType"] + ] += 1 - response = make_run_request( - task_to_be_run, - block_registry_data, - block_data["data"]["input"], - output_payload, - ) + if mode == "RUN": + cache_key, response = _get_data_from_cache(item) + output_payload = {**output_payload, cache_key: response} + + if mode == "RUN": + response = self._make_run_request( + task_to_be_run, + block_registry_data, + block_data, + output_payload, + ) - # Adds to a cache to ensure that requests don't need to be re-run - output_cache = {**output_cache, **response} + # Adds to a cache to ensure that requests don't need to be re-run + output_cache = {**output_cache, **response} - return output_cache + for required in block_registry_data.validations["input"][ + "required" + ]: + # print("Block Type:", required["blockType"]) + # print("Required Block Number: ", required["number"]) + # print(assembled_dependency_list_from_flow) + + if required["blockType"] in assembled_dependency_list_from_flow: + is_valid = ( + is_valid + and assembled_dependency_list_from_flow[ + required["blockType"] + ] + == required["number"] + ) + else: + is_valid = False + + if mode == "VALIDATE": + return is_valid + elif mode == "RUN": + return output_cache + else: + return None diff --git a/orchestrator/services/flow/spectrum_flow_v2.py b/orchestrator/services/flow/spectrum_flow_v2.py deleted file mode 100644 index 711dab0e..00000000 --- a/orchestrator/services/flow/spectrum_flow_v2.py +++ /dev/null @@ -1,360 +0,0 @@ -import json -import requests -from os import environ -from copy import deepcopy - -from orchestrator.models import BlockRegistry - - -class Graph: - def __init__(self): - self.adjacency_list = {} - - def __repr__(self): - return repr(self.adjacency_list) - - def insert(self, source_vertex, dest_vertex): - """ - Inserts one or both vertices (depending on if either exists) and - connects one vertex to the other - - Attributes: - source_vertex: Start / Originating Vertex - dest_vertex: Destination Vertex - """ - if not (source_vertex in self.adjacency_list): - self.adjacency_list[source_vertex] = set() - - self.adjacency_list[source_vertex].add(dest_vertex) - - -class DependencyGraph: - def __init__(self, vertices, edges): - self.vertices = vertices - self.edges = edges - - self.graph = self.generate_graph() - self.dependency_graph = self.generate_dependency_graph() - self.batched_tasks = self.generate_batched_tasks() - - def generate_graph(self): - """ - Generates an adjacency list graph representation using the node and edge pairs - """ - graph = Graph() - - # Initializes the Adjacency List of Blocks - for block_id, vertex in self.vertices.items(): - if block_id not in graph.adjacency_list: - graph.adjacency_list[block_id] = set() - - for edge in self.edges: - graph.insert(edge["source"], edge["target"]) - - return graph - - def generate_dependency_graph(self): - """ - Transposes the adjacency list to create a graph of dependencies - """ - dependency_graph = Graph() - - # Initializes the adjacency list with initial values - for source_vertex, _ in self.graph.adjacency_list.items(): - if source_vertex not in dependency_graph.adjacency_list: - dependency_graph.adjacency_list[source_vertex] = set() - - # Reverses the direction of the node connections - for source_vertex, dest_vertices in self.graph.adjacency_list.items(): - if source_vertex not in dependency_graph.adjacency_list: - dependency_graph.adjacency_list[source_vertex] = set() - - for dest_vertex in dest_vertices: - dependency_graph.insert(dest_vertex, source_vertex) - - return dependency_graph - - def generate_batched_tasks(self): - """ - Creates a series of batches tasks that need to be executed sequentially - """ - batches = [] - - dependency_graph = deepcopy(self.dependency_graph) - - while dependency_graph.adjacency_list: - # Retrieves nodes with no dependencies - nodes_with_no_dependencies = { - k for k, v in dependency_graph.adjacency_list.items() if not v - } - - if not nodes_with_no_dependencies: - raise ValueError("Circular Dependency Found") - - for node in nodes_with_no_dependencies: - del dependency_graph.adjacency_list[node] - - for deps in dependency_graph.adjacency_list.values(): - deps.difference_update(nodes_with_no_dependencies) - - batches.append({name for name in nodes_with_no_dependencies}) - - return batches - - -class SpectrumFlow: - def __init__(self, vertices, edges): - self.vertices = vertices - self.edges = edges - - graph = DependencyGraph(vertices, edges) - - self.graph = graph.graph.adjacency_list - self.dependency_graph = graph.dependency_graph.adjacency_list - self.batched_tasks = graph.batched_tasks - - self.is_valid = self.run(mode="VALIDATE") - - def _get_block_by_id(self, block_id): - """ - Retrieves a block by its ID - - block_id: ID of block in flow - """ - try: - return self.vertices[block_id] - except KeyError: - raise Exception(f"The Block ID {block_id} could not be found") - - @staticmethod - def _get_block_data_from_registry(block_type, block_id): - try: - return ( - BlockRegistry.objects.all() - .filter(block_type=block_type) - .filter(block_id=block_id) - .first() - ) - except Exception as e: - raise Exception(e) - - def _dfs( - self, - visited, - block_id_in_flow, - allowed_block_data, - target_block_data, - blocks_found, - ): - """ - Recursively iterates through directed adjancency list - - Attempts to determine which blocks downstream in the sequence have required data - - Attributes: - visited: Set of blocks that have been traversed - block_id_in_flow: Current block ID being unpacked - allowed_block_data: List of allowed input blocks - target_block_data: Block Type and Number of Blocks being searched for - """ - block_data = self._get_block_by_id(block_id_in_flow) - - if block_data["blockType"] == target_block_data["blockType"]: - for allowed_block in allowed_block_data: - if str(block_data["blockType"]) == str( - allowed_block["blockType"] - ) and str(block_data["blockId"]) == str(allowed_block["blockId"]): - blocks_found.append(block_id_in_flow) - - if len(blocks_found) == int(target_block_data["number"]): - return - - if block_id_in_flow not in visited: - visited.add(block_id_in_flow) - - for neighbor in self.dependency_graph[block_id_in_flow]: - self._dfs( - visited, - neighbor, - allowed_block_data, - target_block_data, - blocks_found, - ) - - def _make_run_request( - self, block_id_in_flow, block_registry_data, input_payload, output_payload - ): - """ - Hits the `/run` endpoint for each block to complete the request - - Attributes - block_id_in_flow: Block ID generated by the frontend - block_registry_data: Block Data queried from the frontend - input_payload: Input Payload - output_payload: Output Payload - """ - - request_url = f"{environ['API_BASE_URL']}/{block_registry_data.block_type}/{block_registry_data.block_id}/run" - - # Input Transformation - input_cleaned_payload = {} - for k, v in input_payload.items(): - if type(v) is dict and "value" in v: - input_cleaned_payload[k] = v["value"] - - request_payload = {"input": input_cleaned_payload, "output": output_payload} - - r = requests.post(request_url, json=request_payload) - - output = {} - if r.status_code == 200: - block_type_id_key = f"{block_registry_data.block_type}-{block_registry_data.block_id}-{block_id_in_flow}" - - if block_type_id_key not in output.keys(): - output[block_type_id_key] = {} - - try: - response_json = r.json() - if "response" in response_json: - output[block_type_id_key] = response_json["response"] - else: - raise Exception("JSON Key 'response' could not be found") - - except json.decoder.JSONDecodeError as e: - raise Exception("JSON Decode Error") - except Exception as e: - raise Exception("Unhandled Exception: ", e) - else: - print("Error: ", r.json()) - - return output - - def run(self, mode="VALIDATE"): - """ - Validates a flow to ensure that all nodes are connected correctly - """ - - output_cache = {} - - def _get_data_from_cache(block_id_in_flow): - """ - Retrieves data about block from cache - - Attributes: - block_id: Block ID from Flow - """ - block_data = self._get_block_by_id(block_id_in_flow) - block_registry_data = self._get_block_data_from_registry( - block_data["blockType"], block_data["blockId"] - ) - - cache_key = f"{block_registry_data.block_type}-{block_registry_data.block_id}-{block_id_in_flow}" - - if cache_key in output_cache.keys(): - return cache_key, output_cache[cache_key] - else: - raise Exception( - f"Data does not exist in cache for {block_id_in_flow} with {cache_key}" - ) - - is_valid = True - - if len(self.batched_tasks) == 0: - is_valid = False - - for task in self.batched_tasks: - for task_to_be_run in task: - block_data = self._get_block_by_id(task_to_be_run) - block_registry_data = self._get_block_data_from_registry( - block_data["blockType"], block_data["blockId"] - ) - - # Iterate through block data to gauge whether inputs exist - for key, value in block_data.items(): - if type(value) is dict and "value" in value.keys(): - if value["value"] == "": - is_valid = False - - if len(self.dependency_graph[task_to_be_run]) == 0: - is_valid = ( - is_valid - and len(block_registry_data.validations["input"]["required"]) - == 0 - ) - - if mode == "RUN": - response = self._make_run_request( - task_to_be_run, block_registry_data, block_data, {} - ) - - # Adds to a cache to ensure that requests don't need to be re-run - output_cache = {**output_cache, **response} - else: - blocks_found = [] - for required_block in block_registry_data.validations["input"][ - "required" - ]: - visited = set() - self._dfs( - visited, - task_to_be_run, - block_registry_data.validations["input"]["allowed_blocks"], - required_block, - blocks_found, - ) - - output_payload = {} - assembled_dependency_list_from_flow = {} - for item in set(blocks_found): - item_block_data = self._get_block_by_id(item) - - if ( - item_block_data["blockType"] - not in assembled_dependency_list_from_flow - ): - assembled_dependency_list_from_flow[ - item_block_data["blockType"] - ] = 0 - assembled_dependency_list_from_flow[ - item_block_data["blockType"] - ] += 1 - - if mode == "RUN": - cache_key, response = _get_data_from_cache(item) - output_payload = {**output_payload, cache_key: response} - - if mode == "RUN": - response = self._make_run_request( - task_to_be_run, - block_registry_data, - block_data, - output_payload, - ) - - # Adds to a cache to ensure that requests don't need to be re-run - output_cache = {**output_cache, **response} - - for required in block_registry_data.validations["input"][ - "required" - ]: - # print("Block Type:", required["blockType"]) - # print("Required Block Number: ", required["number"]) - # print(assembled_dependency_list_from_flow) - - if required["blockType"] in assembled_dependency_list_from_flow: - is_valid = ( - is_valid - and assembled_dependency_list_from_flow[ - required["blockType"] - ] - == required["number"] - ) - else: - is_valid = False - - if mode == "VALIDATE": - return is_valid - elif mode == "RUN": - return output_cache - else: - return None diff --git a/orchestrator/tests/test_dependency_graph.py b/orchestrator/tests/test_dependency_graph.py index 21405eda..27312db5 100644 --- a/orchestrator/tests/test_dependency_graph.py +++ b/orchestrator/tests/test_dependency_graph.py @@ -1,6 +1,6 @@ from django.test import TestCase -from orchestrator.services.flow.spectrum_flow_v2 import DependencyGraph +from orchestrator.services.flow.spectrum_flow import DependencyGraph from orchestrator.tests.data.test_data_validation import SINGLE_FULL_FLOW_VALID diff --git a/orchestrator/tests/test_views.py b/orchestrator/tests/test_views.py index 214c2236..e05d5a5b 100644 --- a/orchestrator/tests/test_views.py +++ b/orchestrator/tests/test_views.py @@ -325,6 +325,9 @@ def test_invalid(self): self.assertDictEqual(response.json(), {"valid": False}) +class ValidateFlowTestUpdated(TestCase): + def test_simple_flow(self): + auth = setup_authentication() class RunFlowTest(TestCase): @responses.activate diff --git a/orchestrator/views.py b/orchestrator/views.py index ac84929f..3a89ed76 100644 --- a/orchestrator/views.py +++ b/orchestrator/views.py @@ -11,8 +11,7 @@ from authentication.decorators import SpectrumAuthentication, SpectrumIsAuthenticated from orchestrator.models import BlockRegistry -from orchestrator.services.flow.run import run -from orchestrator.services.flow.spectrum_flow_v2 import SpectrumFlow +from orchestrator.services.flow.spectrum_flow import SpectrumFlow class AllMetadataView(APIView): From 99f4a3b980ebfa4502ad9004248fa649c66f1907 Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Wed, 28 Jul 2021 07:43:06 +0100 Subject: [PATCH 12/17] (fix): Updated tests that were breaking. --- orchestrator/tests/test_views.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/orchestrator/tests/test_views.py b/orchestrator/tests/test_views.py index e05d5a5b..214c2236 100644 --- a/orchestrator/tests/test_views.py +++ b/orchestrator/tests/test_views.py @@ -325,9 +325,6 @@ def test_invalid(self): self.assertDictEqual(response.json(), {"valid": False}) -class ValidateFlowTestUpdated(TestCase): - def test_simple_flow(self): - auth = setup_authentication() class RunFlowTest(TestCase): @responses.activate From fda634e64def0e4a0edc36057d7b3573e04a4f64 Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Wed, 28 Jul 2021 08:43:38 +0100 Subject: [PATCH 13/17] (feature): Added an edge validator function. --- orchestrator/services/flow/spectrum_flow.py | 39 +++++++++-- orchestrator/tests/test_views.py | 76 ++++++++++++++++++++- orchestrator/views.py | 2 +- 3 files changed, 110 insertions(+), 7 deletions(-) diff --git a/orchestrator/services/flow/spectrum_flow.py b/orchestrator/services/flow/spectrum_flow.py index 711dab0e..09143efe 100644 --- a/orchestrator/services/flow/spectrum_flow.py +++ b/orchestrator/services/flow/spectrum_flow.py @@ -113,6 +113,7 @@ def __init__(self, vertices, edges): self.dependency_graph = graph.dependency_graph.adjacency_list self.batched_tasks = graph.batched_tasks + self.edge_validation = {} self.is_valid = self.run(mode="VALIDATE") def _get_block_by_id(self, block_id): @@ -275,6 +276,7 @@ def _get_data_from_cache(block_id_in_flow): if value["value"] == "": is_valid = False + # Checks if the graph has an edge if len(self.dependency_graph[task_to_be_run]) == 0: is_valid = ( is_valid @@ -337,10 +339,6 @@ def _get_data_from_cache(block_id_in_flow): for required in block_registry_data.validations["input"][ "required" ]: - # print("Block Type:", required["blockType"]) - # print("Required Block Number: ", required["number"]) - # print(assembled_dependency_list_from_flow) - if required["blockType"] in assembled_dependency_list_from_flow: is_valid = ( is_valid @@ -353,6 +351,39 @@ def _get_data_from_cache(block_id_in_flow): is_valid = False if mode == "VALIDATE": + for edge in self.edges: + source_block = self._get_block_by_id(edge["source"]) + target_block = self._get_block_by_id(edge["target"]) + + target_block_data = self._get_block_data_from_registry( + target_block["blockType"], target_block["blockId"] + ) + + # print () + # print ('Edge: ', edge) + # print () + + is_edge_valid = False + for allowed_block in target_block_data.validations["input"][ + "allowed_blocks" + ]: + # print () + # print ('Source Block: ', source_block) + # print ('Allowed Block: ', allowed_block) + # print () + + is_edge_valid = is_edge_valid or ( + (str(allowed_block["blockId"]) == str(source_block["blockId"])) + and ( + str(allowed_block["blockType"]) + == str(source_block["blockType"]) + ) + ) + + # print ('Is Edge Valid: ', is_edge_valid) + + self.edge_validation[edge["id"]] = is_edge_valid + return is_valid elif mode == "RUN": return output_cache diff --git a/orchestrator/tests/test_views.py b/orchestrator/tests/test_views.py index 214c2236..b9230723 100644 --- a/orchestrator/tests/test_views.py +++ b/orchestrator/tests/test_views.py @@ -303,6 +303,56 @@ def test_block_id_dne(self): class ValidateFlowTest(TestCase): + def test_single_edge_invalid(self): + request_payload = { + "nodeList": { + "1": { + "blockType": "DATA_BLOCK", + "blockId": 1, + "equity_name": {"value": "AAPL", "options": []}, + "data_type": { + "value": "intraday", + "options": ["intraday", "daily_adjusted"], + }, + "interval": {"value": "1min", "options": ["1min"]}, + "outputsize": {"value": "compact", "options": ["compact", "full"]}, + "start_date": {"value": "2021-06-21 19:58:00"}, + "end_date": {"value": "2021-06-21 20:00:00"}, + }, + "4": { + "blockType": "SIGNAL_BLOCK", + "blockId": 1, + "event_type": {"value": "INTERSECT"}, + "event_action": {"value": "BUY"}, + }, + }, + "edgeList": [ + { + "source": "1", + "sourceHandle": "output_id888", + "target": "4", + "targetHandle": "input_id891", + "type": "edge", + "id": "reactflow__edge-1output_id888-4input_id891", + }, + ], + } + + auth = set_up_authentication() + response = self.client.post( + "/orchestration/validate", + json.dumps(request_payload), + content_type="application/json", + **{"HTTP_AUTHORIZATION": f"Bearer {auth['token']}"}, + ) + + expected_response = { + "valid": False, + "edges": {"reactflow__edge-1output_id888-4input_id891": False}, + } + + self.assertDictEqual(response.json(), expected_response) + def test_ok(self): auth = set_up_authentication() response = self.client.post( @@ -312,7 +362,19 @@ def test_ok(self): **{"HTTP_AUTHORIZATION": f"Bearer {auth['token']}"}, ) - self.assertDictEqual(response.json(), {"valid": True}) + self.assertDictEqual( + response.json(), + { + "edges": { + "reactflow__edge-1output_id888-2input_id891": True, + "reactflow__edge-1output_id1136-3input_id1143": True, + "reactflow__edge-2output_id1356-4input_id1363": True, + "reactflow__edge-3output_id1576-4input_id1579": True, + "reactflow__edge-4output_id1796-5input_id1799": True, + }, + "valid": True, + }, + ) def test_invalid(self): auth = set_up_authentication() @@ -323,7 +385,17 @@ def test_invalid(self): **{"HTTP_AUTHORIZATION": f"Bearer {auth['token']}"}, ) - self.assertDictEqual(response.json(), {"valid": False}) + self.assertDictEqual( + response.json(), + { + "valid": False, + "edges": { + "reactflow__edge-1output_id502-4input_id589": False, + "reactflow__edge-4output_id986-2input_id1089": False, + "reactflow__edge-4output_id1230-3input_id1417": False, + }, + }, + ) class RunFlowTest(TestCase): diff --git a/orchestrator/views.py b/orchestrator/views.py index 3a89ed76..b994cdb2 100644 --- a/orchestrator/views.py +++ b/orchestrator/views.py @@ -100,7 +100,7 @@ def post(self, request): if request_body["nodeList"] is not {} and request_body["edgeList"] is not []: flow = SpectrumFlow(request_body["nodeList"], request_body["edgeList"]) - return JsonResponse({"valid": flow.is_valid}) + return JsonResponse({"valid": flow.is_valid, "edges": flow.edge_validation}) else: return JsonResponse({"valid": False}) From 4a89e32cd29ab5ef1fca922e09a3d887b3f29b96 Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Wed, 28 Jul 2021 10:46:01 +0100 Subject: [PATCH 14/17] (feature): Implemented additional details for validation endpoint to enrich errors UI. --- orchestrator/services/flow/spectrum_flow.py | 30 +++++++---- orchestrator/tests/test_views.py | 60 +++++++++++++++------ 2 files changed, 64 insertions(+), 26 deletions(-) diff --git a/orchestrator/services/flow/spectrum_flow.py b/orchestrator/services/flow/spectrum_flow.py index 09143efe..9eb1e87f 100644 --- a/orchestrator/services/flow/spectrum_flow.py +++ b/orchestrator/services/flow/spectrum_flow.py @@ -359,19 +359,10 @@ def _get_data_from_cache(block_id_in_flow): target_block["blockType"], target_block["blockId"] ) - # print () - # print ('Edge: ', edge) - # print () - is_edge_valid = False for allowed_block in target_block_data.validations["input"][ "allowed_blocks" ]: - # print () - # print ('Source Block: ', source_block) - # print ('Allowed Block: ', allowed_block) - # print () - is_edge_valid = is_edge_valid or ( (str(allowed_block["blockId"]) == str(source_block["blockId"])) and ( @@ -380,11 +371,28 @@ def _get_data_from_cache(block_id_in_flow): ) ) - # print ('Is Edge Valid: ', is_edge_valid) + target_block = self._get_block_data_from_registry( + target_block["blockType"], target_block["blockId"] + ) + + allowed_connections = [] + if not is_edge_valid: + for allowed_block in target_block_data.validations["input"][ + "allowed_blocks" + ]: + block_data = self._get_block_data_from_registry( + allowed_block["blockType"], allowed_block["blockId"] + ) + allowed_connections.append(block_data.block_name) - self.edge_validation[edge["id"]] = is_edge_valid + self.edge_validation[edge["id"]] = { + "status": is_edge_valid, + "target_block": target_block.block_name, + "allowed_connections": allowed_connections, + } return is_valid + elif mode == "RUN": return output_cache else: diff --git a/orchestrator/tests/test_views.py b/orchestrator/tests/test_views.py index b9230723..2729e322 100644 --- a/orchestrator/tests/test_views.py +++ b/orchestrator/tests/test_views.py @@ -346,12 +346,18 @@ def test_single_edge_invalid(self): **{"HTTP_AUTHORIZATION": f"Bearer {auth['token']}"}, ) - expected_response = { - "valid": False, - "edges": {"reactflow__edge-1output_id888-4input_id891": False}, - } - - self.assertDictEqual(response.json(), expected_response) + self.assertDictEqual( + response.json(), + { + "valid": False, + "edges": { + "reactflow__edge-1output_id888-4input_id891": { + "status": False, + "allowed_connections": ["Technical Analysis"], + } + }, + }, + ) def test_ok(self): auth = set_up_authentication() @@ -365,14 +371,29 @@ def test_ok(self): self.assertDictEqual( response.json(), { + "valid": True, "edges": { - "reactflow__edge-1output_id888-2input_id891": True, - "reactflow__edge-1output_id1136-3input_id1143": True, - "reactflow__edge-2output_id1356-4input_id1363": True, - "reactflow__edge-3output_id1576-4input_id1579": True, - "reactflow__edge-4output_id1796-5input_id1799": True, + "reactflow__edge-1output_id888-2input_id891": { + "status": True, + "allowed_connections": [], + }, + "reactflow__edge-1output_id1136-3input_id1143": { + "status": True, + "allowed_connections": [], + }, + "reactflow__edge-2output_id1356-4input_id1363": { + "status": True, + "allowed_connections": [], + }, + "reactflow__edge-3output_id1576-4input_id1579": { + "status": True, + "allowed_connections": [], + }, + "reactflow__edge-4output_id1796-5input_id1799": { + "status": True, + "allowed_connections": [], + }, }, - "valid": True, }, ) @@ -390,9 +411,18 @@ def test_invalid(self): { "valid": False, "edges": { - "reactflow__edge-1output_id502-4input_id589": False, - "reactflow__edge-4output_id986-2input_id1089": False, - "reactflow__edge-4output_id1230-3input_id1417": False, + "reactflow__edge-1output_id502-4input_id589": { + "status": False, + "allowed_connections": ["Technical Analysis"], + }, + "reactflow__edge-4output_id986-2input_id1089": { + "status": False, + "allowed_connections": ["US Stock Data", "Crypto Data"], + }, + "reactflow__edge-4output_id1230-3input_id1417": { + "status": False, + "allowed_connections": ["Technical Analysis"], + }, }, }, ) From 0d6616dbc4394216822337e64678c16597d74ff8 Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Wed, 28 Jul 2021 12:56:06 +0100 Subject: [PATCH 15/17] (fix): Updated tests. --- orchestrator/services/flow/spectrum_flow.py | 2 +- orchestrator/tests/test_views.py | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/orchestrator/services/flow/spectrum_flow.py b/orchestrator/services/flow/spectrum_flow.py index 9eb1e87f..b3fbe8f8 100644 --- a/orchestrator/services/flow/spectrum_flow.py +++ b/orchestrator/services/flow/spectrum_flow.py @@ -384,7 +384,7 @@ def _get_data_from_cache(block_id_in_flow): allowed_block["blockType"], allowed_block["blockId"] ) allowed_connections.append(block_data.block_name) - + self.edge_validation[edge["id"]] = { "status": is_edge_valid, "target_block": target_block.block_name, diff --git a/orchestrator/tests/test_views.py b/orchestrator/tests/test_views.py index 2729e322..b759dd78 100644 --- a/orchestrator/tests/test_views.py +++ b/orchestrator/tests/test_views.py @@ -353,6 +353,7 @@ def test_single_edge_invalid(self): "edges": { "reactflow__edge-1output_id888-4input_id891": { "status": False, + "target_block": "Event", "allowed_connections": ["Technical Analysis"], } }, @@ -375,22 +376,27 @@ def test_ok(self): "edges": { "reactflow__edge-1output_id888-2input_id891": { "status": True, + "target_block": "Technical Analysis", "allowed_connections": [], }, "reactflow__edge-1output_id1136-3input_id1143": { "status": True, + "target_block": "Technical Analysis", "allowed_connections": [], }, "reactflow__edge-2output_id1356-4input_id1363": { "status": True, + "target_block": "Event", "allowed_connections": [], }, "reactflow__edge-3output_id1576-4input_id1579": { "status": True, + "target_block": "Event", "allowed_connections": [], }, "reactflow__edge-4output_id1796-5input_id1799": { "status": True, + "target_block": "Backtest", "allowed_connections": [], }, }, @@ -413,14 +419,17 @@ def test_invalid(self): "edges": { "reactflow__edge-1output_id502-4input_id589": { "status": False, + "target_block": "Event", "allowed_connections": ["Technical Analysis"], }, "reactflow__edge-4output_id986-2input_id1089": { "status": False, + "target_block": "Technical Analysis", "allowed_connections": ["US Stock Data", "Crypto Data"], }, "reactflow__edge-4output_id1230-3input_id1417": { "status": False, + "target_block": "Event", "allowed_connections": ["Technical Analysis"], }, }, From a5c15d5e51c5618f83cfb7bb003d2d9ababad657 Mon Sep 17 00:00:00 2001 From: Rahul V Brahmal Date: Wed, 28 Jul 2021 12:56:21 +0100 Subject: [PATCH 16/17] (lint): Linted files. --- orchestrator/services/flow/spectrum_flow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestrator/services/flow/spectrum_flow.py b/orchestrator/services/flow/spectrum_flow.py index b3fbe8f8..9eb1e87f 100644 --- a/orchestrator/services/flow/spectrum_flow.py +++ b/orchestrator/services/flow/spectrum_flow.py @@ -384,7 +384,7 @@ def _get_data_from_cache(block_id_in_flow): allowed_block["blockType"], allowed_block["blockId"] ) allowed_connections.append(block_data.block_name) - + self.edge_validation[edge["id"]] = { "status": is_edge_valid, "target_block": target_block.block_name, From f89a4f022dca4ffbfb67392fd7890bd97a0b6234 Mon Sep 17 00:00:00 2001 From: Renovate Bot Date: Wed, 28 Jul 2021 12:11:33 +0000 Subject: [PATCH 17/17] chore(deps): update dependency python-dotenv to v0.19.0 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index f35b0760..a7d867e3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -36,7 +36,7 @@ pylint==2.8.3 pylint-django==2.4.4 pylint-plugin-utils==0.6 python-dateutil==2.8.1 -python-dotenv==0.18.0 +python-dotenv==0.19.0 python3-openid==3.2.0 pytz==2021.1 regex==2021.7.6