Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New features: inline broker request-response, temporary queues, improved docs and examples and better testing #39

Merged
merged 20 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
67e01f9
Examples Update + Code Refactor (#25)
cyrus2281 Aug 27, 2024
31adf19
Added support for temporary queue + UUID queue name (#26)
cyrus2281 Aug 28, 2024
3ede57a
Add assembly component and auto-generated documents (#27)
cyrus2281 Aug 29, 2024
c95b7f3
Added MoA Example + UUID Invoke Function (#28)
cyrus2281 Aug 29, 2024
8fa025e
Update documentation for new users + Refactored component_input & sou…
cyrus2281 Sep 3, 2024
3f33985
Fixed solace disconnection issues on shutting down (#30)
cyrus2281 Sep 3, 2024
8c9434f
Add RAG example for AI connector + delete action for vector index (#31)
cyrus2281 Sep 4, 2024
3c9b8ba
chore: Refactor make_history_start_with_user_message method (#32)
efunneko Sep 5, 2024
8f936ea
Merge branch 'SolaceLabs:main' into main
artyom-morozov Sep 6, 2024
fc26447
Keep history depth needs to be a positive integer and test refactor (…
efunneko Sep 9, 2024
113b930
Fix for anthropic example (#35)
cyrus2281 Sep 11, 2024
43821d9
Updating version dependency (#37)
cyrus2281 Sep 17, 2024
1a96bae
Fixed url and file name in getting started (#38)
cyrus2281 Sep 19, 2024
9f2891f
Add guide for RAG (#39)
cyrus2281 Sep 19, 2024
be10555
Added link to other docs from RAG guide (#40)
cyrus2281 Sep 20, 2024
2e1c1c6
chore: added a timeout setting for running component tests so that yo…
efunneko Sep 23, 2024
c2d77a4
AI-124: Add a feature to provide simple blocking broker request/respo…
efunneko Sep 23, 2024
0146c9d
feat: AI-129: add ability to specify a default value for a an environ…
efunneko Sep 24, 2024
49cf9b6
DATAGO-85484 Bump min python version
artyom-morozov Sep 26, 2024
49f0d06
Merge pull request #44 from SolaceDev/DATAGO-85484-fix-ci
artyom-morozov Sep 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ log:

shared_config:
- broker_config: &broker_connection
broker_connection_share: ${SOLACE_BROKER_URL}
broker_type: solace
broker_url: ${SOLACE_BROKER_URL}
broker_username: ${SOLACE_BROKER_USERNAME}
Expand Down
120 changes: 120 additions & 0 deletions docs/advanced_component_features.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Advanced Component Features

This document describes advanced features available to custom components in the Solace AI Connector.

## Table of Contents
- [Broker Request-Response](#broker-request-response)
- [Cache Manager](#cache-manager)
- [Timer Features](#timer-features)

## Broker Request-Response

Components can perform a request and get a response from the broker using the `do_broker_request_response` method. This method supports both simple request-response and streamed responses. To use this feature, the component's configuration must include a `broker_request_response` section. For details on how to configure this section, refer to the [Broker Request-Response Configuration](configuration.md#broker-request-response-configuration) in the configuration documentation.

This feature would be used in the invoke method of a custom component. When the `do_broker_request_response` method is called, the component will send a message to the broker and then block until a response (or a series of streamed chunks) is received. This makes it very easy to call services that are available via the broker.

### Usage

```python
response = self.do_broker_request_response(message, stream=False)
```

For streamed responses:

```python
for chunk, is_last in self.do_broker_request_response(message, stream=True, streaming_complete_expression="input.payload:streaming.last_message"):
# Process each chunk
if is_last:
break
```

### Parameters

- `message`: The message to send to the broker. This must have a topic and payload.
- `stream` (optional): Boolean indicating whether to expect a streamed response. Default is False.
- `streaming_complete_expression` (optional): An expression to evaluate on each response chunk to determine if it's the last one. This is required when `stream=True`.

### Return Value

- For non-streamed responses: Returns the response message.
- For streamed responses: Returns a generator that yields tuples of (chunk, is_last). Each chunk is a fully formed message with the format of the response. `is_last` is a boolean indicating if the chunk is the last one.

## Memory Cache

The cache service provides a flexible way to store and retrieve data with optional expiration. It supports different storage backends and offers features like automatic expiry checks.

### Features

1. Multiple storage backends:
- In-memory storage
- SQLAlchemy-based storage (for persistent storage)

2. Key-value storage with metadata and expiry support
3. Automatic expiry checks in a background thread
4. Thread-safe operations

### Usage

Components can access the cache service through `self.cache_service`. Here are some common operations:

```python
# Set a value with expiry
self.cache_service.set("key", "value", expiry=300) # Expires in 300 seconds

# Get a value
value = self.cache_service.get("key")

# Delete a value
self.cache_service.delete("key")

# Get all values (including metadata and expiry)
all_data = self.cache_service.get_all()
```

### Configuration

The cache service can be configured in the main configuration file:

```yaml
cache:
backend: "memory" # or "sqlalchemy"
connection_string: "sqlite:///cache.db" # for SQLAlchemy backend
```

## Timer Features

The timer manager allows components to schedule one-time or recurring timer events. This is useful for implementing delayed actions, periodic tasks, or timeouts.

### Features

1. One-time and recurring timers
2. Customizable timer IDs for easy management
3. Optional payloads for timer events

### Usage

Components can access the timer manager through `self.timer_manager`. Here are some common operations:

```python
# Add a one-time timer
self.add_timer(delay_ms=5000, timer_id="my_timer", payload={"key": "value"})

# Add a recurring timer
self.add_timer(delay_ms=5000, timer_id="recurring_timer", interval_ms=10000, payload={"type": "recurring"})

# Cancel a timer
self.cancel_timer(timer_id="my_timer")
```

### Handling Timer Events

To handle timer events, components should implement the `handle_timer_event` method:

```python
def handle_timer_event(self, timer_data):
timer_id = timer_data["timer_id"]
payload = timer_data["payload"]
# Process the timer event
```

Timer events are automatically dispatched to the appropriate component by the timer manager.
33 changes: 32 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ Each component configuration is a dictionary with the following keys:
- `input_selection`: <dictionary> - A `source_expression` or `source_value` to use as the input to the component. Check [Expression Syntax](#expression-syntax) for more details. [Optional: If not specified, the complete previous component output will be used]
- `queue_depth`: <int> - The depth of the input queue for the component.
- `num_instances`: <int> - The number of instances of the component to run (Starts multiple threads to process messages)

- `broker_request_response`: <dictionary> - Configuration for the broker request-response functionality. [Optional]

### component_module

Expand Down Expand Up @@ -359,6 +359,37 @@ The `queue_depth` is an integer that specifies the depth of the input queue for

The `num_instances` is an integer that specifies the number of instances of the component to run. This is the number of threads that will be started to process messages from the input queue. By default, the number of instances is 1.

### Broker Request-Response Configuration

The `broker_request_response` configuration allows components to perform request-response operations with a broker. It has the following structure:

```yaml
broker_request_response:
enabled: <boolean>
broker_config:
broker_type: <string>
broker_url: <string>
broker_username: <string>
broker_password: <string>
broker_vpn: <string>
payload_encoding: <string>
payload_format: <string>
request_expiry_ms: <int>
```

- `enabled`: Set to `true` to enable broker request-response functionality for the component.
- `broker_config`: Configuration for the broker connection.
- `broker_type`: Type of the broker (e.g., "solace").
- `broker_url`: URL of the broker.
- `broker_username`: Username for broker authentication.
- `broker_password`: Password for broker authentication.
- `broker_vpn`: VPN name for the broker connection.
- `payload_encoding`: Encoding for the payload (e.g., "utf-8", "base64").
- `payload_format`: Format of the payload (e.g., "json", "text").
- `request_expiry_ms`: Expiry time for requests in milliseconds.

For more details on using this functionality, see the [Advanced Component Features](advanced_component_features.md#broker-request-response) documentation.

### Built-in components

The AI Event Connector comes with a number of built-in components that can be used to process messages. For a list of all built-in components, see the [Components](components/index.md) documentation.
Expand Down
90 changes: 90 additions & 0 deletions docs/custom_components.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Custom Components

## Purpose

Custom components provide a way to extend the functionality of the Solace AI Connector beyond what's possible with the built-in components and configuration options. Sometimes, it's easier and more efficient to add custom code than to build a complex configuration file, especially for specialized or unique processing requirements.

## Requirements of a Custom Component

To create a custom component, you need to follow these requirements:

1. **Inherit from ComponentBase**: Your custom component class should inherit from the `ComponentBase` class.

2. **Info Section**: Define an `info` dictionary with the following keys:
- `class_name`: The name of your custom component class.
- `config_parameters`: A list of dictionaries describing the configuration parameters for your component.
- `input_schema`: A dictionary describing the expected input schema.
- `output_schema`: A dictionary describing the expected output schema.

3. **Implement the `invoke` method**: This is the main method where your component's logic will be implemented.

Here's a basic template for a custom component:

```python
from solace_ai_connector.components.component_base import ComponentBase

info = {
"class_name": "MyCustomComponent",
"config_parameters": [
{
"name": "my_param",
"type": "string",
"required": True,
"description": "A custom parameter"
}
],
"input_schema": {
"type": "object",
"properties": {
"input_data": {"type": "string"}
}
},
"output_schema": {
"type": "object",
"properties": {
"output_data": {"type": "string"}
}
}
}

class MyCustomComponent(ComponentBase):
def __init__(self, **kwargs):
super().__init__(info, **kwargs)
self.my_param = self.get_config("my_param")

def invoke(self, message, data):
# Your custom logic here
result = f"{self.my_param}: {data['input_data']}"
return {"output_data": result}
```

## Overrideable Methods

While the `invoke` method is the main one you'll implement, there are several other methods you can override to customize your component's behavior:

1. `invoke(self, message, data)`: The main processing method for your component.
2. `get_next_event(self)`: Customize how your component receives events.
3. `send_message(self, message)`: Customize how your component sends messages to the next component.
4. `handle_timer_event(self, timer_data)`: Handle timer events if your component uses timers.
5. `handle_cache_expiry_event(self, timer_data)`: Handle cache expiry events if your component uses the cache service.
6. `process_pre_invoke(self, message)`: Customize preprocessing before `invoke` is called.
7. `process_post_invoke(self, result, message)`: Customize postprocessing after `invoke` is called.

## Advanced Features

Custom components can take advantage of advanced features provided by the Solace AI Connector. These include:

- Broker request-response functionality
- Cache services
- Timer management

For more information on these advanced features and how to use them in your custom components, please refer to the [Advanced Component Features](advanced_component_features.md) documentation.

By creating custom components, you can extend the Solace AI Connector to meet your specific needs while still benefiting from the framework's built-in capabilities for event processing, flow management, and integration with Solace event brokers.

## Example

See the [Tips and Tricks page](tips_and_tricks.md) for an example of creating a custom component.


[]
4 changes: 2 additions & 2 deletions docs/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ area in the Subscriber side of the "Try me!" page.
Download the OpenAI connector example configuration file:

```sh
curl https://raw.githubusercontent.com/SolaceLabs/solace-ai-connector/main/examples/llm/openai_chat.yaml > openai_chat.yaml
curl https://raw.githubusercontent.com/SolaceLabs/solace-ai-connector/refs/heads/main/examples/llm/langchain_openai_with_history_chat.yaml > langchain_openai_with_history_chat.yaml
```

For this one, you need to also define the following additional environment variables:
Expand All @@ -94,7 +94,7 @@ pip install langchain_openai openai
Run the connector:

```sh
solace-ai-connector openai_chat.yaml
solace-ai-connector langchain_openai_with_history_chat.yaml
```

Use the "Try Me!" function on the broker's browser UI (or some other means) to publish an event like this:
Expand Down
Loading
Loading