Skip to content

Commit

Permalink
fixes for eventsource-parser
Browse files Browse the repository at this point in the history
  • Loading branch information
legrego committed Nov 18, 2024
1 parent 01d07f4 commit 9676052
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ export function createObservableFromHttpResponse<T extends ServerSentEvent = nev
}

return new Observable<T>((subscriber) => {
const parser = createParser((event) => {
if (event.type === 'event')
const parser = createParser({
onEvent: (event) => {
try {
const data = JSON.parse(event.data);
if (event.event === 'error') {
Expand All @@ -48,6 +48,7 @@ export function createObservableFromHttpResponse<T extends ServerSentEvent = nev
} catch (error) {
subscriber.error(createSSEInternalError(`Failed to parse JSON`));
}
},
});

const readStream = async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ export function createObservableFromHttpResponse(
}

return new Observable<string>((subscriber) => {
const parser = createParser((event) => {
if (event.type === 'event') {
const parser = createParser({
onEvent: (event) => {
subscriber.next(event.data);
}
},
});

const readStream = async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import { Observable } from 'rxjs';

export function eventSourceStreamIntoObservable(readable: Readable) {
return new Observable<string>((subscriber) => {
const parser = createParser((event) => {
if (event.type === 'event') {
const parser = createParser({
onEvent: (event) => {
subscriber.next(event.data);
}
},
});

async function processStream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ describe('observableIntoEventSourceStream', () => {

const events: Array<Record<string, any>> = [];

const parser = createParser((event) => {
if (event.type === 'event') {
const parser = createParser({
onEvent: (event) => {
events.push(JSON.parse(event.data));
}
},
});

chunks.forEach((chunk) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import { Observable } from 'rxjs';

export function eventsourceStreamIntoObservable(readable: Readable) {
return new Observable<string>((subscriber) => {
const parser = createParser((event) => {
if (event.type === 'event') {
const parser = createParser({
onEvent: (event) => {
subscriber.next(event.data);
}
},
});

async function processStream() {
Expand Down

0 comments on commit 9676052

Please sign in to comment.