From 967605265e4a45da6a872fd81e8495793a17476d Mon Sep 17 00:00:00 2001 From: Larry Gregory Date: Mon, 18 Nov 2024 13:38:00 -0500 Subject: [PATCH] fixes for eventsource-parser --- .../src/create_observable_from_http_response.ts | 5 +++-- .../public/util/create_observable_from_http_response.ts | 6 +++--- .../server/util/event_source_stream_into_observable.ts | 6 +++--- .../server/util/observable_into_event_source_stream.test.ts | 6 +++--- .../service/util/eventsource_stream_into_observable.ts | 6 +++--- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/packages/kbn-sse-utils-client/src/create_observable_from_http_response.ts b/packages/kbn-sse-utils-client/src/create_observable_from_http_response.ts index 814a528535c75..cf9ef8ee30a7b 100644 --- a/packages/kbn-sse-utils-client/src/create_observable_from_http_response.ts +++ b/packages/kbn-sse-utils-client/src/create_observable_from_http_response.ts @@ -29,8 +29,8 @@ export function createObservableFromHttpResponse((subscriber) => { - const parser = createParser((event) => { - if (event.type === 'event') + const parser = createParser({ + onEvent: (event) => { try { const data = JSON.parse(event.data); if (event.event === 'error') { @@ -48,6 +48,7 @@ export function createObservableFromHttpResponse { diff --git a/x-pack/plugins/inference/public/util/create_observable_from_http_response.ts b/x-pack/plugins/inference/public/util/create_observable_from_http_response.ts index 862986ce1c73a..d7520a86d6922 100644 --- a/x-pack/plugins/inference/public/util/create_observable_from_http_response.ts +++ b/x-pack/plugins/inference/public/util/create_observable_from_http_response.ts @@ -26,10 +26,10 @@ export function createObservableFromHttpResponse( } return new Observable((subscriber) => { - const parser = createParser((event) => { - if (event.type === 'event') { + const parser = createParser({ + onEvent: (event) => { subscriber.next(event.data); - } + }, }); const readStream = async () => { diff --git a/x-pack/plugins/inference/server/util/event_source_stream_into_observable.ts b/x-pack/plugins/inference/server/util/event_source_stream_into_observable.ts index cad0a8e84d6a7..42844632aa03b 100644 --- a/x-pack/plugins/inference/server/util/event_source_stream_into_observable.ts +++ b/x-pack/plugins/inference/server/util/event_source_stream_into_observable.ts @@ -11,10 +11,10 @@ import { Observable } from 'rxjs'; export function eventSourceStreamIntoObservable(readable: Readable) { return new Observable((subscriber) => { - const parser = createParser((event) => { - if (event.type === 'event') { + const parser = createParser({ + onEvent: (event) => { subscriber.next(event.data); - } + }, }); async function processStream() { diff --git a/x-pack/plugins/inference/server/util/observable_into_event_source_stream.test.ts b/x-pack/plugins/inference/server/util/observable_into_event_source_stream.test.ts index 8ece214c27599..a815b22854118 100644 --- a/x-pack/plugins/inference/server/util/observable_into_event_source_stream.test.ts +++ b/x-pack/plugins/inference/server/util/observable_into_event_source_stream.test.ts @@ -72,10 +72,10 @@ describe('observableIntoEventSourceStream', () => { const events: Array> = []; - const parser = createParser((event) => { - if (event.type === 'event') { + const parser = createParser({ + onEvent: (event) => { events.push(JSON.parse(event.data)); - } + }, }); chunks.forEach((chunk) => { diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/util/eventsource_stream_into_observable.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/util/eventsource_stream_into_observable.ts index 5ff332128f8ac..b2426d8e4eb5d 100644 --- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/util/eventsource_stream_into_observable.ts +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/util/eventsource_stream_into_observable.ts @@ -14,10 +14,10 @@ import { Observable } from 'rxjs'; export function eventsourceStreamIntoObservable(readable: Readable) { return new Observable((subscriber) => { - const parser = createParser((event) => { - if (event.type === 'event') { + const parser = createParser({ + onEvent: (event) => { subscriber.next(event.data); - } + }, }); async function processStream() {