-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
AI-124: Add a feature to provide simple blocking broker request/respo…
…nse ability for components (#42) * feat: add request_response_controller.py * feat: implement RequestResponseFlowManager and RequestResponseController classes * style: format code with black and improve readability * feat: implement RequestResponseController for flow-based request-response handling * feat: implement RequestResponseController for handling request-response patterns * fix: import SolaceAiConnector for type checking * refactor: restructure Flow class and improve code organization * feat: implement multiple named RequestResponseControllers per component * refactor: initialize request-response controllers in ComponentBase * test: add request_response_controller functionality tests * feat: finished implementation and added some tests * refactor: rename RequestResponseController to RequestResponseFlowController * refactor: rename RequestResponseController to RequestResponseFlowController * refactor: some name changes * fix: update test function names for RequestResponseFlowController * refactor: more name changes * Ed/req_resp_examples_and_fixes (#41) * feat: Added a request_response_flow example and fixed a few issues along the way * feat: Reworked the broker_request_response built-in ability of components to be simpler. Instead of having to have a defined flow and then name that flow, it will automatically create a flow with a single broker_request_response component in it. Now there is a straightforward interating function call to allow components to issue a request and get streaming or non-streaming responses from that flow. * chore: fix the request_response example and remove the old one * docs: add broker request-response configuration * docs: added advanced_component_features.md * docs: add broker request-response configuration details * docs: add payload encoding and format to broker config * docs: add cache service and timer manager to advanced_component_features.md * docs: add configuration requirement for broker request-response * docs: update broker request-response section with configuration info * docs: a bit more detail about do_broker_request_response * docs: add link to advanced features page in table of contents * docs: add link to advanced features page * docs: reorder table of contents in index.md * docs: add custom components documentation * docs: Remove advanced component features from table of contents * docs: clean up a double inclusion of the same section * docs: small example change * chore: remove dead code * chore: add some extra comments to explain some test code * docs: Update description of STDIN input component Update the description of the STDIN input component to clarify that it waits for its output message to be acknowledged before prompting for the next input. This change is made in the `stdin_input.py` file. * chore: add is_broker_request_response_enabled method * chore: Some changes after review
- Loading branch information
Showing
22 changed files
with
1,289 additions
and
70 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
# Advanced Component Features | ||
|
||
This document describes advanced features available to custom components in the Solace AI Connector. | ||
|
||
## Table of Contents | ||
- [Broker Request-Response](#broker-request-response) | ||
- [Cache Manager](#cache-manager) | ||
- [Timer Features](#timer-features) | ||
|
||
## Broker Request-Response | ||
|
||
Components can perform a request and get a response from the broker using the `do_broker_request_response` method. This method supports both simple request-response and streamed responses. To use this feature, the component's configuration must include a `broker_request_response` section. For details on how to configure this section, refer to the [Broker Request-Response Configuration](configuration.md#broker-request-response-configuration) in the configuration documentation. | ||
|
||
This feature would be used in the invoke method of a custom component. When the `do_broker_request_response` method is called, the component will send a message to the broker and then block until a response (or a series of streamed chunks) is received. This makes it very easy to call services that are available via the broker. | ||
|
||
### Usage | ||
|
||
```python | ||
response = self.do_broker_request_response(message, stream=False) | ||
``` | ||
|
||
For streamed responses: | ||
|
||
```python | ||
for chunk, is_last in self.do_broker_request_response(message, stream=True, streaming_complete_expression="input.payload:streaming.last_message"): | ||
# Process each chunk | ||
if is_last: | ||
break | ||
``` | ||
|
||
### Parameters | ||
|
||
- `message`: The message to send to the broker. This must have a topic and payload. | ||
- `stream` (optional): Boolean indicating whether to expect a streamed response. Default is False. | ||
- `streaming_complete_expression` (optional): An expression to evaluate on each response chunk to determine if it's the last one. This is required when `stream=True`. | ||
|
||
### Return Value | ||
|
||
- For non-streamed responses: Returns the response message. | ||
- For streamed responses: Returns a generator that yields tuples of (chunk, is_last). Each chunk is a fully formed message with the format of the response. `is_last` is a boolean indicating if the chunk is the last one. | ||
|
||
## Memory Cache | ||
|
||
The cache service provides a flexible way to store and retrieve data with optional expiration. It supports different storage backends and offers features like automatic expiry checks. | ||
|
||
### Features | ||
|
||
1. Multiple storage backends: | ||
- In-memory storage | ||
- SQLAlchemy-based storage (for persistent storage) | ||
|
||
2. Key-value storage with metadata and expiry support | ||
3. Automatic expiry checks in a background thread | ||
4. Thread-safe operations | ||
|
||
### Usage | ||
|
||
Components can access the cache service through `self.cache_service`. Here are some common operations: | ||
|
||
```python | ||
# Set a value with expiry | ||
self.cache_service.set("key", "value", expiry=300) # Expires in 300 seconds | ||
|
||
# Get a value | ||
value = self.cache_service.get("key") | ||
|
||
# Delete a value | ||
self.cache_service.delete("key") | ||
|
||
# Get all values (including metadata and expiry) | ||
all_data = self.cache_service.get_all() | ||
``` | ||
|
||
### Configuration | ||
|
||
The cache service can be configured in the main configuration file: | ||
|
||
```yaml | ||
cache: | ||
backend: "memory" # or "sqlalchemy" | ||
connection_string: "sqlite:///cache.db" # for SQLAlchemy backend | ||
``` | ||
## Timer Features | ||
The timer manager allows components to schedule one-time or recurring timer events. This is useful for implementing delayed actions, periodic tasks, or timeouts. | ||
### Features | ||
1. One-time and recurring timers | ||
2. Customizable timer IDs for easy management | ||
3. Optional payloads for timer events | ||
### Usage | ||
Components can access the timer manager through `self.timer_manager`. Here are some common operations: | ||
|
||
```python | ||
# Add a one-time timer | ||
self.add_timer(delay_ms=5000, timer_id="my_timer", payload={"key": "value"}) | ||
# Add a recurring timer | ||
self.add_timer(delay_ms=5000, timer_id="recurring_timer", interval_ms=10000, payload={"type": "recurring"}) | ||
# Cancel a timer | ||
self.cancel_timer(timer_id="my_timer") | ||
``` | ||
|
||
### Handling Timer Events | ||
|
||
To handle timer events, components should implement the `handle_timer_event` method: | ||
|
||
```python | ||
def handle_timer_event(self, timer_data): | ||
timer_id = timer_data["timer_id"] | ||
payload = timer_data["payload"] | ||
# Process the timer event | ||
``` | ||
|
||
Timer events are automatically dispatched to the appropriate component by the timer manager. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
# Custom Components | ||
|
||
## Purpose | ||
|
||
Custom components provide a way to extend the functionality of the Solace AI Connector beyond what's possible with the built-in components and configuration options. Sometimes, it's easier and more efficient to add custom code than to build a complex configuration file, especially for specialized or unique processing requirements. | ||
|
||
## Requirements of a Custom Component | ||
|
||
To create a custom component, you need to follow these requirements: | ||
|
||
1. **Inherit from ComponentBase**: Your custom component class should inherit from the `ComponentBase` class. | ||
|
||
2. **Info Section**: Define an `info` dictionary with the following keys: | ||
- `class_name`: The name of your custom component class. | ||
- `config_parameters`: A list of dictionaries describing the configuration parameters for your component. | ||
- `input_schema`: A dictionary describing the expected input schema. | ||
- `output_schema`: A dictionary describing the expected output schema. | ||
|
||
3. **Implement the `invoke` method**: This is the main method where your component's logic will be implemented. | ||
|
||
Here's a basic template for a custom component: | ||
|
||
```python | ||
from solace_ai_connector.components.component_base import ComponentBase | ||
|
||
info = { | ||
"class_name": "MyCustomComponent", | ||
"config_parameters": [ | ||
{ | ||
"name": "my_param", | ||
"type": "string", | ||
"required": True, | ||
"description": "A custom parameter" | ||
} | ||
], | ||
"input_schema": { | ||
"type": "object", | ||
"properties": { | ||
"input_data": {"type": "string"} | ||
} | ||
}, | ||
"output_schema": { | ||
"type": "object", | ||
"properties": { | ||
"output_data": {"type": "string"} | ||
} | ||
} | ||
} | ||
|
||
class MyCustomComponent(ComponentBase): | ||
def __init__(self, **kwargs): | ||
super().__init__(info, **kwargs) | ||
self.my_param = self.get_config("my_param") | ||
|
||
def invoke(self, message, data): | ||
# Your custom logic here | ||
result = f"{self.my_param}: {data['input_data']}" | ||
return {"output_data": result} | ||
``` | ||
|
||
## Overrideable Methods | ||
|
||
While the `invoke` method is the main one you'll implement, there are several other methods you can override to customize your component's behavior: | ||
|
||
1. `invoke(self, message, data)`: The main processing method for your component. | ||
2. `get_next_event(self)`: Customize how your component receives events. | ||
3. `send_message(self, message)`: Customize how your component sends messages to the next component. | ||
4. `handle_timer_event(self, timer_data)`: Handle timer events if your component uses timers. | ||
5. `handle_cache_expiry_event(self, timer_data)`: Handle cache expiry events if your component uses the cache service. | ||
6. `process_pre_invoke(self, message)`: Customize preprocessing before `invoke` is called. | ||
7. `process_post_invoke(self, result, message)`: Customize postprocessing after `invoke` is called. | ||
|
||
## Advanced Features | ||
|
||
Custom components can take advantage of advanced features provided by the Solace AI Connector. These include: | ||
|
||
- Broker request-response functionality | ||
- Cache services | ||
- Timer management | ||
|
||
For more information on these advanced features and how to use them in your custom components, please refer to the [Advanced Component Features](advanced_component_features.md) documentation. | ||
|
||
By creating custom components, you can extend the Solace AI Connector to meet your specific needs while still benefiting from the framework's built-in capabilities for event processing, flow management, and integration with Solace event brokers. | ||
|
||
## Example | ||
|
||
See the [Tips and Tricks page](tips_and_tricks.md) for an example of creating a custom component. | ||
|
||
|
||
[] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
52 changes: 52 additions & 0 deletions
52
examples/llm/custom_components/llm_streaming_custom_component.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
# A simple pass-through component - what goes in comes out | ||
|
||
import sys | ||
|
||
sys.path.append("src") | ||
|
||
from solace_ai_connector.components.component_base import ComponentBase | ||
from solace_ai_connector.common.message import Message | ||
|
||
|
||
info = { | ||
"class_name": "LlmStreamingCustomComponent", | ||
"description": "Do a blocking LLM request/response", | ||
"config_parameters": [ | ||
{ | ||
"name": "llm_request_topic", | ||
"description": "The topic to send the request to", | ||
"type": "string", | ||
} | ||
], | ||
"input_schema": { | ||
"type": "object", | ||
"properties": {}, | ||
}, | ||
"output_schema": { | ||
"type": "object", | ||
"properties": {}, | ||
}, | ||
} | ||
|
||
|
||
class LlmStreamingCustomComponent(ComponentBase): | ||
def __init__(self, **kwargs): | ||
super().__init__(info, **kwargs) | ||
self.llm_request_topic = self.get_config("llm_request_topic") | ||
|
||
def invoke(self, message, data): | ||
llm_message = Message(payload=data, topic=self.llm_request_topic) | ||
for message, last_message in self.do_broker_request_response( | ||
llm_message, | ||
stream=True, | ||
streaming_complete_expression="input.payload:last_chunk", | ||
): | ||
text = message.get_data("input.payload:chunk") | ||
if not text: | ||
text = message.get_data("input.payload:content") or "no response" | ||
if last_message: | ||
return {"chunk": text} | ||
self.output_streaming(message, {"chunk": text}) | ||
|
||
def output_streaming(self, message, data): | ||
return self.process_post_invoke(data, message) |
Oops, something went wrong.