diff --git a/docs/advanced_component_features.md b/docs/advanced_component_features.md new file mode 100644 index 0000000..7053ff6 --- /dev/null +++ b/docs/advanced_component_features.md @@ -0,0 +1,120 @@ +# Advanced Component Features + +This document describes advanced features available to custom components in the Solace AI Connector. + +## Table of Contents +- [Broker Request-Response](#broker-request-response) +- [Cache Manager](#cache-manager) +- [Timer Features](#timer-features) + +## Broker Request-Response + +Components can perform a request and get a response from the broker using the `do_broker_request_response` method. This method supports both simple request-response and streamed responses. To use this feature, the component's configuration must include a `broker_request_response` section. For details on how to configure this section, refer to the [Broker Request-Response Configuration](configuration.md#broker-request-response-configuration) in the configuration documentation. + +This feature would be used in the invoke method of a custom component. When the `do_broker_request_response` method is called, the component will send a message to the broker and then block until a response (or a series of streamed chunks) is received. This makes it very easy to call services that are available via the broker. + +### Usage + +```python +response = self.do_broker_request_response(message, stream=False) +``` + +For streamed responses: + +```python +for chunk, is_last in self.do_broker_request_response(message, stream=True, streaming_complete_expression="input.payload:streaming.last_message"): + # Process each chunk + if is_last: + break +``` + +### Parameters + +- `message`: The message to send to the broker. This must have a topic and payload. +- `stream` (optional): Boolean indicating whether to expect a streamed response. Default is False. +- `streaming_complete_expression` (optional): An expression to evaluate on each response chunk to determine if it's the last one. This is required when `stream=True`. + +### Return Value + +- For non-streamed responses: Returns the response message. +- For streamed responses: Returns a generator that yields tuples of (chunk, is_last). Each chunk is a fully formed message with the format of the response. `is_last` is a boolean indicating if the chunk is the last one. + +## Memory Cache + +The cache service provides a flexible way to store and retrieve data with optional expiration. It supports different storage backends and offers features like automatic expiry checks. + +### Features + +1. Multiple storage backends: + - In-memory storage + - SQLAlchemy-based storage (for persistent storage) + +2. Key-value storage with metadata and expiry support +3. Automatic expiry checks in a background thread +4. Thread-safe operations + +### Usage + +Components can access the cache service through `self.cache_service`. Here are some common operations: + +```python +# Set a value with expiry +self.cache_service.set("key", "value", expiry=300) # Expires in 300 seconds + +# Get a value +value = self.cache_service.get("key") + +# Delete a value +self.cache_service.delete("key") + +# Get all values (including metadata and expiry) +all_data = self.cache_service.get_all() +``` + +### Configuration + +The cache service can be configured in the main configuration file: + +```yaml +cache: + backend: "memory" # or "sqlalchemy" + connection_string: "sqlite:///cache.db" # for SQLAlchemy backend +``` + +## Timer Features + +The timer manager allows components to schedule one-time or recurring timer events. This is useful for implementing delayed actions, periodic tasks, or timeouts. + +### Features + +1. One-time and recurring timers +2. Customizable timer IDs for easy management +3. Optional payloads for timer events + +### Usage + +Components can access the timer manager through `self.timer_manager`. Here are some common operations: + +```python +# Add a one-time timer +self.add_timer(delay_ms=5000, timer_id="my_timer", payload={"key": "value"}) + +# Add a recurring timer +self.add_timer(delay_ms=5000, timer_id="recurring_timer", interval_ms=10000, payload={"type": "recurring"}) + +# Cancel a timer +self.cancel_timer(timer_id="my_timer") +``` + +### Handling Timer Events + +To handle timer events, components should implement the `handle_timer_event` method: + +```python +def handle_timer_event(self, timer_data): + timer_id = timer_data["timer_id"] + payload = timer_data["payload"] + # Process the timer event +``` + +Timer events are automatically dispatched to the appropriate component by the timer manager. diff --git a/docs/configuration.md b/docs/configuration.md index 8c2b511..9394aa8 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -238,7 +238,7 @@ Each component configuration is a dictionary with the following keys: - `input_selection`: - A `source_expression` or `source_value` to use as the input to the component. Check [Expression Syntax](#expression-syntax) for more details. [Optional: If not specified, the complete previous component output will be used] - `queue_depth`: - The depth of the input queue for the component. - `num_instances`: - The number of instances of the component to run (Starts multiple threads to process messages) - +- `broker_request_response`: - Configuration for the broker request-response functionality. [Optional] ### component_module @@ -359,6 +359,37 @@ The `queue_depth` is an integer that specifies the depth of the input queue for The `num_instances` is an integer that specifies the number of instances of the component to run. This is the number of threads that will be started to process messages from the input queue. By default, the number of instances is 1. +### Broker Request-Response Configuration + +The `broker_request_response` configuration allows components to perform request-response operations with a broker. It has the following structure: + +```yaml +broker_request_response: + enabled: + broker_config: + broker_type: + broker_url: + broker_username: + broker_password: + broker_vpn: + payload_encoding: + payload_format: + request_expiry_ms: +``` + +- `enabled`: Set to `true` to enable broker request-response functionality for the component. +- `broker_config`: Configuration for the broker connection. + - `broker_type`: Type of the broker (e.g., "solace"). + - `broker_url`: URL of the broker. + - `broker_username`: Username for broker authentication. + - `broker_password`: Password for broker authentication. + - `broker_vpn`: VPN name for the broker connection. + - `payload_encoding`: Encoding for the payload (e.g., "utf-8", "base64"). + - `payload_format`: Format of the payload (e.g., "json", "text"). +- `request_expiry_ms`: Expiry time for requests in milliseconds. + +For more details on using this functionality, see the [Advanced Component Features](advanced_component_features.md#broker-request-response) documentation. + ### Built-in components The AI Event Connector comes with a number of built-in components that can be used to process messages. For a list of all built-in components, see the [Components](components/index.md) documentation. diff --git a/docs/custom_components.md b/docs/custom_components.md new file mode 100644 index 0000000..f34da91 --- /dev/null +++ b/docs/custom_components.md @@ -0,0 +1,90 @@ +# Custom Components + +## Purpose + +Custom components provide a way to extend the functionality of the Solace AI Connector beyond what's possible with the built-in components and configuration options. Sometimes, it's easier and more efficient to add custom code than to build a complex configuration file, especially for specialized or unique processing requirements. + +## Requirements of a Custom Component + +To create a custom component, you need to follow these requirements: + +1. **Inherit from ComponentBase**: Your custom component class should inherit from the `ComponentBase` class. + +2. **Info Section**: Define an `info` dictionary with the following keys: + - `class_name`: The name of your custom component class. + - `config_parameters`: A list of dictionaries describing the configuration parameters for your component. + - `input_schema`: A dictionary describing the expected input schema. + - `output_schema`: A dictionary describing the expected output schema. + +3. **Implement the `invoke` method**: This is the main method where your component's logic will be implemented. + +Here's a basic template for a custom component: + +```python +from solace_ai_connector.components.component_base import ComponentBase + +info = { + "class_name": "MyCustomComponent", + "config_parameters": [ + { + "name": "my_param", + "type": "string", + "required": True, + "description": "A custom parameter" + } + ], + "input_schema": { + "type": "object", + "properties": { + "input_data": {"type": "string"} + } + }, + "output_schema": { + "type": "object", + "properties": { + "output_data": {"type": "string"} + } + } +} + +class MyCustomComponent(ComponentBase): + def __init__(self, **kwargs): + super().__init__(info, **kwargs) + self.my_param = self.get_config("my_param") + + def invoke(self, message, data): + # Your custom logic here + result = f"{self.my_param}: {data['input_data']}" + return {"output_data": result} +``` + +## Overrideable Methods + +While the `invoke` method is the main one you'll implement, there are several other methods you can override to customize your component's behavior: + +1. `invoke(self, message, data)`: The main processing method for your component. +2. `get_next_event(self)`: Customize how your component receives events. +3. `send_message(self, message)`: Customize how your component sends messages to the next component. +4. `handle_timer_event(self, timer_data)`: Handle timer events if your component uses timers. +5. `handle_cache_expiry_event(self, timer_data)`: Handle cache expiry events if your component uses the cache service. +6. `process_pre_invoke(self, message)`: Customize preprocessing before `invoke` is called. +7. `process_post_invoke(self, result, message)`: Customize postprocessing after `invoke` is called. + +## Advanced Features + +Custom components can take advantage of advanced features provided by the Solace AI Connector. These include: + +- Broker request-response functionality +- Cache services +- Timer management + +For more information on these advanced features and how to use them in your custom components, please refer to the [Advanced Component Features](advanced_component_features.md) documentation. + +By creating custom components, you can extend the Solace AI Connector to meet your specific needs while still benefiting from the framework's built-in capabilities for event processing, flow management, and integration with Solace event brokers. + +## Example + +See the [Tips and Tricks page](tips_and_tricks.md) for an example of creating a custom component. + + +[] \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index 6f1481e..1b89909 100644 --- a/docs/index.md +++ b/docs/index.md @@ -9,6 +9,7 @@ This connector application makes it easy to connect your AI/ML models to Solace - [Configuration](configuration.md) - [Components](components/index.md) - [Transforms](transforms/index.md) +- [Custom Components](custom_components.md) - [Tips and Tricks](tips_and_tricks.md) - [Guides](guides/index.md) - [Examples](../examples/) diff --git a/examples/llm/custom_components/__init__.py b/examples/llm/custom_components/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/llm/custom_components/llm_streaming_custom_component.py b/examples/llm/custom_components/llm_streaming_custom_component.py new file mode 100644 index 0000000..a1363d4 --- /dev/null +++ b/examples/llm/custom_components/llm_streaming_custom_component.py @@ -0,0 +1,52 @@ +# A simple pass-through component - what goes in comes out + +import sys + +sys.path.append("src") + +from solace_ai_connector.components.component_base import ComponentBase +from solace_ai_connector.common.message import Message + + +info = { + "class_name": "LlmStreamingCustomComponent", + "description": "Do a blocking LLM request/response", + "config_parameters": [ + { + "name": "llm_request_topic", + "description": "The topic to send the request to", + "type": "string", + } + ], + "input_schema": { + "type": "object", + "properties": {}, + }, + "output_schema": { + "type": "object", + "properties": {}, + }, +} + + +class LlmStreamingCustomComponent(ComponentBase): + def __init__(self, **kwargs): + super().__init__(info, **kwargs) + self.llm_request_topic = self.get_config("llm_request_topic") + + def invoke(self, message, data): + llm_message = Message(payload=data, topic=self.llm_request_topic) + for message, last_message in self.do_broker_request_response( + llm_message, + stream=True, + streaming_complete_expression="input.payload:last_chunk", + ): + text = message.get_data("input.payload:chunk") + if not text: + text = message.get_data("input.payload:content") or "no response" + if last_message: + return {"chunk": text} + self.output_streaming(message, {"chunk": text}) + + def output_streaming(self, message, data): + return self.process_post_invoke(data, message) diff --git a/examples/llm/openai_component_request_response.yaml b/examples/llm/openai_component_request_response.yaml new file mode 100644 index 0000000..bb102a6 --- /dev/null +++ b/examples/llm/openai_component_request_response.yaml @@ -0,0 +1,147 @@ +# This example demostrates how to use the request_response_flow_controller to +# inject another flow into an existing flow. This is commonly used when +# you want to call a service that is only accessible via the broker. +# +# Main flow: STDIN -> llm_streaming_custom_component -> STDOUT +# | ^ +# v | +# do_broker_request_response() +# | ^ +# v | +# Broker Broker +# +# +# LLM flow: Broker -> OpenAI -> Broker +# +# +# While this looks a bit complicated, it allows you to very easily use all +# the benefits of the broker to distribute service requests, such as load +# balancing, failover, and scaling to LLMs. +# +# It will subscribe to `demo/question` and expect an event with the payload: +# +# The input message has the following schema: +# { +# "text": "" +# } +# +# It will then send an event back to Solace with the topic: `demo/question/response` +# +# Dependencies: +# pip install -U langchain_openai openai +# +# required ENV variables: +# - OPENAI_API_KEY +# - OPENAI_API_ENDPOINT +# - OPENAI_MODEL_NAME +# - SOLACE_BROKER_URL +# - SOLACE_BROKER_USERNAME +# - SOLACE_BROKER_PASSWORD +# - SOLACE_BROKER_VPN + +--- +log: + stdout_log_level: INFO + log_file_level: DEBUG + log_file: solace_ai_connector.log + +shared_config: + - broker_config: &broker_connection + broker_type: solace + broker_url: ${SOLACE_BROKER_URL} + broker_username: ${SOLACE_BROKER_USERNAME} + broker_password: ${SOLACE_BROKER_PASSWORD} + broker_vpn: ${SOLACE_BROKER_VPN} + +# Take from input broker and publish back to Solace +flows: + # broker input processing + - name: main_flow + components: + # Input from a Solace broker + - component_name: input + component_module: stdin_input + + # Our custom component + - component_name: llm_streaming_custom_component + component_module: llm_streaming_custom_component + # Relative path to the component + component_base_path: examples/llm/custom_components + component_config: + llm_request_topic: example/llm/best + broker_request_response: + enabled: true + broker_config: *broker_connection + request_expiry_ms: 60000 + payload_encoding: utf-8 + payload_format: json + input_transforms: + - type: copy + source_expression: | + template:You are a helpful AI assistant. Please help with the user's request below: + + {{text://input.payload:text}} + + dest_expression: user_data.llm_input:messages.0.content + - type: copy + source_expression: static:user + dest_expression: user_data.llm_input:messages.0.role + input_selection: + source_expression: user_data.llm_input + + # Send response to stdout + - component_name: send_response + component_module: stdout_output + component_config: + add_new_line_between_messages: false + input_selection: + source_expression: previous:chunk + + + + # The LLM flow that is accessible via the broker + - name: llm_flow + components: + # Input from a Solace broker + - component_name: solace_sw_broker + component_module: broker_input + component_config: + <<: *broker_connection + broker_queue_name: example_flow_streaming + broker_subscriptions: + - topic: example/llm/best + qos: 1 + payload_encoding: utf-8 + payload_format: json + + # Do an LLM request + - component_name: llm_request + component_module: openai_chat_model + component_config: + api_key: ${OPENAI_API_KEY} + base_url: ${OPENAI_API_ENDPOINT} + model: ${MODEL_NAME} + temperature: 0.01 + llm_mode: stream + stream_to_next_component: true + stream_batch_size: 20 + input_selection: + source_expression: input.payload + + # Send response back to broker + - component_name: send_response + component_module: broker_output + component_config: + <<: *broker_connection + payload_encoding: utf-8 + payload_format: json + copy_user_properties: true + input_transforms: + - type: copy + source_expression: previous + dest_expression: user_data.output:payload + - type: copy + source_expression: input.user_properties:__solace_ai_connector_broker_request_reply_topic__ + dest_expression: user_data.output:topic + input_selection: + source_expression: user_data.output \ No newline at end of file diff --git a/src/solace_ai_connector/common/event.py b/src/solace_ai_connector/common/event.py index cc302c2..98f4217 100644 --- a/src/solace_ai_connector/common/event.py +++ b/src/solace_ai_connector/common/event.py @@ -6,7 +6,10 @@ class EventType(Enum): MESSAGE = "message" TIMER = "timer" CACHE_EXPIRY = "cache_expiry" - # Add more event types as needed + # Add more event types as need + + def __eq__(self, other): + return self.value == other.value class Event: diff --git a/src/solace_ai_connector/components/component_base.py b/src/solace_ai_connector/components/component_base.py index bd4c52c..f059c06 100644 --- a/src/solace_ai_connector/components/component_base.py +++ b/src/solace_ai_connector/components/component_base.py @@ -10,6 +10,7 @@ from ..common.message import Message from ..common.trace_message import TraceMessage from ..common.event import Event, EventType +from ..flow.request_response_flow_controller import RequestResponseFlowController DEFAULT_QUEUE_TIMEOUT_MS = 200 DEFAULT_QUEUE_MAX_DEPTH = 5 @@ -34,6 +35,9 @@ def __init__(self, module_info, **kwargs): self.cache_service = kwargs.pop("cache_service", None) self.component_config = self.config.get("component_config") or {} + self.broker_request_response_config = self.config.get( + "broker_request_response", None + ) self.name = self.config.get("component_name", "") resolve_config_values(self.component_config) @@ -51,6 +55,7 @@ def __init__(self, module_info, **kwargs): self.validate_config() self.setup_transforms() self.setup_communications() + self.setup_broker_request_response() def create_thread_and_run(self): self.thread = threading.Thread(target=self.run) @@ -66,6 +71,8 @@ def run(self): if self.trace_queue: self.trace_event(event) self.process_event(event) + except AssertionError as e: + raise e except Exception as e: log.error( "%sComponent has crashed: %s\n%s", @@ -214,7 +221,15 @@ def get_config(self, key=None, default=None): val = self.component_config.get(key, None) if val is None: val = self.config.get(key, default) - if callable(val): + + # We reserve a few callable function names for internal use + # They are used for the handler_callback component which is used + # in testing (search the tests directory for example uses) + if callable(val) and key not in [ + "invoke_handler", + "get_next_event_handler", + "send_message_handler", + ]: if self.current_message is None: raise ValueError( f"Component {self.log_identifier} is trying to use an `invoke` config " @@ -262,6 +277,32 @@ def setup_communications(self): else: self.input_queue = queue.Queue(maxsize=self.queue_max_depth) + def setup_broker_request_response(self): + if ( + not self.broker_request_response_config + or not self.broker_request_response_config.get("enabled", False) + ): + self.broker_request_response_controller = None + return + broker_config = self.broker_request_response_config.get("broker_config", {}) + request_expiry_ms = self.broker_request_response_config.get( + "request_expiry_ms", 30000 + ) + if not broker_config: + raise ValueError( + f"Broker request response config not found for component {self.name}" + ) + rrc_config = { + "broker_config": broker_config, + "request_expiry_ms": request_expiry_ms, + } + self.broker_request_response_controller = RequestResponseFlowController( + config=rrc_config, connector=self.connector + ) + + def is_broker_request_response_enabled(self): + return self.broker_request_response_controller is not None + def setup_transforms(self): self.transforms = Transforms( self.config.get("input_transforms", []), log_identifier=self.log_identifier @@ -365,3 +406,26 @@ def cleanup(self): self.input_queue.get_nowait() except queue.Empty: break + + # This should be used to do an on-the-fly broker request response + def do_broker_request_response( + self, message, stream=False, streaming_complete_expression=None + ): + if self.broker_request_response_controller: + if stream: + return ( + self.broker_request_response_controller.do_broker_request_response( + message, stream, streaming_complete_expression + ) + ) + else: + generator = ( + self.broker_request_response_controller.do_broker_request_response( + message + ) + ) + next_message, last = next(generator, None) + return next_message + raise ValueError( + f"Broker request response controller not found for component {self.name}" + ) diff --git a/src/solace_ai_connector/components/general/delay.py b/src/solace_ai_connector/components/general/delay.py index d4a05d0..8d8aaf0 100644 --- a/src/solace_ai_connector/components/general/delay.py +++ b/src/solace_ai_connector/components/general/delay.py @@ -38,5 +38,6 @@ def __init__(self, **kwargs): super().__init__(info, **kwargs) def invoke(self, message, data): - sleep(self.get_config("delay")) + delay = self.get_config("delay") + sleep(delay) return deepcopy(data) diff --git a/src/solace_ai_connector/components/general/for_testing/handler_callback.py b/src/solace_ai_connector/components/general/for_testing/handler_callback.py new file mode 100644 index 0000000..12d0ea7 --- /dev/null +++ b/src/solace_ai_connector/components/general/for_testing/handler_callback.py @@ -0,0 +1,67 @@ +"""This test component allows a tester to configure callback handlers for + get_next_event, send_message and invoke methods""" + +from ...component_base import ComponentBase + + +info = { + "class_name": "HandlerCallback", + "description": ( + "This test component allows a tester to configure callback handlers for " + "get_next_event, send_message and invoke methods" + ), + "config_parameters": [ + { + "name": "get_next_event_handler", + "required": False, + "description": "The callback handler for the get_next_event method", + "type": "function", + }, + { + "name": "send_message_handler", + "required": False, + "description": "The callback handler for the send_message method", + "type": "function", + }, + { + "name": "invoke_handler", + "required": False, + "description": "The callback handler for the invoke method", + "type": "function", + }, + ], + "input_schema": { + "type": "object", + "properties": {}, + }, + "output_schema": { + "type": "object", + "properties": {}, + }, +} + + +class HandlerCallback(ComponentBase): + def __init__(self, **kwargs): + super().__init__(info, **kwargs) + self.get_next_event_handler = self.get_config("get_next_event_handler") + self.send_message_handler = self.get_config("send_message_handler") + self.invoke_handler = self.get_config("invoke_handler") + + def get_next_event(self): + if self.get_next_event_handler: + return self.get_next_event_handler(self) + else: + return super().get_next_event() + + def send_message(self, message): + if self.send_message_handler: + return self.send_message_handler(self, message) + else: + return super().send_message(message) + + def invoke(self, message, data): + if self.invoke_handler: + return self.invoke_handler(self, message, data) + else: + return super().invoke(message, data) diff --git a/src/solace_ai_connector/components/general/openai/openai_chat_model_base.py b/src/solace_ai_connector/components/general/openai/openai_chat_model_base.py index 6a0884b..0ce6c90 100644 --- a/src/solace_ai_connector/components/general/openai/openai_chat_model_base.py +++ b/src/solace_ai_connector/components/general/openai/openai_chat_model_base.py @@ -1,9 +1,10 @@ """Base class for OpenAI chat models""" +import uuid + from openai import OpenAI from ...component_base import ComponentBase from ....common.message import Message -import uuid openai_info_base = { "class_name": "OpenAIChatModelBase", @@ -34,13 +35,29 @@ { "name": "stream_to_flow", "required": False, - "description": "Name the flow to stream the output to - this must be configured for llm_mode='stream'.", + "description": ( + "Name the flow to stream the output to - this must be configured for " + "llm_mode='stream'. This is mutually exclusive with stream_to_next_component." + ), "default": "", }, + { + "name": "stream_to_next_component", + "required": False, + "description": ( + "Whether to stream the output to the next component in the flow. " + "This is mutually exclusive with stream_to_flow." + ), + "default": False, + }, { "name": "llm_mode", "required": False, - "description": "The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response.", + "description": ( + "The mode for streaming results: 'sync' or 'stream'. 'stream' " + "will just stream the results to the named flow. 'none' will " + "wait for the full response." + ), "default": "none", }, { @@ -52,7 +69,11 @@ { "name": "set_response_uuid_in_user_properties", "required": False, - "description": "Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response.", + "description": ( + "Whether to set the response_uuid in the user_properties of the " + "input_message. This will allow other components to correlate " + "streaming chunks with the full response." + ), "default": False, "type": "boolean", }, @@ -90,8 +111,6 @@ } -import uuid - class OpenAIChatModelBase(ComponentBase): def __init__(self, module_info, **kwargs): super().__init__(module_info, **kwargs) @@ -101,16 +120,22 @@ def init(self): self.model = self.get_config("model") self.temperature = self.get_config("temperature") self.stream_to_flow = self.get_config("stream_to_flow") + self.stream_to_next_component = self.get_config("stream_to_next_component") self.llm_mode = self.get_config("llm_mode") self.stream_batch_size = self.get_config("stream_batch_size") - self.set_response_uuid_in_user_properties = self.get_config("set_response_uuid_in_user_properties") + self.set_response_uuid_in_user_properties = self.get_config( + "set_response_uuid_in_user_properties" + ) + if self.stream_to_flow and self.stream_to_next_component: + raise ValueError( + "stream_to_flow and stream_to_next_component are mutually exclusive" + ) def invoke(self, message, data): messages = data.get("messages", []) client = OpenAI( - api_key=self.get_config("api_key"), - base_url=self.get_config("base_url") + api_key=self.get_config("api_key"), base_url=self.get_config("base_url") ) if self.llm_mode == "stream": @@ -134,7 +159,7 @@ def invoke_stream(self, client, message, messages): messages=messages, model=self.model, temperature=self.temperature, - stream=True + stream=True, ): if chunk.choices[0].delta.content is not None: content = chunk.choices[0].delta.content @@ -148,11 +173,31 @@ def invoke_stream(self, client, message, messages): aggregate_result, response_uuid, first_chunk, - False + False, + ) + elif self.stream_to_next_component: + self.send_to_next_component( + message, + current_batch, + aggregate_result, + response_uuid, + first_chunk, + False, ) current_batch = "" first_chunk = False + if self.stream_to_next_component: + # Just return the last chunk + return { + "content": aggregate_result, + "chunk": current_batch, + "uuid": response_uuid, + "first_chunk": first_chunk, + "last_chunk": True, + "streaming": True, + } + if self.stream_to_flow: self.send_streaming_message( message, @@ -160,12 +205,20 @@ def invoke_stream(self, client, message, messages): aggregate_result, response_uuid, first_chunk, - True + True, ) return {"content": aggregate_result, "uuid": response_uuid} - def send_streaming_message(self, input_message, chunk, aggregate_result, response_uuid, first_chunk=False, last_chunk=False): + def send_streaming_message( + self, + input_message, + chunk, + aggregate_result, + response_uuid, + first_chunk=False, + last_chunk=False, + ): message = Message( payload={ "chunk": chunk, @@ -177,3 +230,34 @@ def send_streaming_message(self, input_message, chunk, aggregate_result, respons user_properties=input_message.get_user_properties(), ) self.send_to_flow(self.stream_to_flow, message) + + def send_to_next_component( + self, + input_message, + chunk, + aggregate_result, + response_uuid, + first_chunk=False, + last_chunk=False, + ): + message = Message( + payload={ + "chunk": chunk, + "aggregate_result": aggregate_result, + "response_uuid": response_uuid, + "first_chunk": first_chunk, + "last_chunk": last_chunk, + }, + user_properties=input_message.get_user_properties(), + ) + + result = { + "content": aggregate_result, + "chunk": chunk, + "uuid": response_uuid, + "first_chunk": first_chunk, + "last_chunk": last_chunk, + "streaming": True, + } + + self.process_post_invoke(result, message) diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_base.py b/src/solace_ai_connector/components/inputs_outputs/broker_base.py index b6fd113..fac4207 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_base.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_base.py @@ -37,9 +37,12 @@ class BrokerBase(ComponentBase): def __init__(self, module_info, **kwargs): super().__init__(module_info, **kwargs) self.broker_properties = self.get_broker_properties() - self.messaging_service = ( - MessagingServiceBuilder().from_properties(self.broker_properties).build() - ) + if self.broker_properties["broker_type"] not in ["test", "test_streaming"]: + self.messaging_service = ( + MessagingServiceBuilder() + .from_properties(self.broker_properties) + .build() + ) self.current_broker_message = None self.messages_to_ack = [] self.connected = False diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_input.py b/src/solace_ai_connector/components/inputs_outputs/broker_input.py index 841c8ee..3aabd8d 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_input.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_input.py @@ -44,7 +44,8 @@ { "name": "temporary_queue", "required": False, - "description": "Whether to create a temporary queue that will be deleted after disconnection, defaulted to True if broker_queue_name is not provided", + "description": "Whether to create a temporary queue that will be deleted " + "after disconnection, defaulted to True if broker_queue_name is not provided", "default": False, }, { diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py b/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py index 3e4a2cd..b363776 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py @@ -3,6 +3,8 @@ import threading import uuid import json +import queue +from copy import deepcopy # from typing import Dict, Any @@ -63,6 +65,19 @@ "description": "Expiry time for cached requests in milliseconds", "default": 60000, }, + { + "name": "streaming", + "required": False, + "description": "The response will arrive in multiple pieces. If True, " + "the streaming_complete_expression must be set and will be used to " + "determine when the last piece has arrived.", + }, + { + "name": "streaming_complete_expression", + "required": False, + "description": "The source expression to determine when the last piece of a " + "streaming response has arrived.", + }, ], "input_schema": { "type": "object", @@ -79,6 +94,16 @@ "type": "object", "description": "User properties to send with the request message", }, + "stream": { + "type": "boolean", + "description": "Whether this will have a streaming response", + "default": False, + }, + "streaming_complete_expression": { + "type": "string", + "description": "Expression to determine when the last piece of a " + "streaming response has arrived. Required if stream is True.", + }, }, "required": ["payload", "topic"], }, @@ -115,6 +140,11 @@ def __init__(self, **kwargs): self.reply_queue_name = f"reply-queue-{uuid.uuid4()}" self.reply_topic = f"reply/{uuid.uuid4()}" self.response_thread = None + self.streaming = self.get_config("streaming") + self.streaming_complete_expression = self.get_config( + "streaming_complete_expression" + ) + self.broker_type = self.broker_properties.get("broker_type", "solace") self.broker_properties["temporary_queue"] = True self.broker_properties["queue_name"] = self.reply_queue_name self.broker_properties["subscriptions"] = [ @@ -123,7 +153,13 @@ def __init__(self, **kwargs): "qos": 1, } ] - self.connect() + self.test_mode = False + + if self.broker_type == "solace": + self.connect() + elif self.broker_type == "test" or self.broker_type == "test_streaming": + self.test_mode = True + self.setup_test_pass_through() self.start() def start(self): @@ -135,8 +171,16 @@ def setup_reply_queue(self): self.reply_queue_name, [self.reply_topic], temporary=True ) + def setup_test_pass_through(self): + self.pass_through_queue = queue.Queue() + def start_response_thread(self): - self.response_thread = threading.Thread(target=self.handle_responses) + if self.test_mode: + self.response_thread = threading.Thread( + target=self.handle_test_pass_through + ) + else: + self.response_thread = threading.Thread(target=self.handle_responses) self.response_thread.start() def handle_responses(self): @@ -148,61 +192,76 @@ def handle_responses(self): except Exception as e: log.error("Error handling response: %s", e) + def handle_test_pass_through(self): + while not self.stop_signal.is_set(): + try: + message = self.pass_through_queue.get(timeout=1) + decoded_payload = self.decode_payload(message.get_payload()) + message.set_payload(decoded_payload) + self.process_response(message) + except queue.Empty: + continue + except Exception as e: + log.error("Error handling test passthrough: %s", e) + def process_response(self, broker_message): - payload = broker_message.get_payload_as_string() - if payload is None: - payload = broker_message.get_payload_as_bytes() - payload = self.decode_payload(payload) - topic = broker_message.get_destination_name() - user_properties = broker_message.get_properties() + if self.test_mode: + payload = broker_message.get_payload() + topic = broker_message.get_topic() + user_properties = broker_message.get_user_properties() + else: + payload = broker_message.get_payload_as_string() + if payload is None: + payload = broker_message.get_payload_as_bytes() + payload = self.decode_payload(payload) + topic = broker_message.get_destination_name() + user_properties = broker_message.get_properties() metadata_json = user_properties.get( "__solace_ai_connector_broker_request_reply_metadata__" ) if not metadata_json: - log.warning("Received response without metadata: %s", payload) + log.error("Received response without metadata: %s", payload) return try: metadata_stack = json.loads(metadata_json) except json.JSONDecodeError: - log.warning( - "Received response with invalid metadata JSON: %s", metadata_json - ) + log.error("Received response with invalid metadata JSON: %s", metadata_json) return if not metadata_stack: - log.warning("Received response with empty metadata stack: %s", payload) + log.error("Received response with empty metadata stack: %s", payload) return try: current_metadata = metadata_stack.pop() except IndexError: - log.warning( + log.error( "Received response with invalid metadata stack: %s", metadata_stack ) return request_id = current_metadata.get("request_id") if not request_id: - log.warning("Received response without request_id in metadata: %s", payload) + log.error("Received response without request_id in metadata: %s", payload) return cached_request = self.cache_service.get_data(request_id) if not cached_request: - log.warning("Received response for unknown request_id: %s", request_id) + log.error("Received response for unknown request_id: %s", request_id) return + stream = cached_request.get("stream", False) + streaming_complete_expression = cached_request.get( + "streaming_complete_expression" + ) + response = { "payload": payload, "topic": topic, "user_properties": user_properties, } - result = { - "request": cached_request, - "response": response, - } - # Update the metadata in the response if metadata_stack: response["user_properties"][ @@ -221,8 +280,23 @@ def process_response(self, broker_message): "__solace_ai_connector_broker_request_reply_topic__", None ) - self.process_post_invoke(result, Message(payload=result)) - self.cache_service.remove_data(request_id) + message = Message( + payload=payload, + user_properties=user_properties, + topic=topic, + ) + self.process_post_invoke(response, message) + + # Only remove the cache entry if this isn't a streaming response or + # if it is the last piece of a streaming response + last_piece = True + if stream and streaming_complete_expression: + is_last = message.get_data(streaming_complete_expression) + if not is_last: + last_piece = False + + if last_piece: + self.cache_service.remove_data(request_id) def invoke(self, message, data): request_id = str(uuid.uuid4()) @@ -230,6 +304,12 @@ def invoke(self, message, data): if "user_properties" not in data: data["user_properties"] = {} + stream = False + if "stream" in data: + stream = data["stream"] + if "streaming_complete_expression" in data: + streaming_complete_expression = data["streaming_complete_expression"] + metadata = {"request_id": request_id, "reply_topic": self.reply_topic} if ( @@ -266,13 +346,39 @@ def invoke(self, message, data): "__solace_ai_connector_broker_request_reply_topic__" ] = self.reply_topic - encoded_payload = self.encode_payload(data["payload"]) + if self.test_mode: + if self.broker_type == "test_streaming": + # The payload should be an array. Send one message per item in the array + if not isinstance(data["payload"], list): + raise ValueError("Payload must be a list for test_streaming broker") + for item in data["payload"]: + encoded_payload = self.encode_payload(item) + self.pass_through_queue.put( + Message( + payload=encoded_payload, + user_properties=deepcopy(data["user_properties"]), + topic=data["topic"], + ) + ) + else: + encoded_payload = self.encode_payload(data["payload"]) + self.pass_through_queue.put( + Message( + payload=encoded_payload, + user_properties=data["user_properties"], + topic=data["topic"], + ) + ) + else: + encoded_payload = self.encode_payload(data["payload"]) + self.messaging_service.send_message( + destination_name=data["topic"], + payload=encoded_payload, + user_properties=data["user_properties"], + ) - self.messaging_service.send_message( - destination_name=data["topic"], - payload=encoded_payload, - user_properties=data["user_properties"], - ) + data["stream"] = stream + data["streaming_complete_expression"] = streaming_complete_expression self.cache_service.add_data( key=request_id, diff --git a/src/solace_ai_connector/components/inputs_outputs/stdin_input.py b/src/solace_ai_connector/components/inputs_outputs/stdin_input.py index a4fb83e..568333c 100644 --- a/src/solace_ai_connector/components/inputs_outputs/stdin_input.py +++ b/src/solace_ai_connector/components/inputs_outputs/stdin_input.py @@ -1,5 +1,7 @@ # An input component that reads from STDIN +import threading + from copy import deepcopy from ..component_base import ComponentBase from ...common.message import Message @@ -9,9 +11,17 @@ "class_name": "Stdin", "description": ( "STDIN input component. The component will prompt for " - "input, which will then be placed in the message payload using the output schema below." + "input, which will then be placed in the message payload using the output schema below. " + "The component will wait for its output message to be acknowledged before prompting for " + "the next input." ), - "config_parameters": [], + "config_parameters": [ + { + "name": "prompt", + "required": False, + "description": "The prompt to display when asking for input", + } + ], "output_schema": { "type": "object", "properties": { @@ -27,16 +37,28 @@ class Stdin(ComponentBase): def __init__(self, **kwargs): super().__init__(info, **kwargs) + self.need_acknowledgement = True + self.next_input_signal = threading.Event() + self.next_input_signal.set() def get_next_message(self): + # Wait for the next input signal + self.next_input_signal.wait() + + # Reset the event for the next use + self.next_input_signal.clear() + # Get the next message from STDIN - obj = {"text": input(self.config.get("prompt", "Enter text: "))} + obj = {"text": input(self.config.get("prompt", "\nEnter text: "))} # Create and return a message object return Message(payload=obj) - # def get_input_data(self, message): - # return message.payload - def invoke(self, message, data): return deepcopy(message.get_payload()) + + def acknowledge_message(self): + self.next_input_signal.set() + + def get_acknowledgement_callback(self): + return self.acknowledge_message diff --git a/src/solace_ai_connector/components/inputs_outputs/stdout_output.py b/src/solace_ai_connector/components/inputs_outputs/stdout_output.py index 0309f1f..edba4aa 100644 --- a/src/solace_ai_connector/components/inputs_outputs/stdout_output.py +++ b/src/solace_ai_connector/components/inputs_outputs/stdout_output.py @@ -6,7 +6,15 @@ info = { "class_name": "Stdout", "description": "STDOUT output component", - "config_parameters": [], + "config_parameters": [ + { + "name": "add_new_line_between_messages", + "required": False, + "description": "Add a new line between messages", + "type": "boolean", + "default": True, + } + ], "input_schema": { "type": "object", "properties": { @@ -22,8 +30,15 @@ class Stdout(ComponentBase): def __init__(self, **kwargs): super().__init__(info, **kwargs) + self.add_newline = self.get_config("add_new_line_between_messages") def invoke(self, message, data): # Print the message to STDOUT - print(yaml.dump(data)) + if isinstance(data, dict) or isinstance(data, list): + print(yaml.dump(data)) + else: + print(data, end="") + if self.add_newline: + print() + return data diff --git a/src/solace_ai_connector/flow/flow.py b/src/solace_ai_connector/flow/flow.py index 782c7ce..ea5091c 100644 --- a/src/solace_ai_connector/flow/flow.py +++ b/src/solace_ai_connector/flow/flow.py @@ -66,6 +66,9 @@ def __init__( self.cache_service = connector.cache_service if connector else None self.create_components() + def get_input_queue(self): + return self.flow_input_queue + def create_components(self): # Loop through the components and create them for index, component in enumerate(self.flow_config.get("components", [])): @@ -77,14 +80,15 @@ def create_components(self): for component in component_group: component.set_next_component(self.component_groups[index + 1][0]) + self.flow_input_queue = self.component_groups[0][0].get_input_queue() + + def run(self): # Now one more time to create threads and run them - for index, component_group in enumerate(self.component_groups): + for _index, component_group in enumerate(self.component_groups): for component in component_group: thread = component.create_thread_and_run() self.threads.append(thread) - self.flow_input_queue = self.component_groups[0][0].get_input_queue() - def create_component_group(self, component, index): component_module = component.get("component_module", "") base_path = component.get("component_base_path", None) @@ -133,6 +137,12 @@ def create_component_group(self, component, index): def get_flow_input_queue(self): return self.flow_input_queue + # This will set the next component in all the components in the + # last component group + def set_next_component(self, component): + for comp in self.component_groups[-1]: + comp.set_next_component(component) + def wait_for_threads(self): for thread in self.threads: thread.join() diff --git a/src/solace_ai_connector/flow/request_response_flow_controller.py b/src/solace_ai_connector/flow/request_response_flow_controller.py new file mode 100644 index 0000000..36dda71 --- /dev/null +++ b/src/solace_ai_connector/flow/request_response_flow_controller.py @@ -0,0 +1,156 @@ +""" +This file will handle sending a message to a named flow and then +receiving the output message from that flow. It will also support the result +message being a streamed message that comes in multiple parts. + +Each component can optionally create multiple of these using the configuration: + +```yaml +- name: example_flow + components: + - component_name: example_component + component_module: custom_component + request_response_flow_controllers: + - name: example_controller + flow_name: llm_flow + streaming: true + streaming_complete_expression: input.payload:streaming.last_message + request_expiry_ms: 300000 +``` + +""" + +import queue +import time +from typing import Dict, Any + +from ..common.message import Message +from ..common.event import Event, EventType + + +# This is a very basic component which will be stitched onto the final component in the flow +class RequestResponseControllerOuputComponent: + def __init__(self, controller): + self.controller = controller + + def enqueue(self, event): + self.controller.enqueue_response(event) + + +# This is the main class that will be used to send messages to a flow and receive the response +class RequestResponseFlowController: + def __init__(self, config: Dict[str, Any], connector): + self.config = config + self.connector = connector + self.broker_config = config.get("broker_config") + self.request_expiry_ms = config.get("request_expiry_ms", 300000) + self.request_expiry_s = self.request_expiry_ms / 1000 + self.input_queue = None + self.response_queue = None + self.enqueue_time = None + self.request_outstanding = False + + self.flow = self.create_broker_request_response_flow() + self.setup_queues(self.flow) + self.flow.run() + + def create_broker_request_response_flow(self): + self.broker_config["request_expiry_ms"] = self.request_expiry_ms + config = { + "name": "_internal_broker_request_response_flow", + "components": [ + { + "component_name": "_internal_broker_request_response", + "component_module": "broker_request_response", + "component_config": self.broker_config, + } + ], + } + return self.connector.create_flow(flow=config, index=0, flow_instance_index=0) + + def setup_queues(self, flow): + # Input queue to send the message to the flow + self.input_queue = flow.get_input_queue() + + # Response queue to receive the response from the flow + self.response_queue = queue.Queue() + rrcComponent = RequestResponseControllerOuputComponent(self) + flow.set_next_component(rrcComponent) + + def do_broker_request_response( + self, request_message, stream=False, streaming_complete_expression=None + ): + # Send the message to the broker + self.send_message(request_message, stream, streaming_complete_expression) + + # Now we will wait for the response + now = time.time() + elapsed_time = now - self.enqueue_time + remaining_timeout = self.request_expiry_s - elapsed_time + if stream: + # If we are in streaming mode, we will return individual messages + # until we receive the last message. Use the expression to determine + # if this is the last message + while True: + try: + event = self.response_queue.get(timeout=remaining_timeout) + if event.event_type == EventType.MESSAGE: + message = event.data + last_message = message.get_data(streaming_complete_expression) + yield message, last_message + if last_message: + return + except queue.Empty: + if (time.time() - self.enqueue_time) > self.request_expiry_s: + raise TimeoutError( # pylint: disable=raise-missing-from + "Timeout waiting for response" + ) + except Exception as e: + raise e + + now = time.time() + elapsed_time = now - self.enqueue_time + remaining_timeout = self.request_expiry_s - elapsed_time + + # If we are not in streaming mode, we will return a single message + # and then stop the iterator + try: + event = self.response_queue.get(timeout=remaining_timeout) + if event.event_type == EventType.MESSAGE: + message = event.data + yield message, True + return + except queue.Empty: + if (time.time() - self.enqueue_time) > self.request_expiry_s: + raise TimeoutError( # pylint: disable=raise-missing-from + "Timeout waiting for response" + ) + except Exception as e: + raise e + + def send_message( + self, message: Message, stream=False, streaming_complete_expression=None + ): + # Make a new message, but copy the data from the original message + if not self.input_queue: + raise ValueError(f"Input queue for flow {self.flow.name} not found") + + # Need to set the previous object to the required input for the + # broker_request_response component + message.set_previous( + { + "payload": message.get_payload(), + "user_properties": message.get_user_properties(), + "topic": message.get_topic(), + "stream": stream, + "streaming_complete_expression": streaming_complete_expression, + }, + ) + + event = Event(EventType.MESSAGE, message) + self.enqueue_time = time.time() + self.request_outstanding = True + self.input_queue.put(event) + + def enqueue_response(self, event): + self.response_queue.put(event) diff --git a/src/solace_ai_connector/solace_ai_connector.py b/src/solace_ai_connector/solace_ai_connector.py index f41f50f..40240cc 100644 --- a/src/solace_ai_connector/solace_ai_connector.py +++ b/src/solace_ai_connector/solace_ai_connector.py @@ -62,6 +62,8 @@ def create_flows(self): flow_input_queue = flow_instance.get_flow_input_queue() self.flow_input_queues[flow.get("name")] = flow_input_queue self.flows.append(flow_instance) + for flow in self.flows: + flow.run() def create_flow(self, flow: dict, index: int, flow_instance_index: int): """Create a single flow""" @@ -98,15 +100,6 @@ def wait_for_flows(self): self.stop() self.cleanup() - def stop(self): - """Stop the Solace AI Event Connector""" - log.info("Stopping Solace AI Event Connector") - self.stop_signal.set() - self.timer_manager.stop() # Stop the timer manager first - self.wait_for_flows() - if self.trace_thread: - self.trace_thread.join() - def cleanup(self): """Clean up resources and ensure all threads are properly joined""" log.info("Cleaning up Solace AI Event Connector") @@ -202,6 +195,13 @@ def get_flows(self): """Return the flows""" return self.flows + def get_flow(self, flow_name): + """Return a specific flow by name""" + for flow in self.flows: + if flow.name == flow_name: + return flow + return None + def setup_cache_service(self): """Setup the cache service""" cache_config = self.config.get("cache", {}) diff --git a/src/solace_ai_connector/test_utils/utils_for_test_files.py b/src/solace_ai_connector/test_utils/utils_for_test_files.py index 1b38e98..2b3421b 100644 --- a/src/solace_ai_connector/test_utils/utils_for_test_files.py +++ b/src/solace_ai_connector/test_utils/utils_for_test_files.py @@ -168,6 +168,8 @@ def create_test_flows( # For each of the flows, add the input and output components flow_info = [] for flow in flows: + if flow.flow_config.get("test_ignore", False): + continue input_component = TestInputComponent( flow.component_groups[0][0].get_input_queue() ) diff --git a/tests/test_request_response_controller.py b/tests/test_request_response_controller.py new file mode 100644 index 0000000..6ca6073 --- /dev/null +++ b/tests/test_request_response_controller.py @@ -0,0 +1,244 @@ +import sys + +sys.path.append("src") + +from solace_ai_connector.test_utils.utils_for_test_files import ( + create_test_flows, + dispose_connector, + send_message_to_flow, + get_message_from_flow, +) +from solace_ai_connector.common.message import Message + + +def test_request_response_flow_controller_basic(): + """Test basic functionality of the RequestResponseFlowController""" + + def test_invoke_handler(component, message, _data): + # Call the request_response + message = component.do_broker_request_response(message) + try: + assert message.get_data("previous") == { + "payload": {"text": "Hello, World!"}, + "topic": None, + "user_properties": {}, + } + except AssertionError as e: + return e + return "Pass" + + config = { + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "requester", + "component_module": "handler_callback", + "component_config": { + "invoke_handler": test_invoke_handler, + }, + "broker_request_response": { + "enabled": True, + "broker_config": { + "broker_type": "test", + "broker_url": "test", + "broker_username": "test", + "broker_password": "test", + "broker_vpn": "test", + "payload_encoding": "utf-8", + "payload_format": "json", + }, + "request_expiry_ms": 500000, + }, + } + ], + }, + ] + } + connector, flows = create_test_flows(config) + + test_flow = flows[0] + + try: + + # Send a message to the input flow + send_message_to_flow(test_flow, Message(payload={"text": "Hello, World!"})) + + # Get the output message + output_message = get_message_from_flow(test_flow) + + result = output_message.get_data("previous") + + # if the result is an AssertionError, then raise it + if isinstance(result, AssertionError): + raise result + + assert result == "Pass" + + except Exception as e: + print(e) + assert False + + finally: + dispose_connector(connector) + + +# Test simple streaming request response +# Use the iterate component to break a single message into multiple messages +def test_request_response_flow_controller_streaming(): + """Test streaming functionality of the RequestResponseFlowController""" + + def test_invoke_handler(component, message, data): + result = [] + for message, last_message in component.do_broker_request_response( + message, stream=True, streaming_complete_expression="input.payload:last" + ): + payload = message.get_data("input.payload") + result.append(payload) + if last_message: + assert payload == {"text": "Chunk3", "last": True} + + assert result == [ + {"text": "Chunk1", "last": False}, + {"text": "Chunk2", "last": False}, + {"text": "Chunk3", "last": True}, + ] + + return "Pass" + + config = { + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "requester", + "component_module": "handler_callback", + "component_config": { + "invoke_handler": test_invoke_handler, + }, + "broker_request_response": { + "enabled": True, + "broker_config": { + "broker_type": "test_streaming", + "broker_url": "test", + "broker_username": "test", + "broker_password": "test", + "broker_vpn": "test", + "payload_encoding": "utf-8", + "payload_format": "json", + }, + "request_expiry_ms": 500000, + }, + } + ], + }, + ] + } + connector, flows = create_test_flows(config) + + test_flow = flows[0] + + try: + + # Send a message to the input flow + send_message_to_flow( + test_flow, + Message( + payload=[ + {"text": "Chunk1", "last": False}, + {"text": "Chunk2", "last": False}, + {"text": "Chunk3", "last": True}, + ] + ), + ) + + # Get the output message + output_message = get_message_from_flow(test_flow) + + assert output_message.get_data("previous") == "Pass" + + except Exception as e: + print(e) + assert False + + finally: + dispose_connector(connector) + + +# Test the timeout functionality +def test_request_response_flow_controller_timeout(): + """Test timeout functionality of the RequestResponseFlowController""" + + def test_invoke_handler(component, message, data): + # # Call the request_response_flow + # data_iter = component.send_request_response_flow_message( + # "test_controller", message, {"test": "data"} + # ) + + # # This will timeout + # try: + # for message, data, _last_message in data_iter(): + # assert message.get_data("previous") == {"test": "data"} + # assert message.get_data("input.payload") == {"text": "Hello, World!"} + # except TimeoutError: + # return "timeout" + # return "done" + + # Do it the new way + try: + for message, _last_message in component.do_broker_request_response( + message, stream=True, streaming_complete_expression="input.payload:last" + ): + pass + except TimeoutError: + return "Timeout" + return "Fail" + + config = { + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "requester", + "component_module": "handler_callback", + "component_config": { + "invoke_handler": test_invoke_handler, + }, + "broker_request_response": { + "enabled": True, + "broker_config": { + "broker_type": "test_streaming", + "broker_url": "test", + "broker_username": "test", + "broker_password": "test", + "broker_vpn": "test", + "payload_encoding": "utf-8", + "payload_format": "json", + }, + "request_expiry_ms": 2000, + }, + } + ], + }, + ] + } + connector, flows = create_test_flows(config) + + test_flow = flows[0] + + try: + + # Send a message with an empty list in the payload to the test_streaming broker type + # This will not send any chunks and should timeout + send_message_to_flow(test_flow, Message(payload=[])) + + # Get the output message + output_message = get_message_from_flow(test_flow) + + assert output_message.get_data("previous") == "Timeout" + + finally: + dispose_connector(connector)