Skip to content

Commit

Permalink
fixed streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
YulNaumenko committed Dec 17, 2024
1 parent a70051b commit f811503
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import { EuiTextArea, EuiFormRow, EuiSpacer, EuiSelect } from '@elastic/eui';
import type { RuleFormParamsErrors } from '@kbn/response-ops-rule-form';
import { ActionVariable } from '@kbn/alerting-types';
import {
ChatCompleteParams,
RerankParams,
SparseEmbeddingParams,
TextEmbeddingParams,
Expand Down Expand Up @@ -173,22 +172,6 @@ const UnifiedCompletionParamsFields: React.FunctionComponent<{
);
};

const CompletionParamsFields: React.FunctionComponent<{
subActionParams: ChatCompleteParams;
errors: RuleFormParamsErrors;
editSubActionParams: (params: Partial<InferenceActionParams['subActionParams']>) => void;
}> = ({ subActionParams, editSubActionParams, errors }) => {
const { input } = subActionParams;

return (
<InferenceInput
input={input}
editSubActionParams={editSubActionParams}
inputError={errors.input as string}
/>
);
};

const SparseEmbeddingParamsFields: React.FunctionComponent<{
subActionParams: SparseEmbeddingParams;
errors: RuleFormParamsErrors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import {
} from 'rxjs';
import OpenAI from 'openai';
import { ChatCompletionChunk } from 'openai/resources';
import { IncomingMessage } from 'http';
import {
RerankParamsSchema,
SparseEmbeddingParamsSchema,
Expand Down Expand Up @@ -129,7 +128,7 @@ export class InferenceConnector extends SubActionConnector<Config, Secrets> {
): Promise<UnifiedChatCompleteResponse> {
const res = await this.performApiUnifiedCompletionStream(params);

const v = from(eventSourceStreamIntoObservable(res as Readable)).pipe(
const v = from(eventSourceStreamIntoObservable(res as unknown as Readable)).pipe(
filter((line) => !!line && line !== '[DONE]'),
map((line) => {
return JSON.parse(line) as OpenAI.ChatCompletionChunk | { error: { message: string } };
Expand Down Expand Up @@ -258,7 +257,7 @@ export class InferenceConnector extends SubActionConnector<Config, Secrets> {
public async performApiUnifiedCompletionStream(
params: UnifiedChatCompleteParams & { signal?: AbortSignal }
) {
return (await this.esClient.transport.request<UnifiedChatCompleteResponse>(
return await this.esClient.transport.request<UnifiedChatCompleteResponse>(
{
method: 'POST',
path: `_inference/completion/${this.inferenceId}/_unified`,
Expand All @@ -267,7 +266,7 @@ export class InferenceConnector extends SubActionConnector<Config, Secrets> {
{
asStream: true,
}
)) as unknown as IncomingMessage;
);
}

/**
Expand All @@ -291,10 +290,13 @@ export class InferenceConnector extends SubActionConnector<Config, Secrets> {
const res = await this.performApiUnifiedCompletionStream(params);
const controller = new AbortController();
// splits the stream in two, one is used for the UI and other for token tracking
return {
consumerStream: Stream.fromSSEResponse(res as unknown as Response, controller),
tokenCountStream: Stream.fromSSEResponse(res as unknown as Response, controller),
};

const stream = Stream.fromSSEResponse<ChatCompletionChunk>(
{ body: res } as unknown as Response,
controller
);
const teed = stream.tee();
return { consumerStream: teed[0], tokenCountStream: teed[1] };
// since we do not use the sub action connector request method, we need to do our own error handling
} catch (e) {
const errorMessage = this.getResponseErrorMessage(e);
Expand Down

0 comments on commit f811503

Please sign in to comment.