Skip to content

Commit

Permalink
AI-124: Add a feature to provide simple blocking broker request/respo…
Browse files Browse the repository at this point in the history
…nse ability for components (#42)

* feat: add request_response_controller.py

* feat: implement RequestResponseFlowManager and RequestResponseController classes

* style: format code with black and improve readability

* feat: implement RequestResponseController for flow-based request-response handling

* feat: implement RequestResponseController for handling request-response patterns

* fix: import SolaceAiConnector for type checking

* refactor: restructure Flow class and improve code organization

* feat: implement multiple named RequestResponseControllers per component

* refactor: initialize request-response controllers in ComponentBase

* test: add request_response_controller functionality tests

* feat: finished implementation and added some tests

* refactor: rename RequestResponseController to RequestResponseFlowController

* refactor: rename RequestResponseController to RequestResponseFlowController

* refactor: some name changes

* fix: update test function names for RequestResponseFlowController

* refactor: more name changes

* Ed/req_resp_examples_and_fixes (#41)

* feat: Added a request_response_flow example and fixed a few issues along the way

* feat: Reworked the broker_request_response built-in ability of components to be simpler. Instead of having to have a defined flow and then name that flow, it will automatically create a flow with a single broker_request_response component in it. Now there is a straightforward interating function call to allow components to issue a request and get streaming or non-streaming responses from that flow.

* chore: fix the request_response example and remove the old one

* docs: add broker request-response configuration

* docs: added advanced_component_features.md

* docs: add broker request-response configuration details

* docs: add payload encoding and format to broker config

* docs: add cache service and timer manager to advanced_component_features.md

* docs: add configuration requirement for broker request-response

* docs: update broker request-response section with configuration info

* docs: a bit more detail about do_broker_request_response

* docs: add link to advanced features page in table of contents

* docs: add link to advanced features page

* docs: reorder table of contents in index.md

* docs: add custom components documentation

* docs: Remove advanced component features from table of contents

* docs: clean up a double inclusion of the same section

* docs: small example change

* chore: remove dead code

* chore: add some extra comments to explain some test code

* docs: Update description of STDIN input component

Update the description of the STDIN input component to clarify that it waits for its output message to be acknowledged before prompting for the next input. This change is made in the `stdin_input.py` file.

* chore: add is_broker_request_response_enabled method

* chore: Some changes after review
  • Loading branch information
efunneko authored Sep 23, 2024
1 parent 2e1c1c6 commit c2d77a4
Show file tree
Hide file tree
Showing 22 changed files with 1,289 additions and 70 deletions.
120 changes: 120 additions & 0 deletions docs/advanced_component_features.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Advanced Component Features

This document describes advanced features available to custom components in the Solace AI Connector.

## Table of Contents
- [Broker Request-Response](#broker-request-response)
- [Cache Manager](#cache-manager)
- [Timer Features](#timer-features)

## Broker Request-Response

Components can perform a request and get a response from the broker using the `do_broker_request_response` method. This method supports both simple request-response and streamed responses. To use this feature, the component's configuration must include a `broker_request_response` section. For details on how to configure this section, refer to the [Broker Request-Response Configuration](configuration.md#broker-request-response-configuration) in the configuration documentation.

This feature would be used in the invoke method of a custom component. When the `do_broker_request_response` method is called, the component will send a message to the broker and then block until a response (or a series of streamed chunks) is received. This makes it very easy to call services that are available via the broker.

### Usage

```python
response = self.do_broker_request_response(message, stream=False)
```

For streamed responses:

```python
for chunk, is_last in self.do_broker_request_response(message, stream=True, streaming_complete_expression="input.payload:streaming.last_message"):
# Process each chunk
if is_last:
break
```

### Parameters

- `message`: The message to send to the broker. This must have a topic and payload.
- `stream` (optional): Boolean indicating whether to expect a streamed response. Default is False.
- `streaming_complete_expression` (optional): An expression to evaluate on each response chunk to determine if it's the last one. This is required when `stream=True`.

### Return Value

- For non-streamed responses: Returns the response message.
- For streamed responses: Returns a generator that yields tuples of (chunk, is_last). Each chunk is a fully formed message with the format of the response. `is_last` is a boolean indicating if the chunk is the last one.

## Memory Cache

The cache service provides a flexible way to store and retrieve data with optional expiration. It supports different storage backends and offers features like automatic expiry checks.

### Features

1. Multiple storage backends:
- In-memory storage
- SQLAlchemy-based storage (for persistent storage)

2. Key-value storage with metadata and expiry support
3. Automatic expiry checks in a background thread
4. Thread-safe operations

### Usage

Components can access the cache service through `self.cache_service`. Here are some common operations:

```python
# Set a value with expiry
self.cache_service.set("key", "value", expiry=300) # Expires in 300 seconds

# Get a value
value = self.cache_service.get("key")

# Delete a value
self.cache_service.delete("key")

# Get all values (including metadata and expiry)
all_data = self.cache_service.get_all()
```

### Configuration

The cache service can be configured in the main configuration file:

```yaml
cache:
backend: "memory" # or "sqlalchemy"
connection_string: "sqlite:///cache.db" # for SQLAlchemy backend
```
## Timer Features
The timer manager allows components to schedule one-time or recurring timer events. This is useful for implementing delayed actions, periodic tasks, or timeouts.
### Features
1. One-time and recurring timers
2. Customizable timer IDs for easy management
3. Optional payloads for timer events
### Usage
Components can access the timer manager through `self.timer_manager`. Here are some common operations:

```python
# Add a one-time timer
self.add_timer(delay_ms=5000, timer_id="my_timer", payload={"key": "value"})
# Add a recurring timer
self.add_timer(delay_ms=5000, timer_id="recurring_timer", interval_ms=10000, payload={"type": "recurring"})
# Cancel a timer
self.cancel_timer(timer_id="my_timer")
```

### Handling Timer Events

To handle timer events, components should implement the `handle_timer_event` method:

```python
def handle_timer_event(self, timer_data):
timer_id = timer_data["timer_id"]
payload = timer_data["payload"]
# Process the timer event
```

Timer events are automatically dispatched to the appropriate component by the timer manager.
33 changes: 32 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ Each component configuration is a dictionary with the following keys:
- `input_selection`: <dictionary> - A `source_expression` or `source_value` to use as the input to the component. Check [Expression Syntax](#expression-syntax) for more details. [Optional: If not specified, the complete previous component output will be used]
- `queue_depth`: <int> - The depth of the input queue for the component.
- `num_instances`: <int> - The number of instances of the component to run (Starts multiple threads to process messages)

- `broker_request_response`: <dictionary> - Configuration for the broker request-response functionality. [Optional]

### component_module

Expand Down Expand Up @@ -359,6 +359,37 @@ The `queue_depth` is an integer that specifies the depth of the input queue for

The `num_instances` is an integer that specifies the number of instances of the component to run. This is the number of threads that will be started to process messages from the input queue. By default, the number of instances is 1.

### Broker Request-Response Configuration

The `broker_request_response` configuration allows components to perform request-response operations with a broker. It has the following structure:

```yaml
broker_request_response:
enabled: <boolean>
broker_config:
broker_type: <string>
broker_url: <string>
broker_username: <string>
broker_password: <string>
broker_vpn: <string>
payload_encoding: <string>
payload_format: <string>
request_expiry_ms: <int>
```

- `enabled`: Set to `true` to enable broker request-response functionality for the component.
- `broker_config`: Configuration for the broker connection.
- `broker_type`: Type of the broker (e.g., "solace").
- `broker_url`: URL of the broker.
- `broker_username`: Username for broker authentication.
- `broker_password`: Password for broker authentication.
- `broker_vpn`: VPN name for the broker connection.
- `payload_encoding`: Encoding for the payload (e.g., "utf-8", "base64").
- `payload_format`: Format of the payload (e.g., "json", "text").
- `request_expiry_ms`: Expiry time for requests in milliseconds.

For more details on using this functionality, see the [Advanced Component Features](advanced_component_features.md#broker-request-response) documentation.

### Built-in components

The AI Event Connector comes with a number of built-in components that can be used to process messages. For a list of all built-in components, see the [Components](components/index.md) documentation.
Expand Down
90 changes: 90 additions & 0 deletions docs/custom_components.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Custom Components

## Purpose

Custom components provide a way to extend the functionality of the Solace AI Connector beyond what's possible with the built-in components and configuration options. Sometimes, it's easier and more efficient to add custom code than to build a complex configuration file, especially for specialized or unique processing requirements.

## Requirements of a Custom Component

To create a custom component, you need to follow these requirements:

1. **Inherit from ComponentBase**: Your custom component class should inherit from the `ComponentBase` class.

2. **Info Section**: Define an `info` dictionary with the following keys:
- `class_name`: The name of your custom component class.
- `config_parameters`: A list of dictionaries describing the configuration parameters for your component.
- `input_schema`: A dictionary describing the expected input schema.
- `output_schema`: A dictionary describing the expected output schema.

3. **Implement the `invoke` method**: This is the main method where your component's logic will be implemented.

Here's a basic template for a custom component:

```python
from solace_ai_connector.components.component_base import ComponentBase

info = {
"class_name": "MyCustomComponent",
"config_parameters": [
{
"name": "my_param",
"type": "string",
"required": True,
"description": "A custom parameter"
}
],
"input_schema": {
"type": "object",
"properties": {
"input_data": {"type": "string"}
}
},
"output_schema": {
"type": "object",
"properties": {
"output_data": {"type": "string"}
}
}
}

class MyCustomComponent(ComponentBase):
def __init__(self, **kwargs):
super().__init__(info, **kwargs)
self.my_param = self.get_config("my_param")

def invoke(self, message, data):
# Your custom logic here
result = f"{self.my_param}: {data['input_data']}"
return {"output_data": result}
```

## Overrideable Methods

While the `invoke` method is the main one you'll implement, there are several other methods you can override to customize your component's behavior:

1. `invoke(self, message, data)`: The main processing method for your component.
2. `get_next_event(self)`: Customize how your component receives events.
3. `send_message(self, message)`: Customize how your component sends messages to the next component.
4. `handle_timer_event(self, timer_data)`: Handle timer events if your component uses timers.
5. `handle_cache_expiry_event(self, timer_data)`: Handle cache expiry events if your component uses the cache service.
6. `process_pre_invoke(self, message)`: Customize preprocessing before `invoke` is called.
7. `process_post_invoke(self, result, message)`: Customize postprocessing after `invoke` is called.

## Advanced Features

Custom components can take advantage of advanced features provided by the Solace AI Connector. These include:

- Broker request-response functionality
- Cache services
- Timer management

For more information on these advanced features and how to use them in your custom components, please refer to the [Advanced Component Features](advanced_component_features.md) documentation.

By creating custom components, you can extend the Solace AI Connector to meet your specific needs while still benefiting from the framework's built-in capabilities for event processing, flow management, and integration with Solace event brokers.

## Example

See the [Tips and Tricks page](tips_and_tricks.md) for an example of creating a custom component.


[]
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This connector application makes it easy to connect your AI/ML models to Solace
- [Configuration](configuration.md)
- [Components](components/index.md)
- [Transforms](transforms/index.md)
- [Custom Components](custom_components.md)
- [Tips and Tricks](tips_and_tricks.md)
- [Guides](guides/index.md)
- [Examples](../examples/)
Expand Down
Empty file.
52 changes: 52 additions & 0 deletions examples/llm/custom_components/llm_streaming_custom_component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# A simple pass-through component - what goes in comes out

import sys

sys.path.append("src")

from solace_ai_connector.components.component_base import ComponentBase
from solace_ai_connector.common.message import Message


info = {
"class_name": "LlmStreamingCustomComponent",
"description": "Do a blocking LLM request/response",
"config_parameters": [
{
"name": "llm_request_topic",
"description": "The topic to send the request to",
"type": "string",
}
],
"input_schema": {
"type": "object",
"properties": {},
},
"output_schema": {
"type": "object",
"properties": {},
},
}


class LlmStreamingCustomComponent(ComponentBase):
def __init__(self, **kwargs):
super().__init__(info, **kwargs)
self.llm_request_topic = self.get_config("llm_request_topic")

def invoke(self, message, data):
llm_message = Message(payload=data, topic=self.llm_request_topic)
for message, last_message in self.do_broker_request_response(
llm_message,
stream=True,
streaming_complete_expression="input.payload:last_chunk",
):
text = message.get_data("input.payload:chunk")
if not text:
text = message.get_data("input.payload:content") or "no response"
if last_message:
return {"chunk": text}
self.output_streaming(message, {"chunk": text})

def output_streaming(self, message, data):
return self.process_post_invoke(data, message)
Loading

0 comments on commit c2d77a4

Please sign in to comment.