Skip to content

Commit

Permalink
Change ajax_stream to use new-line delimited JSON (#52797) (#52884)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisdavies authored Dec 12, 2019
1 parent b4a7df2 commit 8dce594
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('ajaxStream', () => {
it('pulls items from the stream and calls the handler', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].map(m => `${m.length}:${m}`);
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'];

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
Expand All @@ -43,12 +43,34 @@ describe('ajaxStream', () => {
expect(handler).toHaveBeenCalledWith({ tis: 'fate' });
});

it('handles newlines in values', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = [
JSON.stringify({ hello: 'wo\nrld' }),
'\n',
JSON.stringify({ tis: 'fa\nte' }),
'\n',
];

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});

messages.forEach(sendText);
done();

await promise;
expect(handler).toHaveBeenCalledTimes(2);
expect(handler).toHaveBeenCalledWith({ hello: 'wo\nrld' });
expect(handler).toHaveBeenCalledWith({ tis: 'fa\nte' });
});

it('handles partial messages', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']
.map(m => `${m.length}:${m}`)
.join('');
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join('');

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
Expand Down Expand Up @@ -117,7 +139,7 @@ describe('ajaxStream', () => {
it('rejects if the payload contains invalid JSON', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = ['{ waut? }\n'].map(m => `${m.length}:${m}`).join('');
const messages = ['{ waut? }\n'].join('');

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
Expand All @@ -130,32 +152,12 @@ describe('ajaxStream', () => {
expect(await promise.then(() => true).catch(() => false)).toBeFalsy();
});

it('rejects if the delim is invalid', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = '{ "hi": "there" }';

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});

sendText(messages);
done();

expect(await promise.then(() => true).catch(({ message }) => message)).toMatch(
/invalid stream response/i
);
});

it('rejects if the handler throws', async () => {
const handler = jest.fn(() => {
throw new Error('DOH!');
});
const { req, sendText, done } = mockRequest();
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']
.map(m => `${m.length}:${m}`)
.join('');
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join('');

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,34 +64,19 @@ function processBatchResponseStream<T>(handler: BatchResponseHandler<T>) {
return (text: string) => {
// While there's text to process...
while (index < text.length) {
// Our messages are delimited by colon: len:json
const delim = ':';
// We're using new line-delimited JSON.
const delim = '\n';
const delimIndex = text.indexOf(delim, index);
const payloadStart = delimIndex + delim.length;

// We've got an incomplete batch length
if (delimIndex < 0) {
return;
}

const rawLen = text.slice(index, delimIndex);
const payloadLen = parseInt(rawLen, 10);
const payloadEnd = payloadStart + payloadLen;

// We've got an invalid batch message (e.g. one without a numeric length: prefix)
if (isNaN(payloadLen)) {
throw new Error(`Invalid stream response length: ${rawLen}`);
}

// We've got an incomplete batch message
if (text.length < payloadEnd) {
return;
}

const payload = JSON.parse(text.slice(payloadStart, payloadEnd));
const payload = JSON.parse(text.slice(index, delimIndex));
handler(payload);

index = payloadEnd;
index = delimIndex + 1;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,15 @@ function runServerFunctions(server: any) {

// Send the initial headers.
res.writeHead(200, {
'Content-Type': 'text/plain',
'Content-Type': 'application/x-ndjson',
Connection: 'keep-alive',
'Transfer-Encoding': 'chunked',
'Cache-Control': 'no-cache',
});

// Write a length-delimited response
const streamResult = (result: any) => {
const payload = JSON.stringify(result) + '\n';
res.write(`${payload.length}:${payload}`);
res.write(JSON.stringify(result) + '\n');
};

// Tries to run an interpreter function, and ensures a consistent error payload on failure.
Expand Down

0 comments on commit 8dce594

Please sign in to comment.