Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Interpreter] Change ajax_stream to use new-line delimited JSON #52797

Merged
merged 1 commit into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('ajaxStream', () => {
it('pulls items from the stream and calls the handler', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].map(m => `${m.length}:${m}`);
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'];

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
Expand All @@ -43,12 +43,34 @@ describe('ajaxStream', () => {
expect(handler).toHaveBeenCalledWith({ tis: 'fate' });
});

it('handles newlines in values', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = [
JSON.stringify({ hello: 'wo\nrld' }),
'\n',
JSON.stringify({ tis: 'fa\nte' }),
'\n',
];

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});

messages.forEach(sendText);
done();

await promise;
expect(handler).toHaveBeenCalledTimes(2);
expect(handler).toHaveBeenCalledWith({ hello: 'wo\nrld' });
expect(handler).toHaveBeenCalledWith({ tis: 'fa\nte' });
});

it('handles partial messages', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']
.map(m => `${m.length}:${m}`)
.join('');
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join('');

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
Expand Down Expand Up @@ -117,7 +139,7 @@ describe('ajaxStream', () => {
it('rejects if the payload contains invalid JSON', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = ['{ waut? }\n'].map(m => `${m.length}:${m}`).join('');
const messages = ['{ waut? }\n'].join('');

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
Expand All @@ -130,32 +152,12 @@ describe('ajaxStream', () => {
expect(await promise.then(() => true).catch(() => false)).toBeFalsy();
});

it('rejects if the delim is invalid', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = '{ "hi": "there" }';

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});

sendText(messages);
done();

expect(await promise.then(() => true).catch(({ message }) => message)).toMatch(
/invalid stream response/i
);
});

it('rejects if the handler throws', async () => {
const handler = jest.fn(() => {
throw new Error('DOH!');
});
const { req, sendText, done } = mockRequest();
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']
.map(m => `${m.length}:${m}`)
.join('');
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join('');

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,34 +64,19 @@ function processBatchResponseStream<T>(handler: BatchResponseHandler<T>) {
return (text: string) => {
// While there's text to process...
while (index < text.length) {
// Our messages are delimited by colon: len:json
const delim = ':';
// We're using new line-delimited JSON.
const delim = '\n';
const delimIndex = text.indexOf(delim, index);
const payloadStart = delimIndex + delim.length;

// We've got an incomplete batch length
if (delimIndex < 0) {
return;
}

const rawLen = text.slice(index, delimIndex);
const payloadLen = parseInt(rawLen, 10);
const payloadEnd = payloadStart + payloadLen;

// We've got an invalid batch message (e.g. one without a numeric length: prefix)
if (isNaN(payloadLen)) {
throw new Error(`Invalid stream response length: ${rawLen}`);
}

// We've got an incomplete batch message
if (text.length < payloadEnd) {
return;
}

const payload = JSON.parse(text.slice(payloadStart, payloadEnd));
const payload = JSON.parse(text.slice(index, delimIndex));
handler(payload);

index = payloadEnd;
index = delimIndex + 1;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,15 @@ function runServerFunctions(server: any) {

// Send the initial headers.
res.writeHead(200, {
'Content-Type': 'text/plain',
'Content-Type': 'application/x-ndjson',
Connection: 'keep-alive',
'Transfer-Encoding': 'chunked',
'Cache-Control': 'no-cache',
});

// Write a length-delimited response
const streamResult = (result: any) => {
const payload = JSON.stringify(result) + '\n';
res.write(`${payload.length}:${payload}`);
res.write(JSON.stringify(result) + '\n');
};

// Tries to run an interpreter function, and ensures a consistent error payload on failure.
Expand Down