From f81150348335a9d3faa55617bb9a8d0b429ceaca Mon Sep 17 00:00:00 2001 From: YulNaumenko Date: Mon, 16 Dec 2024 21:31:58 -0800 Subject: [PATCH] fixed streaming --- .../connector_types/inference/params.tsx | 17 ----------------- .../connector_types/inference/inference.ts | 18 ++++++++++-------- 2 files changed, 10 insertions(+), 25 deletions(-) diff --git a/x-pack/plugins/stack_connectors/public/connector_types/inference/params.tsx b/x-pack/plugins/stack_connectors/public/connector_types/inference/params.tsx index 50e8bc1773d3..4a1ae535d3af 100644 --- a/x-pack/plugins/stack_connectors/public/connector_types/inference/params.tsx +++ b/x-pack/plugins/stack_connectors/public/connector_types/inference/params.tsx @@ -14,7 +14,6 @@ import { EuiTextArea, EuiFormRow, EuiSpacer, EuiSelect } from '@elastic/eui'; import type { RuleFormParamsErrors } from '@kbn/response-ops-rule-form'; import { ActionVariable } from '@kbn/alerting-types'; import { - ChatCompleteParams, RerankParams, SparseEmbeddingParams, TextEmbeddingParams, @@ -173,22 +172,6 @@ const UnifiedCompletionParamsFields: React.FunctionComponent<{ ); }; -const CompletionParamsFields: React.FunctionComponent<{ - subActionParams: ChatCompleteParams; - errors: RuleFormParamsErrors; - editSubActionParams: (params: Partial) => void; -}> = ({ subActionParams, editSubActionParams, errors }) => { - const { input } = subActionParams; - - return ( - - ); -}; - const SparseEmbeddingParamsFields: React.FunctionComponent<{ subActionParams: SparseEmbeddingParams; errors: RuleFormParamsErrors; diff --git a/x-pack/plugins/stack_connectors/server/connector_types/inference/inference.ts b/x-pack/plugins/stack_connectors/server/connector_types/inference/inference.ts index 55640e7f45e1..4661fd1c03c2 100644 --- a/x-pack/plugins/stack_connectors/server/connector_types/inference/inference.ts +++ b/x-pack/plugins/stack_connectors/server/connector_types/inference/inference.ts @@ -31,7 +31,6 @@ import { } from 'rxjs'; import OpenAI from 'openai'; import { ChatCompletionChunk } from 'openai/resources'; -import { IncomingMessage } from 'http'; import { RerankParamsSchema, SparseEmbeddingParamsSchema, @@ -129,7 +128,7 @@ export class InferenceConnector extends SubActionConnector { ): Promise { const res = await this.performApiUnifiedCompletionStream(params); - const v = from(eventSourceStreamIntoObservable(res as Readable)).pipe( + const v = from(eventSourceStreamIntoObservable(res as unknown as Readable)).pipe( filter((line) => !!line && line !== '[DONE]'), map((line) => { return JSON.parse(line) as OpenAI.ChatCompletionChunk | { error: { message: string } }; @@ -258,7 +257,7 @@ export class InferenceConnector extends SubActionConnector { public async performApiUnifiedCompletionStream( params: UnifiedChatCompleteParams & { signal?: AbortSignal } ) { - return (await this.esClient.transport.request( + return await this.esClient.transport.request( { method: 'POST', path: `_inference/completion/${this.inferenceId}/_unified`, @@ -267,7 +266,7 @@ export class InferenceConnector extends SubActionConnector { { asStream: true, } - )) as unknown as IncomingMessage; + ); } /** @@ -291,10 +290,13 @@ export class InferenceConnector extends SubActionConnector { const res = await this.performApiUnifiedCompletionStream(params); const controller = new AbortController(); // splits the stream in two, one is used for the UI and other for token tracking - return { - consumerStream: Stream.fromSSEResponse(res as unknown as Response, controller), - tokenCountStream: Stream.fromSSEResponse(res as unknown as Response, controller), - }; + + const stream = Stream.fromSSEResponse( + { body: res } as unknown as Response, + controller + ); + const teed = stream.tee(); + return { consumerStream: teed[0], tokenCountStream: teed[1] }; // since we do not use the sub action connector request method, we need to do our own error handling } catch (e) { const errorMessage = this.getResponseErrorMessage(e);