Skip to content

Commit

Permalink
chore(eventstream-serde-node): add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AllanZhengYP committed Feb 14, 2020
1 parent 0fb5441 commit c6dea2a
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 0 deletions.
1 change: 1 addition & 0 deletions packages/eventstream-serde-node/.npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ tsconfig.test.json
*.fixture.js
*.fixture.d.ts
*.fixture.js.map
*.fixture.ts
5 changes: 5 additions & 0 deletions packages/eventstream-serde-node/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
const base = require("../../jest.config.base.js");

module.exports = {
...base
};
1 change: 1 addition & 0 deletions packages/eventstream-serde-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"tslib": "^1.8.0"
},
"devDependencies": {
"@aws-sdk/util-utf8-node": "^1.0.0-alpha.2",
"@types/jest": "^24.0.12",
"typescript": "~3.4.0",
"jest": "^24.7.1"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { EventDeserializerStream } from "./EventDeserializerStream";

describe("EventDeserializerStream", () => {
it("throws when deserializer throws an error", done => {
const deserStream = new EventDeserializerStream({
deserializer: message => {
throw new Error("error event");
}
});
deserStream.on("error", error => {
expect(error).toBeDefined();
expect(error.message).toEqual("error event");
done();
});
deserStream.write({});
});
});
146 changes: 146 additions & 0 deletions packages/eventstream-serde-node/src/EventMessageChunkerStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import { EventMessageChunkerStream } from "./EventMessageChunkerStream";
import {
recordEventMessage,
statsEventMessage,
endEventMessage
} from "./fixtures/event.fixture";
import { MockEventMessageSource } from "./fixtures/MockEventMessageSource.fixture";

describe("EventMessageChunkerStream", () => {
it("splits payloads into individual messages", done => {
const messages = [];
const mockMessages = [
recordEventMessage,
statsEventMessage,
endEventMessage
];
const mockStream = new MockEventMessageSource({
messages: mockMessages,
emitSize: 100
});
const chunkerStream = new EventMessageChunkerStream();
mockStream.pipe(chunkerStream);
chunkerStream.on("data", msg => {
messages.push(msg);
});
chunkerStream.on("end", function() {
expect(messages.length).toBe(3);
done();
});
});

it("splits payloads in correct order", done => {
const messages: Array<any> = [];
const mockMessages = [
recordEventMessage,
statsEventMessage,
recordEventMessage,
endEventMessage
];
const mockStream = new MockEventMessageSource({
messages: mockMessages,
emitSize: 100
});
const chunkerStream = new EventMessageChunkerStream();
mockStream.pipe(chunkerStream);
chunkerStream.on("data", msg => {
messages.push(msg);
});
chunkerStream.on("end", function() {
expect(messages.length).toBe(4);
for (let i = 0; i < mockMessages.length; i++) {
expect(messages[i].toString("base64")).toEqual(
mockMessages[i].toString("base64")
);
}
done();
});
});

it("splits payloads when received all at once", done => {
const messages = [];
const mockMessages = [
recordEventMessage,
statsEventMessage,
endEventMessage
];
const mockStream = new MockEventMessageSource({
messages: mockMessages,
emitSize: mockMessages.reduce((prev, cur) => {
return prev + cur.length;
}, 0)
});
const chunkerStream = new EventMessageChunkerStream();
mockStream.pipe(chunkerStream);
chunkerStream.on("data", msg => {
messages.push(msg);
});
chunkerStream.on("end", function() {
expect(messages.length).toBe(3);
done();
});
});

it("splits payloads when total event message length spans multiple chunks", done => {
const messages = [];
const mockMessages = [
recordEventMessage,
statsEventMessage,
endEventMessage
];
const mockStream = new MockEventMessageSource({
messages: mockMessages,
emitSize: 1
});
const chunkerStream = new EventMessageChunkerStream();
mockStream.pipe(chunkerStream);
chunkerStream.on("data", msg => {
messages.push(msg);
});
chunkerStream.on("end", function() {
expect(messages.length).toBe(3);
done();
});
});

it("splits payloads when total event message length spans 2 chunks", done => {
const messages = [];
const mockMessages = [
recordEventMessage,
statsEventMessage,
endEventMessage
];
const mockStream = new MockEventMessageSource({
messages: mockMessages,
emitSize: recordEventMessage.length + 2
});
const chunkerStream = new EventMessageChunkerStream();
mockStream.pipe(chunkerStream);
chunkerStream.on("data", msg => {
messages.push(msg);
});
chunkerStream.on("end", function() {
expect(messages.length).toBe(3);
done();
});
});

it("sends an error if an event message is truncated", done => {
const responseMessage = Buffer.concat([
recordEventMessage,
statsEventMessage,
endEventMessage
]);
const mockStream = new MockEventMessageSource({
messages: [responseMessage.slice(0, responseMessage.length - 4)],
emitSize: 10
});

const chunkerStream = new EventMessageChunkerStream();
mockStream.pipe(chunkerStream);
chunkerStream.on("error", err => {
expect(err.message).toEqual("Truncated event message received.");
done();
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { fromUtf8, toUtf8 } from "@aws-sdk/util-utf8-node";
import { EventStreamMarshaller } from "@aws-sdk/eventstream-marshaller";
import { MessageUnmarshallerStream } from "./MessageUnmarshallerStream";
import {
recordEventMessage,
statsEventMessage,
endEventMessage
} from "./fixtures/event.fixture";
import { Message } from "@aws-sdk/types";

describe("MessageUnmarshallerStream", () => {
it("emits parsed message on data", done => {
const expectedMessages: Array<Message> = [
{
headers: {
":content-type": {
type: "string",
value: "application/octet-stream"
},
":event-type": { type: "string", value: "Records" },
":message-type": { type: "string", value: "event" }
},
body: new Uint8Array(
Buffer.from(
`1,Foo,When life gives you foo...\n2,Bar,make Bar!\n3,Fizz,Sometimes paired with...\n4,Buzz,the infamous Buzz!\n`
)
)
},
{
headers: {
":content-type": {
type: "string",
value: "text/xml"
},
":event-type": { type: "string", value: "Stats" },
":message-type": { type: "string", value: "event" }
},
body: new Uint8Array(
Buffer.from(
'<Stats xmlns=""><BytesScanned>126</BytesScanned><BytesProcessed>126</BytesProcessed><BytesReturned>107</BytesReturned></Stats>'
)
)
},
{
headers: {
":event-type": { type: "string", value: "End" },
":message-type": { type: "string", value: "event" }
},
body: new Uint8Array()
}
];

const unmarshallerStream = new MessageUnmarshallerStream({
eventMarshaller: new EventStreamMarshaller(toUtf8, fromUtf8)
});

const messages: Array<Message> = [];
unmarshallerStream.on("data", msg => {
messages.push(msg[Object.keys(msg)[0]]);
});
unmarshallerStream.on("end", () => {
for (let i = 1; i < messages.length; i++) {
expect(messages[i]).toEqual(expectedMessages[i]);
}
done();
});

unmarshallerStream.write(recordEventMessage);
unmarshallerStream.write(statsEventMessage);
unmarshallerStream.write(endEventMessage);
unmarshallerStream.end();
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { Readable, ReadableOptions } from "stream";

export interface MockEventMessageSourceOptions extends ReadableOptions {
messages: Array<Buffer>;
emitSize: number;
throwError?: Error;
}

export class MockEventMessageSource extends Readable {
private readonly data: Buffer;
private readonly emitSize: number;
private readonly throwError?: Error;
private readCount = 0;
constructor(options: MockEventMessageSourceOptions) {
super(options);
this.data = Buffer.concat(options.messages);
this.emitSize = options.emitSize;
this.throwError = options.throwError;
}

_read() {
const self = this;
if (this.readCount === this.data.length) {
if (this.throwError) {
process.nextTick(function() {
self.emit("error", new Error("Throwing an error!"));
});
return;
} else {
this.push(null);
return;
}
}

const bytesLeft = this.data.length - this.readCount;
const numBytesToSend = Math.min(bytesLeft, this.emitSize);

const chunk = this.data.slice(
this.readCount,
this.readCount + numBytesToSend
);
this.readCount += numBytesToSend;
this.push(chunk);
}
}
14 changes: 14 additions & 0 deletions packages/eventstream-serde-node/src/fixtures/event.fixture.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export const recordEventMessage = Buffer.from(
"AAAA0AAAAFX31gVLDTptZXNzYWdlLXR5cGUHAAVldmVudAs6ZXZlbnQtdHlwZQcAB1JlY29yZHMNOmNvbnRlbnQtdHlwZQcAGGFwcGxpY2F0aW9uL29jdGV0LXN0cmVhbTEsRm9vLFdoZW4gbGlmZSBnaXZlcyB5b3UgZm9vLi4uCjIsQmFyLG1ha2UgQmFyIQozLEZpenosU29tZXRpbWVzIHBhaXJlZCB3aXRoLi4uCjQsQnV6eix0aGUgaW5mYW1vdXMgQnV6eiEKzxKeSw==",
"base64"
);

export const statsEventMessage = Buffer.from(
"AAAA0QAAAEM+YpmqDTptZXNzYWdlLXR5cGUHAAVldmVudAs6ZXZlbnQtdHlwZQcABVN0YXRzDTpjb250ZW50LXR5cGUHAAh0ZXh0L3htbDxTdGF0cyB4bWxucz0iIj48Qnl0ZXNTY2FubmVkPjEyNjwvQnl0ZXNTY2FubmVkPjxCeXRlc1Byb2Nlc3NlZD4xMjY8L0J5dGVzUHJvY2Vzc2VkPjxCeXRlc1JldHVybmVkPjEwNzwvQnl0ZXNSZXR1cm5lZD48L1N0YXRzPiJ0pLk=",
"base64"
);

export const endEventMessage = Buffer.from(
"AAAAOAAAACjBxoTUDTptZXNzYWdlLXR5cGUHAAVldmVudAs6ZXZlbnQtdHlwZQcAA0VuZM+X05I=",
"base64"
);

0 comments on commit c6dea2a

Please sign in to comment.