Skip to content

Commit

Permalink
feat: throw error/exception event and terminate event stream
Browse files Browse the repository at this point in the history
  • Loading branch information
AllanZhengYP committed Feb 14, 2020
1 parent c6dea2a commit 3441e19
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ export function getDeserializingStream(
return;
}

controller.enqueue(await deserializer(value));
push();
try {
controller.enqueue(await deserializer(value));
push();
} catch (e) {
controller.error(e);
}
});
}

Expand Down
17 changes: 14 additions & 3 deletions packages/eventstream-serde-browser/src/getEventMessageStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@ export function getEventMessageStream(

const message = eventMarshaller.unmarshall(value);
const { value: messageType } = message.headers[":message-type"];
if (messageType === "exception") {
if (messageType === "error") {
// Unmodeled exception in event
const unmodeledError = new Error(
(message.headers[":error-message"].value as string) ||
"UnknownError"
);
unmodeledError.name = message.headers[":error-code"]
.value as string;
controller.error(unmodeledError);
} else if (messageType === "exception") {
// throw this.exceptionsDeserializer(message);
controller.enqueue({
[message.headers[":exception-type"].value as string]: message
Expand All @@ -27,8 +36,10 @@ export function getEventMessageStream(
[message.headers[":event-type"].value as string]: message
});
} else {
throw Error(
`Unrecognizable event type: ${message.headers[":event-type"].value}`
controller.error(
new Error(
`Unrecognizable event type: ${message.headers[":event-type"].value}`
)
);
}
push();
Expand Down
11 changes: 9 additions & 2 deletions packages/eventstream-serde-node/src/MessageUnmarshallerStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,15 @@ export class MessageUnmarshallerStream extends Transform {
try {
const message = this.eventMarshaller.unmarshall(chunk);
const { value: messageType } = message.headers[":message-type"];
if (messageType === "exception") {
// throw this.exceptionsDeserializer(message);
if (messageType === "error") {
// Unmodeled exception in event
const unmodeledError = new Error(
(message.headers[":error-message"].value as string) || "UnknownError"
);
unmodeledError.name = message.headers[":error-code"].value as string;
throw unmodeledError;
} else if (messageType === "exception") {
// For modeled exception, push it to deserializer and throw after deserializing
this.push({
[message.headers[":exception-type"].value as string]: message
});
Expand Down
4 changes: 4 additions & 0 deletions packages/eventstream-serde-node/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ export function getSignatureBinary(signature: string): Uint8Array {
export async function* ReadabletoIterable<T>(
readStream: Readable
): AsyncIterable<T> {
if (typeof readStream[Symbol.asyncIterator] === "function") {
// use the experimental feature if available.
throw readStream;
}
let streamEnded = false;
let generationEnded = false;
const records = new Array<T>();
Expand Down

0 comments on commit 3441e19

Please sign in to comment.