From 81918b35c1ed55fe3e94397a1155a7793ecee124 Mon Sep 17 00:00:00 2001 From: Liza Katz Date: Wed, 30 Jun 2021 20:53:47 +0100 Subject: [PATCH] [bfetch] compress stream chunks (#97994) (#103961) * Move inspector adapter integration into search source * docs and ts * Move other bucket to search source * test ts + delete unused tabilfy function * hierarchical param in aggconfig. ts improvements more inspector tests * fix jest * separate inspect more tests * jest * inspector * Error handling and more tests * put the fun in functional tests * delete client side legacy msearch code * ts * override to sync search in search source * delete more legacy code * ts * delete moarrrr * deflate bfetch chunks * update tests use only zlib * ts * extract getInflatedResponse * tests * Use fflate in attempt to reduce package size * use node streams, fflate and hex encoding. * DISABLE_SEARCH_COMPRESSION UI Settings Use base64 and async compression * i18n * Code review Use custom header for compression Promisify once * use custom headers * Update jest * fix tests * code review, baby! * integration * tests * limit * limit * limit Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> # Conflicts: # packages/kbn-optimizer/limits.yml # test/api_integration/apis/search/bsearch.ts --- .i18nrc.json | 1 + package.json | 1 + packages/kbn-optimizer/limits.yml | 2 +- packages/kbn-ui-shared-deps/entry.js | 2 + packages/kbn-ui-shared-deps/index.js | 1 + src/plugins/bfetch/common/batch.ts | 5 + src/plugins/bfetch/common/constants.ts | 9 ++ src/plugins/bfetch/common/index.ts | 1 + .../create_streaming_batched_function.test.ts | 50 +++++++- .../create_streaming_batched_function.ts | 17 ++- src/plugins/bfetch/public/batching/index.ts | 12 ++ src/plugins/bfetch/public/plugin.ts | 34 ++++-- .../public/streaming/fetch_streaming.test.ts | 112 +++++++++++++++++- .../public/streaming/fetch_streaming.ts | 49 ++++++-- src/plugins/bfetch/public/streaming/index.ts | 1 + .../public/streaming/inflate_response.ts | 15 +++ src/plugins/bfetch/server/plugin.ts | 32 ++++- .../streaming/create_compressed_stream.ts | 59 +++++++++ .../bfetch/server/streaming/create_stream.ts | 23 ++++ src/plugins/bfetch/server/streaming/index.ts | 2 + src/plugins/bfetch/server/ui_settings.ts | 29 +++++ .../server/collectors/management/schema.ts | 4 + .../server/collectors/management/types.ts | 1 + src/plugins/telemetry/schema/oss_plugins.json | 6 + test/api_integration/apis/search/bsearch.ts | 83 ++++++++++--- yarn.lock | 5 + 26 files changed, 504 insertions(+), 52 deletions(-) create mode 100644 src/plugins/bfetch/common/constants.ts create mode 100644 src/plugins/bfetch/public/batching/index.ts create mode 100644 src/plugins/bfetch/public/streaming/inflate_response.ts create mode 100644 src/plugins/bfetch/server/streaming/create_compressed_stream.ts create mode 100644 src/plugins/bfetch/server/streaming/create_stream.ts create mode 100644 src/plugins/bfetch/server/ui_settings.ts diff --git a/.i18nrc.json b/.i18nrc.json index efbb5ecc0194e..26694461b3b22 100644 --- a/.i18nrc.json +++ b/.i18nrc.json @@ -4,6 +4,7 @@ "console": "src/plugins/console", "core": "src/core", "discover": "src/plugins/discover", + "bfetch": "src/plugins/bfetch", "dashboard": "src/plugins/dashboard", "data": "src/plugins/data", "embeddableApi": "src/plugins/embeddable", diff --git a/package.json b/package.json index c41d1241130c1..16e999e82bc74 100644 --- a/package.json +++ b/package.json @@ -214,6 +214,7 @@ "expiry-js": "0.1.7", "extract-zip": "^2.0.1", "fast-deep-equal": "^3.1.1", + "fflate": "^0.6.9", "file-saver": "^1.3.8", "file-type": "^10.9.0", "focus-trap-react": "^3.1.1", diff --git a/packages/kbn-optimizer/limits.yml b/packages/kbn-optimizer/limits.yml index 3b7b7db888339..25266984022db 100644 --- a/packages/kbn-optimizer/limits.yml +++ b/packages/kbn-optimizer/limits.yml @@ -4,7 +4,7 @@ pageLoadAssetSize: apm: 64385 apmOss: 18996 beatsManagement: 188135 - bfetch: 41874 + bfetch: 51874 canvas: 1065624 charts: 195358 cloud: 21076 diff --git a/packages/kbn-ui-shared-deps/entry.js b/packages/kbn-ui-shared-deps/entry.js index 4029ce28faf5b..d3755ed7c5f29 100644 --- a/packages/kbn-ui-shared-deps/entry.js +++ b/packages/kbn-ui-shared-deps/entry.js @@ -44,6 +44,8 @@ export const Theme = require('./theme.ts'); export const Lodash = require('lodash'); export const LodashFp = require('lodash/fp'); +export const Fflate = require('fflate/esm/browser'); + // runtime deps which don't need to be copied across all bundles export const TsLib = require('tslib'); export const KbnAnalytics = require('@kbn/analytics'); diff --git a/packages/kbn-ui-shared-deps/index.js b/packages/kbn-ui-shared-deps/index.js index 62ddb09d25add..877bf3df6c039 100644 --- a/packages/kbn-ui-shared-deps/index.js +++ b/packages/kbn-ui-shared-deps/index.js @@ -52,6 +52,7 @@ exports.externals = { '@elastic/eui/dist/eui_theme_dark.json': '__kbnSharedDeps__.Theme.euiDarkVars', lodash: '__kbnSharedDeps__.Lodash', 'lodash/fp': '__kbnSharedDeps__.LodashFp', + fflate: '__kbnSharedDeps__.Fflate', /** * runtime deps which don't need to be copied across all bundles diff --git a/src/plugins/bfetch/common/batch.ts b/src/plugins/bfetch/common/batch.ts index a84d94b541ae5..59b012751c66d 100644 --- a/src/plugins/bfetch/common/batch.ts +++ b/src/plugins/bfetch/common/batch.ts @@ -19,3 +19,8 @@ export interface BatchResponseItem new Promise((resolve) => setImmediate(resolve)); const getPromiseState = (promise: Promise): Promise<'resolved' | 'rejected' | 'pending'> => Promise.race<'resolved' | 'rejected' | 'pending'>([ @@ -52,6 +54,7 @@ describe('createStreamingBatchedFunction()', () => { const fn = createStreamingBatchedFunction({ url: '/test', fetchStreaming, + compressionDisabled$: rxof(true), }); expect(typeof fn).toBe('function'); }); @@ -61,6 +64,7 @@ describe('createStreamingBatchedFunction()', () => { const fn = createStreamingBatchedFunction({ url: '/test', fetchStreaming, + compressionDisabled$: rxof(true), }); const res = fn({}); expect(typeof res.then).toBe('function'); @@ -74,6 +78,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); expect(fetchStreaming).toHaveBeenCalledTimes(0); @@ -93,6 +98,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); expect(fetchStreaming).toHaveBeenCalledTimes(0); @@ -107,6 +113,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); fn({ foo: 'bar' }); @@ -125,6 +132,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); fn({ foo: 'bar' }); @@ -146,14 +154,18 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); expect(fetchStreaming).toHaveBeenCalledTimes(0); fn({ foo: 'bar' }); + await flushPromises(); expect(fetchStreaming).toHaveBeenCalledTimes(0); fn({ baz: 'quix' }); + await flushPromises(); expect(fetchStreaming).toHaveBeenCalledTimes(0); fn({ full: 'yep' }); + await flushPromises(); expect(fetchStreaming).toHaveBeenCalledTimes(1); }); @@ -164,6 +176,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); const abortController = new AbortController(); @@ -186,11 +199,13 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); fn({ a: '1' }); fn({ b: '2' }); fn({ c: '3' }); + await flushPromises(); expect(fetchStreaming.mock.calls[0][0]).toMatchObject({ url: '/test', @@ -209,13 +224,16 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); fn({ a: '1' }); fn({ b: '2' }); fn({ c: '3' }); + await flushPromises(); expect(fetchStreaming).toHaveBeenCalledTimes(1); fn({ d: '4' }); + await flushPromises(); await new Promise((r) => setTimeout(r, 6)); expect(fetchStreaming).toHaveBeenCalledTimes(2); }); @@ -229,6 +247,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); const promise1 = fn({ a: '1' }); @@ -246,8 +265,11 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); + await flushPromises(); + const promise1 = fn({ a: '1' }); const promise2 = fn({ b: '2' }); const promise3 = fn({ c: '3' }); @@ -287,6 +309,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); const promise1 = fn({ a: '1' }); @@ -314,6 +337,20 @@ describe('createStreamingBatchedFunction()', () => { expect(await promise3).toEqual({ foo: 'bar 2' }); }); + test('compression is false by default', async () => { + const { fetchStreaming } = setup(); + const fn = createStreamingBatchedFunction({ + url: '/test', + flushOnMaxItems: 1, + fetchStreaming, + }); + + fn({ a: '1' }); + + const dontCompress = await fetchStreaming.mock.calls[0][0].compressionDisabled$.toPromise(); + expect(dontCompress).toBe(false); + }); + test('resolves falsy results', async () => { const { fetchStreaming, stream } = setup(); const fn = createStreamingBatchedFunction({ @@ -321,6 +358,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); const promise1 = fn({ a: '1' }); @@ -362,6 +400,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); const promise = fn({ a: '1' }); @@ -390,6 +429,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); const promise1 = of(fn({ a: '1' })); @@ -442,6 +482,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); const abortController = new AbortController(); @@ -471,6 +512,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); const abortController = new AbortController(); @@ -509,6 +551,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); const promise1 = of(fn({ a: '1' })); @@ -539,6 +582,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); const promise1 = of(fn({ a: '1' })); @@ -576,6 +620,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); const promise1 = of(fn({ a: '1' })); @@ -608,6 +653,7 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); const promise1 = of(fn({ a: '1' })); @@ -644,7 +690,9 @@ describe('createStreamingBatchedFunction()', () => { fetchStreaming, maxItemAge: 5, flushOnMaxItems: 3, + compressionDisabled$: rxof(true), }); + await flushPromises(); const promise1 = of(fn({ a: '1' })); const promise2 = of(fn({ a: '2' })); diff --git a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts index 2d81331f10a88..d5f955f517d13 100644 --- a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts +++ b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts @@ -6,16 +6,16 @@ * Side Public License, v 1. */ +import { Observable, of } from 'rxjs'; import { AbortError, abortSignalToPromise, defer } from '../../../kibana_utils/public'; import { ItemBufferParams, TimedItemBufferParams, createBatchedFunction, - BatchResponseItem, ErrorLike, + normalizeError, } from '../../common'; -import { fetchStreaming, split } from '../streaming'; -import { normalizeError } from '../../common'; +import { fetchStreaming } from '../streaming'; import { BatchedFunc, BatchItem } from './types'; export interface BatchedFunctionProtocolError extends ErrorLike { @@ -47,6 +47,11 @@ export interface StreamingBatchedFunctionParams { * before sending the batch request. */ maxItemAge?: TimedItemBufferParams['maxItemAge']; + + /** + * Disabled zlib compression of response chunks. + */ + compressionDisabled$?: Observable; } /** @@ -64,6 +69,7 @@ export const createStreamingBatchedFunction = ( fetchStreaming: fetchStreamingInjected = fetchStreaming, flushOnMaxItems = 25, maxItemAge = 10, + compressionDisabled$ = of(false), } = params; const [fn] = createBatchedFunction({ onCall: (payload: Payload, signal?: AbortSignal) => { @@ -119,6 +125,7 @@ export const createStreamingBatchedFunction = ( body: JSON.stringify({ batch }), method: 'POST', signal: abortController.signal, + compressionDisabled$, }); const handleStreamError = (error: any) => { @@ -127,10 +134,10 @@ export const createStreamingBatchedFunction = ( for (const { future } of items) future.reject(normalizedError); }; - stream.pipe(split('\n')).subscribe({ + stream.subscribe({ next: (json: string) => { try { - const response = JSON.parse(json) as BatchResponseItem; + const response = JSON.parse(json); if (response.error) { items[response.id].future.reject(response.error); } else if (response.result !== undefined) { diff --git a/src/plugins/bfetch/public/batching/index.ts b/src/plugins/bfetch/public/batching/index.ts new file mode 100644 index 0000000000000..115fd84cbe979 --- /dev/null +++ b/src/plugins/bfetch/public/batching/index.ts @@ -0,0 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export { + createStreamingBatchedFunction, + StreamingBatchedFunctionParams, +} from './create_streaming_batched_function'; diff --git a/src/plugins/bfetch/public/plugin.ts b/src/plugins/bfetch/public/plugin.ts index ed97d468eec0b..f97a91a0e70d3 100644 --- a/src/plugins/bfetch/public/plugin.ts +++ b/src/plugins/bfetch/public/plugin.ts @@ -7,12 +7,11 @@ */ import { CoreStart, PluginInitializerContext, CoreSetup, Plugin } from 'src/core/public'; +import { from, Observable, of } from 'rxjs'; +import { switchMap } from 'rxjs/operators'; import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './streaming'; -import { removeLeadingSlash } from '../common'; -import { - createStreamingBatchedFunction, - StreamingBatchedFunctionParams, -} from './batching/create_streaming_batched_function'; +import { DISABLE_BFETCH_COMPRESSION, removeLeadingSlash } from '../common'; +import { createStreamingBatchedFunction, StreamingBatchedFunctionParams } from './batching'; import { BatchedFunc } from './batching/types'; // eslint-disable-next-line @@ -43,12 +42,23 @@ export class BfetchPublicPlugin constructor(private readonly initializerContext: PluginInitializerContext) {} - public setup(core: CoreSetup, plugins: BfetchPublicSetupDependencies): BfetchPublicSetup { + public setup( + core: CoreSetup, + plugins: BfetchPublicSetupDependencies + ): BfetchPublicSetup { const { version } = this.initializerContext.env.packageInfo; const basePath = core.http.basePath.get(); - const fetchStreaming = this.fetchStreaming(version, basePath); - const batchedFunction = this.batchedFunction(fetchStreaming); + const compressionDisabled$ = from(core.getStartServices()).pipe( + switchMap((deps) => { + return of(deps[0]); + }), + switchMap((coreStart) => { + return coreStart.uiSettings.get$(DISABLE_BFETCH_COMPRESSION); + }) + ); + const fetchStreaming = this.fetchStreaming(version, basePath, compressionDisabled$); + const batchedFunction = this.batchedFunction(fetchStreaming, compressionDisabled$); this.contract = { fetchStreaming, @@ -66,7 +76,8 @@ export class BfetchPublicPlugin private fetchStreaming = ( version: string, - basePath: string + basePath: string, + compressionDisabled$: Observable ): BfetchPublicSetup['fetchStreaming'] => (params) => fetchStreamingStatic({ ...params, @@ -76,13 +87,16 @@ export class BfetchPublicPlugin 'kbn-version': version, ...(params.headers || {}), }, + compressionDisabled$, }); private batchedFunction = ( - fetchStreaming: BfetchPublicContract['fetchStreaming'] + fetchStreaming: BfetchPublicContract['fetchStreaming'], + compressionDisabled$: Observable ): BfetchPublicContract['batchedFunction'] => (params) => createStreamingBatchedFunction({ ...params, + compressionDisabled$, fetchStreaming: params.fetchStreaming || fetchStreaming, }); } diff --git a/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts b/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts index e804b3ea94227..a5d066f6d9a24 100644 --- a/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts +++ b/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts @@ -8,6 +8,15 @@ import { fetchStreaming } from './fetch_streaming'; import { mockXMLHttpRequest } from '../test_helpers/xhr'; +import { of } from 'rxjs'; +import { promisify } from 'util'; +import { deflate } from 'zlib'; +const pDeflate = promisify(deflate); + +const compressResponse = async (resp: any) => { + const gzipped = await pDeflate(JSON.stringify(resp)); + return gzipped.toString('base64'); +}; const tick = () => new Promise((resolve) => setTimeout(resolve, 1)); @@ -21,6 +30,7 @@ test('returns XHR request', () => { setup(); const { xhr } = fetchStreaming({ url: 'http://example.com', + compressionDisabled$: of(true), }); expect(typeof xhr.readyState).toBe('number'); }); @@ -29,6 +39,7 @@ test('returns stream', () => { setup(); const { stream } = fetchStreaming({ url: 'http://example.com', + compressionDisabled$: of(true), }); expect(typeof stream.subscribe).toBe('function'); }); @@ -37,6 +48,7 @@ test('promise resolves when request completes', async () => { const env = setup(); const { stream } = fetchStreaming({ url: 'http://example.com', + compressionDisabled$: of(true), }); let resolved = false; @@ -65,10 +77,90 @@ test('promise resolves when request completes', async () => { expect(resolved).toBe(true); }); -test('streams incoming text as it comes through', async () => { +test('promise resolves when compressed request completes', async () => { + const env = setup(); + const { stream } = fetchStreaming({ + url: 'http://example.com', + compressionDisabled$: of(false), + }); + + let resolved = false; + let result; + stream.toPromise().then((r) => { + resolved = true; + result = r; + }); + + await tick(); + expect(resolved).toBe(false); + + const msg = { foo: 'bar' }; + + // Whole message in a response + (env.xhr as any).responseText = `${await compressResponse(msg)}\n`; + env.xhr.onprogress!({} as any); + + await tick(); + expect(resolved).toBe(false); + + (env.xhr as any).readyState = 4; + (env.xhr as any).status = 200; + env.xhr.onreadystatechange!({} as any); + + await tick(); + expect(resolved).toBe(true); + expect(result).toStrictEqual(JSON.stringify(msg)); +}); + +test('promise resolves when compressed chunked request completes', async () => { const env = setup(); const { stream } = fetchStreaming({ url: 'http://example.com', + compressionDisabled$: of(false), + }); + + let resolved = false; + let result; + stream.toPromise().then((r) => { + resolved = true; + result = r; + }); + + await tick(); + expect(resolved).toBe(false); + + const msg = { veg: 'tomato' }; + const msgToCut = await compressResponse(msg); + const part1 = msgToCut.substr(0, 3); + + // Message and a half in a response + (env.xhr as any).responseText = part1; + env.xhr.onprogress!({} as any); + + await tick(); + expect(resolved).toBe(false); + + // Half a message in a response + (env.xhr as any).responseText = `${msgToCut}\n`; + env.xhr.onprogress!({} as any); + + await tick(); + expect(resolved).toBe(false); + + (env.xhr as any).readyState = 4; + (env.xhr as any).status = 200; + env.xhr.onreadystatechange!({} as any); + + await tick(); + expect(resolved).toBe(true); + expect(result).toStrictEqual(JSON.stringify(msg)); +}); + +test('streams incoming text as it comes through, according to separators', async () => { + const env = setup(); + const { stream } = fetchStreaming({ + url: 'http://example.com', + compressionDisabled$: of(true), }); const spy = jest.fn(); @@ -80,16 +172,22 @@ test('streams incoming text as it comes through', async () => { (env.xhr as any).responseText = 'foo'; env.xhr.onprogress!({} as any); + await tick(); + expect(spy).toHaveBeenCalledTimes(0); + + (env.xhr as any).responseText = 'foo\nbar'; + env.xhr.onprogress!({} as any); + await tick(); expect(spy).toHaveBeenCalledTimes(1); expect(spy).toHaveBeenCalledWith('foo'); - (env.xhr as any).responseText = 'foo\nbar'; + (env.xhr as any).responseText = 'foo\nbar\n'; env.xhr.onprogress!({} as any); await tick(); expect(spy).toHaveBeenCalledTimes(2); - expect(spy).toHaveBeenCalledWith('\nbar'); + expect(spy).toHaveBeenCalledWith('bar'); (env.xhr as any).readyState = 4; (env.xhr as any).status = 200; @@ -103,6 +201,7 @@ test('completes stream observable when request finishes', async () => { const env = setup(); const { stream } = fetchStreaming({ url: 'http://example.com', + compressionDisabled$: of(true), }); const spy = jest.fn(); @@ -127,6 +226,7 @@ test('completes stream observable when aborted', async () => { const { stream } = fetchStreaming({ url: 'http://example.com', signal: abort.signal, + compressionDisabled$: of(true), }); const spy = jest.fn(); @@ -152,6 +252,7 @@ test('promise throws when request errors', async () => { const env = setup(); const { stream } = fetchStreaming({ url: 'http://example.com', + compressionDisabled$: of(true), }); const spy = jest.fn(); @@ -178,6 +279,7 @@ test('stream observable errors when request errors', async () => { const env = setup(); const { stream } = fetchStreaming({ url: 'http://example.com', + compressionDisabled$: of(true), }); const spy = jest.fn(); @@ -210,6 +312,7 @@ test('sets custom headers', async () => { 'Content-Type': 'text/plain', Authorization: 'Bearer 123', }, + compressionDisabled$: of(true), }); expect(env.xhr.setRequestHeader).toHaveBeenCalledWith('Content-Type', 'text/plain'); @@ -223,6 +326,7 @@ test('uses credentials', async () => { fetchStreaming({ url: 'http://example.com', + compressionDisabled$: of(true), }); expect(env.xhr.withCredentials).toBe(true); @@ -238,6 +342,7 @@ test('opens XHR request and sends specified body', async () => { url: 'http://elastic.co', method: 'GET', body: 'foobar', + compressionDisabled$: of(true), }); expect(env.xhr.open).toHaveBeenCalledTimes(1); @@ -250,6 +355,7 @@ test('uses POST request method by default', async () => { const env = setup(); fetchStreaming({ url: 'http://elastic.co', + compressionDisabled$: of(true), }); expect(env.xhr.open).toHaveBeenCalledWith('POST', 'http://elastic.co'); }); diff --git a/src/plugins/bfetch/public/streaming/fetch_streaming.ts b/src/plugins/bfetch/public/streaming/fetch_streaming.ts index d68e4d01b44f5..1af35ef68fb85 100644 --- a/src/plugins/bfetch/public/streaming/fetch_streaming.ts +++ b/src/plugins/bfetch/public/streaming/fetch_streaming.ts @@ -6,7 +6,11 @@ * Side Public License, v 1. */ +import { Observable, of } from 'rxjs'; +import { map, share, switchMap } from 'rxjs/operators'; +import { inflateResponse } from '.'; import { fromStreamingXhr } from './from_streaming_xhr'; +import { split } from './split'; export interface FetchStreamingParams { url: string; @@ -14,6 +18,7 @@ export interface FetchStreamingParams { method?: 'GET' | 'POST'; body?: string; signal?: AbortSignal; + compressionDisabled$?: Observable; } /** @@ -26,23 +31,49 @@ export function fetchStreaming({ method = 'POST', body = '', signal, + compressionDisabled$ = of(false), }: FetchStreamingParams) { const xhr = new window.XMLHttpRequest(); - // Begin the request - xhr.open(method, url); - xhr.withCredentials = true; + const msgStream = compressionDisabled$.pipe( + switchMap((compressionDisabled) => { + // Begin the request + xhr.open(method, url); + xhr.withCredentials = true; - // Set the HTTP headers - Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v)); + if (!compressionDisabled) { + headers['X-Chunk-Encoding'] = 'deflate'; + } - const stream = fromStreamingXhr(xhr, signal); + // Set the HTTP headers + Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v)); - // Send the payload to the server - xhr.send(body); + const stream = fromStreamingXhr(xhr, signal); + + // Send the payload to the server + xhr.send(body); + + // Return a stream of chunked decompressed messages + return stream.pipe( + split('\n'), + map((msg) => { + return compressionDisabled ? msg : inflateResponse(msg); + }) + ); + }), + share() + ); + + // start execution + const msgStreamSub = msgStream.subscribe({ + error: (e) => {}, + complete: () => { + msgStreamSub.unsubscribe(); + }, + }); return { xhr, - stream, + stream: msgStream, }; } diff --git a/src/plugins/bfetch/public/streaming/index.ts b/src/plugins/bfetch/public/streaming/index.ts index afb442feffb29..545cae87aa3d6 100644 --- a/src/plugins/bfetch/public/streaming/index.ts +++ b/src/plugins/bfetch/public/streaming/index.ts @@ -9,3 +9,4 @@ export * from './split'; export * from './from_streaming_xhr'; export * from './fetch_streaming'; +export { inflateResponse } from './inflate_response'; diff --git a/src/plugins/bfetch/public/streaming/inflate_response.ts b/src/plugins/bfetch/public/streaming/inflate_response.ts new file mode 100644 index 0000000000000..73cb52285987c --- /dev/null +++ b/src/plugins/bfetch/public/streaming/inflate_response.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { unzlibSync, strFromU8 } from 'fflate'; + +export function inflateResponse(response: string) { + const buff = Buffer.from(response, 'base64'); + const unzip = unzlibSync(buff); + return strFromU8(unzip); +} diff --git a/src/plugins/bfetch/server/plugin.ts b/src/plugins/bfetch/server/plugin.ts index 18f0813260f03..7fd46e2f6cc44 100644 --- a/src/plugins/bfetch/server/plugin.ts +++ b/src/plugins/bfetch/server/plugin.ts @@ -16,6 +16,7 @@ import type { RouteMethod, RequestHandler, RequestHandlerContext, + StartServicesAccessor, } from 'src/core/server'; import { schema } from '@kbn/config-schema'; import { Subject } from 'rxjs'; @@ -28,7 +29,8 @@ import { normalizeError, } from '../common'; import { StreamingRequestHandler } from './types'; -import { createNDJSONStream } from './streaming'; +import { createStream } from './streaming'; +import { getUiSettings } from './ui_settings'; // eslint-disable-next-line export interface BfetchServerSetupDependencies {} @@ -112,9 +114,19 @@ export class BfetchServerPlugin public setup(core: CoreSetup, plugins: BfetchServerSetupDependencies): BfetchServerSetup { const logger = this.initializerContext.logger.get(); const router = core.http.createRouter(); - const addStreamingResponseRoute = this.addStreamingResponseRoute({ router, logger }); + + core.uiSettings.register(getUiSettings()); + + const addStreamingResponseRoute = this.addStreamingResponseRoute({ + getStartServices: core.getStartServices, + router, + logger, + }); const addBatchProcessingRoute = this.addBatchProcessingRoute(addStreamingResponseRoute); - const createStreamingRequestHandler = this.createStreamingRequestHandler({ logger }); + const createStreamingRequestHandler = this.createStreamingRequestHandler({ + getStartServices: core.getStartServices, + logger, + }); return { addBatchProcessingRoute, @@ -129,10 +141,16 @@ export class BfetchServerPlugin public stop() {} + private getCompressionDisabled(request: KibanaRequest) { + return request.headers['x-chunk-encoding'] !== 'deflate'; + } + private addStreamingResponseRoute = ({ + getStartServices, router, logger, }: { + getStartServices: StartServicesAccessor; router: ReturnType; logger: Logger; }): BfetchServerSetup['addStreamingResponseRoute'] => (path, handler) => { @@ -146,9 +164,10 @@ export class BfetchServerPlugin async (context, request, response) => { const handlerInstance = handler(request); const data = request.body; + const compressionDisabled = this.getCompressionDisabled(request); return response.ok({ headers: streamingHeaders, - body: createNDJSONStream(handlerInstance.getResponseStream(data), logger), + body: createStream(handlerInstance.getResponseStream(data), logger, compressionDisabled), }); } ); @@ -156,17 +175,20 @@ export class BfetchServerPlugin private createStreamingRequestHandler = ({ logger, + getStartServices, }: { logger: Logger; + getStartServices: StartServicesAccessor; }): BfetchServerSetup['createStreamingRequestHandler'] => (streamHandler) => async ( context, request, response ) => { const response$ = await streamHandler(context, request); + const compressionDisabled = this.getCompressionDisabled(request); return response.ok({ headers: streamingHeaders, - body: createNDJSONStream(response$, logger), + body: createStream(response$, logger, compressionDisabled), }); }; diff --git a/src/plugins/bfetch/server/streaming/create_compressed_stream.ts b/src/plugins/bfetch/server/streaming/create_compressed_stream.ts new file mode 100644 index 0000000000000..6814ed1dd7955 --- /dev/null +++ b/src/plugins/bfetch/server/streaming/create_compressed_stream.ts @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { promisify } from 'util'; +import { Observable } from 'rxjs'; +import { catchError, concatMap, finalize } from 'rxjs/operators'; +import { Logger } from 'src/core/server'; +import { Stream, PassThrough } from 'stream'; +import { constants, deflate } from 'zlib'; + +const delimiter = '\n'; +const pDeflate = promisify(deflate); + +async function zipMessageToStream(output: PassThrough, message: string) { + return new Promise(async (resolve, reject) => { + try { + const gzipped = await pDeflate(message, { + flush: constants.Z_SYNC_FLUSH, + }); + output.write(gzipped.toString('base64')); + output.write(delimiter); + resolve(undefined); + } catch (err) { + reject(err); + } + }); +} + +export const createCompressedStream = ( + results: Observable, + logger: Logger +): Stream => { + const output = new PassThrough(); + + const sub = results + .pipe( + concatMap((message: Response) => { + const strMessage = JSON.stringify(message); + return zipMessageToStream(output, strMessage); + }), + catchError((e) => { + logger.error('Could not serialize or stream a message.'); + logger.error(e); + throw e; + }), + finalize(() => { + output.end(); + sub.unsubscribe(); + }) + ) + .subscribe(); + + return output; +}; diff --git a/src/plugins/bfetch/server/streaming/create_stream.ts b/src/plugins/bfetch/server/streaming/create_stream.ts new file mode 100644 index 0000000000000..7d6981294341b --- /dev/null +++ b/src/plugins/bfetch/server/streaming/create_stream.ts @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { Logger } from 'kibana/server'; +import { Stream } from 'stream'; +import { Observable } from 'rxjs'; +import { createCompressedStream } from './create_compressed_stream'; +import { createNDJSONStream } from './create_ndjson_stream'; + +export function createStream( + response$: Observable, + logger: Logger, + compressionDisabled: boolean +): Stream { + return compressionDisabled + ? createNDJSONStream(response$, logger) + : createCompressedStream(response$, logger); +} diff --git a/src/plugins/bfetch/server/streaming/index.ts b/src/plugins/bfetch/server/streaming/index.ts index 2c31cc329295d..dfd472b5034a1 100644 --- a/src/plugins/bfetch/server/streaming/index.ts +++ b/src/plugins/bfetch/server/streaming/index.ts @@ -7,3 +7,5 @@ */ export * from './create_ndjson_stream'; +export * from './create_compressed_stream'; +export * from './create_stream'; diff --git a/src/plugins/bfetch/server/ui_settings.ts b/src/plugins/bfetch/server/ui_settings.ts new file mode 100644 index 0000000000000..cf7b13a9af182 --- /dev/null +++ b/src/plugins/bfetch/server/ui_settings.ts @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { i18n } from '@kbn/i18n'; +import { UiSettingsParams } from 'src/core/server'; +import { schema } from '@kbn/config-schema'; +import { DISABLE_BFETCH_COMPRESSION } from '../common'; + +export function getUiSettings(): Record> { + return { + [DISABLE_BFETCH_COMPRESSION]: { + name: i18n.translate('bfetch.disableBfetchCompression', { + defaultMessage: 'Disable Batch Compression', + }), + value: false, + description: i18n.translate('bfetch.disableBfetchCompressionDesc', { + defaultMessage: + 'Disable batch compression. This allows you to debug individual requests, but increases response size.', + }), + schema: schema.boolean(), + category: [], + }, + }; +} diff --git a/src/plugins/kibana_usage_collection/server/collectors/management/schema.ts b/src/plugins/kibana_usage_collection/server/collectors/management/schema.ts index 68ebdbb3f35f2..70e689c9fa37c 100644 --- a/src/plugins/kibana_usage_collection/server/collectors/management/schema.ts +++ b/src/plugins/kibana_usage_collection/server/collectors/management/schema.ts @@ -392,6 +392,10 @@ export const stackManagementSchema: MakeSchemaFrom = { type: 'long', _meta: { description: 'Non-default value of setting.' }, }, + 'bfetch:disableCompression': { + type: 'boolean', + _meta: { description: 'Non-default value of setting.' }, + }, 'visualization:visualize:legacyChartsLibrary': { type: 'boolean', _meta: { description: 'Non-default value of setting.' }, diff --git a/src/plugins/kibana_usage_collection/server/collectors/management/types.ts b/src/plugins/kibana_usage_collection/server/collectors/management/types.ts index 7fe3746630c9e..591d9bcf0aa42 100644 --- a/src/plugins/kibana_usage_collection/server/collectors/management/types.ts +++ b/src/plugins/kibana_usage_collection/server/collectors/management/types.ts @@ -23,6 +23,7 @@ export interface UsageStats { /** * non-sensitive settings */ + 'bfetch:disableCompression': boolean; 'autocomplete:useTimeRange': boolean; 'search:timeout': number; 'visualization:visualize:legacyChartsLibrary': boolean; diff --git a/src/plugins/telemetry/schema/oss_plugins.json b/src/plugins/telemetry/schema/oss_plugins.json index 670068def2225..ce0d3b0ebc108 100644 --- a/src/plugins/telemetry/schema/oss_plugins.json +++ b/src/plugins/telemetry/schema/oss_plugins.json @@ -8271,6 +8271,12 @@ "description": "Non-default value of setting." } }, + "bfetch:disableCompression": { + "type": "boolean", + "_meta": { + "description": "Non-default value of setting." + } + }, "visualization:visualize:legacyChartsLibrary": { "type": "boolean", "_meta": { diff --git a/test/api_integration/apis/search/bsearch.ts b/test/api_integration/apis/search/bsearch.ts index 89fe606857c1c..b9d3f236cf1a9 100644 --- a/test/api_integration/apis/search/bsearch.ts +++ b/test/api_integration/apis/search/bsearch.ts @@ -8,15 +8,18 @@ import expect from '@kbn/expect'; import request from 'superagent'; +import { inflateResponse } from '../../../../src/plugins/bfetch/public/streaming'; import { FtrProviderContext } from '../../ftr_provider_context'; import { painlessErrReq } from './painless_err_req'; import { verifyErrorResponse } from './verify_error'; -function parseBfetchResponse(resp: request.Response): Array> { +function parseBfetchResponse(resp: request.Response, compressed: boolean = false) { return resp.text .trim() .split('\n') - .map((item) => JSON.parse(item)); + .map((item) => { + return JSON.parse(compressed ? inflateResponse(item) : item); + }); } export default function ({ getService }: FtrProviderContext) { @@ -26,29 +29,69 @@ export default function ({ getService }: FtrProviderContext) { describe('bsearch', () => { describe('post', () => { it('should return 200 a single response', async () => { - const resp = await supertest.post(`/internal/bsearch`).send({ - batch: [ - { - request: { - params: { - body: { - query: { - match_all: {}, + const resp = await supertest + .post(`/internal/bsearch`) + .set({ 'X-Chunk-Encoding': '' }) + .send({ + batch: [ + { + request: { + params: { + index: '.kibana', + body: { + query: { + match_all: {}, + }, }, }, }, + options: { + strategy: 'es', + }, }, - }, - ], - }); + ], + }); + + const jsonBody = parseBfetchResponse(resp); + + expect(resp.status).to.be(200); + expect(jsonBody[0].id).to.be(0); + expect(jsonBody[0].result.isPartial).to.be(false); + expect(jsonBody[0].result.isRunning).to.be(false); + expect(jsonBody[0].result).to.have.property('rawResponse'); + }); + + it('should return 200 a single response from compressed', async () => { + const resp = await supertest + .post(`/internal/bsearch`) + .set({ 'X-Chunk-Encoding': 'deflate' }) + .send({ + batch: [ + { + request: { + params: { + index: '.kibana', + body: { + query: { + match_all: {}, + }, + }, + }, + }, + options: { + strategy: 'es', + }, + }, + ], + }); - const jsonBody = JSON.parse(resp.text); + const jsonBody = parseBfetchResponse(resp, true); expect(resp.status).to.be(200); - expect(jsonBody.id).to.be(0); - expect(jsonBody.result.isPartial).to.be(false); - expect(jsonBody.result.isRunning).to.be(false); - expect(jsonBody.result).to.have.property('rawResponse'); + expect(jsonBody[0].id).to.be(0); + expect(jsonBody[0].result.isPartial).to.be(false); + expect(jsonBody[0].result.isRunning).to.be(false); + expect(jsonBody[0].result).to.have.property('rawResponse'); }); it('should return a batch of successful resposes', async () => { @@ -57,6 +100,7 @@ export default function ({ getService }: FtrProviderContext) { { request: { params: { + index: '.kibana', body: { query: { match_all: {}, @@ -68,6 +112,7 @@ export default function ({ getService }: FtrProviderContext) { { request: { params: { + index: '.kibana', body: { query: { match_all: {}, @@ -95,6 +140,7 @@ export default function ({ getService }: FtrProviderContext) { { request: { params: { + index: '.kibana', body: { query: { match_all: {}, @@ -121,6 +167,7 @@ export default function ({ getService }: FtrProviderContext) { batch: [ { request: { + index: '.kibana', indexType: 'baad', params: { body: { diff --git a/yarn.lock b/yarn.lock index cfafa0f8febda..bbe887ad573d0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -13432,6 +13432,11 @@ fetch-mock@^7.3.9: path-to-regexp "^2.2.1" whatwg-url "^6.5.0" +fflate@^0.6.9: + version "0.6.9" + resolved "https://registry.yarnpkg.com/fflate/-/fflate-0.6.9.tgz#fb369b30792a03ff7274e174f3b36e51292d3f99" + integrity sha512-hmAdxNHub7fw36hX7BHiuAO0uekp6ufY2sjxBXWxIf0sw5p7tnS9GVrdM4D12SDYQUHVpiC50fPBYPTjOzRU2Q== + figgy-pudding@^3.5.1: version "3.5.1" resolved "https://registry.yarnpkg.com/figgy-pudding/-/figgy-pudding-3.5.1.tgz#862470112901c727a0e495a80744bd5baa1d6790"