Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added retries for streaming #172

Merged
merged 1 commit into from
Apr 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions libs/vertexai/langchain_google_vertexai/chat_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ def _completion_with_retry(
generation_method: Callable,
*,
max_retries: int,
check_stream_response_for_candidates: bool = False,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Any:
Expand All @@ -437,6 +438,14 @@ def _completion_with_retry(
@retry_decorator
def _completion_with_retry_inner(generation_method: Callable, **kwargs: Any) -> Any:
response = generation_method(**kwargs)
if kwargs.get("stream") and check_stream_response_for_candidates:
chunks = list(response)
for chunk in chunks:
if not chunk.candidates:
raise ValueError("Got 0 candidates from generations.")
return iter(chunks)
if kwargs.get("stream"):
return response
if len(response.candidates):
return response
else:
Expand All @@ -449,6 +458,7 @@ async def _acompletion_with_retry(
generation_method: Callable,
*,
max_retries: int,
check_stream_response_for_candidates: bool = False,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Any:
Expand All @@ -462,6 +472,14 @@ async def _completion_with_retry_inner(
generation_method: Callable, **kwargs: Any
) -> Any:
response = await generation_method(**kwargs)
if kwargs.get("stream") and check_stream_response_for_candidates:
chunks = list(response)
for chunk in chunks:
if not chunk.candidates:
raise ValueError("Got 0 candidates from generations.")
return iter(chunks)
if kwargs.get("stream"):
return response
if len(response.candidates):
return response
else:
Expand All @@ -484,6 +502,10 @@ class ChatVertexAI(_VertexAICommon, BaseChatModel):
"""[Deprecated] Since new Gemini models support setting a System Message,
setting this parameter to True is discouraged.
"""
check_stream_response_for_candidates: bool = False
"""Retrieves all chunks from streaming response and check all of them
have candidates. If not, retries.
It makes streaming mode essentially useless."""

@classmethod
def is_lc_serializable(self) -> bool:
Expand Down Expand Up @@ -711,7 +733,13 @@ def _stream(
client, contents = self._gemini_client_and_contents(messages)
params = self._gemini_params(stop=stop, stream=True, **kwargs)
with telemetry.tool_context_manager(self._user_agent):
response_iter = client.generate_content(contents, **params, stream=True)
response_iter = _completion_with_retry(
client.generate_content,
max_retries=self.max_retries,
contents=contents,
check_stream_response_for_candidates=self.check_stream_response_for_candidates,
**params,
)
for response_chunk in response_iter:
chunk = self._gemini_chunk_to_generation_chunk(response_chunk)
if run_manager and isinstance(chunk.message.content, str):
Expand Down Expand Up @@ -758,8 +786,12 @@ async def _astream(
client, contents = self._gemini_client_and_contents(messages)
params = self._gemini_params(stop=stop, stream=True, **kwargs)
with telemetry.tool_context_manager(self._user_agent):
async for response_chunk in await client.generate_content_async(
contents, **params, stream=True
async for response_chunk in await _acompletion_with_retry(
client.generate_content_async,
max_retries=self.max_retries,
contents=contents,
check_stream_response_for_candidates=self.check_stream_response_for_candidates,
**params,
):
chunk = self._gemini_chunk_to_generation_chunk(response_chunk)
if run_manager and isinstance(chunk.message.content, str):
Expand Down
Loading