Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing merge issue #46

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ jobs:
uses: SolaceDev/solace-public-workflows/.github/workflows/hatch_ci.yml@main
with:
min-python-version: "3.10"
whitesource_product_name: "solaceai"
secrets:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: ${{ vars.SONAR_HOST_URL }}
WHITESOURCE_API_KEY: ${{ secrets.WHITESOURCE_API_KEY }}
MANIFEST_AWS_ACCESS_KEY_ID: ${{ secrets.MANIFEST_READ_ONLY_AWS_ACCESS_KEY_ID }}
MANIFEST_AWS_SECRET_ACCESS_KEY: ${{ secrets.MANIFEST_READ_ONLY_AWS_SECRET_ACCESS_KEY }}
structure-test:
name: Test Docker Image Structure
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions docs/components/broker_request_response.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ component_config:
payload_format: <string>
response_topic_prefix: <string>
response_topic_suffix: <string>
reply_queue_prefix: <string>
response_queue_prefix: <string>
request_expiry_ms: <integer>
streaming: <string>
streaming_complete_expression: <string>
Expand All @@ -34,7 +34,7 @@ component_config:
| payload_format | False | json | Format for the payload (json, yaml, text) |
| response_topic_prefix | False | reply | Prefix for reply topics |
| response_topic_suffix | False | | Suffix for reply topics |
| reply_queue_prefix | False | reply-queue | Prefix for reply queues |
| response_queue_prefix | False | reply-queue | Prefix for reply queues |
| request_expiry_ms | False | 60000 | Expiry time for cached requests in milliseconds |
| streaming | False | | The response will arrive in multiple pieces. If True, the streaming_complete_expression must be set and will be used to determine when the last piece has arrived. |
| streaming_complete_expression | False | | The source expression to determine when the last piece of a streaming response has arrived. |
Expand Down
6 changes: 0 additions & 6 deletions docs/components/langchain_chat_model_with_history.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ component_config:
history_module: <string>
history_class: <string>
history_config: <object>
stream_to_flow: <string>
llm_mode: <string>
stream_batch_size: <string>
set_response_uuid_in_user_properties: <boolean>
```

Expand All @@ -38,9 +35,6 @@ component_config:
| history_module | False | langchain_community.chat_message_histories | The module that contains the history class. Default: 'langchain_community.chat_message_histories' |
| history_class | False | ChatMessageHistory | The class to use for the history. Default: 'ChatMessageHistory' |
| history_config | False | | The configuration for the history class. |
| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. |
| llm_mode | False | | The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. |
| stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. |
| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. |


Expand Down
8 changes: 8 additions & 0 deletions docs/components/litellm_chat_model.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ component_config:
history_max_time: <string>
history_max_turns: <string>
history_max_time: <string>
stream_to_flow: <string>
stream_to_next_component: <string>
llm_mode: <string>
stream_batch_size: <string>
```

| Parameter | Required | Default | Description |
Expand All @@ -36,6 +40,10 @@ component_config:
| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) |
| history_max_turns | False | 10 | Maximum number of conversation turns to keep in history |
| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) |
| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. |
| stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. |
| llm_mode | False | none | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. |
| stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. |


## Component Input Schema
Expand Down
8 changes: 8 additions & 0 deletions docs/components/litellm_embeddings.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ component_config:
history_max_time: <string>
history_max_turns: <string>
history_max_time: <string>
stream_to_flow: <string>
stream_to_next_component: <string>
llm_mode: <string>
stream_batch_size: <string>
```

| Parameter | Required | Default | Description |
Expand All @@ -36,6 +40,10 @@ component_config:
| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) |
| history_max_turns | False | 10 | Maximum number of conversation turns to keep in history |
| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) |
| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. |
| stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. |
| llm_mode | False | none | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. |
| stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. |


## Component Input Schema
Expand Down
30 changes: 28 additions & 2 deletions examples/llm/anthropic_chat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
#
# The input message has the following schema:
# {
# "text": "<question or request as text>"
# "query": "<question or request as text>",
# "stream": false
# }
#
# It will then send an event back to Solace with the topic: `demo/question/response`
Expand Down Expand Up @@ -66,17 +67,22 @@ flows:
base_url: ${ANTHROPIC_API_ENDPOINT}
model: ${MODEL_NAME}
temperature: 0.01
llm_mode: stream
stream_to_flow: stream_output
input_transforms:
- type: copy
source_expression: |
template:You are a helpful AI assistant. Please help with the user's request below:
<user-question>
{{text://input.payload:text}}
{{text://input.payload:query}}
</user-question>
dest_expression: user_data.llm_input:messages.0.content
- type: copy
source_expression: static:user
dest_expression: user_data.llm_input:messages.0.role
- type: copy
source_expression: input.payload:stream
dest_expression: user_data.llm_input:stream
input_selection:
source_expression: user_data.llm_input

Expand All @@ -97,3 +103,23 @@ flows:
dest_expression: user_data.output:topic
input_selection:
source_expression: user_data.output

- name: stream_output
components:
# Send response back to broker
- component_name: send_response
component_module: broker_output
component_config:
<<: *broker_connection
payload_encoding: utf-8
payload_format: json
copy_user_properties: true
input_transforms:
- type: copy
source_expression: input.payload
dest_expression: user_data.output:payload
- type: copy
source_value: demo/question/stream
dest_expression: user_data.output:topic
input_selection:
source_expression: user_data.output
10 changes: 10 additions & 0 deletions src/solace_ai_connector/components/component_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,16 @@ def setup_broker_request_response(self):
"broker_config": broker_config,
"request_expiry_ms": request_expiry_ms,
}

if "response_topic_prefix" in self.broker_request_response_config:
rrc_config["response_topic_prefix"] = self.broker_request_response_config[
"response_topic_prefix"
]
if "response_queue_prefix" in self.broker_request_response_config:
rrc_config["response_queue_prefix"] = self.broker_request_response_config[
"response_queue_prefix"
]

self.broker_request_response_controller = RequestResponseFlowController(
config=rrc_config, connector=self.connector
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# This is a wrapper around all the LangChain chat models
# The configuration will control dynamic loading of the chat models
from uuid import uuid4
from copy import deepcopy
from collections import namedtuple
from .langchain_chat_model_base import (
LangChainChatModelBase,
info_base,
Expand All @@ -17,6 +19,48 @@ def __init__(self, **kwargs):
super().__init__(info, **kwargs)

def invoke_model(
self, input_message, messages, session_id=None, clear_history=False
self,
input_message,
messages,
session_id=None,
clear_history=False,
stream=False,
):
return self.component.invoke(messages)
if not stream:
return self.component.invoke(messages)

aggregate_result = ""
current_batch = ""
response_uuid = str(uuid4())
first_chunk = True

for chunk in self.component.stream(messages):
aggregate_result += chunk.content
current_batch += chunk.content
if len(current_batch) >= self.stream_batch_size:
if self.stream_to_flow:
self.send_streaming_message(
input_message,
current_batch,
aggregate_result,
response_uuid,
first_chunk,
)
current_batch = ""
first_chunk = False

if self.stream_to_flow:
self.send_streaming_message(
input_message,
current_batch,
aggregate_result,
response_uuid,
first_chunk,
True,
)

result = namedtuple("Result", ["content", "response_uuid"])(
aggregate_result, response_uuid
)

return result
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from abc import abstractmethod
from langchain_core.output_parsers import JsonOutputParser

from .....common.message import Message
from .....common.utils import get_obj_text
from langchain.schema.messages import (
HumanMessage,
Expand Down Expand Up @@ -39,6 +40,23 @@
"description": "Model specific configuration for the chat model. "
"See documentation for valid parameter names.",
},
{
"name": "llm_mode",
"required": False,
"description": "The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response.",
},
{
"name": "stream_to_flow",
"required": False,
"description": "Name the flow to stream the output to - this must be configured for llm_mode='stream'.",
"default": "",
},
{
"name": "stream_batch_size",
"required": False,
"description": "The minimum number of words in a single streaming result. Default: 15.",
"default": 15,
},
{
"name": "llm_response_format",
"required": False,
Expand Down Expand Up @@ -88,10 +106,17 @@


class LangChainChatModelBase(LangChainBase):

def __init__(self, info, **kwargs):
super().__init__(info, **kwargs)
self.llm_mode = self.get_config("llm_mode", "none")
self.stream_to_flow = self.get_config("stream_to_flow", "")
self.stream_batch_size = self.get_config("stream_batch_size", 15)

def invoke(self, message, data):
messages = []

for item in data["messages"]:
for item in data.get("messages"):
if item["role"] == "system":
messages.append(SystemMessage(content=item["content"]))
elif item["role"] == "user" or item["role"] == "human":
Expand All @@ -109,9 +134,14 @@ def invoke(self, message, data):

session_id = data.get("session_id", None)
clear_history = data.get("clear_history", False)
stream = data.get("stream", self.llm_mode == "stream")

llm_res = self.invoke_model(
message, messages, session_id=session_id, clear_history=clear_history
message,
messages,
session_id=session_id,
clear_history=clear_history,
stream=stream,
)

res_format = self.get_config("llm_response_format", "text")
Expand All @@ -134,6 +164,32 @@ def invoke(self, message, data):

@abstractmethod
def invoke_model(
self, input_message, messages, session_id=None, clear_history=False
self,
input_message,
messages,
session_id=None,
clear_history=False,
stream=False,
):
pass

def send_streaming_message(
self,
input_message,
chunk,
aggregate_result,
response_uuid,
first_chunk=False,
last_chunk=False,
):
message = Message(
payload={
"chunk": chunk,
"content": aggregate_result,
"response_uuid": response_uuid,
"first_chunk": first_chunk,
"last_chunk": last_chunk,
},
user_properties=input_message.get_user_properties(),
)
self.send_to_flow(self.stream_to_flow, message)
Loading
Loading