Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AI-124: Add a feature to provide simple blocking broker request/response ability for components #42

Merged
merged 38 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
6cbe54a
feat: add request_response_controller.py
efunneko Sep 13, 2024
4f9f79c
feat: implement RequestResponseFlowManager and RequestResponseControl…
efunneko Sep 13, 2024
139ed70
style: format code with black and improve readability
efunneko Sep 13, 2024
1e54049
feat: implement RequestResponseController for flow-based request-resp…
efunneko Sep 13, 2024
e16b0ca
feat: implement RequestResponseController for handling request-respon…
efunneko Sep 13, 2024
776622f
fix: import SolaceAiConnector for type checking
efunneko Sep 13, 2024
e90b136
refactor: restructure Flow class and improve code organization
efunneko Sep 13, 2024
e9ab2f5
feat: implement multiple named RequestResponseControllers per component
efunneko Sep 13, 2024
11b345b
refactor: initialize request-response controllers in ComponentBase
efunneko Sep 13, 2024
9f5817a
test: add request_response_controller functionality tests
efunneko Sep 15, 2024
03e8292
feat: finished implementation and added some tests
efunneko Sep 16, 2024
9943637
refactor: rename RequestResponseController to RequestResponseFlowCont…
efunneko Sep 16, 2024
49796c8
refactor: rename RequestResponseController to RequestResponseFlowCont…
efunneko Sep 16, 2024
3db7fa9
refactor: some name changes
efunneko Sep 16, 2024
a6c0c5d
fix: update test function names for RequestResponseFlowController
efunneko Sep 16, 2024
b836da7
refactor: more name changes
efunneko Sep 16, 2024
4ab4bfc
Ed/req_resp_examples_and_fixes (#41)
efunneko Sep 21, 2024
5161637
docs: add broker request-response configuration
efunneko Sep 22, 2024
38797b7
docs: added advanced_component_features.md
efunneko Sep 22, 2024
cefb69a
docs: add broker request-response configuration details
efunneko Sep 22, 2024
577991f
docs: add payload encoding and format to broker config
efunneko Sep 22, 2024
1e8e7b4
docs: add cache service and timer manager to advanced_component_featu…
efunneko Sep 22, 2024
bc0d67f
docs: add configuration requirement for broker request-response
efunneko Sep 22, 2024
1efbdcb
docs: update broker request-response section with configuration info
efunneko Sep 22, 2024
5319ef3
docs: a bit more detail about do_broker_request_response
efunneko Sep 22, 2024
9347eb9
docs: add link to advanced features page in table of contents
efunneko Sep 22, 2024
47c2c40
docs: add link to advanced features page
efunneko Sep 22, 2024
ae17c3d
docs: reorder table of contents in index.md
efunneko Sep 22, 2024
959c743
docs: add custom components documentation
efunneko Sep 22, 2024
44fffec
docs: Remove advanced component features from table of contents
efunneko Sep 22, 2024
51ce819
docs: clean up a double inclusion of the same section
efunneko Sep 22, 2024
f44d896
docs: small example change
efunneko Sep 22, 2024
e1f2fbb
chore: remove dead code
efunneko Sep 22, 2024
75b4b6b
chore: add some extra comments to explain some test code
efunneko Sep 22, 2024
c52920b
docs: Update description of STDIN input component
efunneko Sep 22, 2024
338f786
chore: add is_broker_request_response_enabled method
efunneko Sep 22, 2024
2da666b
Merge branch 'main' into ed/request_response_flow
efunneko Sep 22, 2024
d2ddcca
chore: Some changes after review
efunneko Sep 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions docs/advanced_component_features.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Advanced Component Features

This document describes advanced features available to custom components in the Solace AI Connector.

## Table of Contents
- [Broker Request-Response](#broker-request-response)
- [Cache Manager](#cache-manager)
- [Timer Features](#timer-features)

## Broker Request-Response

Components can perform a request and get a response from the broker using the `do_broker_request_response` method. This method supports both simple request-response and streamed responses. To use this feature, the component's configuration must include a `broker_request_response` section. For details on how to configure this section, refer to the [Broker Request-Response Configuration](configuration.md#broker-request-response-configuration) in the configuration documentation.

This feature would be used in the invoke method of a custom component. When the `do_broker_request_response` method is called, the component will send a message to the broker and then block until a response (or a series of streamed chunks) is received. This makes it very easy to call services that are available via the broker.

### Usage

```python
response = self.do_broker_request_response(message, stream=False)
```

For streamed responses:

```python
for chunk, is_last in self.do_broker_request_response(message, stream=True, streaming_complete_expression="input.payload:streaming.last_message"):
# Process each chunk
if is_last:
break
```

### Parameters

- `message`: The message to send to the broker. This must have a topic and payload.
- `stream` (optional): Boolean indicating whether to expect a streamed response. Default is False.
- `streaming_complete_expression` (optional): An expression to evaluate on each response chunk to determine if it's the last one. This is required when `stream=True`.

### Return Value

- For non-streamed responses: Returns the response message.
- For streamed responses: Returns a generator that yields tuples of (chunk, is_last). Each chunk is a fully formed message with the format of the response. `is_last` is a boolean indicating if the chunk is the last one.

## Memory Cache

The cache service provides a flexible way to store and retrieve data with optional expiration. It supports different storage backends and offers features like automatic expiry checks.

### Features

1. Multiple storage backends:
- In-memory storage
- SQLAlchemy-based storage (for persistent storage)

2. Key-value storage with metadata and expiry support
3. Automatic expiry checks in a background thread
4. Thread-safe operations

### Usage

Components can access the cache service through `self.cache_service`. Here are some common operations:

```python
# Set a value with expiry
self.cache_service.set("key", "value", expiry=300) # Expires in 300 seconds

# Get a value
value = self.cache_service.get("key")

# Delete a value
self.cache_service.delete("key")

# Get all values (including metadata and expiry)
all_data = self.cache_service.get_all()
```

### Configuration

The cache service can be configured in the main configuration file:

```yaml
cache:
backend: "memory" # or "sqlalchemy"
connection_string: "sqlite:///cache.db" # for SQLAlchemy backend
```

## Timer Features

The timer manager allows components to schedule one-time or recurring timer events. This is useful for implementing delayed actions, periodic tasks, or timeouts.

### Features

1. One-time and recurring timers
2. Customizable timer IDs for easy management
3. Optional payloads for timer events

### Usage

Components can access the timer manager through `self.timer_manager`. Here are some common operations:

```python
# Add a one-time timer
self.add_timer(delay_ms=5000, timer_id="my_timer", payload={"key": "value"})

# Add a recurring timer
self.add_timer(delay_ms=5000, timer_id="recurring_timer", interval_ms=10000, payload={"type": "recurring"})

# Cancel a timer
self.cancel_timer(timer_id="my_timer")
```

### Handling Timer Events

To handle timer events, components should implement the `handle_timer_event` method:

```python
def handle_timer_event(self, timer_data):
timer_id = timer_data["timer_id"]
payload = timer_data["payload"]
# Process the timer event
```

Timer events are automatically dispatched to the appropriate component by the timer manager.
33 changes: 32 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ Each component configuration is a dictionary with the following keys:
- `input_selection`: <dictionary> - A `source_expression` or `source_value` to use as the input to the component. Check [Expression Syntax](#expression-syntax) for more details. [Optional: If not specified, the complete previous component output will be used]
- `queue_depth`: <int> - The depth of the input queue for the component.
- `num_instances`: <int> - The number of instances of the component to run (Starts multiple threads to process messages)

- `broker_request_response`: <dictionary> - Configuration for the broker request-response functionality. [Optional]

### component_module

Expand Down Expand Up @@ -359,6 +359,37 @@ The `queue_depth` is an integer that specifies the depth of the input queue for

The `num_instances` is an integer that specifies the number of instances of the component to run. This is the number of threads that will be started to process messages from the input queue. By default, the number of instances is 1.

### Broker Request-Response Configuration

The `broker_request_response` configuration allows components to perform request-response operations with a broker. It has the following structure:

```yaml
broker_request_response:
enabled: <boolean>
broker_config:
broker_type: <string>
broker_url: <string>
broker_username: <string>
broker_password: <string>
broker_vpn: <string>
payload_encoding: <string>
payload_format: <string>
request_expiry_ms: <int>
```

- `enabled`: Set to `true` to enable broker request-response functionality for the component.
- `broker_config`: Configuration for the broker connection.
- `broker_type`: Type of the broker (e.g., "solace").
- `broker_url`: URL of the broker.
- `broker_username`: Username for broker authentication.
- `broker_password`: Password for broker authentication.
- `broker_vpn`: VPN name for the broker connection.
- `payload_encoding`: Encoding for the payload (e.g., "utf-8", "base64").
- `payload_format`: Format of the payload (e.g., "json", "text").
- `request_expiry_ms`: Expiry time for requests in milliseconds.

For more details on using this functionality, see the [Advanced Component Features](advanced_component_features.md#broker-request-response) documentation.

### Built-in components

The AI Event Connector comes with a number of built-in components that can be used to process messages. For a list of all built-in components, see the [Components](components/index.md) documentation.
Expand Down
90 changes: 90 additions & 0 deletions docs/custom_components.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Custom Components

## Purpose

Custom components provide a way to extend the functionality of the Solace AI Connector beyond what's possible with the built-in components and configuration options. Sometimes, it's easier and more efficient to add custom code than to build a complex configuration file, especially for specialized or unique processing requirements.

## Requirements of a Custom Component

To create a custom component, you need to follow these requirements:

1. **Inherit from ComponentBase**: Your custom component class should inherit from the `ComponentBase` class.

2. **Info Section**: Define an `info` dictionary with the following keys:
- `class_name`: The name of your custom component class.
- `config_parameters`: A list of dictionaries describing the configuration parameters for your component.
- `input_schema`: A dictionary describing the expected input schema.
- `output_schema`: A dictionary describing the expected output schema.

3. **Implement the `invoke` method**: This is the main method where your component's logic will be implemented.

Here's a basic template for a custom component:

```python
from solace_ai_connector.components.component_base import ComponentBase

info = {
"class_name": "MyCustomComponent",
"config_parameters": [
{
"name": "my_param",
"type": "string",
"required": True,
"description": "A custom parameter"
}
],
"input_schema": {
"type": "object",
"properties": {
"input_data": {"type": "string"}
}
},
"output_schema": {
"type": "object",
"properties": {
"output_data": {"type": "string"}
}
}
}

class MyCustomComponent(ComponentBase):
def __init__(self, **kwargs):
super().__init__(info, **kwargs)
self.my_param = self.get_config("my_param")

def invoke(self, message, data):
# Your custom logic here
result = f"{self.my_param}: {data['input_data']}"
return {"output_data": result}
```

## Overrideable Methods

While the `invoke` method is the main one you'll implement, there are several other methods you can override to customize your component's behavior:

1. `invoke(self, message, data)`: The main processing method for your component.
2. `get_next_event(self)`: Customize how your component receives events.
3. `send_message(self, message)`: Customize how your component sends messages to the next component.
4. `handle_timer_event(self, timer_data)`: Handle timer events if your component uses timers.
5. `handle_cache_expiry_event(self, timer_data)`: Handle cache expiry events if your component uses the cache service.
6. `process_pre_invoke(self, message)`: Customize preprocessing before `invoke` is called.
7. `process_post_invoke(self, result, message)`: Customize postprocessing after `invoke` is called.

## Advanced Features

Custom components can take advantage of advanced features provided by the Solace AI Connector. These include:

- Broker request-response functionality
- Cache services
- Timer management

For more information on these advanced features and how to use them in your custom components, please refer to the [Advanced Component Features](advanced_component_features.md) documentation.

By creating custom components, you can extend the Solace AI Connector to meet your specific needs while still benefiting from the framework's built-in capabilities for event processing, flow management, and integration with Solace event brokers.

## Example

See the [Tips and Tricks page](tips_and_tricks.md) for an example of creating a custom component.


[]
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This connector application makes it easy to connect your AI/ML models to Solace
- [Configuration](configuration.md)
- [Components](components/index.md)
- [Transforms](transforms/index.md)
- [Custom Components](custom_components.md)
- [Tips and Tricks](tips_and_tricks.md)
- [Guides](guides/index.md)
- [Examples](../examples/)
Expand Down
Empty file.
52 changes: 52 additions & 0 deletions examples/llm/custom_components/llm_streaming_custom_component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# A simple pass-through component - what goes in comes out

import sys

sys.path.append("src")

from solace_ai_connector.components.component_base import ComponentBase
from solace_ai_connector.common.message import Message


info = {
"class_name": "LlmStreamingCustomComponent",
"description": "Do a blocking LLM request/response",
"config_parameters": [
{
"name": "llm_request_topic",
"description": "The topic to send the request to",
"type": "string",
}
],
"input_schema": {
"type": "object",
"properties": {},
},
"output_schema": {
"type": "object",
"properties": {},
},
}


class LlmStreamingCustomComponent(ComponentBase):
def __init__(self, **kwargs):
super().__init__(info, **kwargs)
self.llm_request_topic = self.get_config("llm_request_topic")

def invoke(self, message, data):
llm_message = Message(payload=data, topic=self.llm_request_topic)
for message, last_message in self.do_broker_request_response(
llm_message,
stream=True,
streaming_complete_expression="input.payload:last_chunk",
):
text = message.get_data("input.payload:chunk")
if not text:
text = message.get_data("input.payload:content") or "no response"
if last_message:
return {"chunk": text}
self.output_streaming(message, {"chunk": text})

def output_streaming(self, message, data):
return self.process_post_invoke(data, message)
Loading
Loading