From a6de455c2a820cf24178dc3a7117288488768480 Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Sun, 14 Jul 2024 21:50:26 -0400 Subject: [PATCH 1/4] Better streaming support from chat_model_with_history --- .../langchain_chat_model_with_history.py | 46 +++++++++++++++---- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py b/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py index 02ea596..4e776c7 100644 --- a/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py +++ b/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py @@ -3,6 +3,7 @@ import threading from collections import namedtuple from copy import deepcopy +from uuid import uuid4 from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory @@ -161,6 +162,8 @@ def invoke_model( aggregate_result = "" current_batch = "" + response_uuid = str(uuid4()) + first_chunk = True for chunk in runnable.stream( {"input": human_message}, config={ @@ -172,25 +175,50 @@ def invoke_model( if len(current_batch.split()) >= self.stream_batch_size: if self.stream_to_flow: self.send_streaming_message( - input_message, current_batch, aggregate_result + input_message, + current_batch, + aggregate_result, + response_uuid, + first_chunk, ) current_batch = "" + first_chunk = False - if current_batch: - if self.stream_to_flow: - self.send_streaming_message( - input_message, current_batch, aggregate_result - ) + if self.stream_to_flow: + self.send_streaming_message( + input_message, + current_batch, + aggregate_result, + response_uuid, + first_chunk, + True, + ) - result = namedtuple("Result", ["content"])(aggregate_result) + result = namedtuple("Result", ["content", "uuid"])( + aggregate_result, response_uuid + ) self.prune_large_message_from_history(session_id) return result - def send_streaming_message(self, input_message, chunk, aggregate_result): + def send_streaming_message( + self, + input_message, + chunk, + aggregate_result, + response_uuid, + first_chunk=False, + last_chunk=False, + ): message = Message( - payload={"chunk": chunk, "aggregate_result": aggregate_result}, + payload={ + "chunk": chunk, + "aggregate_result": aggregate_result, + "response_uuid": response_uuid, + "first_chunk": first_chunk, + "last_chunk": last_chunk, + }, user_properties=input_message.get_user_properties(), ) self.send_to_flow(self.stream_to_flow, message) From 70f8643d0f6746d9f8405786621fe515ca5d5a08 Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Wed, 17 Jul 2024 07:10:44 -0400 Subject: [PATCH 2/4] Save the response_uuid into the message's user_properties (optionally) --- .../langchain_chat_model_with_history.md | 2 ++ docs/components/timer_input.md | 2 +- .../langchain_chat_model_with_history.py | 19 ++++++++++++++++--- .../components/inputs_outputs/timer_input.py | 4 +++- 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/docs/components/langchain_chat_model_with_history.md b/docs/components/langchain_chat_model_with_history.md index 04ef74b..8838811 100644 --- a/docs/components/langchain_chat_model_with_history.md +++ b/docs/components/langchain_chat_model_with_history.md @@ -21,6 +21,7 @@ component_config: stream_to_flow: llm_mode: stream_batch_size: + set_response_uuid_in_user_properties: ``` | Parameter | Required | Default | Description | @@ -38,6 +39,7 @@ component_config: | stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. | | llm_mode | False | | The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | | stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. | +| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. | ## Component Input Schema diff --git a/docs/components/timer_input.md b/docs/components/timer_input.md index 0b35cbc..e411a6f 100644 --- a/docs/components/timer_input.md +++ b/docs/components/timer_input.md @@ -22,5 +22,5 @@ component_config: ## Component Output Schema ``` - + ``` diff --git a/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py b/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py index 4e776c7..da5904d 100644 --- a/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py +++ b/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py @@ -87,6 +87,13 @@ "description": "The minimum number of words in a single streaming result. Default: 15.", "default": 15, }, + { + "name": "set_response_uuid_in_user_properties", + "required": False, + "description": "Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response.", + "default": False, + "type": "boolean", + }, ] ) info["input_schema"]["properties"]["session_id"] = { @@ -115,6 +122,9 @@ def __init__(self, **kwargs): self.stream_to_flow = self.get_config("stream_to_flow", "") self.llm_mode = self.get_config("llm_mode", "none") self.stream_batch_size = self.get_config("stream_batch_size", 15) + self.set_response_uuid_in_user_properties = self.get_config( + "set_response_uuid_in_user_properties", False + ) def invoke_model( self, input_message, messages, session_id=None, clear_history=False @@ -198,6 +208,12 @@ def invoke_model( aggregate_result, response_uuid ) + # Put the response_uuid into the input_message user_properties + if self.set_response_uuid_in_user_properties: + user_properties = input_message.get_user_properties() + user_properties["response_uuid"] = response_uuid + input_message.set_user_properties(user_properties) + self.prune_large_message_from_history(session_id) return result @@ -233,9 +249,6 @@ def create_history(self): ) config = self.get_config("history_config", {}) history = self.create_component(config, history_class) - # memory = ConversationTokenBufferMemory( - # chat_memory=history, llm=self.component, max_token_limit=history_max_tokens - # ) return history def get_history(self, session_id: str) -> BaseChatMessageHistory: diff --git a/src/solace_ai_connector/components/inputs_outputs/timer_input.py b/src/solace_ai_connector/components/inputs_outputs/timer_input.py index dc2821e..62b87fe 100644 --- a/src/solace_ai_connector/components/inputs_outputs/timer_input.py +++ b/src/solace_ai_connector/components/inputs_outputs/timer_input.py @@ -29,7 +29,9 @@ "required": False, }, ], - "output_schema": "any", + "output_schema": { + "type": "None", + }, } From 5495009a8bc8dff1af359db59929962503de9a1a Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Wed, 17 Jul 2024 07:15:18 -0400 Subject: [PATCH 3/4] A bit more help text in the docs --- docs/components/langchain_chat_model_with_history.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/components/langchain_chat_model_with_history.md b/docs/components/langchain_chat_model_with_history.md index 8838811..100e429 100644 --- a/docs/components/langchain_chat_model_with_history.md +++ b/docs/components/langchain_chat_model_with_history.md @@ -39,7 +39,7 @@ component_config: | stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. | | llm_mode | False | | The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | | stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. | -| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. | +| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. | ## Component Input Schema From d5df0cfd349c65d6b1c385bf5db88800f5e11d9b Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Mon, 22 Jul 2024 12:54:12 -0400 Subject: [PATCH 4/4] More cleanup for better streaming --- src/solace_ai_connector/components/component_base.py | 11 ++++++++++- .../langchain/langchain_chat_model_with_history.py | 6 ------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/solace_ai_connector/components/component_base.py b/src/solace_ai_connector/components/component_base.py index aef1aa3..1783a07 100644 --- a/src/solace_ai_connector/components/component_base.py +++ b/src/solace_ai_connector/components/component_base.py @@ -46,6 +46,7 @@ def __init__(self, module_info, **kwargs): self.need_acknowledgement = False self.stop_thread_event = threading.Event() self.current_message = None + self.current_message_has_been_discarded = False self.log_identifier = f"[{self.instance_name}.{self.flow_name}.{self.name}] " @@ -159,9 +160,13 @@ def process_message(self, message): self.trace_data(data) # Invoke the component + self.current_message_has_been_discarded = False result = self.invoke(message, data) - if result is not None: + if self.current_message_has_been_discarded: + # Call the message acknowledgements + message.call_acknowledgements() + elif result is not None: # Do all the things we need to do after invoking the component # Note that there are times where we don't want to # send the message to the next component @@ -193,6 +198,10 @@ def process_post_invoke(self, result, message): self.current_message = message self.send_message(message) + def discard_current_message(self): + # If the message is to be discarded, we need to acknowledge any previous components + self.current_message_has_been_discarded = True + def get_acknowledgement_callback(self): # This should be overridden by the component if it needs to acknowledge messages return None diff --git a/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py b/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py index da5904d..e9988db 100644 --- a/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py +++ b/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py @@ -208,12 +208,6 @@ def invoke_model( aggregate_result, response_uuid ) - # Put the response_uuid into the input_message user_properties - if self.set_response_uuid_in_user_properties: - user_properties = input_message.get_user_properties() - user_properties["response_uuid"] = response_uuid - input_message.set_user_properties(user_properties) - self.prune_large_message_from_history(session_id) return result