Skip to content

Commit

Permalink
AI-95: Enhance request/response handling for streaming LLM access (#69)
Browse files Browse the repository at this point in the history
* Changes for request/response for streaming LLM access

* Updated with main

* update

---------

Co-authored-by: Edward Funnekotter <[email protected]>
  • Loading branch information
cyrus2281 and efunneko authored Nov 26, 2024
1 parent f4677f9 commit 6e1e3fc
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 71 deletions.
4 changes: 2 additions & 2 deletions docs/components/broker_request_response.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ component_config:
payload_format: <string>
response_topic_prefix: <string>
response_topic_suffix: <string>
reply_queue_prefix: <string>
response_queue_prefix: <string>
request_expiry_ms: <integer>
streaming: <string>
streaming_complete_expression: <string>
Expand All @@ -34,7 +34,7 @@ component_config:
| payload_format | False | json | Format for the payload (json, yaml, text) |
| response_topic_prefix | False | reply | Prefix for reply topics |
| response_topic_suffix | False | | Suffix for reply topics |
| reply_queue_prefix | False | reply-queue | Prefix for reply queues |
| response_queue_prefix | False | reply-queue | Prefix for reply queues |
| request_expiry_ms | False | 60000 | Expiry time for cached requests in milliseconds |
| streaming | False | | The response will arrive in multiple pieces. If True, the streaming_complete_expression must be set and will be used to determine when the last piece has arrived. |
| streaming_complete_expression | False | | The source expression to determine when the last piece of a streaming response has arrived. |
Expand Down
6 changes: 0 additions & 6 deletions docs/components/langchain_chat_model_with_history.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ component_config:
history_module: <string>
history_class: <string>
history_config: <object>
stream_to_flow: <string>
llm_mode: <string>
stream_batch_size: <string>
set_response_uuid_in_user_properties: <boolean>
```
Expand All @@ -38,9 +35,6 @@ component_config:
| history_module | False | langchain_community.chat_message_histories | The module that contains the history class. Default: 'langchain_community.chat_message_histories' |
| history_class | False | ChatMessageHistory | The class to use for the history. Default: 'ChatMessageHistory' |
| history_config | False | | The configuration for the history class. |
| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. |
| llm_mode | False | | The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. |
| stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. |
| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. |
Expand Down
8 changes: 8 additions & 0 deletions docs/components/litellm_chat_model.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ component_config:
history_max_time: <string>
history_max_turns: <string>
history_max_time: <string>
stream_to_flow: <string>
stream_to_next_component: <string>
llm_mode: <string>
stream_batch_size: <string>
```
| Parameter | Required | Default | Description |
Expand All @@ -36,6 +40,10 @@ component_config:
| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) |
| history_max_turns | False | 10 | Maximum number of conversation turns to keep in history |
| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) |
| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. |
| stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. |
| llm_mode | False | none | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. |
| stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. |
## Component Input Schema
Expand Down
8 changes: 8 additions & 0 deletions docs/components/litellm_embeddings.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ component_config:
history_max_time: <string>
history_max_turns: <string>
history_max_time: <string>
stream_to_flow: <string>
stream_to_next_component: <string>
llm_mode: <string>
stream_batch_size: <string>
```
| Parameter | Required | Default | Description |
Expand All @@ -36,6 +40,10 @@ component_config:
| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) |
| history_max_turns | False | 10 | Maximum number of conversation turns to keep in history |
| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) |
| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. |
| stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. |
| llm_mode | False | none | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. |
| stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. |
## Component Input Schema
Expand Down
1 change: 0 additions & 1 deletion examples/llm/anthropic_chat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ flows:
model: ${MODEL_NAME}
temperature: 0.01
llm_mode: stream
allow_overwrite_llm_mode: true
stream_to_flow: stream_output
input_transforms:
- type: copy
Expand Down
10 changes: 10 additions & 0 deletions src/solace_ai_connector/components/component_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,16 @@ def setup_broker_request_response(self):
"broker_config": broker_config,
"request_expiry_ms": request_expiry_ms,
}

if "response_topic_prefix" in self.broker_request_response_config:
rrc_config["response_topic_prefix"] = self.broker_request_response_config[
"response_topic_prefix"
]
if "response_queue_prefix" in self.broker_request_response_config:
rrc_config["response_queue_prefix"] = self.broker_request_response_config[
"response_queue_prefix"
]

self.broker_request_response_controller = RequestResponseFlowController(
config=rrc_config, connector=self.connector
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@
"required": False,
"description": "The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response.",
},
{
"name": "allow_overwrite_llm_mode",
"required": False,
"description": "Whether to allow the llm_mode to be overwritten by the `stream` from the input message.",
},
{
"name": "stream_to_flow",
"required": False,
Expand Down Expand Up @@ -115,7 +110,6 @@ class LangChainChatModelBase(LangChainBase):
def __init__(self, info, **kwargs):
super().__init__(info, **kwargs)
self.llm_mode = self.get_config("llm_mode", "none")
self.allow_overwrite_llm_mode = self.get_config("allow_overwrite_llm_mode")
self.stream_to_flow = self.get_config("stream_to_flow", "")
self.stream_batch_size = self.get_config("stream_batch_size", 15)

Expand All @@ -140,22 +134,14 @@ def invoke(self, message, data):

session_id = data.get("session_id", None)
clear_history = data.get("clear_history", False)
stream = data.get("stream")

should_stream = self.llm_mode == "stream"
if (
self.allow_overwrite_llm_mode
and stream is not None
and isinstance(stream, bool)
):
should_stream = stream
stream = data.get("stream", self.llm_mode == "stream")

llm_res = self.invoke_model(
message,
messages,
session_id=session_id,
clear_history=clear_history,
stream=should_stream,
stream=stream,
)

res_format = self.get_config("llm_response_format", "text")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import litellm

from ....component_base import ComponentBase
from .....common.message import Message
from .....common.log import log

litellm_info_base = {
Expand Down Expand Up @@ -60,10 +59,6 @@ def init(self):
self.set_response_uuid_in_user_properties = self.get_config(
"set_response_uuid_in_user_properties"
)
if self.stream_to_flow and self.stream_to_next_component:
raise ValueError(
"stream_to_flow and stream_to_next_component are mutually exclusive"
)
self.router = None

def init_load_balancer(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@
),
"default": "none",
},
{
"name": "allow_overwrite_llm_mode",
"required": False,
"description": "Whether to allow the llm_mode to be overwritten by the `stream` from the input message.",
},
{
"name": "stream_batch_size",
"required": False,
Expand All @@ -119,23 +114,19 @@ def __init__(self, info, **kwargs):
self.stream_to_flow = self.get_config("stream_to_flow")
self.stream_to_next_component = self.get_config("stream_to_next_component")
self.llm_mode = self.get_config("llm_mode")
self.allow_overwrite_llm_mode = self.get_config("allow_overwrite_llm_mode")
self.stream_batch_size = self.get_config("stream_batch_size")

if self.stream_to_flow and self.stream_to_next_component:
raise ValueError(
"stream_to_flow and stream_to_next_component are mutually exclusive"
)

def invoke(self, message, data):
"""invoke the model"""
messages = data.get("messages", [])
stream = data.get("stream")

should_stream = self.llm_mode == "stream"
if (
self.allow_overwrite_llm_mode
and stream is not None
and isinstance(stream, bool)
):
should_stream = stream
stream = data.get("stream", self.llm_mode == "stream")

if should_stream:
if stream:
return self.invoke_stream(message, messages)
else:
return self.invoke_non_stream(messages)
Expand Down
23 changes: 4 additions & 19 deletions src/solace_ai_connector/components/general/llm/openai/openai_chat_model_base.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@
),
"default": "none",
},
{
"name": "allow_overwrite_llm_mode",
"required": False,
"description": "Whether to allow the llm_mode to be overwritten by the `stream` from the input message.",
},
{
"name": "stream_batch_size",
"required": False,
Expand Down Expand Up @@ -104,7 +99,8 @@
},
"stream": {
"type": "boolean",
"description": "Whether to stream the response - overwrites llm_mode",
"description": "Whether to stream the response. It is is not provided, it will default to the value of llm_mode.",
"required": False,
},
},
"required": ["messages"],
Expand Down Expand Up @@ -154,9 +150,7 @@ def init(self):
self.stream_to_flow = self.get_config("stream_to_flow")
self.stream_to_next_component = self.get_config("stream_to_next_component")
self.llm_mode = self.get_config("llm_mode")
self.allow_overwrite_llm_mode = self.get_config("allow_overwrite_llm_mode")
self.stream_batch_size = self.get_config("stream_batch_size")
self.response_format = self.get_config("response_format", "text")
self.set_response_uuid_in_user_properties = self.get_config(
"set_response_uuid_in_user_properties"
)
Expand All @@ -167,21 +161,13 @@ def init(self):

def invoke(self, message, data):
messages = data.get("messages", [])
stream = data.get("stream")
stream = data.get("stream", self.llm_mode == "stream")

client = OpenAI(
api_key=self.get_config("api_key"), base_url=self.get_config("base_url")
)

should_stream = self.llm_mode == "stream"
if (
self.allow_overwrite_llm_mode
and stream is not None
and isinstance(stream, bool)
):
should_stream = stream

if should_stream:
if stream:
return self.invoke_stream(client, message, messages)
else:
max_retries = 3
Expand All @@ -191,7 +177,6 @@ def invoke(self, message, data):
messages=messages,
model=self.model,
temperature=self.temperature,
response_format={"type": self.response_format},
)
return {"content": response.choices[0].message.content}
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
"default": "",
},
{
"name": "reply_queue_prefix",
"name": "response_queue_prefix",
"required": False,
"description": "Prefix for reply queues",
"default": "reply-queue",
Expand Down Expand Up @@ -168,11 +168,11 @@ def __init__(self, **kwargs):
self.response_topic_suffix = ensure_slash_on_start(
self.get_config("response_topic_suffix")
)
self.reply_queue_prefix = ensure_slash_on_end(
self.get_config("reply_queue_prefix")
self.response_queue_prefix = ensure_slash_on_end(
self.get_config("response_queue_prefix")
)
self.requestor_id = str(uuid.uuid4())
self.reply_queue_name = f"{self.reply_queue_prefix}{self.requestor_id}"
self.reply_queue_name = f"{self.response_queue_prefix}{self.requestor_id}"
self.response_topic = f"{self.response_topic_prefix}{self.requestor_id}{self.response_topic_suffix}"
self.response_thread = None
self.streaming = self.get_config("streaming")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

# This is a very basic component which will be stitched onto the final component in the flow
class RequestResponseControllerOuputComponent:

def __init__(self, controller):
self.controller = controller

Expand All @@ -39,6 +40,7 @@ def enqueue(self, event):

# This is the main class that will be used to send messages to a flow and receive the response
class RequestResponseFlowController:

def __init__(self, config: Dict[str, Any], connector):
self.config = config
self.connector = connector
Expand All @@ -55,14 +57,15 @@ def __init__(self, config: Dict[str, Any], connector):
self.flow.run()

def create_broker_request_response_flow(self):
self.broker_config["request_expiry_ms"] = self.request_expiry_ms
full_config = self.broker_config.copy()
full_config.update(self.config)
config = {
"name": "_internal_broker_request_response_flow",
"components": [
{
"component_name": "_internal_broker_request_response",
"component_module": "broker_request_response",
"component_config": self.broker_config,
"component_config": full_config,
}
],
}
Expand Down

0 comments on commit 6e1e3fc

Please sign in to comment.