From d084e01a6e607a38545c37bac5b11b5c887aa082 Mon Sep 17 00:00:00 2001
From: Chris Davies <github@christophilus.com>
Date: Fri, 1 Mar 2019 10:29:59 -0500
Subject: [PATCH] Add http/1 response streaming to Canvas batches (#32027)

---
 .../src/public/batched_fetch.js               |  38 ++--
 .../src/public/batched_fetch.test.js          |  72 +++++++
 .../kbn-interpreter/src/public/interpreter.js |   4 +-
 .../src/public/interpreter.test.js            |  37 ++--
 .../interpreter/public/interpreter.js         |   8 +-
 .../server/routes/server_functions.js         |  75 ++++---
 .../ui/public/ajax_stream/ajax_stream.test.ts | 199 ++++++++++++++++++
 .../ui/public/ajax_stream/ajax_stream.ts      | 167 +++++++++++++++
 src/legacy/ui/public/ajax_stream/index.ts     |  33 +++
 9 files changed, 570 insertions(+), 63 deletions(-)
 create mode 100644 packages/kbn-interpreter/src/public/batched_fetch.test.js
 create mode 100644 src/legacy/ui/public/ajax_stream/ajax_stream.test.ts
 create mode 100644 src/legacy/ui/public/ajax_stream/ajax_stream.ts
 create mode 100644 src/legacy/ui/public/ajax_stream/index.ts

diff --git a/packages/kbn-interpreter/src/public/batched_fetch.js b/packages/kbn-interpreter/src/public/batched_fetch.js
index 20fd0d37b5ef..414b1443f5b8 100644
--- a/packages/kbn-interpreter/src/public/batched_fetch.js
+++ b/packages/kbn-interpreter/src/public/batched_fetch.js
@@ -23,7 +23,7 @@ import { FUNCTIONS_URL } from './consts';
  * Create a function which executes an Expression function on the
  * server as part of a larger batch of executions.
  */
-export function batchedFetch({ kfetch, serialize, ms = 10 }) {
+export function batchedFetch({ ajaxStream, serialize, ms = 10 }) {
   // Uniquely identifies each function call in a batch operation
   // so that the appropriate promise can be resolved / rejected later.
   let id = 0;
@@ -42,7 +42,7 @@ export function batchedFetch({ kfetch, serialize, ms = 10 }) {
   };
 
   const runBatch = () => {
-    processBatch(kfetch, batch);
+    processBatch(ajaxStream, batch);
     reset();
   };
 
@@ -70,14 +70,15 @@ export function batchedFetch({ kfetch, serialize, ms = 10 }) {
 function createFuture() {
   let resolve;
   let reject;
+  const promise = new Promise((res, rej) => {
+    resolve = res;
+    reject = rej;
+  });
 
   return {
-    resolve(val) { return resolve(val); },
-    reject(val) { return reject(val); },
-    promise: new Promise((res, rej) => {
-      resolve = res;
-      reject = rej;
-    }),
+    resolve,
+    reject,
+    promise,
   };
 }
 
@@ -85,22 +86,21 @@ function createFuture() {
  * Runs the specified batch of functions on the server, then resolves
  * the related promises.
  */
-async function processBatch(kfetch, batch) {
+async function processBatch(ajaxStream, batch) {
   try {
-    const { results } = await kfetch({
-      pathname: FUNCTIONS_URL,
-      method: 'POST',
+    await ajaxStream({
+      url: FUNCTIONS_URL,
       body: JSON.stringify({
         functions: Object.values(batch).map(({ request }) => request),
       }),
-    });
+      onResponse({ id, statusCode, result }) {
+        const { future } = batch[id];
 
-    results.forEach(({ id, result }) => {
-      const { future } = batch[id];
-      if (result.statusCode && result.err) {
-        future.reject(result);
-      } else {
-        future.resolve(result);
+        if (statusCode >= 400) {
+          future.reject(result);
+        } else {
+          future.resolve(result);
+        }
       }
     });
   } catch (err) {
diff --git a/packages/kbn-interpreter/src/public/batched_fetch.test.js b/packages/kbn-interpreter/src/public/batched_fetch.test.js
new file mode 100644
index 000000000000..f1c04e9de6d0
--- /dev/null
+++ b/packages/kbn-interpreter/src/public/batched_fetch.test.js
@@ -0,0 +1,72 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { batchedFetch } from './batched_fetch';
+
+const serialize = (o) => JSON.stringify(o);
+
+describe('batchedFetch', () => {
+  it('resolves the correct promise', async () => {
+    const ajaxStream = jest.fn(async ({ body, onResponse }) => {
+      const { functions } = JSON.parse(body);
+      functions.map(({ id, functionName, context, args }) => onResponse({
+        id,
+        statusCode: 200,
+        result: `${functionName}${context}${args}`,
+      }));
+    });
+
+    const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 });
+
+    const result = await Promise.all([
+      ajax({ functionName: 'a', context: 1, args: 'aaa' }),
+      ajax({ functionName: 'b', context: 2, args: 'bbb' }),
+    ]);
+
+    expect(result).toEqual([
+      'a1aaa',
+      'b2bbb',
+    ]);
+  });
+
+  it('rejects responses whose statusCode is >= 300', async () => {
+    const ajaxStream = jest.fn(async ({ body, onResponse }) => {
+      const { functions } = JSON.parse(body);
+      functions.map(({ id, functionName, context, args }) => onResponse({
+        id,
+        statusCode: context,
+        result: context >= 400 ? { err: {} } : `${functionName}${context}${args}`,
+      }));
+    });
+
+    const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 });
+
+    const result = await Promise.all([
+      ajax({ functionName: 'a', context: 500, args: 'aaa' }).catch(() => 'fail'),
+      ajax({ functionName: 'b', context: 400, args: 'bbb' }).catch(() => 'fail'),
+      ajax({ functionName: 'c', context: 200, args: 'ccc' }),
+    ]);
+
+    expect(result).toEqual([
+      'fail',
+      'fail',
+      'c200ccc'
+    ]);
+  });
+});
\ No newline at end of file
diff --git a/packages/kbn-interpreter/src/public/interpreter.js b/packages/kbn-interpreter/src/public/interpreter.js
index 9e695864e1f4..02adc19f7810 100644
--- a/packages/kbn-interpreter/src/public/interpreter.js
+++ b/packages/kbn-interpreter/src/public/interpreter.js
@@ -23,11 +23,11 @@ import { createHandlers } from './create_handlers';
 import { batchedFetch } from './batched_fetch';
 import { FUNCTIONS_URL } from './consts';
 
-export async function initializeInterpreter(kfetch, typesRegistry, functionsRegistry) {
+export async function initializeInterpreter({ kfetch, ajaxStream, typesRegistry, functionsRegistry }) {
   const serverFunctionList = await kfetch({ pathname: FUNCTIONS_URL });
   const types = typesRegistry.toJS();
   const { serialize } = serializeProvider(types);
-  const batch = batchedFetch({ kfetch, serialize });
+  const batch = batchedFetch({ ajaxStream, serialize });
 
   // For every sever-side function, register a client-side
   // function that matches its definition, but which simply
diff --git a/packages/kbn-interpreter/src/public/interpreter.test.js b/packages/kbn-interpreter/src/public/interpreter.test.js
index 34eb3578ec35..8593d0793a1b 100644
--- a/packages/kbn-interpreter/src/public/interpreter.test.js
+++ b/packages/kbn-interpreter/src/public/interpreter.test.js
@@ -35,26 +35,21 @@ jest.mock('./create_handlers', () => ({
 describe('kbn-interpreter/interpreter', () => {
   it('loads server-side functions', async () => {
     const kfetch = jest.fn(async () => ({}));
+    const ajaxStream = jest.fn(async () => ({}));
 
-    await initializeInterpreter(kfetch, { toJS: () => ({}) }, ({ register: () => {} }));
+    await initializeInterpreter({
+      kfetch,
+      ajaxStream,
+      typesRegistry: { toJS: () => ({}) },
+      functionsRegistry: ({ register: () => {} }),
+    });
 
     expect(kfetch).toHaveBeenCalledTimes(1);
     expect(kfetch).toHaveBeenCalledWith({ pathname: FUNCTIONS_URL });
   });
 
   it('registers client-side functions that pass through to the server', async () => {
-    const kfetch = jest.fn(async ({ method }) => {
-      if (method === 'POST') {
-        return {
-          results: [{
-            id: 1,
-            result: {
-              hello: 'world',
-            },
-          }],
-        };
-      }
-
+    const kfetch = jest.fn(async () => {
       return {
         hello: { name: 'hello' },
         world: { name: 'world' },
@@ -62,8 +57,16 @@ describe('kbn-interpreter/interpreter', () => {
     });
 
     const register = jest.fn();
+    const ajaxStream = jest.fn(async ({ onResponse }) => {
+      onResponse({ id: 1, result: { hello: 'world' } });
+    });
 
-    await initializeInterpreter(kfetch, { toJS: () => ({}) }, ({ register }));
+    await initializeInterpreter({
+      kfetch,
+      ajaxStream,
+      typesRegistry: { toJS: () => ({}) },
+      functionsRegistry: ({ register }),
+    });
 
     expect(register).toHaveBeenCalledTimes(2);
 
@@ -81,9 +84,9 @@ describe('kbn-interpreter/interpreter', () => {
 
     expect(result).toEqual({ hello: 'world' });
 
-    expect(kfetch).toHaveBeenCalledWith({
-      pathname: FUNCTIONS_URL,
-      method: 'POST',
+    expect(ajaxStream).toHaveBeenCalledWith({
+      url: FUNCTIONS_URL,
+      onResponse: expect.any(Function),
       body: JSON.stringify({
         functions: [{
           id: 1,
diff --git a/src/legacy/core_plugins/interpreter/public/interpreter.js b/src/legacy/core_plugins/interpreter/public/interpreter.js
index 1ec5b00d3952..b26f46f7a27e 100644
--- a/src/legacy/core_plugins/interpreter/public/interpreter.js
+++ b/src/legacy/core_plugins/interpreter/public/interpreter.js
@@ -20,6 +20,7 @@
 import { register } from '@kbn/interpreter/common';
 import { initializeInterpreter, registries } from '@kbn/interpreter/public';
 import { kfetch } from 'ui/kfetch';
+import { ajaxStream } from 'ui/ajax_stream';
 import { functions } from './functions';
 import { visualization } from './renderers/visualization';
 
@@ -32,7 +33,12 @@ let _resolve;
 let _interpreterPromise;
 
 const initialize = async () => {
-  initializeInterpreter(kfetch, registries.types, registries.browserFunctions).then(interpreter => {
+  initializeInterpreter({
+    kfetch,
+    ajaxStream,
+    typesRegistry: registries.types,
+    functionsRegistry: registries.browserFunctions,
+  }).then(interpreter => {
     _resolve({ interpreter });
   });
 };
diff --git a/src/legacy/core_plugins/interpreter/server/routes/server_functions.js b/src/legacy/core_plugins/interpreter/server/routes/server_functions.js
index 07b3fabad6af..cdd552e258ca 100644
--- a/src/legacy/core_plugins/interpreter/server/routes/server_functions.js
+++ b/src/legacy/core_plugins/interpreter/server/routes/server_functions.js
@@ -65,37 +65,64 @@ function runServerFunctions(server) {
       const handlers = await createHandlers(req, server);
       const { functions } = req.payload;
 
-      // Process each function individually, and bundle up respones / errors into
-      // the format expected by the front-end batcher.
-      const results = await Promise.all(functions.map(async ({ id, ...fnCall }) => {
-        const result = await runFunction(server, handlers, fnCall)
-          .catch(err => {
-            if (Boom.isBoom(err)) {
-              return { err, statusCode: err.statusCode, message: err.output.payload };
-            }
-            return { err: 'Internal Server Error', statusCode: 500, message: 'See server logs for details.' };
-          });
-
-        if (result == null) {
-          const { functionName } = fnCall;
-          return {
-            id,
-            result: {
-              err: `No result from '${functionName}'`,
-              statusCode: 500,
-              message: `Function '${functionName}' did not return anything`
-            }
-          };
+      // Grab the raw Node response object.
+      const res = req.raw.res;
+
+      // Tell Hapi not to manage the response https://github.com/hapijs/hapi/issues/3884
+      req._isReplied = true;
+
+      // Send the initial headers.
+      res.writeHead(200, {
+        'Content-Type': 'text/plain',
+        'Connection': 'keep-alive',
+        'Transfer-Encoding': 'chunked',
+        'Cache-Control': 'no-cache',
+      });
+
+      // Write a length-delimited response
+      const streamResult = (result) => {
+        const payload = JSON.stringify(result) + '\n';
+        res.write(`${payload.length}:${payload}`);
+      };
+
+      // Tries to run an interpreter function, and ensures a consistent error payload on failure.
+      const tryFunction = async (id, fnCall) => {
+        try {
+          const result = await runFunction(server, handlers, fnCall);
+
+          if (result != null) {
+            return { id, statusCode: 200, result };
+          }
+
+          return batchError(id, `Function ${fnCall.functionName} did not return anything.`);
+        } catch (err) {
+          if (Boom.isBoom(err)) {
+            return batchError(id, err.output.payload, err.statusCode);
+          }
+          return batchError(id, 'See server logs for details.');
         }
+      };
 
-        return { id, result };
-      }));
+      // Process each function individually, and stream the responses back to the client
+      await Promise.all(functions.map(({ id, ...fnCall }) => tryFunction(id, fnCall).then(streamResult)));
 
-      return { results };
+      // All of the responses have been written, so we can close the response.
+      res.end();
     },
   });
 }
 
+/**
+ * A helper function for bundling up errors.
+ */
+function batchError(id, message, statusCode = 500) {
+  return {
+    id,
+    statusCode,
+    result: { statusCode, message },
+  };
+}
+
 /**
  * Register the endpoint that returns the list of server-only functions.
  * @param {*} server - The Kibana server
diff --git a/src/legacy/ui/public/ajax_stream/ajax_stream.test.ts b/src/legacy/ui/public/ajax_stream/ajax_stream.test.ts
new file mode 100644
index 000000000000..755473d1ee23
--- /dev/null
+++ b/src/legacy/ui/public/ajax_stream/ajax_stream.test.ts
@@ -0,0 +1,199 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { ajaxStream, XMLHttpRequestLike } from './ajax_stream';
+
+// tslint:disable-next-line:no-empty
+function noop() {}
+
+describe('ajaxStream', () => {
+  it('pulls items from the stream and calls the handler', async () => {
+    const handler = jest.fn(() => ({}));
+    const { req, sendText, done } = mockRequest();
+    const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].map(m => `${m.length}:${m}`);
+
+    const promise = ajaxStream('', {}, req, {
+      url: '/test/endpoint',
+      onResponse: handler,
+    });
+
+    sendText(messages[0]);
+    sendText(messages[1]);
+    done();
+
+    await promise;
+    expect(handler).toHaveBeenCalledTimes(2);
+    expect(handler).toHaveBeenCalledWith({ hello: 'world' });
+    expect(handler).toHaveBeenCalledWith({ tis: 'fate' });
+  });
+
+  it('handles partial messages', async () => {
+    const handler = jest.fn(() => ({}));
+    const { req, sendText, done } = mockRequest();
+    const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']
+      .map(m => `${m.length}:${m}`)
+      .join('');
+
+    const promise = ajaxStream('', {}, req, {
+      url: '/test/endpoint',
+      onResponse: handler,
+    });
+
+    for (const s of messages) {
+      sendText(s);
+    }
+    done();
+
+    await promise;
+    expect(handler).toHaveBeenCalledTimes(2);
+    expect(handler).toHaveBeenCalledWith({ hello: 'world' });
+    expect(handler).toHaveBeenCalledWith({ tis: 'fate' });
+  });
+
+  it('sends the request', async () => {
+    const handler = jest.fn(() => ({}));
+    const { req, done } = mockRequest();
+
+    const promise = ajaxStream('mehBasePath', { a: 'b' }, req, {
+      url: '/test/endpoint',
+      onResponse: handler,
+      body: 'whatup',
+      headers: { foo: 'bar' },
+    });
+
+    done();
+
+    await promise;
+    expect(req.open).toHaveBeenCalledWith('POST', 'mehBasePath/test/endpoint');
+    expect(req.setRequestHeader).toHaveBeenCalledWith('foo', 'bar');
+    expect(req.setRequestHeader).toHaveBeenCalledWith('a', 'b');
+    expect(req.send).toHaveBeenCalledWith('whatup');
+  });
+
+  it('rejects if network failure', async () => {
+    const handler = jest.fn(() => ({}));
+    const { req, done } = mockRequest();
+
+    const promise = ajaxStream('', {}, req, {
+      url: '/test/endpoint',
+      onResponse: handler,
+      body: 'whatup',
+    });
+
+    done(0);
+    expect(await promise.then(() => true).catch(() => false)).toBeFalsy();
+  });
+
+  it('rejects if http status error', async () => {
+    const handler = jest.fn(() => ({}));
+    const { req, done } = mockRequest();
+
+    const promise = ajaxStream('', {}, req, {
+      url: '/test/endpoint',
+      onResponse: handler,
+      body: 'whatup',
+    });
+
+    done(400);
+    expect(await promise.then(() => true).catch(() => false)).toBeFalsy();
+  });
+
+  it('rejects if the payload contains invalid JSON', async () => {
+    const handler = jest.fn(() => ({}));
+    const { req, sendText, done } = mockRequest();
+    const messages = ['{ waut? }\n'].map(m => `${m.length}:${m}`).join('');
+
+    const promise = ajaxStream('', {}, req, {
+      url: '/test/endpoint',
+      onResponse: handler,
+    });
+
+    sendText(messages);
+    done();
+
+    expect(await promise.then(() => true).catch(() => false)).toBeFalsy();
+  });
+
+  it('rejects if the delim is invalid', async () => {
+    const handler = jest.fn(() => ({}));
+    const { req, sendText, done } = mockRequest();
+    const messages = '{ "hi": "there" }';
+
+    const promise = ajaxStream('', {}, req, {
+      url: '/test/endpoint',
+      onResponse: handler,
+    });
+
+    sendText(messages);
+    done();
+
+    expect(await promise.then(() => true).catch(({ message }) => message)).toMatch(
+      /invalid stream response/i
+    );
+  });
+
+  it('rejects if the handler throws', async () => {
+    const handler = jest.fn(() => {
+      throw new Error('DOH!');
+    });
+    const { req, sendText, done } = mockRequest();
+    const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']
+      .map(m => `${m.length}:${m}`)
+      .join('');
+
+    const promise = ajaxStream('', {}, req, {
+      url: '/test/endpoint',
+      onResponse: handler,
+    });
+
+    sendText(messages);
+    done();
+
+    expect(await promise.then(() => true).catch(({ message }) => message)).toMatch(/doh!/i);
+  });
+});
+
+function mockRequest() {
+  const req: XMLHttpRequestLike = {
+    onprogress: noop,
+    onreadystatechange: noop,
+    open: jest.fn(),
+    readyState: 0,
+    responseText: '',
+    send: jest.fn(),
+    setRequestHeader: jest.fn(),
+    abort: jest.fn(),
+    status: 0,
+    withCredentials: false,
+  };
+
+  return {
+    req,
+    sendText(text: string) {
+      req.responseText += text;
+      req.onreadystatechange();
+      req.onprogress();
+    },
+    done(status = 200) {
+      req.status = status;
+      req.readyState = 4;
+      req.onreadystatechange();
+    },
+  };
+}
diff --git a/src/legacy/ui/public/ajax_stream/ajax_stream.ts b/src/legacy/ui/public/ajax_stream/ajax_stream.ts
new file mode 100644
index 000000000000..74e10e2a271b
--- /dev/null
+++ b/src/legacy/ui/public/ajax_stream/ajax_stream.ts
@@ -0,0 +1,167 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { once } from 'lodash';
+
+/**
+ * This file contains the client-side logic for processing a streaming AJAX response.
+ * This allows things like request batching to process individual batch item results
+ * as soon as the server sends them, instead of waiting for the entire response before
+ * client-side processing can begin.
+ *
+ * The server sends responses in this format: {length}:{json}, for example:
+ *
+ * 18:{"hello":"world"}\n16:{"hello":"you"}\n
+ */
+
+// T is the response payload (the JSON), and we don't really
+// care what it's type / shape is.
+export type BatchResponseHandler<T> = (result: T) => void;
+
+export interface BatchOpts<T> {
+  url: string;
+  onResponse: BatchResponseHandler<T>;
+  method?: string;
+  body?: string;
+  headers?: { [k: string]: string };
+}
+
+// The subset of XMLHttpRequest that we use
+export interface XMLHttpRequestLike {
+  abort: () => void;
+  onreadystatechange: any;
+  onprogress: any;
+  open: (method: string, url: string) => void;
+  readyState: number;
+  responseText: string;
+  send: (body?: string) => void;
+  setRequestHeader: (header: string, value: string) => void;
+  status: number;
+  withCredentials: boolean;
+}
+
+// Create a function which, when successively passed streaming response text,
+// calls a handler callback with each response in the batch.
+function processBatchResponseStream<T>(handler: BatchResponseHandler<T>) {
+  let index = 0;
+
+  return (text: string) => {
+    // While there's text to process...
+    while (index < text.length) {
+      // Our messages are delimited by colon: len:json
+      const delim = ':';
+      const delimIndex = text.indexOf(delim, index);
+      const payloadStart = delimIndex + delim.length;
+
+      // We've got an incomplete batch length
+      if (delimIndex < 0) {
+        return;
+      }
+
+      const rawLen = text.slice(index, delimIndex);
+      const payloadLen = parseInt(rawLen, 10);
+      const payloadEnd = payloadStart + payloadLen;
+
+      // We've got an invalid batch message (e.g. one without a numeric length: prefix)
+      if (isNaN(payloadLen)) {
+        throw new Error(`Invalid stream response length: ${rawLen}`);
+      }
+
+      // We've got an incomplete batch message
+      if (text.length < payloadEnd) {
+        return;
+      }
+
+      const payload = JSON.parse(text.slice(payloadStart, payloadEnd));
+      handler(payload);
+
+      index = payloadEnd;
+    }
+  };
+}
+
+/**
+ * Sends an AJAX request to the server, and processes the result as a
+ * streaming HTTP/1 response.
+ *
+ * @param basePath - The Kibana basepath
+ * @param defaultHeaders - The default HTTP headers to be sent with each request
+ * @param req - The XMLHttpRequest
+ * @param opts - The request options
+ * @returns A promise which resolves when the entire batch response has been processed.
+ */
+export function ajaxStream<T>(
+  basePath: string,
+  defaultHeaders: { [k: string]: string },
+  req: XMLHttpRequestLike,
+  opts: BatchOpts<T>
+) {
+  return new Promise((resolve, reject) => {
+    const { url, method, headers } = opts;
+
+    // There are several paths by which the promise may resolve or reject. We wrap this
+    // in "once" as a safeguard against cases where we attempt more than one call. (e.g.
+    // a batch handler fails, so we reject the promise, but then new data comes in for
+    // a subsequent batch item)
+    const complete = once((err: Error | undefined = undefined) =>
+      err ? reject(err) : resolve(req)
+    );
+
+    // Begin the request
+    req.open(method || 'POST', `${basePath}/${url.replace(/^\//, '')}`);
+    req.withCredentials = true;
+
+    // Set the HTTP headers
+    Object.entries(Object.assign({}, defaultHeaders, headers)).forEach(([k, v]) =>
+      req.setRequestHeader(k, v)
+    );
+
+    const batchHandler = processBatchResponseStream(opts.onResponse);
+    const processBatch = () => {
+      try {
+        batchHandler(req.responseText);
+      } catch (err) {
+        req.abort();
+        complete(err);
+      }
+    };
+
+    req.onprogress = processBatch;
+
+    req.onreadystatechange = () => {
+      // Older browsers don't support onprogress, so we need
+      // to call this here, too. It's safe to call this multiple
+      // times even for the same progress event.
+      processBatch();
+
+      // 4 is the magic number that means the request is done
+      if (req.readyState === 4) {
+        // 0 indicates a network failure. 400+ messages are considered server errors
+        if (req.status === 0 || req.status >= 400) {
+          complete(new Error(`Batch request failed with status ${req.status}`));
+        } else {
+          complete();
+        }
+      }
+    };
+
+    // Send the payload to the server
+    req.send(opts.body);
+  });
+}
diff --git a/src/legacy/ui/public/ajax_stream/index.ts b/src/legacy/ui/public/ajax_stream/index.ts
new file mode 100644
index 000000000000..9c5f1832f9c5
--- /dev/null
+++ b/src/legacy/ui/public/ajax_stream/index.ts
@@ -0,0 +1,33 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import chrome from 'ui/chrome';
+import { metadata } from 'ui/metadata';
+import { ajaxStream as ajax, BatchOpts } from './ajax_stream';
+
+const defaultHeaders = {
+  'Content-Type': 'application/json',
+  'kbn-version': metadata.version,
+};
+
+export { BatchOpts } from './ajax_stream';
+
+export function ajaxStream<T>(opts: BatchOpts<T>) {
+  return ajax(chrome.getBasePath(), defaultHeaders, new XMLHttpRequest(), opts);
+}