From 6cbe54a1d26832d00a8dd36827d9fc26d2677930 Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Fri, 13 Sep 2024 15:35:04 -0400 Subject: [PATCH 01/30] feat: add request_response_controller.py --- .../flow/request_response_controller.py | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 src/solace_ai_connector/flow/request_response_controller.py diff --git a/src/solace_ai_connector/flow/request_response_controller.py b/src/solace_ai_connector/flow/request_response_controller.py new file mode 100644 index 0000000..fae7972 --- /dev/null +++ b/src/solace_ai_connector/flow/request_response_controller.py @@ -0,0 +1,27 @@ +""" +This file will handle sending a message to a named flow and then +receiving the output message from that flow. It will also support the result +message being a streamed message that comes in multiple parts. + +Each component can optionally create multiple of these using the configuration: + +```yaml +- name: example_flow + components: + - component_name: example_component + component_module: custom_component + request_response_controllers: + - name: example_controller + flow_name: llm_flow + streaming: true + streaming_last_message_expression: input.payload:streaming.last_message + timeout_ms: 300000 +``` + +""" + +# +# - Create the request response flow manager class that will hold all the request response flows +# - Create the request response controller class that will hold the request response controller +# config and manage sending messages to the flow and getting the output messages. +# From 4f9f79c0539382fb0693e046d352b25bc5af3748 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Fri, 13 Sep 2024 15:35:39 -0400 Subject: [PATCH 02/30] feat: implement RequestResponseFlowManager and RequestResponseController classes --- .../flow/request_response_controller.py | 69 +++++++++++++++++-- 1 file changed, 64 insertions(+), 5 deletions(-) diff --git a/src/solace_ai_connector/flow/request_response_controller.py b/src/solace_ai_connector/flow/request_response_controller.py index fae7972..bf3a3e4 100644 --- a/src/solace_ai_connector/flow/request_response_controller.py +++ b/src/solace_ai_connector/flow/request_response_controller.py @@ -20,8 +20,67 @@ """ -# -# - Create the request response flow manager class that will hold all the request response flows -# - Create the request response controller class that will hold the request response controller -# config and manage sending messages to the flow and getting the output messages. -# +import threading +import queue +import time +from typing import Dict, Any + +class RequestResponseFlowManager: + def __init__(self): + self.flows: Dict[str, Any] = {} + + def add_flow(self, flow_name: str, flow): + self.flows[flow_name] = flow + + def get_flow(self, flow_name: str): + return self.flows.get(flow_name) + +class RequestResponseController: + def __init__(self, config: Dict[str, Any], flow_manager: RequestResponseFlowManager): + self.config = config + self.flow_manager = flow_manager + self.flow_name = config['flow_name'] + self.streaming = config.get('streaming', False) + self.streaming_last_message_expression = config.get('streaming_last_message_expression') + self.timeout_ms = config.get('timeout_ms', 30000) + self.response_queue = queue.Queue() + + def send_message(self, message: Any): + flow = self.flow_manager.get_flow(self.flow_name) + if not flow: + raise ValueError(f"Flow {self.flow_name} not found") + + flow.send_message(message) + + def get_response(self): + try: + if self.streaming: + return self._get_streaming_response() + else: + return self.response_queue.get(timeout=self.timeout_ms / 1000) + except queue.Empty: + raise TimeoutError(f"Timeout waiting for response from flow {self.flow_name}") + + def _get_streaming_response(self): + responses = [] + start_time = time.time() + while True: + try: + response = self.response_queue.get(timeout=(start_time + self.timeout_ms / 1000 - time.time())) + responses.append(response) + if self.streaming_last_message_expression: + if self._is_last_message(response): + break + except queue.Empty: + if responses: + break + raise TimeoutError(f"Timeout waiting for streaming response from flow {self.flow_name}") + return responses + + def _is_last_message(self, message): + # Implement logic to check if this is the last message based on the streaming_last_message_expression + # This might involve parsing the expression and checking the message content + pass + + def handle_response(self, response): + self.response_queue.put(response) From 139ed7087a02e4b423dbfd83f799ef20b79076a8 Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Fri, 13 Sep 2024 15:42:25 -0400 Subject: [PATCH 03/30] style: format code with black and improve readability --- .../flow/request_response_controller.py | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/solace_ai_connector/flow/request_response_controller.py b/src/solace_ai_connector/flow/request_response_controller.py index bf3a3e4..36a890e 100644 --- a/src/solace_ai_connector/flow/request_response_controller.py +++ b/src/solace_ai_connector/flow/request_response_controller.py @@ -25,6 +25,7 @@ import time from typing import Dict, Any + class RequestResponseFlowManager: def __init__(self): self.flows: Dict[str, Any] = {} @@ -35,21 +36,26 @@ def add_flow(self, flow_name: str, flow): def get_flow(self, flow_name: str): return self.flows.get(flow_name) + class RequestResponseController: - def __init__(self, config: Dict[str, Any], flow_manager: RequestResponseFlowManager): + def __init__( + self, config: Dict[str, Any], flow_manager: RequestResponseFlowManager + ): self.config = config self.flow_manager = flow_manager - self.flow_name = config['flow_name'] - self.streaming = config.get('streaming', False) - self.streaming_last_message_expression = config.get('streaming_last_message_expression') - self.timeout_ms = config.get('timeout_ms', 30000) + self.flow_name = config["flow_name"] + self.streaming = config.get("streaming", False) + self.streaming_last_message_expression = config.get( + "streaming_last_message_expression" + ) + self.timeout_ms = config.get("timeout_ms", 30000) self.response_queue = queue.Queue() def send_message(self, message: Any): flow = self.flow_manager.get_flow(self.flow_name) if not flow: raise ValueError(f"Flow {self.flow_name} not found") - + flow.send_message(message) def get_response(self): @@ -59,14 +65,18 @@ def get_response(self): else: return self.response_queue.get(timeout=self.timeout_ms / 1000) except queue.Empty: - raise TimeoutError(f"Timeout waiting for response from flow {self.flow_name}") + raise TimeoutError( + f"Timeout waiting for response from flow {self.flow_name}" + ) def _get_streaming_response(self): responses = [] start_time = time.time() while True: try: - response = self.response_queue.get(timeout=(start_time + self.timeout_ms / 1000 - time.time())) + response = self.response_queue.get( + timeout=(start_time + self.timeout_ms / 1000 - time.time()) + ) responses.append(response) if self.streaming_last_message_expression: if self._is_last_message(response): @@ -74,7 +84,9 @@ def _get_streaming_response(self): except queue.Empty: if responses: break - raise TimeoutError(f"Timeout waiting for streaming response from flow {self.flow_name}") + raise TimeoutError( + f"Timeout waiting for streaming response from flow {self.flow_name}" + ) return responses def _is_last_message(self, message): From 1e5404993e8becd6a4bbc18ad58944b15ec7483b Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Fri, 13 Sep 2024 15:42:27 -0400 Subject: [PATCH 04/30] feat: implement RequestResponseController for flow-based request-response handling --- .../components/component_base.py | 16 +++++++++++ src/solace_ai_connector/flow/flow.py | 28 +++++++++++++++++++ .../flow/request_response_controller.py | 23 +++++++++++---- .../solace_ai_connector.py | 20 ++++++++++++- 4 files changed, 80 insertions(+), 7 deletions(-) diff --git a/src/solace_ai_connector/components/component_base.py b/src/solace_ai_connector/components/component_base.py index bd4c52c..b229eae 100644 --- a/src/solace_ai_connector/components/component_base.py +++ b/src/solace_ai_connector/components/component_base.py @@ -78,6 +78,22 @@ def run(self): self.stop_component() + def process_single_event(self, event): + try: + if self.trace_queue: + self.trace_event(event) + return self.process_event(event) + except Exception as e: + log.error( + "%sComponent has encountered an error: %s\n%s", + self.log_identifier, + e, + traceback.format_exc(), + ) + if self.error_queue: + self.handle_error(e, event) + raise + def get_next_event(self): # Check if there is a get_next_message defined by a # component that inherits from this class - this is diff --git a/src/solace_ai_connector/flow/flow.py b/src/solace_ai_connector/flow/flow.py index 782c7ce..def61dd 100644 --- a/src/solace_ai_connector/flow/flow.py +++ b/src/solace_ai_connector/flow/flow.py @@ -47,6 +47,7 @@ def __init__( trace_queue=None, flow_instance_index=0, connector=None, + for_request_response=False, ): self.flow_config = flow_config self.flow_index = flow_index @@ -64,8 +65,30 @@ def __init__( self.flow_lock_manager = Flow._lock_manager self.flow_kv_store = Flow._kv_store self.cache_service = connector.cache_service if connector else None + self.for_request_response = for_request_response self.create_components() + def create_components(self): + # Loop through the components and create them + for index, component in enumerate(self.flow_config.get("components", [])): + self.create_component_group(component, index) + + # Now loop through them again and set the next component + for index, component_group in enumerate(self.component_groups): + if index < len(self.component_groups) - 1: + for component in component_group: + component.set_next_component(self.component_groups[index + 1][0]) + + # For request-response flows, don't create threads + if not self.for_request_response: + # Now one more time to create threads and run them + for index, component_group in enumerate(self.component_groups): + for component in component_group: + thread = component.create_thread_and_run() + self.threads.append(thread) + + self.flow_input_queue = self.component_groups[0][0].get_input_queue() + def create_components(self): # Loop through the components and create them for index, component in enumerate(self.flow_config.get("components", [])): @@ -124,6 +147,11 @@ def create_component_group(self, component, index): ) sibling_component = component_instance + # Set up RequestResponseController if specified + request_response_controllers = component.get("request_response_controllers", []) + for controller_config in request_response_controllers: + self.connector.create_request_response_controller(component_instance, controller_config) + # Add the component to the list component_group.append(component_instance) diff --git a/src/solace_ai_connector/flow/request_response_controller.py b/src/solace_ai_connector/flow/request_response_controller.py index 36a890e..e54d75e 100644 --- a/src/solace_ai_connector/flow/request_response_controller.py +++ b/src/solace_ai_connector/flow/request_response_controller.py @@ -39,10 +39,10 @@ def get_flow(self, flow_name: str): class RequestResponseController: def __init__( - self, config: Dict[str, Any], flow_manager: RequestResponseFlowManager + self, config: Dict[str, Any], connector: 'SolaceAiConnector' ): self.config = config - self.flow_manager = flow_manager + self.connector = connector self.flow_name = config["flow_name"] self.streaming = config.get("streaming", False) self.streaming_last_message_expression = config.get( @@ -50,13 +50,24 @@ def __init__( ) self.timeout_ms = config.get("timeout_ms", 30000) self.response_queue = queue.Queue() + self.flow_instance = self.connector.create_flow_instance(self.flow_name) + self.input_queue = self.flow_instance.get_flow_input_queue() + self.setup_response_queue() + + def setup_response_queue(self): + last_component = self.flow_instance.component_groups[-1][-1] + last_component.set_next_component(self) def send_message(self, message: Any): - flow = self.flow_manager.get_flow(self.flow_name) - if not flow: - raise ValueError(f"Flow {self.flow_name} not found") + if not self.input_queue: + raise ValueError(f"Input queue for flow {self.flow_name} not found") + + event = Event(EventType.MESSAGE, message) + self.input_queue.put(event) - flow.send_message(message) + def enqueue(self, event): + if event.event_type == EventType.MESSAGE: + self.response_queue.put(event.data) def get_response(self): try: diff --git a/src/solace_ai_connector/solace_ai_connector.py b/src/solace_ai_connector/solace_ai_connector.py index f41f50f..882e666 100644 --- a/src/solace_ai_connector/solace_ai_connector.py +++ b/src/solace_ai_connector/solace_ai_connector.py @@ -32,6 +32,15 @@ def __init__(self, config, event_handlers=None, error_queue=None): self.instance_name = self.config.get("instance_name", "solace_ai_connector") self.timer_manager = TimerManager(self.stop_signal) self.cache_service = self.setup_cache_service() + self.request_response_controllers = {} + + def create_request_response_controller(self, component, controller_config): + controller = RequestResponseController(controller_config, self) + self.request_response_controllers[component] = controller + return controller + + def get_request_response_controller(self, component): + return self.request_response_controllers.get(component) def run(self): """Run the Solace AI Event Connector""" @@ -63,7 +72,7 @@ def create_flows(self): self.flow_input_queues[flow.get("name")] = flow_input_queue self.flows.append(flow_instance) - def create_flow(self, flow: dict, index: int, flow_instance_index: int): + def create_flow(self, flow: dict, index: int, flow_instance_index: int, for_request_response=False): """Create a single flow""" return Flow( @@ -75,8 +84,17 @@ def create_flow(self, flow: dict, index: int, flow_instance_index: int): instance_name=self.instance_name, trace_queue=self.trace_queue, connector=self, + for_request_response=for_request_response ) + def create_flow_instance(self, flow_name: str): + """Create a new instance of a flow for request-response""" + for flow in self.config.get("flows", []): + if flow.get("name") == flow_name: + new_flow = self.create_flow(flow, -1, -1, for_request_response=True) + return new_flow + raise ValueError(f"Flow '{flow_name}' not found") + def send_message_to_flow(self, flow_name, message): """Send a message to a flow""" flow_input_queue = self.flow_input_queues.get(flow_name) From e16b0caa1e561553c165d2c111ef6cc7934afdc3 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Fri, 13 Sep 2024 15:44:06 -0400 Subject: [PATCH 05/30] feat: implement RequestResponseController for handling request-response patterns --- src/solace_ai_connector/flow/request_response_controller.py | 3 +++ src/solace_ai_connector/solace_ai_connector.py | 2 ++ 2 files changed, 5 insertions(+) diff --git a/src/solace_ai_connector/flow/request_response_controller.py b/src/solace_ai_connector/flow/request_response_controller.py index e54d75e..2cdbf2a 100644 --- a/src/solace_ai_connector/flow/request_response_controller.py +++ b/src/solace_ai_connector/flow/request_response_controller.py @@ -37,6 +37,9 @@ def get_flow(self, flow_name: str): return self.flows.get(flow_name) +from typing import Dict, Any +from ..common.event import Event, EventType + class RequestResponseController: def __init__( self, config: Dict[str, Any], connector: 'SolaceAiConnector' diff --git a/src/solace_ai_connector/solace_ai_connector.py b/src/solace_ai_connector/solace_ai_connector.py index 882e666..2c8eb79 100644 --- a/src/solace_ai_connector/solace_ai_connector.py +++ b/src/solace_ai_connector/solace_ai_connector.py @@ -13,6 +13,8 @@ from .services.cache_service import CacheService, create_storage_backend +from .flow.request_response_controller import RequestResponseController + class SolaceAiConnector: """Solace AI Connector""" From 776622ffb2c8497faf26982edd9891412de47ca8 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Fri, 13 Sep 2024 15:49:40 -0400 Subject: [PATCH 06/30] fix: import SolaceAiConnector for type checking --- src/solace_ai_connector/flow/request_response_controller.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/solace_ai_connector/flow/request_response_controller.py b/src/solace_ai_connector/flow/request_response_controller.py index 2cdbf2a..31971fa 100644 --- a/src/solace_ai_connector/flow/request_response_controller.py +++ b/src/solace_ai_connector/flow/request_response_controller.py @@ -37,9 +37,12 @@ def get_flow(self, flow_name: str): return self.flows.get(flow_name) -from typing import Dict, Any +from typing import Dict, Any, TYPE_CHECKING from ..common.event import Event, EventType +if TYPE_CHECKING: + from ..solace_ai_connector import SolaceAiConnector + class RequestResponseController: def __init__( self, config: Dict[str, Any], connector: 'SolaceAiConnector' From e90b1366851f3d34ce3aec37c73a56e7f9973346 Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Fri, 13 Sep 2024 16:04:11 -0400 Subject: [PATCH 07/30] refactor: restructure Flow class and improve code organization --- src/solace_ai_connector/flow/flow.py | 27 +++++-------------- .../solace_ai_connector.py | 13 ++++++--- 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/src/solace_ai_connector/flow/flow.py b/src/solace_ai_connector/flow/flow.py index def61dd..283f460 100644 --- a/src/solace_ai_connector/flow/flow.py +++ b/src/solace_ai_connector/flow/flow.py @@ -89,25 +89,6 @@ def create_components(self): self.flow_input_queue = self.component_groups[0][0].get_input_queue() - def create_components(self): - # Loop through the components and create them - for index, component in enumerate(self.flow_config.get("components", [])): - self.create_component_group(component, index) - - # Now loop through them again and set the next component - for index, component_group in enumerate(self.component_groups): - if index < len(self.component_groups) - 1: - for component in component_group: - component.set_next_component(self.component_groups[index + 1][0]) - - # Now one more time to create threads and run them - for index, component_group in enumerate(self.component_groups): - for component in component_group: - thread = component.create_thread_and_run() - self.threads.append(thread) - - self.flow_input_queue = self.component_groups[0][0].get_input_queue() - def create_component_group(self, component, index): component_module = component.get("component_module", "") base_path = component.get("component_base_path", None) @@ -148,9 +129,13 @@ def create_component_group(self, component, index): sibling_component = component_instance # Set up RequestResponseController if specified - request_response_controllers = component.get("request_response_controllers", []) + request_response_controllers = component.get( + "request_response_controllers", [] + ) for controller_config in request_response_controllers: - self.connector.create_request_response_controller(component_instance, controller_config) + self.connector.create_request_response_controller( + component_instance, controller_config + ) # Add the component to the list component_group.append(component_instance) diff --git a/src/solace_ai_connector/solace_ai_connector.py b/src/solace_ai_connector/solace_ai_connector.py index 2c8eb79..aa101b9 100644 --- a/src/solace_ai_connector/solace_ai_connector.py +++ b/src/solace_ai_connector/solace_ai_connector.py @@ -11,10 +11,9 @@ from .flow.timer_manager import TimerManager from .common.event import Event, EventType from .services.cache_service import CacheService, create_storage_backend - - from .flow.request_response_controller import RequestResponseController + class SolaceAiConnector: """Solace AI Connector""" @@ -74,7 +73,13 @@ def create_flows(self): self.flow_input_queues[flow.get("name")] = flow_input_queue self.flows.append(flow_instance) - def create_flow(self, flow: dict, index: int, flow_instance_index: int, for_request_response=False): + def create_flow( + self, + flow: dict, + index: int, + flow_instance_index: int, + for_request_response=False, + ): """Create a single flow""" return Flow( @@ -86,7 +91,7 @@ def create_flow(self, flow: dict, index: int, flow_instance_index: int, for_requ instance_name=self.instance_name, trace_queue=self.trace_queue, connector=self, - for_request_response=for_request_response + for_request_response=for_request_response, ) def create_flow_instance(self, flow_name: str): From e9ab2f5dbcce6b875cc19f7a860af2094dfd68d5 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Fri, 13 Sep 2024 16:04:13 -0400 Subject: [PATCH 08/30] feat: implement multiple named RequestResponseControllers per component --- src/solace_ai_connector/components/component_base.py | 4 ++++ src/solace_ai_connector/flow/flow.py | 10 ++++------ src/solace_ai_connector/solace_ai_connector.py | 7 ++----- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/solace_ai_connector/components/component_base.py b/src/solace_ai_connector/components/component_base.py index b229eae..e0bfdc3 100644 --- a/src/solace_ai_connector/components/component_base.py +++ b/src/solace_ai_connector/components/component_base.py @@ -38,6 +38,8 @@ def __init__(self, module_info, **kwargs): resolve_config_values(self.component_config) + self.request_response_controllers = {} + self.next_component = None self.thread = None self.queue_timeout_ms = DEFAULT_QUEUE_TIMEOUT_MS @@ -381,3 +383,5 @@ def cleanup(self): self.input_queue.get_nowait() except queue.Empty: break + def get_request_response_controller(self, name): + return self.request_response_controllers.get(name) diff --git a/src/solace_ai_connector/flow/flow.py b/src/solace_ai_connector/flow/flow.py index 283f460..171543a 100644 --- a/src/solace_ai_connector/flow/flow.py +++ b/src/solace_ai_connector/flow/flow.py @@ -128,13 +128,11 @@ def create_component_group(self, component, index): ) sibling_component = component_instance - # Set up RequestResponseController if specified - request_response_controllers = component.get( - "request_response_controllers", [] - ) - for controller_config in request_response_controllers: + # Set up RequestResponseControllers if specified + request_response_controllers = component.get("request_response_controllers", {}) + for controller_name, controller_config in request_response_controllers.items(): self.connector.create_request_response_controller( - component_instance, controller_config + component_instance, controller_name, controller_config ) # Add the component to the list diff --git a/src/solace_ai_connector/solace_ai_connector.py b/src/solace_ai_connector/solace_ai_connector.py index aa101b9..93c13de 100644 --- a/src/solace_ai_connector/solace_ai_connector.py +++ b/src/solace_ai_connector/solace_ai_connector.py @@ -35,14 +35,11 @@ def __init__(self, config, event_handlers=None, error_queue=None): self.cache_service = self.setup_cache_service() self.request_response_controllers = {} - def create_request_response_controller(self, component, controller_config): + def create_request_response_controller(self, component, controller_name, controller_config): controller = RequestResponseController(controller_config, self) - self.request_response_controllers[component] = controller + component.request_response_controllers[controller_name] = controller return controller - def get_request_response_controller(self, component): - return self.request_response_controllers.get(component) - def run(self): """Run the Solace AI Event Connector""" log.debug("Starting Solace AI Event Connector") From 11b345ba533cfdd5f158861c6ecae25181f46ac2 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Fri, 13 Sep 2024 16:07:42 -0400 Subject: [PATCH 09/30] refactor: initialize request-response controllers in ComponentBase --- src/solace_ai_connector/components/component_base.py | 10 ++++++++++ src/solace_ai_connector/flow/flow.py | 7 ------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/solace_ai_connector/components/component_base.py b/src/solace_ai_connector/components/component_base.py index e0bfdc3..ec0d1bf 100644 --- a/src/solace_ai_connector/components/component_base.py +++ b/src/solace_ai_connector/components/component_base.py @@ -39,6 +39,7 @@ def __init__(self, module_info, **kwargs): resolve_config_values(self.component_config) self.request_response_controllers = {} + self.initialize_request_response_controllers() self.next_component = None self.thread = None @@ -383,5 +384,14 @@ def cleanup(self): self.input_queue.get_nowait() except queue.Empty: break + + def initialize_request_response_controllers(self): + if self.connector: + request_response_controllers = self.config.get("request_response_controllers", {}) + for controller_name, controller_config in request_response_controllers.items(): + self.connector.create_request_response_controller( + self, controller_name, controller_config + ) + def get_request_response_controller(self, name): return self.request_response_controllers.get(name) diff --git a/src/solace_ai_connector/flow/flow.py b/src/solace_ai_connector/flow/flow.py index 171543a..b591128 100644 --- a/src/solace_ai_connector/flow/flow.py +++ b/src/solace_ai_connector/flow/flow.py @@ -128,13 +128,6 @@ def create_component_group(self, component, index): ) sibling_component = component_instance - # Set up RequestResponseControllers if specified - request_response_controllers = component.get("request_response_controllers", {}) - for controller_name, controller_config in request_response_controllers.items(): - self.connector.create_request_response_controller( - component_instance, controller_name, controller_config - ) - # Add the component to the list component_group.append(component_instance) From 9f5817acf68866b87c1b48bec88339973e1e1cf6 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Sun, 15 Sep 2024 16:04:33 -0400 Subject: [PATCH 10/30] test: add request_response_controller functionality tests --- tests/test_request_response_controller.py | 180 ++++++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 tests/test_request_response_controller.py diff --git a/tests/test_request_response_controller.py b/tests/test_request_response_controller.py new file mode 100644 index 0000000..31ec39b --- /dev/null +++ b/tests/test_request_response_controller.py @@ -0,0 +1,180 @@ +import sys +import pytest +from unittest.mock import MagicMock + +sys.path.append("src") + +from solace_ai_connector.test_utils.utils_for_test_files import ( + create_test_flows, + dispose_connector, + send_message_to_flow, + get_message_from_flow, +) +from solace_ai_connector.common.message import Message +from solace_ai_connector.flow.request_response_controller import RequestResponseController + + +def test_request_response_controller_basic(): + """Test basic functionality of the RequestResponseController""" + config_yaml = """ +log: + log_file_level: DEBUG + log_file: solace_ai_connector.log +flows: + - name: request_flow + components: + - component_name: requester + component_module: pass_through + request_response_controllers: + test_controller: + flow_name: response_flow + timeout_ms: 5000 + - name: response_flow + components: + - component_name: responder + component_module: pass_through +""" + connector, flows = create_test_flows(config_yaml) + request_flow, response_flow = flows + + try: + # Mock the send_message_to_flow method of the connector + connector.send_message_to_flow = MagicMock() + + # Get the RequestResponseController from the requester component + requester_component = request_flow['flow'].component_groups[0][0] + controller = requester_component.get_request_response_controller("test_controller") + + assert controller is not None, "RequestResponseController not found" + + # Test sending a message + request_data = { + "payload": {"test": "data"}, + "topic": "test/topic", + "user_properties": {} + } + response = controller.send_message(request_data) + + # Check that send_message_to_flow was called with the correct arguments + connector.send_message_to_flow.assert_called_once() + call_args = connector.send_message_to_flow.call_args + assert call_args[0][0] == "response_flow" + sent_message = call_args[0][1] + assert sent_message.get_payload() == {"test": "data"} + assert sent_message.get_topic() == "test/topic" + + # Simulate a response + response_message = Message(payload={"response": "data"}) + send_message_to_flow(response_flow, response_message) + + # Check the response + assert response == {"response": "data"} + + finally: + dispose_connector(connector) + + +def test_request_response_controller_timeout(): + """Test timeout functionality of the RequestResponseController""" + config_yaml = """ +log: + log_file_level: DEBUG + log_file: solace_ai_connector.log +flows: + - name: request_flow + components: + - component_name: requester + component_module: pass_through + request_response_controllers: + test_controller: + flow_name: response_flow + timeout_ms: 100 # Very short timeout for testing + - name: response_flow + components: + - component_name: responder + component_module: pass_through +""" + connector, flows = create_test_flows(config_yaml) + request_flow = flows[0] + + try: + # Get the RequestResponseController from the requester component + requester_component = request_flow['flow'].component_groups[0][0] + controller = requester_component.get_request_response_controller("test_controller") + + assert controller is not None, "RequestResponseController not found" + + # Test sending a message + request_data = { + "payload": {"test": "data"}, + "topic": "test/topic", + "user_properties": {} + } + + with pytest.raises(TimeoutError): + controller.send_message(request_data) + + finally: + dispose_connector(connector) + + +def test_multiple_request_response_controllers(): + """Test multiple RequestResponseControllers in a single component""" + config_yaml = """ +log: + log_file_level: DEBUG + log_file: solace_ai_connector.log +flows: + - name: request_flow + components: + - component_name: requester + component_module: pass_through + request_response_controllers: + controller1: + flow_name: response_flow1 + timeout_ms: 5000 + controller2: + flow_name: response_flow2 + timeout_ms: 5000 + - name: response_flow1 + components: + - component_name: responder1 + component_module: pass_through + - name: response_flow2 + components: + - component_name: responder2 + component_module: pass_through +""" + connector, flows = create_test_flows(config_yaml) + request_flow, response_flow1, response_flow2 = flows + + try: + # Mock the send_message_to_flow method of the connector + connector.send_message_to_flow = MagicMock() + + # Get the RequestResponseControllers from the requester component + requester_component = request_flow['flow'].component_groups[0][0] + controller1 = requester_component.get_request_response_controller("controller1") + controller2 = requester_component.get_request_response_controller("controller2") + + assert controller1 is not None, "RequestResponseController 1 not found" + assert controller2 is not None, "RequestResponseController 2 not found" + + # Test sending messages to both controllers + request_data = { + "payload": {"test": "data"}, + "topic": "test/topic", + "user_properties": {} + } + + controller1.send_message(request_data) + controller2.send_message(request_data) + + # Check that send_message_to_flow was called twice with different flow names + assert connector.send_message_to_flow.call_count == 2 + call_args_list = connector.send_message_to_flow.call_args_list + assert call_args_list[0][0][0] == "response_flow1" + assert call_args_list[1][0][0] == "response_flow2" + + finally: + dispose_connector(connector) From 03e8292c859e914d5ca45c37d6cf21ad77fa704f Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Mon, 16 Sep 2024 08:45:55 -0400 Subject: [PATCH 11/30] feat: finished implementation and added some tests --- .../components/component_base.py | 59 ++-- .../components/general/delay.py | 3 +- .../general/for_testing/handler_callback.py | 67 ++++ src/solace_ai_connector/flow/flow.py | 26 +- .../flow/request_response_controller.py | 151 ++++---- .../solace_ai_connector.py | 33 +- .../test_utils/utils_for_test_files.py | 2 + tests/test_request_response_controller.py | 331 ++++++++++-------- 8 files changed, 405 insertions(+), 267 deletions(-) create mode 100644 src/solace_ai_connector/components/general/for_testing/handler_callback.py diff --git a/src/solace_ai_connector/components/component_base.py b/src/solace_ai_connector/components/component_base.py index ec0d1bf..86dd1a3 100644 --- a/src/solace_ai_connector/components/component_base.py +++ b/src/solace_ai_connector/components/component_base.py @@ -10,6 +10,7 @@ from ..common.message import Message from ..common.trace_message import TraceMessage from ..common.event import Event, EventType +from ..flow.request_response_controller import RequestResponseController DEFAULT_QUEUE_TIMEOUT_MS = 200 DEFAULT_QUEUE_MAX_DEPTH = 5 @@ -39,7 +40,6 @@ def __init__(self, module_info, **kwargs): resolve_config_values(self.component_config) self.request_response_controllers = {} - self.initialize_request_response_controllers() self.next_component = None self.thread = None @@ -61,6 +61,10 @@ def create_thread_and_run(self): return self.thread def run(self): + # Init the request response controllers here so that we know + # the connector is fully initialized and all flows are created + self.initialize_request_response_controllers() + while not self.stop_signal.is_set(): event = None try: @@ -81,22 +85,6 @@ def run(self): self.stop_component() - def process_single_event(self, event): - try: - if self.trace_queue: - self.trace_event(event) - return self.process_event(event) - except Exception as e: - log.error( - "%sComponent has encountered an error: %s\n%s", - self.log_identifier, - e, - traceback.format_exc(), - ) - if self.error_queue: - self.handle_error(e, event) - raise - def get_next_event(self): # Check if there is a get_next_message defined by a # component that inherits from this class - this is @@ -233,7 +221,11 @@ def get_config(self, key=None, default=None): val = self.component_config.get(key, None) if val is None: val = self.config.get(key, default) - if callable(val): + if callable(val) and key not in [ + "invoke_handler", + "get_next_event_handler", + "send_message_handler", + ]: if self.current_message is None: raise ValueError( f"Component {self.log_identifier} is trying to use an `invoke` config " @@ -386,12 +378,33 @@ def cleanup(self): break def initialize_request_response_controllers(self): - if self.connector: - request_response_controllers = self.config.get("request_response_controllers", {}) - for controller_name, controller_config in request_response_controllers.items(): - self.connector.create_request_response_controller( - self, controller_name, controller_config + request_response_controllers_config = self.config.get( + "request_response_controllers", [] + ) + if request_response_controllers_config: + for rrc_config in request_response_controllers_config: + name = rrc_config.get("name") + if not name: + raise ValueError( + f"Request Response Controller in component {self.name} does not have a name" + ) + + rrc = RequestResponseController( + config=rrc_config, connector=self.connector ) + if not rrc: + raise ValueError( + f"Request Response Controller failed to initialize in component {self.name}" + ) + + self.request_response_controllers[name] = rrc + def get_request_response_controller(self, name): return self.request_response_controllers.get(name) + + def send_request_response_message(self, rrc_name, message, data): + rrc = self.get_request_response_controller(rrc_name) + if rrc: + return rrc.send_message(message, data) + return None diff --git a/src/solace_ai_connector/components/general/delay.py b/src/solace_ai_connector/components/general/delay.py index d4a05d0..8d8aaf0 100644 --- a/src/solace_ai_connector/components/general/delay.py +++ b/src/solace_ai_connector/components/general/delay.py @@ -38,5 +38,6 @@ def __init__(self, **kwargs): super().__init__(info, **kwargs) def invoke(self, message, data): - sleep(self.get_config("delay")) + delay = self.get_config("delay") + sleep(delay) return deepcopy(data) diff --git a/src/solace_ai_connector/components/general/for_testing/handler_callback.py b/src/solace_ai_connector/components/general/for_testing/handler_callback.py new file mode 100644 index 0000000..12d0ea7 --- /dev/null +++ b/src/solace_ai_connector/components/general/for_testing/handler_callback.py @@ -0,0 +1,67 @@ +"""This test component allows a tester to configure callback handlers for + get_next_event, send_message and invoke methods""" + +from ...component_base import ComponentBase + + +info = { + "class_name": "HandlerCallback", + "description": ( + "This test component allows a tester to configure callback handlers for " + "get_next_event, send_message and invoke methods" + ), + "config_parameters": [ + { + "name": "get_next_event_handler", + "required": False, + "description": "The callback handler for the get_next_event method", + "type": "function", + }, + { + "name": "send_message_handler", + "required": False, + "description": "The callback handler for the send_message method", + "type": "function", + }, + { + "name": "invoke_handler", + "required": False, + "description": "The callback handler for the invoke method", + "type": "function", + }, + ], + "input_schema": { + "type": "object", + "properties": {}, + }, + "output_schema": { + "type": "object", + "properties": {}, + }, +} + + +class HandlerCallback(ComponentBase): + def __init__(self, **kwargs): + super().__init__(info, **kwargs) + self.get_next_event_handler = self.get_config("get_next_event_handler") + self.send_message_handler = self.get_config("send_message_handler") + self.invoke_handler = self.get_config("invoke_handler") + + def get_next_event(self): + if self.get_next_event_handler: + return self.get_next_event_handler(self) + else: + return super().get_next_event() + + def send_message(self, message): + if self.send_message_handler: + return self.send_message_handler(self, message) + else: + return super().send_message(message) + + def invoke(self, message, data): + if self.invoke_handler: + return self.invoke_handler(self, message, data) + else: + return super().invoke(message, data) diff --git a/src/solace_ai_connector/flow/flow.py b/src/solace_ai_connector/flow/flow.py index b591128..ea5091c 100644 --- a/src/solace_ai_connector/flow/flow.py +++ b/src/solace_ai_connector/flow/flow.py @@ -47,7 +47,6 @@ def __init__( trace_queue=None, flow_instance_index=0, connector=None, - for_request_response=False, ): self.flow_config = flow_config self.flow_index = flow_index @@ -65,9 +64,11 @@ def __init__( self.flow_lock_manager = Flow._lock_manager self.flow_kv_store = Flow._kv_store self.cache_service = connector.cache_service if connector else None - self.for_request_response = for_request_response self.create_components() + def get_input_queue(self): + return self.flow_input_queue + def create_components(self): # Loop through the components and create them for index, component in enumerate(self.flow_config.get("components", [])): @@ -79,16 +80,15 @@ def create_components(self): for component in component_group: component.set_next_component(self.component_groups[index + 1][0]) - # For request-response flows, don't create threads - if not self.for_request_response: - # Now one more time to create threads and run them - for index, component_group in enumerate(self.component_groups): - for component in component_group: - thread = component.create_thread_and_run() - self.threads.append(thread) - self.flow_input_queue = self.component_groups[0][0].get_input_queue() + def run(self): + # Now one more time to create threads and run them + for _index, component_group in enumerate(self.component_groups): + for component in component_group: + thread = component.create_thread_and_run() + self.threads.append(thread) + def create_component_group(self, component, index): component_module = component.get("component_module", "") base_path = component.get("component_base_path", None) @@ -137,6 +137,12 @@ def create_component_group(self, component, index): def get_flow_input_queue(self): return self.flow_input_queue + # This will set the next component in all the components in the + # last component group + def set_next_component(self, component): + for comp in self.component_groups[-1]: + comp.set_next_component(component) + def wait_for_threads(self): for thread in self.threads: thread.join() diff --git a/src/solace_ai_connector/flow/request_response_controller.py b/src/solace_ai_connector/flow/request_response_controller.py index 31971fa..67978ac 100644 --- a/src/solace_ai_connector/flow/request_response_controller.py +++ b/src/solace_ai_connector/flow/request_response_controller.py @@ -20,96 +20,111 @@ """ -import threading import queue import time from typing import Dict, Any +from ..common.message import Message +from ..common.event import Event, EventType -class RequestResponseFlowManager: - def __init__(self): - self.flows: Dict[str, Any] = {} - - def add_flow(self, flow_name: str, flow): - self.flows[flow_name] = flow - - def get_flow(self, flow_name: str): - return self.flows.get(flow_name) +# This is a very basic component which will be stitched onto the final component in the flow +class RequestResponseControllerOuputComponent: + def __init__(self, controller): + self.controller = controller -from typing import Dict, Any, TYPE_CHECKING -from ..common.event import Event, EventType + def enqueue(self, event): + self.controller.enqueue_response(event) -if TYPE_CHECKING: - from ..solace_ai_connector import SolaceAiConnector +# This is the main class that will be used to send messages to a flow and receive the response class RequestResponseController: - def __init__( - self, config: Dict[str, Any], connector: 'SolaceAiConnector' - ): + def __init__(self, config: Dict[str, Any], connector: "SolaceAiConnector"): self.config = config self.connector = connector - self.flow_name = config["flow_name"] + self.flow_name = config.get("flow_name") self.streaming = config.get("streaming", False) self.streaming_last_message_expression = config.get( "streaming_last_message_expression" ) - self.timeout_ms = config.get("timeout_ms", 30000) - self.response_queue = queue.Queue() - self.flow_instance = self.connector.create_flow_instance(self.flow_name) - self.input_queue = self.flow_instance.get_flow_input_queue() - self.setup_response_queue() + self.timeout_s = config.get("timeout_ms", 30000) / 1000 + self.input_queue = None + self.response_queue = None + self.enqueue_time = None + self.request_outstanding = False + + flow = connector.get_flow(self.flow_name) + + if not flow: + raise ValueError(f"Flow {self.flow_name} not found") + + self.setup_queues(flow) - def setup_response_queue(self): - last_component = self.flow_instance.component_groups[-1][-1] - last_component.set_next_component(self) + def setup_queues(self, flow): + # Input queue to send the message to the flow + self.input_queue = flow.get_input_queue() + + # Response queue to receive the response from the flow + self.response_queue = queue.Queue() + rrcComponent = RequestResponseControllerOuputComponent(self) + flow.set_next_component(rrcComponent) + + def send_message(self, message: Message, data: Any): + # Make a new message, but copy the data from the original message + payload = message.get_payload() + topic = message.get_topic() + user_properties = message.get_user_properties() + new_message = Message( + payload=payload, topic=topic, user_properties=user_properties + ) + new_message.set_previous(data) - def send_message(self, message: Any): if not self.input_queue: raise ValueError(f"Input queue for flow {self.flow_name} not found") - event = Event(EventType.MESSAGE, message) + event = Event(EventType.MESSAGE, new_message) + self.enqueue_time = time.time() + self.request_outstanding = True self.input_queue.put(event) + return self.response_iterator - def enqueue(self, event): - if event.event_type == EventType.MESSAGE: - self.response_queue.put(event.data) - - def get_response(self): - try: + def response_iterator(self): + while True: + now = time.time() + elapsed_time = now - self.enqueue_time + remaining_timeout = self.timeout_s - elapsed_time if self.streaming: - return self._get_streaming_response() + # If we are in streaming mode, we will return individual messages + # until we receive the last message. Use the expression to determine + # if this is the last message + while True: + try: + event = self.response_queue.get(timeout=remaining_timeout) + if event.event_type == EventType.MESSAGE: + message = event.data + yield message, message.get_previous() + if self.streaming_last_message_expression: + last_message = message.get_data( + self.streaming_last_message_expression + ) + if last_message: + return + except queue.Empty: + if (time.time() - self.enqueue_time) > self.timeout_s: + raise TimeoutError("Timeout waiting for response") + else: - return self.response_queue.get(timeout=self.timeout_ms / 1000) - except queue.Empty: - raise TimeoutError( - f"Timeout waiting for response from flow {self.flow_name}" - ) - - def _get_streaming_response(self): - responses = [] - start_time = time.time() - while True: - try: - response = self.response_queue.get( - timeout=(start_time + self.timeout_ms / 1000 - time.time()) - ) - responses.append(response) - if self.streaming_last_message_expression: - if self._is_last_message(response): - break - except queue.Empty: - if responses: - break - raise TimeoutError( - f"Timeout waiting for streaming response from flow {self.flow_name}" - ) - return responses - - def _is_last_message(self, message): - # Implement logic to check if this is the last message based on the streaming_last_message_expression - # This might involve parsing the expression and checking the message content - pass - - def handle_response(self, response): - self.response_queue.put(response) + # If we are not in streaming mode, we will return a single message + # and then stop the iterator + try: + event = self.response_queue.get(timeout=remaining_timeout) + if event.event_type == EventType.MESSAGE: + message = event.data + yield message, message.get_previous() + return + except queue.Empty: + if (time.time() - self.enqueue_time) > self.timeout_s: + raise TimeoutError("Timeout waiting for response") + + def enqueue_response(self, event): + self.response_queue.put(event) diff --git a/src/solace_ai_connector/solace_ai_connector.py b/src/solace_ai_connector/solace_ai_connector.py index 93c13de..7c4a688 100644 --- a/src/solace_ai_connector/solace_ai_connector.py +++ b/src/solace_ai_connector/solace_ai_connector.py @@ -11,7 +11,6 @@ from .flow.timer_manager import TimerManager from .common.event import Event, EventType from .services.cache_service import CacheService, create_storage_backend -from .flow.request_response_controller import RequestResponseController class SolaceAiConnector: @@ -33,12 +32,6 @@ def __init__(self, config, event_handlers=None, error_queue=None): self.instance_name = self.config.get("instance_name", "solace_ai_connector") self.timer_manager = TimerManager(self.stop_signal) self.cache_service = self.setup_cache_service() - self.request_response_controllers = {} - - def create_request_response_controller(self, component, controller_name, controller_config): - controller = RequestResponseController(controller_config, self) - component.request_response_controllers[controller_name] = controller - return controller def run(self): """Run the Solace AI Event Connector""" @@ -69,14 +62,10 @@ def create_flows(self): flow_input_queue = flow_instance.get_flow_input_queue() self.flow_input_queues[flow.get("name")] = flow_input_queue self.flows.append(flow_instance) + for flow in self.flows: + flow.run() - def create_flow( - self, - flow: dict, - index: int, - flow_instance_index: int, - for_request_response=False, - ): + def create_flow(self, flow: dict, index: int, flow_instance_index: int): """Create a single flow""" return Flow( @@ -88,17 +77,8 @@ def create_flow( instance_name=self.instance_name, trace_queue=self.trace_queue, connector=self, - for_request_response=for_request_response, ) - def create_flow_instance(self, flow_name: str): - """Create a new instance of a flow for request-response""" - for flow in self.config.get("flows", []): - if flow.get("name") == flow_name: - new_flow = self.create_flow(flow, -1, -1, for_request_response=True) - return new_flow - raise ValueError(f"Flow '{flow_name}' not found") - def send_message_to_flow(self, flow_name, message): """Send a message to a flow""" flow_input_queue = self.flow_input_queues.get(flow_name) @@ -224,6 +204,13 @@ def get_flows(self): """Return the flows""" return self.flows + def get_flow(self, flow_name): + """Return a specific flow by name""" + for flow in self.flows: + if flow.name == flow_name: + return flow + return None + def setup_cache_service(self): """Setup the cache service""" cache_config = self.config.get("cache", {}) diff --git a/src/solace_ai_connector/test_utils/utils_for_test_files.py b/src/solace_ai_connector/test_utils/utils_for_test_files.py index fec9bad..15a6492 100644 --- a/src/solace_ai_connector/test_utils/utils_for_test_files.py +++ b/src/solace_ai_connector/test_utils/utils_for_test_files.py @@ -156,6 +156,8 @@ def create_test_flows( # For each of the flows, add the input and output components flow_info = [] for flow in flows: + if flow.flow_config.get("test_ignore", False): + continue input_component = TestInputComponent( flow.component_groups[0][0].get_input_queue() ) diff --git a/tests/test_request_response_controller.py b/tests/test_request_response_controller.py index 31ec39b..fa401ae 100644 --- a/tests/test_request_response_controller.py +++ b/tests/test_request_response_controller.py @@ -1,6 +1,5 @@ import sys import pytest -from unittest.mock import MagicMock sys.path.append("src") @@ -11,170 +10,218 @@ get_message_from_flow, ) from solace_ai_connector.common.message import Message -from solace_ai_connector.flow.request_response_controller import RequestResponseController +from solace_ai_connector.flow.request_response_controller import ( + RequestResponseController, +) def test_request_response_controller_basic(): """Test basic functionality of the RequestResponseController""" - config_yaml = """ -log: - log_file_level: DEBUG - log_file: solace_ai_connector.log -flows: - - name: request_flow - components: - - component_name: requester - component_module: pass_through - request_response_controllers: - test_controller: - flow_name: response_flow - timeout_ms: 5000 - - name: response_flow - components: - - component_name: responder - component_module: pass_through -""" - connector, flows = create_test_flows(config_yaml) - request_flow, response_flow = flows + + def test_invoke_handler(component, message, data): + # Call the request_response_flow + data_iter = component.send_request_response_message( + "test_controller", message, {"test": "data"} + ) + + # Just a single message with no streaming + for message, data in data_iter(): + assert message.get_data("previous") == {"test": "data"} + assert message.get_data("input.payload") == {"text": "Hello, World!"} + + return "done" + + config = { + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "requester", + "component_module": "handler_callback", + "component_config": { + "invoke_handler": test_invoke_handler, + }, + "request_response_controllers": [ + { + "name": "test_controller", + "flow_name": "request_response_flow", + "timeout_ms": 500000, + } + ], + } + ], + }, + { + "name": "request_response_flow", + "test_ignore": True, + "components": [ + {"component_name": "responder", "component_module": "pass_through"} + ], + }, + ] + } + connector, flows = create_test_flows(config) + + test_flow = flows[0] try: - # Mock the send_message_to_flow method of the connector - connector.send_message_to_flow = MagicMock() - - # Get the RequestResponseController from the requester component - requester_component = request_flow['flow'].component_groups[0][0] - controller = requester_component.get_request_response_controller("test_controller") - - assert controller is not None, "RequestResponseController not found" - - # Test sending a message - request_data = { - "payload": {"test": "data"}, - "topic": "test/topic", - "user_properties": {} - } - response = controller.send_message(request_data) - - # Check that send_message_to_flow was called with the correct arguments - connector.send_message_to_flow.assert_called_once() - call_args = connector.send_message_to_flow.call_args - assert call_args[0][0] == "response_flow" - sent_message = call_args[0][1] - assert sent_message.get_payload() == {"test": "data"} - assert sent_message.get_topic() == "test/topic" - - # Simulate a response - response_message = Message(payload={"response": "data"}) - send_message_to_flow(response_flow, response_message) - - # Check the response - assert response == {"response": "data"} + + # Send a message to the input flow + send_message_to_flow(test_flow, Message(payload={"text": "Hello, World!"})) + + # Get the output message + output_message = get_message_from_flow(test_flow) + + assert output_message.get_data("previous") == "done" finally: dispose_connector(connector) -def test_request_response_controller_timeout(): - """Test timeout functionality of the RequestResponseController""" - config_yaml = """ -log: - log_file_level: DEBUG - log_file: solace_ai_connector.log -flows: - - name: request_flow - components: - - component_name: requester - component_module: pass_through - request_response_controllers: - test_controller: - flow_name: response_flow - timeout_ms: 100 # Very short timeout for testing - - name: response_flow - components: - - component_name: responder - component_module: pass_through -""" - connector, flows = create_test_flows(config_yaml) - request_flow = flows[0] +# Test simple streaming request response +# Use the iterate component to break a single message into multiple messages +def test_request_response_controller_streaming(): + """Test streaming functionality of the RequestResponseController""" + + def test_invoke_handler(component, message, data): + # Call the request_response_flow + data_iter = component.send_request_response_message( + "test_controller", + message, + [ + {"test": "data1", "streaming": {"last_message": False}}, + {"test": "data2", "streaming": {"last_message": False}}, + {"test": "data3", "streaming": {"last_message": True}}, + ], + ) + + # Expecting 3 messages + results = [] + for message, data in data_iter(): + results.append(data.get("test")) + + assert results == ["data1", "data2", "data3"] + return "done" + + config = { + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "requester", + "component_module": "handler_callback", + "component_config": { + "invoke_handler": test_invoke_handler, + }, + "request_response_controllers": [ + { + "name": "test_controller", + "flow_name": "request_response_flow", + "streaming": True, + "streaming_last_message_expression": "previous:streaming.last_message", + "timeout_ms": 500000, + } + ], + } + ], + }, + { + "name": "request_response_flow", + "test_ignore": True, + "components": [ + {"component_name": "responder", "component_module": "iterate"} + ], + }, + ] + } + connector, flows = create_test_flows(config) + + test_flow = flows[0] try: - # Get the RequestResponseController from the requester component - requester_component = request_flow['flow'].component_groups[0][0] - controller = requester_component.get_request_response_controller("test_controller") - assert controller is not None, "RequestResponseController not found" + # Send a message to the input flow + send_message_to_flow(test_flow, Message(payload={"text": "Hello, World!"})) - # Test sending a message - request_data = { - "payload": {"test": "data"}, - "topic": "test/topic", - "user_properties": {} - } + # Get the output message + output_message = get_message_from_flow(test_flow) - with pytest.raises(TimeoutError): - controller.send_message(request_data) + assert output_message.get_data("previous") == "done" finally: dispose_connector(connector) -def test_multiple_request_response_controllers(): - """Test multiple RequestResponseControllers in a single component""" - config_yaml = """ -log: - log_file_level: DEBUG - log_file: solace_ai_connector.log -flows: - - name: request_flow - components: - - component_name: requester - component_module: pass_through - request_response_controllers: - controller1: - flow_name: response_flow1 - timeout_ms: 5000 - controller2: - flow_name: response_flow2 - timeout_ms: 5000 - - name: response_flow1 - components: - - component_name: responder1 - component_module: pass_through - - name: response_flow2 - components: - - component_name: responder2 - component_module: pass_through -""" - connector, flows = create_test_flows(config_yaml) - request_flow, response_flow1, response_flow2 = flows +# Test the timeout functionality +def test_request_response_controller_timeout(): + """Test timeout functionality of the RequestResponseController""" + + def test_invoke_handler(component, message, data): + # Call the request_response_flow + data_iter = component.send_request_response_message( + "test_controller", message, {"test": "data"} + ) + + # This will timeout + try: + for message, data in data_iter(): + assert message.get_data("previous") == {"test": "data"} + assert message.get_data("input.payload") == {"text": "Hello, World!"} + except TimeoutError: + return "timeout" + return "done" + + config = { + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "requester", + "component_module": "handler_callback", + "component_config": { + "invoke_handler": test_invoke_handler, + }, + "request_response_controllers": [ + { + "name": "test_controller", + "flow_name": "request_response_flow", + "timeout_ms": 1000, + } + ], + } + ], + }, + { + "name": "request_response_flow", + "test_ignore": True, + "components": [ + { + "component_name": "responder", + "component_module": "delay", + "component_config": { + "delay": 5, + }, + } + ], + }, + ] + } + connector, flows = create_test_flows(config) + + test_flow = flows[0] try: - # Mock the send_message_to_flow method of the connector - connector.send_message_to_flow = MagicMock() - - # Get the RequestResponseControllers from the requester component - requester_component = request_flow['flow'].component_groups[0][0] - controller1 = requester_component.get_request_response_controller("controller1") - controller2 = requester_component.get_request_response_controller("controller2") - - assert controller1 is not None, "RequestResponseController 1 not found" - assert controller2 is not None, "RequestResponseController 2 not found" - - # Test sending messages to both controllers - request_data = { - "payload": {"test": "data"}, - "topic": "test/topic", - "user_properties": {} - } - - controller1.send_message(request_data) - controller2.send_message(request_data) - - # Check that send_message_to_flow was called twice with different flow names - assert connector.send_message_to_flow.call_count == 2 - call_args_list = connector.send_message_to_flow.call_args_list - assert call_args_list[0][0][0] == "response_flow1" - assert call_args_list[1][0][0] == "response_flow2" + + # Send a message to the input flow + send_message_to_flow(test_flow, Message(payload={"text": "Hello, World!"})) + + # Get the output message + output_message = get_message_from_flow(test_flow) + + assert output_message.get_data("previous") == "timeout" finally: dispose_connector(connector) From 9943637bc054e6ca172a58dbdab10c8226df9bb3 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Mon, 16 Sep 2024 08:53:01 -0400 Subject: [PATCH 12/30] refactor: rename RequestResponseController to RequestResponseFlowController --- .../components/component_base.py | 38 +++++++++---------- .../flow/request_response_controller.py | 2 +- tests/test_request_response_controller.py | 10 ++--- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/src/solace_ai_connector/components/component_base.py b/src/solace_ai_connector/components/component_base.py index 86dd1a3..c029b53 100644 --- a/src/solace_ai_connector/components/component_base.py +++ b/src/solace_ai_connector/components/component_base.py @@ -39,7 +39,7 @@ def __init__(self, module_info, **kwargs): resolve_config_values(self.component_config) - self.request_response_controllers = {} + self.request_response_flow_controllers = {} self.next_component = None self.thread = None @@ -377,34 +377,34 @@ def cleanup(self): except queue.Empty: break - def initialize_request_response_controllers(self): - request_response_controllers_config = self.config.get( - "request_response_controllers", [] + def initialize_request_response_flow_controllers(self): + request_response_flow_controllers_config = self.config.get( + "request_response_flow_controllers", [] ) - if request_response_controllers_config: - for rrc_config in request_response_controllers_config: - name = rrc_config.get("name") + if request_response_flow_controllers_config: + for rrfc_config in request_response_flow_controllers_config: + name = rrfc_config.get("name") if not name: raise ValueError( - f"Request Response Controller in component {self.name} does not have a name" + f"Request Response Flow Controller in component {self.name} does not have a name" ) - rrc = RequestResponseController( - config=rrc_config, connector=self.connector + rrfc = RequestResponseFlowController( + config=rrfc_config, connector=self.connector ) - if not rrc: + if not rrfc: raise ValueError( - f"Request Response Controller failed to initialize in component {self.name}" + f"Request Response Flow Controller failed to initialize in component {self.name}" ) - self.request_response_controllers[name] = rrc + self.request_response_flow_controllers[name] = rrfc - def get_request_response_controller(self, name): - return self.request_response_controllers.get(name) + def get_request_response_flow_controller(self, name): + return self.request_response_flow_controllers.get(name) - def send_request_response_message(self, rrc_name, message, data): - rrc = self.get_request_response_controller(rrc_name) - if rrc: - return rrc.send_message(message, data) + def send_request_response_flow_message(self, rrfc_name, message, data): + rrfc = self.get_request_response_flow_controller(rrfc_name) + if rrfc: + return rrfc.send_message(message, data) return None diff --git a/src/solace_ai_connector/flow/request_response_controller.py b/src/solace_ai_connector/flow/request_response_controller.py index 67978ac..ef0aae2 100644 --- a/src/solace_ai_connector/flow/request_response_controller.py +++ b/src/solace_ai_connector/flow/request_response_controller.py @@ -38,7 +38,7 @@ def enqueue(self, event): # This is the main class that will be used to send messages to a flow and receive the response -class RequestResponseController: +class RequestResponseFlowController: def __init__(self, config: Dict[str, Any], connector: "SolaceAiConnector"): self.config = config self.connector = connector diff --git a/tests/test_request_response_controller.py b/tests/test_request_response_controller.py index fa401ae..1343603 100644 --- a/tests/test_request_response_controller.py +++ b/tests/test_request_response_controller.py @@ -11,16 +11,16 @@ ) from solace_ai_connector.common.message import Message from solace_ai_connector.flow.request_response_controller import ( - RequestResponseController, + RequestResponseFlowController, ) -def test_request_response_controller_basic(): - """Test basic functionality of the RequestResponseController""" +def test_request_response_flow_controller_basic(): + """Test basic functionality of the RequestResponseFlowController""" def test_invoke_handler(component, message, data): # Call the request_response_flow - data_iter = component.send_request_response_message( + data_iter = component.send_request_response_flow_message( "test_controller", message, {"test": "data"} ) @@ -42,7 +42,7 @@ def test_invoke_handler(component, message, data): "component_config": { "invoke_handler": test_invoke_handler, }, - "request_response_controllers": [ + "request_response_flow_controllers": [ { "name": "test_controller", "flow_name": "request_response_flow", From 49796c873333e2c2abdf6a516758338c6a06ec19 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Mon, 16 Sep 2024 08:53:42 -0400 Subject: [PATCH 13/30] refactor: rename RequestResponseController to RequestResponseFlowController --- src/solace_ai_connector/components/component_base.py | 2 +- src/solace_ai_connector/flow/request_response_controller.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/solace_ai_connector/components/component_base.py b/src/solace_ai_connector/components/component_base.py index c029b53..34eab10 100644 --- a/src/solace_ai_connector/components/component_base.py +++ b/src/solace_ai_connector/components/component_base.py @@ -10,7 +10,7 @@ from ..common.message import Message from ..common.trace_message import TraceMessage from ..common.event import Event, EventType -from ..flow.request_response_controller import RequestResponseController +from ..flow.request_response_controller import RequestResponseFlowController DEFAULT_QUEUE_TIMEOUT_MS = 200 DEFAULT_QUEUE_MAX_DEPTH = 5 diff --git a/src/solace_ai_connector/flow/request_response_controller.py b/src/solace_ai_connector/flow/request_response_controller.py index ef0aae2..9973165 100644 --- a/src/solace_ai_connector/flow/request_response_controller.py +++ b/src/solace_ai_connector/flow/request_response_controller.py @@ -22,7 +22,10 @@ import queue import time -from typing import Dict, Any +from typing import Dict, Any, TYPE_CHECKING + +if TYPE_CHECKING: + from ..solace_ai_connector import SolaceAiConnector from ..common.message import Message from ..common.event import Event, EventType From 3db7fa9e05f6065720a939b550089097abba1d9d Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Mon, 16 Sep 2024 09:07:56 -0400 Subject: [PATCH 14/30] refactor: some name changes --- .../components/component_base.py | 4 ++-- ...er.py => request_response_flow_controller.py} | 7 ++----- tests/test_request_response_controller.py | 16 ++++++++-------- 3 files changed, 12 insertions(+), 15 deletions(-) rename src/solace_ai_connector/flow/{request_response_controller.py => request_response_flow_controller.py} (96%) diff --git a/src/solace_ai_connector/components/component_base.py b/src/solace_ai_connector/components/component_base.py index 34eab10..e20ecae 100644 --- a/src/solace_ai_connector/components/component_base.py +++ b/src/solace_ai_connector/components/component_base.py @@ -10,7 +10,7 @@ from ..common.message import Message from ..common.trace_message import TraceMessage from ..common.event import Event, EventType -from ..flow.request_response_controller import RequestResponseFlowController +from ..flow.request_response_flow_controller import RequestResponseFlowController DEFAULT_QUEUE_TIMEOUT_MS = 200 DEFAULT_QUEUE_MAX_DEPTH = 5 @@ -63,7 +63,7 @@ def create_thread_and_run(self): def run(self): # Init the request response controllers here so that we know # the connector is fully initialized and all flows are created - self.initialize_request_response_controllers() + self.initialize_request_response_flow_controllers() while not self.stop_signal.is_set(): event = None diff --git a/src/solace_ai_connector/flow/request_response_controller.py b/src/solace_ai_connector/flow/request_response_flow_controller.py similarity index 96% rename from src/solace_ai_connector/flow/request_response_controller.py rename to src/solace_ai_connector/flow/request_response_flow_controller.py index 9973165..d81d711 100644 --- a/src/solace_ai_connector/flow/request_response_controller.py +++ b/src/solace_ai_connector/flow/request_response_flow_controller.py @@ -22,10 +22,7 @@ import queue import time -from typing import Dict, Any, TYPE_CHECKING - -if TYPE_CHECKING: - from ..solace_ai_connector import SolaceAiConnector +from typing import Dict, Any from ..common.message import Message from ..common.event import Event, EventType @@ -42,7 +39,7 @@ def enqueue(self, event): # This is the main class that will be used to send messages to a flow and receive the response class RequestResponseFlowController: - def __init__(self, config: Dict[str, Any], connector: "SolaceAiConnector"): + def __init__(self, config: Dict[str, Any], connector): self.config = config self.connector = connector self.flow_name = config.get("flow_name") diff --git a/tests/test_request_response_controller.py b/tests/test_request_response_controller.py index 1343603..da3063a 100644 --- a/tests/test_request_response_controller.py +++ b/tests/test_request_response_controller.py @@ -1,5 +1,4 @@ import sys -import pytest sys.path.append("src") @@ -10,22 +9,19 @@ get_message_from_flow, ) from solace_ai_connector.common.message import Message -from solace_ai_connector.flow.request_response_controller import ( - RequestResponseFlowController, -) def test_request_response_flow_controller_basic(): """Test basic functionality of the RequestResponseFlowController""" - def test_invoke_handler(component, message, data): + def test_invoke_handler(component, message, _data): # Call the request_response_flow data_iter = component.send_request_response_flow_message( "test_controller", message, {"test": "data"} ) # Just a single message with no streaming - for message, data in data_iter(): + for message, _data in data_iter(): assert message.get_data("previous") == {"test": "data"} assert message.get_data("input.payload") == {"text": "Hello, World!"} @@ -86,7 +82,7 @@ def test_request_response_controller_streaming(): def test_invoke_handler(component, message, data): # Call the request_response_flow - data_iter = component.send_request_response_message( + data_iter = component.send_request_response_flow_message( "test_controller", message, [ @@ -150,6 +146,10 @@ def test_invoke_handler(component, message, data): assert output_message.get_data("previous") == "done" + except Exception as e: + print(e) + assert False + finally: dispose_connector(connector) @@ -160,7 +160,7 @@ def test_request_response_controller_timeout(): def test_invoke_handler(component, message, data): # Call the request_response_flow - data_iter = component.send_request_response_message( + data_iter = component.send_request_response_flow_message( "test_controller", message, {"test": "data"} ) From a6c0c5dda1f0a4a51959c8589dbd6dc0dd166002 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Mon, 16 Sep 2024 09:08:28 -0400 Subject: [PATCH 15/30] fix: update test function names for RequestResponseFlowController --- tests/test_request_response_controller.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_request_response_controller.py b/tests/test_request_response_controller.py index da3063a..d8cc3a1 100644 --- a/tests/test_request_response_controller.py +++ b/tests/test_request_response_controller.py @@ -77,8 +77,8 @@ def test_invoke_handler(component, message, _data): # Test simple streaming request response # Use the iterate component to break a single message into multiple messages -def test_request_response_controller_streaming(): - """Test streaming functionality of the RequestResponseController""" +def test_request_response_flow_controller_streaming(): + """Test streaming functionality of the RequestResponseFlowController""" def test_invoke_handler(component, message, data): # Call the request_response_flow @@ -155,8 +155,8 @@ def test_invoke_handler(component, message, data): # Test the timeout functionality -def test_request_response_controller_timeout(): - """Test timeout functionality of the RequestResponseController""" +def test_request_response_flow_controller_timeout(): + """Test timeout functionality of the RequestResponseFlowController""" def test_invoke_handler(component, message, data): # Call the request_response_flow From b836da7dc9d067fd70a7ae627464e0b431071d9a Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Mon, 16 Sep 2024 09:10:46 -0400 Subject: [PATCH 16/30] refactor: more name changes --- .../flow/request_response_flow_controller.py | 2 +- tests/test_request_response_controller.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/solace_ai_connector/flow/request_response_flow_controller.py b/src/solace_ai_connector/flow/request_response_flow_controller.py index d81d711..373f2ec 100644 --- a/src/solace_ai_connector/flow/request_response_flow_controller.py +++ b/src/solace_ai_connector/flow/request_response_flow_controller.py @@ -10,7 +10,7 @@ components: - component_name: example_component component_module: custom_component - request_response_controllers: + request_response_flow_controllers: - name: example_controller flow_name: llm_flow streaming: true diff --git a/tests/test_request_response_controller.py b/tests/test_request_response_controller.py index d8cc3a1..5719bad 100644 --- a/tests/test_request_response_controller.py +++ b/tests/test_request_response_controller.py @@ -111,7 +111,7 @@ def test_invoke_handler(component, message, data): "component_config": { "invoke_handler": test_invoke_handler, }, - "request_response_controllers": [ + "request_response_flow_controllers": [ { "name": "test_controller", "flow_name": "request_response_flow", @@ -184,7 +184,7 @@ def test_invoke_handler(component, message, data): "component_config": { "invoke_handler": test_invoke_handler, }, - "request_response_controllers": [ + "request_response_flow_controllers": [ { "name": "test_controller", "flow_name": "request_response_flow", From 4ab4bfc4e91ccec3bd6b7d52196b34b3cd60fd36 Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Sat, 21 Sep 2024 14:50:52 -0400 Subject: [PATCH 17/30] Ed/req_resp_examples_and_fixes (#41) * feat: Added a request_response_flow example and fixed a few issues along the way * feat: Reworked the broker_request_response built-in ability of components to be simpler. Instead of having to have a defined flow and then name that flow, it will automatically create a flow with a single broker_request_response component in it. Now there is a straightforward interating function call to allow components to issue a request and get streaming or non-streaming responses from that flow. * chore: fix the request_response example and remove the old one --- examples/llm/custom_components/__init__.py | 0 .../llm_streaming_custom_component.py | 52 +++++ .../openai_component_request_response.yaml | 146 +++++++++++++ src/solace_ai_connector/common/event.py | 5 +- .../components/component_base.py | 80 ++++--- .../general/openai/openai_chat_model_base.py | 110 ++++++++-- .../components/inputs_outputs/broker_base.py | 9 +- .../components/inputs_outputs/broker_input.py | 3 +- .../inputs_outputs/broker_request_response.py | 164 +++++++++++--- .../components/inputs_outputs/stdin_input.py | 22 +- .../inputs_outputs/stdout_output.py | 19 +- .../flow/request_response_flow_controller.py | 154 +++++++------ .../solace_ai_connector.py | 9 - tests/test_request_response_controller.py | 205 ++++++++++-------- 14 files changed, 726 insertions(+), 252 deletions(-) create mode 100644 examples/llm/custom_components/__init__.py create mode 100644 examples/llm/custom_components/llm_streaming_custom_component.py create mode 100644 examples/llm/openai_component_request_response.yaml diff --git a/examples/llm/custom_components/__init__.py b/examples/llm/custom_components/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/llm/custom_components/llm_streaming_custom_component.py b/examples/llm/custom_components/llm_streaming_custom_component.py new file mode 100644 index 0000000..a1363d4 --- /dev/null +++ b/examples/llm/custom_components/llm_streaming_custom_component.py @@ -0,0 +1,52 @@ +# A simple pass-through component - what goes in comes out + +import sys + +sys.path.append("src") + +from solace_ai_connector.components.component_base import ComponentBase +from solace_ai_connector.common.message import Message + + +info = { + "class_name": "LlmStreamingCustomComponent", + "description": "Do a blocking LLM request/response", + "config_parameters": [ + { + "name": "llm_request_topic", + "description": "The topic to send the request to", + "type": "string", + } + ], + "input_schema": { + "type": "object", + "properties": {}, + }, + "output_schema": { + "type": "object", + "properties": {}, + }, +} + + +class LlmStreamingCustomComponent(ComponentBase): + def __init__(self, **kwargs): + super().__init__(info, **kwargs) + self.llm_request_topic = self.get_config("llm_request_topic") + + def invoke(self, message, data): + llm_message = Message(payload=data, topic=self.llm_request_topic) + for message, last_message in self.do_broker_request_response( + llm_message, + stream=True, + streaming_complete_expression="input.payload:last_chunk", + ): + text = message.get_data("input.payload:chunk") + if not text: + text = message.get_data("input.payload:content") or "no response" + if last_message: + return {"chunk": text} + self.output_streaming(message, {"chunk": text}) + + def output_streaming(self, message, data): + return self.process_post_invoke(data, message) diff --git a/examples/llm/openai_component_request_response.yaml b/examples/llm/openai_component_request_response.yaml new file mode 100644 index 0000000..5147719 --- /dev/null +++ b/examples/llm/openai_component_request_response.yaml @@ -0,0 +1,146 @@ +# This example demostrates how to use the request_response_flow_controller to +# inject another flow into an existing flow. This is commonly used when +# you want to call a service that is only accessible via the broker. +# +# Main flow: STDIN -> llm_streaming_custom_component -> STDOUT +# | +# v +# request_response +# | ^ +# v | +# Broker Broker +# +# +# LLM flow: Broker -> OpenAI -> Broker +# +# +# While this looks a bit complicated, it allows you to very easily use all +# the benefits of the broker to distribute service requests, such as load +# balancing, failover, and scaling to LLMs. +# +# It will subscribe to `demo/question` and expect an event with the payload: +# +# The input message has the following schema: +# { +# "text": "" +# } +# +# It will then send an event back to Solace with the topic: `demo/question/response` +# +# Dependencies: +# pip install -U langchain_openai openai +# +# required ENV variables: +# - OPENAI_API_KEY +# - OPENAI_API_ENDPOINT +# - OPENAI_MODEL_NAME +# - SOLACE_BROKER_URL +# - SOLACE_BROKER_USERNAME +# - SOLACE_BROKER_PASSWORD +# - SOLACE_BROKER_VPN + +--- +log: + stdout_log_level: INFO + log_file_level: DEBUG + log_file: solace_ai_connector.log + +shared_config: + - broker_config: &broker_connection + broker_type: solace + broker_url: ${SOLACE_BROKER_URL} + broker_username: ${SOLACE_BROKER_USERNAME} + broker_password: ${SOLACE_BROKER_PASSWORD} + broker_vpn: ${SOLACE_BROKER_VPN} + +# Take from input broker and publish back to Solace +flows: + # broker input processing + - name: main_flow + components: + # Input from a Solace broker + - component_name: input + component_module: stdin_input + + # Our custom component + - component_name: llm_streaming_custom_component + component_module: llm_streaming_custom_component + component_base_path: examples/llm/custom_components + component_config: + llm_request_topic: example/llm/best + broker_request_response: + enabled: true + broker_config: *broker_connection + request_expiry_ms: 60000 + payload_encoding: utf-8 + payload_format: json + input_transforms: + - type: copy + source_expression: | + template:You are a helpful AI assistant. Please help with the user's request below: + + {{text://input.payload:text}} + + dest_expression: user_data.llm_input:messages.0.content + - type: copy + source_expression: static:user + dest_expression: user_data.llm_input:messages.0.role + input_selection: + source_expression: user_data.llm_input + + # Send response to stdout + - component_name: send_response + component_module: stdout_output + component_config: + add_new_line_between_messages: false + input_selection: + source_expression: previous:chunk + + + + # The LLM flow that is accessible via the broker + - name: llm_flow + components: + # Input from a Solace broker + - component_name: solace_sw_broker + component_module: broker_input + component_config: + <<: *broker_connection + broker_queue_name: example_flow_streaming + broker_subscriptions: + - topic: example/llm/best + qos: 1 + payload_encoding: utf-8 + payload_format: json + + # Do an LLM request + - component_name: llm_request + component_module: openai_chat_model + component_config: + api_key: ${OPENAI_API_KEY} + base_url: ${OPENAI_API_ENDPOINT} + model: ${MODEL_NAME} + temperature: 0.01 + llm_mode: stream + stream_to_next_component: true + stream_batch_size: 20 + input_selection: + source_expression: input.payload + + # Send response back to broker + - component_name: send_response + component_module: broker_output + component_config: + <<: *broker_connection + payload_encoding: utf-8 + payload_format: json + copy_user_properties: true + input_transforms: + - type: copy + source_expression: previous + dest_expression: user_data.output:payload + - type: copy + source_expression: input.user_properties:__solace_ai_connector_broker_request_reply_topic__ + dest_expression: user_data.output:topic + input_selection: + source_expression: user_data.output \ No newline at end of file diff --git a/src/solace_ai_connector/common/event.py b/src/solace_ai_connector/common/event.py index cc302c2..98f4217 100644 --- a/src/solace_ai_connector/common/event.py +++ b/src/solace_ai_connector/common/event.py @@ -6,7 +6,10 @@ class EventType(Enum): MESSAGE = "message" TIMER = "timer" CACHE_EXPIRY = "cache_expiry" - # Add more event types as needed + # Add more event types as need + + def __eq__(self, other): + return self.value == other.value class Event: diff --git a/src/solace_ai_connector/components/component_base.py b/src/solace_ai_connector/components/component_base.py index e20ecae..e848183 100644 --- a/src/solace_ai_connector/components/component_base.py +++ b/src/solace_ai_connector/components/component_base.py @@ -35,6 +35,9 @@ def __init__(self, module_info, **kwargs): self.cache_service = kwargs.pop("cache_service", None) self.component_config = self.config.get("component_config") or {} + self.broker_request_response_config = self.config.get( + "broker_request_response", None + ) self.name = self.config.get("component_name", "") resolve_config_values(self.component_config) @@ -54,6 +57,7 @@ def __init__(self, module_info, **kwargs): self.validate_config() self.setup_transforms() self.setup_communications() + self.setup_broker_request_response() def create_thread_and_run(self): self.thread = threading.Thread(target=self.run) @@ -61,10 +65,6 @@ def create_thread_and_run(self): return self.thread def run(self): - # Init the request response controllers here so that we know - # the connector is fully initialized and all flows are created - self.initialize_request_response_flow_controllers() - while not self.stop_signal.is_set(): event = None try: @@ -73,6 +73,8 @@ def run(self): if self.trace_queue: self.trace_event(event) self.process_event(event) + except AssertionError as e: + raise e except Exception as e: log.error( "%sComponent has crashed: %s\n%s", @@ -273,6 +275,29 @@ def setup_communications(self): else: self.input_queue = queue.Queue(maxsize=self.queue_max_depth) + def setup_broker_request_response(self): + if ( + not self.broker_request_response_config + or not self.broker_request_response_config.get("enabled", False) + ): + self.broker_request_response_controller = None + return + broker_config = self.broker_request_response_config.get("broker_config", {}) + request_expiry_ms = self.broker_request_response_config.get( + "request_expiry_ms", 30000 + ) + if not broker_config: + raise ValueError( + f"Broker request response config not found for component {self.name}" + ) + rrc_config = { + "broker_config": broker_config, + "request_expiry_ms": request_expiry_ms, + } + self.broker_request_response_controller = RequestResponseFlowController( + config=rrc_config, connector=self.connector + ) + def setup_transforms(self): self.transforms = Transforms( self.config.get("input_transforms", []), log_identifier=self.log_identifier @@ -377,34 +402,25 @@ def cleanup(self): except queue.Empty: break - def initialize_request_response_flow_controllers(self): - request_response_flow_controllers_config = self.config.get( - "request_response_flow_controllers", [] - ) - if request_response_flow_controllers_config: - for rrfc_config in request_response_flow_controllers_config: - name = rrfc_config.get("name") - if not name: - raise ValueError( - f"Request Response Flow Controller in component {self.name} does not have a name" + # This should be used to do an on-the-fly broker request response + def do_broker_request_response( + self, message, stream=False, streaming_complete_expression=None + ): + if self.broker_request_response_controller: + if stream: + return ( + self.broker_request_response_controller.do_broker_request_response( + message, stream, streaming_complete_expression ) - - rrfc = RequestResponseFlowController( - config=rrfc_config, connector=self.connector ) - - if not rrfc: - raise ValueError( - f"Request Response Flow Controller failed to initialize in component {self.name}" + else: + generator = ( + self.broker_request_response_controller.do_broker_request_response( + message ) - - self.request_response_flow_controllers[name] = rrfc - - def get_request_response_flow_controller(self, name): - return self.request_response_flow_controllers.get(name) - - def send_request_response_flow_message(self, rrfc_name, message, data): - rrfc = self.get_request_response_flow_controller(rrfc_name) - if rrfc: - return rrfc.send_message(message, data) - return None + ) + next_message, last = next(generator, None) + return next_message + raise ValueError( + f"Broker request response controller not found for component {self.name}" + ) diff --git a/src/solace_ai_connector/components/general/openai/openai_chat_model_base.py b/src/solace_ai_connector/components/general/openai/openai_chat_model_base.py index 6a0884b..0ce6c90 100644 --- a/src/solace_ai_connector/components/general/openai/openai_chat_model_base.py +++ b/src/solace_ai_connector/components/general/openai/openai_chat_model_base.py @@ -1,9 +1,10 @@ """Base class for OpenAI chat models""" +import uuid + from openai import OpenAI from ...component_base import ComponentBase from ....common.message import Message -import uuid openai_info_base = { "class_name": "OpenAIChatModelBase", @@ -34,13 +35,29 @@ { "name": "stream_to_flow", "required": False, - "description": "Name the flow to stream the output to - this must be configured for llm_mode='stream'.", + "description": ( + "Name the flow to stream the output to - this must be configured for " + "llm_mode='stream'. This is mutually exclusive with stream_to_next_component." + ), "default": "", }, + { + "name": "stream_to_next_component", + "required": False, + "description": ( + "Whether to stream the output to the next component in the flow. " + "This is mutually exclusive with stream_to_flow." + ), + "default": False, + }, { "name": "llm_mode", "required": False, - "description": "The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response.", + "description": ( + "The mode for streaming results: 'sync' or 'stream'. 'stream' " + "will just stream the results to the named flow. 'none' will " + "wait for the full response." + ), "default": "none", }, { @@ -52,7 +69,11 @@ { "name": "set_response_uuid_in_user_properties", "required": False, - "description": "Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response.", + "description": ( + "Whether to set the response_uuid in the user_properties of the " + "input_message. This will allow other components to correlate " + "streaming chunks with the full response." + ), "default": False, "type": "boolean", }, @@ -90,8 +111,6 @@ } -import uuid - class OpenAIChatModelBase(ComponentBase): def __init__(self, module_info, **kwargs): super().__init__(module_info, **kwargs) @@ -101,16 +120,22 @@ def init(self): self.model = self.get_config("model") self.temperature = self.get_config("temperature") self.stream_to_flow = self.get_config("stream_to_flow") + self.stream_to_next_component = self.get_config("stream_to_next_component") self.llm_mode = self.get_config("llm_mode") self.stream_batch_size = self.get_config("stream_batch_size") - self.set_response_uuid_in_user_properties = self.get_config("set_response_uuid_in_user_properties") + self.set_response_uuid_in_user_properties = self.get_config( + "set_response_uuid_in_user_properties" + ) + if self.stream_to_flow and self.stream_to_next_component: + raise ValueError( + "stream_to_flow and stream_to_next_component are mutually exclusive" + ) def invoke(self, message, data): messages = data.get("messages", []) client = OpenAI( - api_key=self.get_config("api_key"), - base_url=self.get_config("base_url") + api_key=self.get_config("api_key"), base_url=self.get_config("base_url") ) if self.llm_mode == "stream": @@ -134,7 +159,7 @@ def invoke_stream(self, client, message, messages): messages=messages, model=self.model, temperature=self.temperature, - stream=True + stream=True, ): if chunk.choices[0].delta.content is not None: content = chunk.choices[0].delta.content @@ -148,11 +173,31 @@ def invoke_stream(self, client, message, messages): aggregate_result, response_uuid, first_chunk, - False + False, + ) + elif self.stream_to_next_component: + self.send_to_next_component( + message, + current_batch, + aggregate_result, + response_uuid, + first_chunk, + False, ) current_batch = "" first_chunk = False + if self.stream_to_next_component: + # Just return the last chunk + return { + "content": aggregate_result, + "chunk": current_batch, + "uuid": response_uuid, + "first_chunk": first_chunk, + "last_chunk": True, + "streaming": True, + } + if self.stream_to_flow: self.send_streaming_message( message, @@ -160,12 +205,20 @@ def invoke_stream(self, client, message, messages): aggregate_result, response_uuid, first_chunk, - True + True, ) return {"content": aggregate_result, "uuid": response_uuid} - def send_streaming_message(self, input_message, chunk, aggregate_result, response_uuid, first_chunk=False, last_chunk=False): + def send_streaming_message( + self, + input_message, + chunk, + aggregate_result, + response_uuid, + first_chunk=False, + last_chunk=False, + ): message = Message( payload={ "chunk": chunk, @@ -177,3 +230,34 @@ def send_streaming_message(self, input_message, chunk, aggregate_result, respons user_properties=input_message.get_user_properties(), ) self.send_to_flow(self.stream_to_flow, message) + + def send_to_next_component( + self, + input_message, + chunk, + aggregate_result, + response_uuid, + first_chunk=False, + last_chunk=False, + ): + message = Message( + payload={ + "chunk": chunk, + "aggregate_result": aggregate_result, + "response_uuid": response_uuid, + "first_chunk": first_chunk, + "last_chunk": last_chunk, + }, + user_properties=input_message.get_user_properties(), + ) + + result = { + "content": aggregate_result, + "chunk": chunk, + "uuid": response_uuid, + "first_chunk": first_chunk, + "last_chunk": last_chunk, + "streaming": True, + } + + self.process_post_invoke(result, message) diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_base.py b/src/solace_ai_connector/components/inputs_outputs/broker_base.py index b6fd113..fac4207 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_base.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_base.py @@ -37,9 +37,12 @@ class BrokerBase(ComponentBase): def __init__(self, module_info, **kwargs): super().__init__(module_info, **kwargs) self.broker_properties = self.get_broker_properties() - self.messaging_service = ( - MessagingServiceBuilder().from_properties(self.broker_properties).build() - ) + if self.broker_properties["broker_type"] not in ["test", "test_streaming"]: + self.messaging_service = ( + MessagingServiceBuilder() + .from_properties(self.broker_properties) + .build() + ) self.current_broker_message = None self.messages_to_ack = [] self.connected = False diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_input.py b/src/solace_ai_connector/components/inputs_outputs/broker_input.py index 841c8ee..3aabd8d 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_input.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_input.py @@ -44,7 +44,8 @@ { "name": "temporary_queue", "required": False, - "description": "Whether to create a temporary queue that will be deleted after disconnection, defaulted to True if broker_queue_name is not provided", + "description": "Whether to create a temporary queue that will be deleted " + "after disconnection, defaulted to True if broker_queue_name is not provided", "default": False, }, { diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py b/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py index 3e4a2cd..b363776 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py @@ -3,6 +3,8 @@ import threading import uuid import json +import queue +from copy import deepcopy # from typing import Dict, Any @@ -63,6 +65,19 @@ "description": "Expiry time for cached requests in milliseconds", "default": 60000, }, + { + "name": "streaming", + "required": False, + "description": "The response will arrive in multiple pieces. If True, " + "the streaming_complete_expression must be set and will be used to " + "determine when the last piece has arrived.", + }, + { + "name": "streaming_complete_expression", + "required": False, + "description": "The source expression to determine when the last piece of a " + "streaming response has arrived.", + }, ], "input_schema": { "type": "object", @@ -79,6 +94,16 @@ "type": "object", "description": "User properties to send with the request message", }, + "stream": { + "type": "boolean", + "description": "Whether this will have a streaming response", + "default": False, + }, + "streaming_complete_expression": { + "type": "string", + "description": "Expression to determine when the last piece of a " + "streaming response has arrived. Required if stream is True.", + }, }, "required": ["payload", "topic"], }, @@ -115,6 +140,11 @@ def __init__(self, **kwargs): self.reply_queue_name = f"reply-queue-{uuid.uuid4()}" self.reply_topic = f"reply/{uuid.uuid4()}" self.response_thread = None + self.streaming = self.get_config("streaming") + self.streaming_complete_expression = self.get_config( + "streaming_complete_expression" + ) + self.broker_type = self.broker_properties.get("broker_type", "solace") self.broker_properties["temporary_queue"] = True self.broker_properties["queue_name"] = self.reply_queue_name self.broker_properties["subscriptions"] = [ @@ -123,7 +153,13 @@ def __init__(self, **kwargs): "qos": 1, } ] - self.connect() + self.test_mode = False + + if self.broker_type == "solace": + self.connect() + elif self.broker_type == "test" or self.broker_type == "test_streaming": + self.test_mode = True + self.setup_test_pass_through() self.start() def start(self): @@ -135,8 +171,16 @@ def setup_reply_queue(self): self.reply_queue_name, [self.reply_topic], temporary=True ) + def setup_test_pass_through(self): + self.pass_through_queue = queue.Queue() + def start_response_thread(self): - self.response_thread = threading.Thread(target=self.handle_responses) + if self.test_mode: + self.response_thread = threading.Thread( + target=self.handle_test_pass_through + ) + else: + self.response_thread = threading.Thread(target=self.handle_responses) self.response_thread.start() def handle_responses(self): @@ -148,61 +192,76 @@ def handle_responses(self): except Exception as e: log.error("Error handling response: %s", e) + def handle_test_pass_through(self): + while not self.stop_signal.is_set(): + try: + message = self.pass_through_queue.get(timeout=1) + decoded_payload = self.decode_payload(message.get_payload()) + message.set_payload(decoded_payload) + self.process_response(message) + except queue.Empty: + continue + except Exception as e: + log.error("Error handling test passthrough: %s", e) + def process_response(self, broker_message): - payload = broker_message.get_payload_as_string() - if payload is None: - payload = broker_message.get_payload_as_bytes() - payload = self.decode_payload(payload) - topic = broker_message.get_destination_name() - user_properties = broker_message.get_properties() + if self.test_mode: + payload = broker_message.get_payload() + topic = broker_message.get_topic() + user_properties = broker_message.get_user_properties() + else: + payload = broker_message.get_payload_as_string() + if payload is None: + payload = broker_message.get_payload_as_bytes() + payload = self.decode_payload(payload) + topic = broker_message.get_destination_name() + user_properties = broker_message.get_properties() metadata_json = user_properties.get( "__solace_ai_connector_broker_request_reply_metadata__" ) if not metadata_json: - log.warning("Received response without metadata: %s", payload) + log.error("Received response without metadata: %s", payload) return try: metadata_stack = json.loads(metadata_json) except json.JSONDecodeError: - log.warning( - "Received response with invalid metadata JSON: %s", metadata_json - ) + log.error("Received response with invalid metadata JSON: %s", metadata_json) return if not metadata_stack: - log.warning("Received response with empty metadata stack: %s", payload) + log.error("Received response with empty metadata stack: %s", payload) return try: current_metadata = metadata_stack.pop() except IndexError: - log.warning( + log.error( "Received response with invalid metadata stack: %s", metadata_stack ) return request_id = current_metadata.get("request_id") if not request_id: - log.warning("Received response without request_id in metadata: %s", payload) + log.error("Received response without request_id in metadata: %s", payload) return cached_request = self.cache_service.get_data(request_id) if not cached_request: - log.warning("Received response for unknown request_id: %s", request_id) + log.error("Received response for unknown request_id: %s", request_id) return + stream = cached_request.get("stream", False) + streaming_complete_expression = cached_request.get( + "streaming_complete_expression" + ) + response = { "payload": payload, "topic": topic, "user_properties": user_properties, } - result = { - "request": cached_request, - "response": response, - } - # Update the metadata in the response if metadata_stack: response["user_properties"][ @@ -221,8 +280,23 @@ def process_response(self, broker_message): "__solace_ai_connector_broker_request_reply_topic__", None ) - self.process_post_invoke(result, Message(payload=result)) - self.cache_service.remove_data(request_id) + message = Message( + payload=payload, + user_properties=user_properties, + topic=topic, + ) + self.process_post_invoke(response, message) + + # Only remove the cache entry if this isn't a streaming response or + # if it is the last piece of a streaming response + last_piece = True + if stream and streaming_complete_expression: + is_last = message.get_data(streaming_complete_expression) + if not is_last: + last_piece = False + + if last_piece: + self.cache_service.remove_data(request_id) def invoke(self, message, data): request_id = str(uuid.uuid4()) @@ -230,6 +304,12 @@ def invoke(self, message, data): if "user_properties" not in data: data["user_properties"] = {} + stream = False + if "stream" in data: + stream = data["stream"] + if "streaming_complete_expression" in data: + streaming_complete_expression = data["streaming_complete_expression"] + metadata = {"request_id": request_id, "reply_topic": self.reply_topic} if ( @@ -266,13 +346,39 @@ def invoke(self, message, data): "__solace_ai_connector_broker_request_reply_topic__" ] = self.reply_topic - encoded_payload = self.encode_payload(data["payload"]) + if self.test_mode: + if self.broker_type == "test_streaming": + # The payload should be an array. Send one message per item in the array + if not isinstance(data["payload"], list): + raise ValueError("Payload must be a list for test_streaming broker") + for item in data["payload"]: + encoded_payload = self.encode_payload(item) + self.pass_through_queue.put( + Message( + payload=encoded_payload, + user_properties=deepcopy(data["user_properties"]), + topic=data["topic"], + ) + ) + else: + encoded_payload = self.encode_payload(data["payload"]) + self.pass_through_queue.put( + Message( + payload=encoded_payload, + user_properties=data["user_properties"], + topic=data["topic"], + ) + ) + else: + encoded_payload = self.encode_payload(data["payload"]) + self.messaging_service.send_message( + destination_name=data["topic"], + payload=encoded_payload, + user_properties=data["user_properties"], + ) - self.messaging_service.send_message( - destination_name=data["topic"], - payload=encoded_payload, - user_properties=data["user_properties"], - ) + data["stream"] = stream + data["streaming_complete_expression"] = streaming_complete_expression self.cache_service.add_data( key=request_id, diff --git a/src/solace_ai_connector/components/inputs_outputs/stdin_input.py b/src/solace_ai_connector/components/inputs_outputs/stdin_input.py index a4fb83e..53f54ec 100644 --- a/src/solace_ai_connector/components/inputs_outputs/stdin_input.py +++ b/src/solace_ai_connector/components/inputs_outputs/stdin_input.py @@ -1,5 +1,7 @@ # An input component that reads from STDIN +import threading + from copy import deepcopy from ..component_base import ComponentBase from ...common.message import Message @@ -27,16 +29,28 @@ class Stdin(ComponentBase): def __init__(self, **kwargs): super().__init__(info, **kwargs) + self.need_acknowledgement = True + self.next_input_signal = threading.Event() + self.next_input_signal.set() def get_next_message(self): + # Wait for the next input signal + self.next_input_signal.wait() + + # Reset the event for the next use + self.next_input_signal.clear() + # Get the next message from STDIN - obj = {"text": input(self.config.get("prompt", "Enter text: "))} + obj = {"text": input(self.config.get("prompt", "\nEnter text: "))} # Create and return a message object return Message(payload=obj) - # def get_input_data(self, message): - # return message.payload - def invoke(self, message, data): return deepcopy(message.get_payload()) + + def acknowledge_message(self): + self.next_input_signal.set() + + def get_acknowledgement_callback(self): + return self.acknowledge_message diff --git a/src/solace_ai_connector/components/inputs_outputs/stdout_output.py b/src/solace_ai_connector/components/inputs_outputs/stdout_output.py index 0309f1f..edba4aa 100644 --- a/src/solace_ai_connector/components/inputs_outputs/stdout_output.py +++ b/src/solace_ai_connector/components/inputs_outputs/stdout_output.py @@ -6,7 +6,15 @@ info = { "class_name": "Stdout", "description": "STDOUT output component", - "config_parameters": [], + "config_parameters": [ + { + "name": "add_new_line_between_messages", + "required": False, + "description": "Add a new line between messages", + "type": "boolean", + "default": True, + } + ], "input_schema": { "type": "object", "properties": { @@ -22,8 +30,15 @@ class Stdout(ComponentBase): def __init__(self, **kwargs): super().__init__(info, **kwargs) + self.add_newline = self.get_config("add_new_line_between_messages") def invoke(self, message, data): # Print the message to STDOUT - print(yaml.dump(data)) + if isinstance(data, dict) or isinstance(data, list): + print(yaml.dump(data)) + else: + print(data, end="") + if self.add_newline: + print() + return data diff --git a/src/solace_ai_connector/flow/request_response_flow_controller.py b/src/solace_ai_connector/flow/request_response_flow_controller.py index 373f2ec..36dda71 100644 --- a/src/solace_ai_connector/flow/request_response_flow_controller.py +++ b/src/solace_ai_connector/flow/request_response_flow_controller.py @@ -14,8 +14,8 @@ - name: example_controller flow_name: llm_flow streaming: true - streaming_last_message_expression: input.payload:streaming.last_message - timeout_ms: 300000 + streaming_complete_expression: input.payload:streaming.last_message + request_expiry_ms: 300000 ``` """ @@ -42,23 +42,31 @@ class RequestResponseFlowController: def __init__(self, config: Dict[str, Any], connector): self.config = config self.connector = connector - self.flow_name = config.get("flow_name") - self.streaming = config.get("streaming", False) - self.streaming_last_message_expression = config.get( - "streaming_last_message_expression" - ) - self.timeout_s = config.get("timeout_ms", 30000) / 1000 + self.broker_config = config.get("broker_config") + self.request_expiry_ms = config.get("request_expiry_ms", 300000) + self.request_expiry_s = self.request_expiry_ms / 1000 self.input_queue = None self.response_queue = None self.enqueue_time = None self.request_outstanding = False - flow = connector.get_flow(self.flow_name) - - if not flow: - raise ValueError(f"Flow {self.flow_name} not found") - - self.setup_queues(flow) + self.flow = self.create_broker_request_response_flow() + self.setup_queues(self.flow) + self.flow.run() + + def create_broker_request_response_flow(self): + self.broker_config["request_expiry_ms"] = self.request_expiry_ms + config = { + "name": "_internal_broker_request_response_flow", + "components": [ + { + "component_name": "_internal_broker_request_response", + "component_module": "broker_request_response", + "component_config": self.broker_config, + } + ], + } + return self.connector.create_flow(flow=config, index=0, flow_instance_index=0) def setup_queues(self, flow): # Input queue to send the message to the flow @@ -69,62 +77,80 @@ def setup_queues(self, flow): rrcComponent = RequestResponseControllerOuputComponent(self) flow.set_next_component(rrcComponent) - def send_message(self, message: Message, data: Any): + def do_broker_request_response( + self, request_message, stream=False, streaming_complete_expression=None + ): + # Send the message to the broker + self.send_message(request_message, stream, streaming_complete_expression) + + # Now we will wait for the response + now = time.time() + elapsed_time = now - self.enqueue_time + remaining_timeout = self.request_expiry_s - elapsed_time + if stream: + # If we are in streaming mode, we will return individual messages + # until we receive the last message. Use the expression to determine + # if this is the last message + while True: + try: + event = self.response_queue.get(timeout=remaining_timeout) + if event.event_type == EventType.MESSAGE: + message = event.data + last_message = message.get_data(streaming_complete_expression) + yield message, last_message + if last_message: + return + except queue.Empty: + if (time.time() - self.enqueue_time) > self.request_expiry_s: + raise TimeoutError( # pylint: disable=raise-missing-from + "Timeout waiting for response" + ) + except Exception as e: + raise e + + now = time.time() + elapsed_time = now - self.enqueue_time + remaining_timeout = self.request_expiry_s - elapsed_time + + # If we are not in streaming mode, we will return a single message + # and then stop the iterator + try: + event = self.response_queue.get(timeout=remaining_timeout) + if event.event_type == EventType.MESSAGE: + message = event.data + yield message, True + return + except queue.Empty: + if (time.time() - self.enqueue_time) > self.request_expiry_s: + raise TimeoutError( # pylint: disable=raise-missing-from + "Timeout waiting for response" + ) + except Exception as e: + raise e + + def send_message( + self, message: Message, stream=False, streaming_complete_expression=None + ): # Make a new message, but copy the data from the original message - payload = message.get_payload() - topic = message.get_topic() - user_properties = message.get_user_properties() - new_message = Message( - payload=payload, topic=topic, user_properties=user_properties - ) - new_message.set_previous(data) - if not self.input_queue: - raise ValueError(f"Input queue for flow {self.flow_name} not found") + raise ValueError(f"Input queue for flow {self.flow.name} not found") + + # Need to set the previous object to the required input for the + # broker_request_response component + message.set_previous( + { + "payload": message.get_payload(), + "user_properties": message.get_user_properties(), + "topic": message.get_topic(), + "stream": stream, + "streaming_complete_expression": streaming_complete_expression, + }, + ) - event = Event(EventType.MESSAGE, new_message) + event = Event(EventType.MESSAGE, message) self.enqueue_time = time.time() self.request_outstanding = True self.input_queue.put(event) - return self.response_iterator - - def response_iterator(self): - while True: - now = time.time() - elapsed_time = now - self.enqueue_time - remaining_timeout = self.timeout_s - elapsed_time - if self.streaming: - # If we are in streaming mode, we will return individual messages - # until we receive the last message. Use the expression to determine - # if this is the last message - while True: - try: - event = self.response_queue.get(timeout=remaining_timeout) - if event.event_type == EventType.MESSAGE: - message = event.data - yield message, message.get_previous() - if self.streaming_last_message_expression: - last_message = message.get_data( - self.streaming_last_message_expression - ) - if last_message: - return - except queue.Empty: - if (time.time() - self.enqueue_time) > self.timeout_s: - raise TimeoutError("Timeout waiting for response") - - else: - # If we are not in streaming mode, we will return a single message - # and then stop the iterator - try: - event = self.response_queue.get(timeout=remaining_timeout) - if event.event_type == EventType.MESSAGE: - message = event.data - yield message, message.get_previous() - return - except queue.Empty: - if (time.time() - self.enqueue_time) > self.timeout_s: - raise TimeoutError("Timeout waiting for response") def enqueue_response(self, event): self.response_queue.put(event) diff --git a/src/solace_ai_connector/solace_ai_connector.py b/src/solace_ai_connector/solace_ai_connector.py index 7c4a688..40240cc 100644 --- a/src/solace_ai_connector/solace_ai_connector.py +++ b/src/solace_ai_connector/solace_ai_connector.py @@ -100,15 +100,6 @@ def wait_for_flows(self): self.stop() self.cleanup() - def stop(self): - """Stop the Solace AI Event Connector""" - log.info("Stopping Solace AI Event Connector") - self.stop_signal.set() - self.timer_manager.stop() # Stop the timer manager first - self.wait_for_flows() - if self.trace_thread: - self.trace_thread.join() - def cleanup(self): """Clean up resources and ensure all threads are properly joined""" log.info("Cleaning up Solace AI Event Connector") diff --git a/tests/test_request_response_controller.py b/tests/test_request_response_controller.py index 5719bad..6ca6073 100644 --- a/tests/test_request_response_controller.py +++ b/tests/test_request_response_controller.py @@ -15,17 +15,17 @@ def test_request_response_flow_controller_basic(): """Test basic functionality of the RequestResponseFlowController""" def test_invoke_handler(component, message, _data): - # Call the request_response_flow - data_iter = component.send_request_response_flow_message( - "test_controller", message, {"test": "data"} - ) - - # Just a single message with no streaming - for message, _data in data_iter(): - assert message.get_data("previous") == {"test": "data"} - assert message.get_data("input.payload") == {"text": "Hello, World!"} - - return "done" + # Call the request_response + message = component.do_broker_request_response(message) + try: + assert message.get_data("previous") == { + "payload": {"text": "Hello, World!"}, + "topic": None, + "user_properties": {}, + } + except AssertionError as e: + return e + return "Pass" config = { "flows": [ @@ -38,23 +38,22 @@ def test_invoke_handler(component, message, _data): "component_config": { "invoke_handler": test_invoke_handler, }, - "request_response_flow_controllers": [ - { - "name": "test_controller", - "flow_name": "request_response_flow", - "timeout_ms": 500000, - } - ], + "broker_request_response": { + "enabled": True, + "broker_config": { + "broker_type": "test", + "broker_url": "test", + "broker_username": "test", + "broker_password": "test", + "broker_vpn": "test", + "payload_encoding": "utf-8", + "payload_format": "json", + }, + "request_expiry_ms": 500000, + }, } ], }, - { - "name": "request_response_flow", - "test_ignore": True, - "components": [ - {"component_name": "responder", "component_module": "pass_through"} - ], - }, ] } connector, flows = create_test_flows(config) @@ -69,7 +68,17 @@ def test_invoke_handler(component, message, _data): # Get the output message output_message = get_message_from_flow(test_flow) - assert output_message.get_data("previous") == "done" + result = output_message.get_data("previous") + + # if the result is an AssertionError, then raise it + if isinstance(result, AssertionError): + raise result + + assert result == "Pass" + + except Exception as e: + print(e) + assert False finally: dispose_connector(connector) @@ -81,24 +90,22 @@ def test_request_response_flow_controller_streaming(): """Test streaming functionality of the RequestResponseFlowController""" def test_invoke_handler(component, message, data): - # Call the request_response_flow - data_iter = component.send_request_response_flow_message( - "test_controller", - message, - [ - {"test": "data1", "streaming": {"last_message": False}}, - {"test": "data2", "streaming": {"last_message": False}}, - {"test": "data3", "streaming": {"last_message": True}}, - ], - ) - - # Expecting 3 messages - results = [] - for message, data in data_iter(): - results.append(data.get("test")) + result = [] + for message, last_message in component.do_broker_request_response( + message, stream=True, streaming_complete_expression="input.payload:last" + ): + payload = message.get_data("input.payload") + result.append(payload) + if last_message: + assert payload == {"text": "Chunk3", "last": True} + + assert result == [ + {"text": "Chunk1", "last": False}, + {"text": "Chunk2", "last": False}, + {"text": "Chunk3", "last": True}, + ] - assert results == ["data1", "data2", "data3"] - return "done" + return "Pass" config = { "flows": [ @@ -111,25 +118,22 @@ def test_invoke_handler(component, message, data): "component_config": { "invoke_handler": test_invoke_handler, }, - "request_response_flow_controllers": [ - { - "name": "test_controller", - "flow_name": "request_response_flow", - "streaming": True, - "streaming_last_message_expression": "previous:streaming.last_message", - "timeout_ms": 500000, - } - ], + "broker_request_response": { + "enabled": True, + "broker_config": { + "broker_type": "test_streaming", + "broker_url": "test", + "broker_username": "test", + "broker_password": "test", + "broker_vpn": "test", + "payload_encoding": "utf-8", + "payload_format": "json", + }, + "request_expiry_ms": 500000, + }, } ], }, - { - "name": "request_response_flow", - "test_ignore": True, - "components": [ - {"component_name": "responder", "component_module": "iterate"} - ], - }, ] } connector, flows = create_test_flows(config) @@ -139,12 +143,21 @@ def test_invoke_handler(component, message, data): try: # Send a message to the input flow - send_message_to_flow(test_flow, Message(payload={"text": "Hello, World!"})) + send_message_to_flow( + test_flow, + Message( + payload=[ + {"text": "Chunk1", "last": False}, + {"text": "Chunk2", "last": False}, + {"text": "Chunk3", "last": True}, + ] + ), + ) # Get the output message output_message = get_message_from_flow(test_flow) - assert output_message.get_data("previous") == "done" + assert output_message.get_data("previous") == "Pass" except Exception as e: print(e) @@ -159,19 +172,29 @@ def test_request_response_flow_controller_timeout(): """Test timeout functionality of the RequestResponseFlowController""" def test_invoke_handler(component, message, data): - # Call the request_response_flow - data_iter = component.send_request_response_flow_message( - "test_controller", message, {"test": "data"} - ) - - # This will timeout + # # Call the request_response_flow + # data_iter = component.send_request_response_flow_message( + # "test_controller", message, {"test": "data"} + # ) + + # # This will timeout + # try: + # for message, data, _last_message in data_iter(): + # assert message.get_data("previous") == {"test": "data"} + # assert message.get_data("input.payload") == {"text": "Hello, World!"} + # except TimeoutError: + # return "timeout" + # return "done" + + # Do it the new way try: - for message, data in data_iter(): - assert message.get_data("previous") == {"test": "data"} - assert message.get_data("input.payload") == {"text": "Hello, World!"} + for message, _last_message in component.do_broker_request_response( + message, stream=True, streaming_complete_expression="input.payload:last" + ): + pass except TimeoutError: - return "timeout" - return "done" + return "Timeout" + return "Fail" config = { "flows": [ @@ -184,25 +207,18 @@ def test_invoke_handler(component, message, data): "component_config": { "invoke_handler": test_invoke_handler, }, - "request_response_flow_controllers": [ - { - "name": "test_controller", - "flow_name": "request_response_flow", - "timeout_ms": 1000, - } - ], - } - ], - }, - { - "name": "request_response_flow", - "test_ignore": True, - "components": [ - { - "component_name": "responder", - "component_module": "delay", - "component_config": { - "delay": 5, + "broker_request_response": { + "enabled": True, + "broker_config": { + "broker_type": "test_streaming", + "broker_url": "test", + "broker_username": "test", + "broker_password": "test", + "broker_vpn": "test", + "payload_encoding": "utf-8", + "payload_format": "json", + }, + "request_expiry_ms": 2000, }, } ], @@ -215,13 +231,14 @@ def test_invoke_handler(component, message, data): try: - # Send a message to the input flow - send_message_to_flow(test_flow, Message(payload={"text": "Hello, World!"})) + # Send a message with an empty list in the payload to the test_streaming broker type + # This will not send any chunks and should timeout + send_message_to_flow(test_flow, Message(payload=[])) # Get the output message output_message = get_message_from_flow(test_flow) - assert output_message.get_data("previous") == "timeout" + assert output_message.get_data("previous") == "Timeout" finally: dispose_connector(connector) From 516163757f495dbe400c162d242da67fb382a67d Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Sun, 22 Sep 2024 10:31:24 -0400 Subject: [PATCH 18/30] docs: add broker request-response configuration --- docs/configuration.md | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index 8c2b511..63544d1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -238,6 +238,34 @@ Each component configuration is a dictionary with the following keys: - `input_selection`: - A `source_expression` or `source_value` to use as the input to the component. Check [Expression Syntax](#expression-syntax) for more details. [Optional: If not specified, the complete previous component output will be used] - `queue_depth`: - The depth of the input queue for the component. - `num_instances`: - The number of instances of the component to run (Starts multiple threads to process messages) +- `broker_request_response`: - Configuration for the broker request-response functionality. [Optional] + +### Broker Request-Response Configuration + +The `broker_request_response` configuration allows components to perform request-response operations with a broker. It has the following structure: + +```yaml +broker_request_response: + enabled: + broker_config: + broker_type: + broker_url: + broker_username: + broker_password: + broker_vpn: + request_expiry_ms: +``` + +- `enabled`: Set to `true` to enable broker request-response functionality for the component. +- `broker_config`: Configuration for the broker connection. + - `broker_type`: Type of the broker (e.g., "solace"). + - `broker_url`: URL of the broker. + - `broker_username`: Username for broker authentication. + - `broker_password`: Password for broker authentication. + - `broker_vpn`: VPN name for the broker connection. +- `request_expiry_ms`: Expiry time for requests in milliseconds. + +For more details on using this functionality, see the [Advanced Component Features](advanced_component_features.md#broker-request-response) documentation. ### component_module From 38797b7d77b891f725ee8cf151667765606e2a88 Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Sun, 22 Sep 2024 10:36:45 -0400 Subject: [PATCH 19/30] docs: added advanced_component_features.md --- docs/advanced_component_features.md | 50 +++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 docs/advanced_component_features.md diff --git a/docs/advanced_component_features.md b/docs/advanced_component_features.md new file mode 100644 index 0000000..742fc92 --- /dev/null +++ b/docs/advanced_component_features.md @@ -0,0 +1,50 @@ +# Advanced Component Features + +This document describes advanced features available to custom components in the Solace AI Connector. + +## Table of Contents +- [Broker Request-Response](#broker-request-response) +- [Cache Manager](#cache-manager) +- [Timer Features](#timer-features) + +## Broker Request-Response + +Components can perform a request and get a response from the broker using the `do_broker_request_response` method. This method supports both simple request-response and streamed responses. + +### Usage + +```python +response = self.do_broker_request_response(message, stream=False) +``` + +For streamed responses: + +```python +for chunk, is_last in self.do_broker_request_response(message, stream=True, streaming_complete_expression="input.payload:streaming.last_message"): + # Process each chunk + if is_last: + break +``` + +### Parameters + +- `message`: The message to send to the broker. This must have a topic and payload. +- `stream` (optional): Boolean indicating whether to expect a streamed response. Default is False. +- `streaming_complete_expression` (optional): An expression to evaluate on each response chunk to determine if it's the last one. This is required when `stream=True`. + +### Return Value + +- For non-streamed responses: Returns the response message. +- For streamed responses: Returns a generator that yields tuples of (chunk, is_last). Each chunk is a fully formed message with the format of the response. `is_last` is a boolean indicating if the chunk is the last one. + +## Memory Cache + + +Describe the cache service features based on the implementation in the cache_service.py file. + + +## Timer Features + + +Describe the timer features based on the implementation in the timer_manager.py file. + From cefb69a0e68799934be2ad97d82fdb7bfb5f9c4c Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Sun, 22 Sep 2024 10:44:48 -0400 Subject: [PATCH 20/30] docs: add broker request-response configuration details --- docs/configuration.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index 63544d1..399e3f1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -387,6 +387,33 @@ The `queue_depth` is an integer that specifies the depth of the input queue for The `num_instances` is an integer that specifies the number of instances of the component to run. This is the number of threads that will be started to process messages from the input queue. By default, the number of instances is 1. +### Broker Request-Response Configuration + +The `broker_request_response` configuration allows components to perform request-response operations with a broker. It has the following structure: + +```yaml +broker_request_response: + enabled: + broker_config: + broker_type: + broker_url: + broker_username: + broker_password: + broker_vpn: + request_expiry_ms: +``` + +- `enabled`: Set to `true` to enable broker request-response functionality for the component. +- `broker_config`: Configuration for the broker connection. + - `broker_type`: Type of the broker (e.g., "solace"). + - `broker_url`: URL of the broker. + - `broker_username`: Username for broker authentication. + - `broker_password`: Password for broker authentication. + - `broker_vpn`: VPN name for the broker connection. +- `request_expiry_ms`: Expiry time for requests in milliseconds. + +For more details on using this functionality, see the [Advanced Component Features](advanced_component_features.md#broker-request-response) documentation. + ### Built-in components The AI Event Connector comes with a number of built-in components that can be used to process messages. For a list of all built-in components, see the [Components](components/index.md) documentation. From 577991fa426b03b08fc03d30e119564105898141 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Sun, 22 Sep 2024 10:46:08 -0400 Subject: [PATCH 21/30] docs: add payload encoding and format to broker config --- docs/configuration.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index 399e3f1..4d53404 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -253,6 +253,8 @@ broker_request_response: broker_username: broker_password: broker_vpn: + payload_encoding: + payload_format: request_expiry_ms: ``` @@ -263,6 +265,8 @@ broker_request_response: - `broker_username`: Username for broker authentication. - `broker_password`: Password for broker authentication. - `broker_vpn`: VPN name for the broker connection. + - `payload_encoding`: Encoding for the payload (e.g., "utf-8", "base64"). + - `payload_format`: Format of the payload (e.g., "json", "text"). - `request_expiry_ms`: Expiry time for requests in milliseconds. For more details on using this functionality, see the [Advanced Component Features](advanced_component_features.md#broker-request-response) documentation. From 1e8e7b4b6ad64dd3cbbd83e42b95b1e07c892475 Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Sun, 22 Sep 2024 10:47:28 -0400 Subject: [PATCH 22/30] docs: add cache service and timer manager to advanced_component_features.md --- docs/advanced_component_features.md | 80 ++++++++++++++++++++++++++--- 1 file changed, 74 insertions(+), 6 deletions(-) diff --git a/docs/advanced_component_features.md b/docs/advanced_component_features.md index 742fc92..d8a9681 100644 --- a/docs/advanced_component_features.md +++ b/docs/advanced_component_features.md @@ -39,12 +39,80 @@ for chunk, is_last in self.do_broker_request_response(message, stream=True, stre ## Memory Cache - -Describe the cache service features based on the implementation in the cache_service.py file. - +The cache service provides a flexible way to store and retrieve data with optional expiration. It supports different storage backends and offers features like automatic expiry checks. + +### Features + +1. Multiple storage backends: + - In-memory storage + - SQLAlchemy-based storage (for persistent storage) + +2. Key-value storage with metadata and expiry support +3. Automatic expiry checks in a background thread +4. Thread-safe operations + +### Usage + +Components can access the cache service through `self.cache_service`. Here are some common operations: + +```python +# Set a value with expiry +self.cache_service.set("key", "value", expiry=300) # Expires in 300 seconds + +# Get a value +value = self.cache_service.get("key") + +# Delete a value +self.cache_service.delete("key") + +# Get all values (including metadata and expiry) +all_data = self.cache_service.get_all() +``` + +### Configuration + +The cache service can be configured in the main configuration file: + +```yaml +cache: + backend: "memory" # or "sqlalchemy" + connection_string: "sqlite:///cache.db" # for SQLAlchemy backend +``` ## Timer Features - -Describe the timer features based on the implementation in the timer_manager.py file. - +The timer manager allows components to schedule one-time or recurring timer events. This is useful for implementing delayed actions, periodic tasks, or timeouts. + +### Features + +1. One-time and recurring timers +2. Customizable timer IDs for easy management +3. Optional payloads for timer events + +### Usage + +Components can access the timer manager through `self.timer_manager`. Here are some common operations: + +```python +# Add a one-time timer +self.add_timer(delay_ms=5000, timer_id="my_timer", payload={"key": "value"}) + +# Add a recurring timer +self.add_timer(delay_ms=5000, timer_id="recurring_timer", interval_ms=10000, payload={"type": "recurring"}) + +# Cancel a timer +self.cancel_timer(timer_id="my_timer") +``` + +### Handling Timer Events + +To handle timer events, components should implement the `handle_timer_event` method: + +```python +def handle_timer_event(self, timer_data): + timer_id = timer_data["timer_id"] + payload = timer_data["payload"] + # Process the timer event +``` + +Timer events are automatically dispatched to the appropriate component by the timer manager. \ No newline at end of file From bc0d67fa43a35058ba07516c5b5384f2848e1435 Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Sun, 22 Sep 2024 10:49:06 -0400 Subject: [PATCH 23/30] docs: add configuration requirement for broker request-response --- docs/advanced_component_features.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/advanced_component_features.md b/docs/advanced_component_features.md index d8a9681..84ac4af 100644 --- a/docs/advanced_component_features.md +++ b/docs/advanced_component_features.md @@ -9,6 +9,10 @@ This document describes advanced features available to custom components in the ## Broker Request-Response + +This section needs to mention that the component's configuration must include a broker_request_response section. Add a link to the appropriate section in the configuration documentation. + + Components can perform a request and get a response from the broker using the `do_broker_request_response` method. This method supports both simple request-response and streamed responses. ### Usage From 1efbdcb0797f6a3b8df2118d0eddd3355d0a98c0 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Sun, 22 Sep 2024 10:49:07 -0400 Subject: [PATCH 24/30] docs: update broker request-response section with configuration info --- docs/advanced_component_features.md | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/docs/advanced_component_features.md b/docs/advanced_component_features.md index 84ac4af..c11f0a7 100644 --- a/docs/advanced_component_features.md +++ b/docs/advanced_component_features.md @@ -9,11 +9,7 @@ This document describes advanced features available to custom components in the ## Broker Request-Response - -This section needs to mention that the component's configuration must include a broker_request_response section. Add a link to the appropriate section in the configuration documentation. - - -Components can perform a request and get a response from the broker using the `do_broker_request_response` method. This method supports both simple request-response and streamed responses. +Components can perform a request and get a response from the broker using the `do_broker_request_response` method. This method supports both simple request-response and streamed responses. To use this feature, the component's configuration must include a `broker_request_response` section. For details on how to configure this section, refer to the [Broker Request-Response Configuration](configuration.md#broker-request-response-configuration) in the configuration documentation. ### Usage @@ -119,4 +115,4 @@ def handle_timer_event(self, timer_data): # Process the timer event ``` -Timer events are automatically dispatched to the appropriate component by the timer manager. \ No newline at end of file +Timer events are automatically dispatched to the appropriate component by the timer manager. From 5319ef355a5d73131331fad1482f2381e3d2e08c Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Sun, 22 Sep 2024 10:52:24 -0400 Subject: [PATCH 25/30] docs: a bit more detail about do_broker_request_response --- docs/advanced_component_features.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/advanced_component_features.md b/docs/advanced_component_features.md index c11f0a7..7053ff6 100644 --- a/docs/advanced_component_features.md +++ b/docs/advanced_component_features.md @@ -11,6 +11,8 @@ This document describes advanced features available to custom components in the Components can perform a request and get a response from the broker using the `do_broker_request_response` method. This method supports both simple request-response and streamed responses. To use this feature, the component's configuration must include a `broker_request_response` section. For details on how to configure this section, refer to the [Broker Request-Response Configuration](configuration.md#broker-request-response-configuration) in the configuration documentation. +This feature would be used in the invoke method of a custom component. When the `do_broker_request_response` method is called, the component will send a message to the broker and then block until a response (or a series of streamed chunks) is received. This makes it very easy to call services that are available via the broker. + ### Usage ```python From 9347eb93fa5ed23473b813cb95aaffa0e053f557 Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Sun, 22 Sep 2024 10:54:03 -0400 Subject: [PATCH 26/30] docs: add link to advanced features page in table of contents --- docs/index.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/index.md b/docs/index.md index 6450165..32f33c0 100644 --- a/docs/index.md +++ b/docs/index.md @@ -4,6 +4,10 @@ This connector application makes it easy to connect your AI/ML models to Solace ## Table of Contents + +add link to the advanced features page + + - [Overview](overview.md) - [Getting Started](getting_started.md) - [Configuration](configuration.md) From 47c2c40393b66db0939cdbd1b08eed5ddb58a072 Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Sun, 22 Sep 2024 10:54:04 -0400 Subject: [PATCH 27/30] docs: add link to advanced features page --- docs/index.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/docs/index.md b/docs/index.md index 32f33c0..f6a3d51 100644 --- a/docs/index.md +++ b/docs/index.md @@ -4,11 +4,8 @@ This connector application makes it easy to connect your AI/ML models to Solace ## Table of Contents - -add link to the advanced features page - - - [Overview](overview.md) +- [Advanced Component Features](advanced_component_features.md) - [Getting Started](getting_started.md) - [Configuration](configuration.md) - [Components](components/index.md) From ae17c3d74ed12ba812b920c9c62d8c089b8a8e8a Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Sun, 22 Sep 2024 10:57:57 -0400 Subject: [PATCH 28/30] docs: reorder table of contents in index.md --- docs/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/index.md b/docs/index.md index f6a3d51..3dab2f7 100644 --- a/docs/index.md +++ b/docs/index.md @@ -5,11 +5,11 @@ This connector application makes it easy to connect your AI/ML models to Solace ## Table of Contents - [Overview](overview.md) -- [Advanced Component Features](advanced_component_features.md) - [Getting Started](getting_started.md) - [Configuration](configuration.md) - [Components](components/index.md) - [Transforms](transforms/index.md) +- [Advanced Component Features](advanced_component_features.md) - [Tips and Tricks](tips_and_tricks.md) - [Examples](../examples/) - [Contributing](../CONTRIBUTING.md) From 959c743dea5308e92fbaae4eaeabb70636426c9f Mon Sep 17 00:00:00 2001 From: "Edward Funnekotter (aider)" Date: Sun, 22 Sep 2024 10:58:04 -0400 Subject: [PATCH 29/30] docs: add custom components documentation --- docs/custom_components.md | 83 +++++++++++++++++++++++++++++++++++++++ docs/index.md | 1 + 2 files changed, 84 insertions(+) create mode 100644 docs/custom_components.md diff --git a/docs/custom_components.md b/docs/custom_components.md new file mode 100644 index 0000000..cc71bb8 --- /dev/null +++ b/docs/custom_components.md @@ -0,0 +1,83 @@ +# Custom Components + +## Purpose + +Custom components provide a way to extend the functionality of the Solace AI Connector beyond what's possible with the built-in components and configuration options. Sometimes, it's easier and more efficient to add custom code than to build a complex configuration file, especially for specialized or unique processing requirements. + +## Requirements of a Custom Component + +To create a custom component, you need to follow these requirements: + +1. **Inherit from ComponentBase**: Your custom component class should inherit from the `ComponentBase` class. + +2. **Info Section**: Define an `info` dictionary with the following keys: + - `class_name`: The name of your custom component class. + - `config_parameters`: A list of dictionaries describing the configuration parameters for your component. + - `input_schema`: A dictionary describing the expected input schema. + - `output_schema`: A dictionary describing the expected output schema. + +3. **Implement the `invoke` method**: This is the main method where your component's logic will be implemented. + +Here's a basic template for a custom component: + +```python +from solace_ai_connector.components.component_base import ComponentBase + +info = { + "class_name": "MyCustomComponent", + "config_parameters": [ + { + "name": "my_param", + "type": "string", + "required": True, + "description": "A custom parameter" + } + ], + "input_schema": { + "type": "object", + "properties": { + "input_data": {"type": "string"} + } + }, + "output_schema": { + "type": "object", + "properties": { + "output_data": {"type": "string"} + } + } +} + +class MyCustomComponent(ComponentBase): + def __init__(self, **kwargs): + super().__init__(info, **kwargs) + self.my_param = self.get_config("my_param") + + def invoke(self, message, data): + # Your custom logic here + result = f"{self.my_param}: {data['input_data']}" + return {"output_data": result} +``` + +## Overrideable Methods + +While the `invoke` method is the main one you'll implement, there are several other methods you can override to customize your component's behavior: + +1. `invoke(self, message, data)`: The main processing method for your component. +2. `get_next_event(self)`: Customize how your component receives events. +3. `send_message(self, message)`: Customize how your component sends messages to the next component. +4. `handle_timer_event(self, timer_data)`: Handle timer events if your component uses timers. +5. `handle_cache_expiry_event(self, timer_data)`: Handle cache expiry events if your component uses the cache service. +6. `process_pre_invoke(self, message)`: Customize preprocessing before `invoke` is called. +7. `process_post_invoke(self, result, message)`: Customize postprocessing after `invoke` is called. + +## Advanced Features + +Custom components can take advantage of advanced features provided by the Solace AI Connector. These include: + +- Broker request-response functionality +- Cache services +- Timer management + +For more information on these advanced features and how to use them in your custom components, please refer to the [Advanced Component Features](advanced_component_features.md) documentation. + +By creating custom components, you can extend the Solace AI Connector to meet your specific needs while still benefiting from the framework's built-in capabilities for event processing, flow management, and integration with Solace event brokers. diff --git a/docs/index.md b/docs/index.md index 3dab2f7..c669132 100644 --- a/docs/index.md +++ b/docs/index.md @@ -10,6 +10,7 @@ This connector application makes it easy to connect your AI/ML models to Solace - [Components](components/index.md) - [Transforms](transforms/index.md) - [Advanced Component Features](advanced_component_features.md) +- [Custom Components](custom_components.md) - [Tips and Tricks](tips_and_tricks.md) - [Examples](../examples/) - [Contributing](../CONTRIBUTING.md) From 44fffec74c2984dc96b01575813dc27716c37c9b Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Sun, 22 Sep 2024 11:00:51 -0400 Subject: [PATCH 30/30] docs: Remove advanced component features from table of contents --- docs/index.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/index.md b/docs/index.md index c669132..5fe2423 100644 --- a/docs/index.md +++ b/docs/index.md @@ -9,7 +9,6 @@ This connector application makes it easy to connect your AI/ML models to Solace - [Configuration](configuration.md) - [Components](components/index.md) - [Transforms](transforms/index.md) -- [Advanced Component Features](advanced_component_features.md) - [Custom Components](custom_components.md) - [Tips and Tricks](tips_and_tricks.md) - [Examples](../examples/)