From 3441e19d1f61d19f7dc50484ca7a33a1acd8ed84 Mon Sep 17 00:00:00 2001 From: Allan Zheng Date: Thu, 13 Feb 2020 11:31:43 -0800 Subject: [PATCH] feat: throw error/exception event and terminate event stream --- .../src/getDeserializingStream.ts | 8 ++++++-- .../src/getEventMessageStream.ts | 17 ++++++++++++++--- .../src/MessageUnmarshallerStream.ts | 11 +++++++++-- packages/eventstream-serde-node/src/utils.ts | 4 ++++ 4 files changed, 33 insertions(+), 7 deletions(-) diff --git a/packages/eventstream-serde-browser/src/getDeserializingStream.ts b/packages/eventstream-serde-browser/src/getDeserializingStream.ts index 642c7050eeb3..78c050f25eaf 100644 --- a/packages/eventstream-serde-browser/src/getDeserializingStream.ts +++ b/packages/eventstream-serde-browser/src/getDeserializingStream.ts @@ -14,8 +14,12 @@ export function getDeserializingStream( return; } - controller.enqueue(await deserializer(value)); - push(); + try { + controller.enqueue(await deserializer(value)); + push(); + } catch (e) { + controller.error(e); + } }); } diff --git a/packages/eventstream-serde-browser/src/getEventMessageStream.ts b/packages/eventstream-serde-browser/src/getEventMessageStream.ts index cb9e98dec804..799158a0dc17 100644 --- a/packages/eventstream-serde-browser/src/getEventMessageStream.ts +++ b/packages/eventstream-serde-browser/src/getEventMessageStream.ts @@ -17,7 +17,16 @@ export function getEventMessageStream( const message = eventMarshaller.unmarshall(value); const { value: messageType } = message.headers[":message-type"]; - if (messageType === "exception") { + if (messageType === "error") { + // Unmodeled exception in event + const unmodeledError = new Error( + (message.headers[":error-message"].value as string) || + "UnknownError" + ); + unmodeledError.name = message.headers[":error-code"] + .value as string; + controller.error(unmodeledError); + } else if (messageType === "exception") { // throw this.exceptionsDeserializer(message); controller.enqueue({ [message.headers[":exception-type"].value as string]: message @@ -27,8 +36,10 @@ export function getEventMessageStream( [message.headers[":event-type"].value as string]: message }); } else { - throw Error( - `Unrecognizable event type: ${message.headers[":event-type"].value}` + controller.error( + new Error( + `Unrecognizable event type: ${message.headers[":event-type"].value}` + ) ); } push(); diff --git a/packages/eventstream-serde-node/src/MessageUnmarshallerStream.ts b/packages/eventstream-serde-node/src/MessageUnmarshallerStream.ts index 77f81ac88d54..727fb83d0ec2 100644 --- a/packages/eventstream-serde-node/src/MessageUnmarshallerStream.ts +++ b/packages/eventstream-serde-node/src/MessageUnmarshallerStream.ts @@ -28,8 +28,15 @@ export class MessageUnmarshallerStream extends Transform { try { const message = this.eventMarshaller.unmarshall(chunk); const { value: messageType } = message.headers[":message-type"]; - if (messageType === "exception") { - // throw this.exceptionsDeserializer(message); + if (messageType === "error") { + // Unmodeled exception in event + const unmodeledError = new Error( + (message.headers[":error-message"].value as string) || "UnknownError" + ); + unmodeledError.name = message.headers[":error-code"].value as string; + throw unmodeledError; + } else if (messageType === "exception") { + // For modeled exception, push it to deserializer and throw after deserializing this.push({ [message.headers[":exception-type"].value as string]: message }); diff --git a/packages/eventstream-serde-node/src/utils.ts b/packages/eventstream-serde-node/src/utils.ts index d2d7d1b667b7..b57be2125126 100644 --- a/packages/eventstream-serde-node/src/utils.ts +++ b/packages/eventstream-serde-node/src/utils.ts @@ -21,6 +21,10 @@ export function getSignatureBinary(signature: string): Uint8Array { export async function* ReadabletoIterable( readStream: Readable ): AsyncIterable { + if (typeof readStream[Symbol.asyncIterator] === "function") { + // use the experimental feature if available. + throw readStream; + } let streamEnded = false; let generationEnded = false; const records = new Array();