Skip to content

Commit

Permalink
[bfetch] compress stream chunks (#97994)
Browse files Browse the repository at this point in the history
* Move inspector adapter integration into search source

* docs and ts

* Move other bucket to search source

* test ts + delete unused tabilfy function

* hierarchical param in aggconfig.
ts improvements
more inspector tests

* fix jest

* separate inspect
more tests

* jest

* inspector

* Error handling and more tests

* put the fun in functional tests

* delete client side legacy msearch code

* ts

* override to sync search in search source

* delete more legacy code

* ts

* delete moarrrr

* deflate bfetch chunks

* update tests
use only zlib

* ts

* extract getInflatedResponse

* tests

* Use fflate in attempt to reduce package size

* use node streams, fflate and hex encoding.

* DISABLE_SEARCH_COMPRESSION UI Settings
Use base64 and async compression

* i18n

* Code review
Use custom header for compression
Promisify once

* use custom headers

* Update jest

* fix tests

* code review, baby!

* integration

* tests

* limit

* limit

* limit

Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
lizozom and kibanamachine authored Jun 1, 2021
1 parent a622cd5 commit 842bb69
Show file tree
Hide file tree
Showing 26 changed files with 504 additions and 52 deletions.
1 change: 1 addition & 0 deletions .i18nrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"console": "src/plugins/console",
"core": "src/core",
"discover": "src/plugins/discover",
"bfetch": "src/plugins/bfetch",
"dashboard": "src/plugins/dashboard",
"data": "src/plugins/data",
"embeddableApi": "src/plugins/embeddable",
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@
"expiry-js": "0.1.7",
"extract-zip": "^2.0.1",
"fast-deep-equal": "^3.1.1",
"fflate": "^0.6.9",
"file-saver": "^1.3.8",
"file-type": "^10.9.0",
"focus-trap-react": "^3.1.1",
Expand Down
2 changes: 1 addition & 1 deletion packages/kbn-optimizer/limits.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pageLoadAssetSize:
alerting: 106936
apm: 64385
apmOss: 18996
bfetch: 41874
bfetch: 51874
canvas: 1066647
charts: 195358
cloud: 21076
Expand Down
2 changes: 2 additions & 0 deletions packages/kbn-ui-shared-deps/entry.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ export const Theme = require('./theme.ts');
export const Lodash = require('lodash');
export const LodashFp = require('lodash/fp');

export const Fflate = require('fflate/esm/browser');

// runtime deps which don't need to be copied across all bundles
export const TsLib = require('tslib');
export const KbnAnalytics = require('@kbn/analytics');
Expand Down
1 change: 1 addition & 0 deletions packages/kbn-ui-shared-deps/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ exports.externals = {
'@elastic/eui/dist/eui_theme_dark.json': '__kbnSharedDeps__.Theme.euiDarkVars',
lodash: '__kbnSharedDeps__.Lodash',
'lodash/fp': '__kbnSharedDeps__.LodashFp',
fflate: '__kbnSharedDeps__.Fflate',

/**
* runtime deps which don't need to be copied across all bundles
Expand Down
5 changes: 5 additions & 0 deletions src/plugins/bfetch/common/batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,8 @@ export interface BatchResponseItem<Result extends object, Error extends ErrorLik
result?: Result;
error?: Error;
}

export interface BatchItemWrapper {
compressed: boolean;
payload: string;
}
9 changes: 9 additions & 0 deletions src/plugins/bfetch/common/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export const DISABLE_BFETCH_COMPRESSION = 'bfetch:disableCompression';
1 change: 1 addition & 0 deletions src/plugins/bfetch/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ export * from './util';
export * from './streaming';
export * from './buffer';
export * from './batch';
export * from './constants';
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import { createStreamingBatchedFunction } from './create_streaming_batched_function';
import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming';
import { AbortError, defer, of } from '../../../kibana_utils/public';
import { Subject } from 'rxjs';
import { Subject, of as rxof } from 'rxjs';

const flushPromises = () => new Promise((resolve) => setImmediate(resolve));

const getPromiseState = (promise: Promise<unknown>): Promise<'resolved' | 'rejected' | 'pending'> =>
Promise.race<'resolved' | 'rejected' | 'pending'>([
Expand Down Expand Up @@ -52,6 +54,7 @@ describe('createStreamingBatchedFunction()', () => {
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
compressionDisabled$: rxof(true),
});
expect(typeof fn).toBe('function');
});
Expand All @@ -61,6 +64,7 @@ describe('createStreamingBatchedFunction()', () => {
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
compressionDisabled$: rxof(true),
});
const res = fn({});
expect(typeof res.then).toBe('function');
Expand All @@ -74,6 +78,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

expect(fetchStreaming).toHaveBeenCalledTimes(0);
Expand All @@ -93,6 +98,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

expect(fetchStreaming).toHaveBeenCalledTimes(0);
Expand All @@ -107,6 +113,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

fn({ foo: 'bar' });
Expand All @@ -125,6 +132,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

fn({ foo: 'bar' });
Expand All @@ -146,14 +154,18 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ foo: 'bar' });
await flushPromises();
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ baz: 'quix' });
await flushPromises();
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ full: 'yep' });
await flushPromises();
expect(fetchStreaming).toHaveBeenCalledTimes(1);
});

Expand All @@ -164,6 +176,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const abortController = new AbortController();
Expand All @@ -186,11 +199,13 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

fn({ a: '1' });
fn({ b: '2' });
fn({ c: '3' });
await flushPromises();

expect(fetchStreaming.mock.calls[0][0]).toMatchObject({
url: '/test',
Expand All @@ -209,13 +224,16 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

fn({ a: '1' });
fn({ b: '2' });
fn({ c: '3' });
await flushPromises();
expect(fetchStreaming).toHaveBeenCalledTimes(1);
fn({ d: '4' });
await flushPromises();
await new Promise((r) => setTimeout(r, 6));
expect(fetchStreaming).toHaveBeenCalledTimes(2);
});
Expand All @@ -229,6 +247,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = fn({ a: '1' });
Expand All @@ -246,8 +265,11 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

await flushPromises();

const promise1 = fn({ a: '1' });
const promise2 = fn({ b: '2' });
const promise3 = fn({ c: '3' });
Expand Down Expand Up @@ -287,6 +309,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = fn({ a: '1' });
Expand Down Expand Up @@ -314,13 +337,28 @@ describe('createStreamingBatchedFunction()', () => {
expect(await promise3).toEqual({ foo: 'bar 2' });
});

test('compression is false by default', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
flushOnMaxItems: 1,
fetchStreaming,
});

fn({ a: '1' });

const dontCompress = await fetchStreaming.mock.calls[0][0].compressionDisabled$.toPromise();
expect(dontCompress).toBe(false);
});

test('resolves falsy results', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = fn({ a: '1' });
Expand Down Expand Up @@ -362,6 +400,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise = fn({ a: '1' });
Expand Down Expand Up @@ -390,6 +429,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = of(fn({ a: '1' }));
Expand Down Expand Up @@ -442,6 +482,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const abortController = new AbortController();
Expand Down Expand Up @@ -471,6 +512,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const abortController = new AbortController();
Expand Down Expand Up @@ -509,6 +551,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = of(fn({ a: '1' }));
Expand Down Expand Up @@ -539,6 +582,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = of(fn({ a: '1' }));
Expand Down Expand Up @@ -576,6 +620,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = of(fn({ a: '1' }));
Expand Down Expand Up @@ -608,6 +653,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = of(fn({ a: '1' }));
Expand Down Expand Up @@ -644,7 +690,9 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
await flushPromises();

const promise1 = of(fn({ a: '1' }));
const promise2 = of(fn({ a: '2' }));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
* Side Public License, v 1.
*/

import { Observable, of } from 'rxjs';
import { AbortError, abortSignalToPromise, defer } from '../../../kibana_utils/public';
import {
ItemBufferParams,
TimedItemBufferParams,
createBatchedFunction,
BatchResponseItem,
ErrorLike,
normalizeError,
} from '../../common';
import { fetchStreaming, split } from '../streaming';
import { normalizeError } from '../../common';
import { fetchStreaming } from '../streaming';
import { BatchedFunc, BatchItem } from './types';

export interface BatchedFunctionProtocolError extends ErrorLike {
Expand Down Expand Up @@ -47,6 +47,11 @@ export interface StreamingBatchedFunctionParams<Payload, Result> {
* before sending the batch request.
*/
maxItemAge?: TimedItemBufferParams<any>['maxItemAge'];

/**
* Disabled zlib compression of response chunks.
*/
compressionDisabled$?: Observable<boolean>;
}

/**
Expand All @@ -64,6 +69,7 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
fetchStreaming: fetchStreamingInjected = fetchStreaming,
flushOnMaxItems = 25,
maxItemAge = 10,
compressionDisabled$ = of(false),
} = params;
const [fn] = createBatchedFunction({
onCall: (payload: Payload, signal?: AbortSignal) => {
Expand Down Expand Up @@ -119,6 +125,7 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
body: JSON.stringify({ batch }),
method: 'POST',
signal: abortController.signal,
compressionDisabled$,
});

const handleStreamError = (error: any) => {
Expand All @@ -127,10 +134,10 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
for (const { future } of items) future.reject(normalizedError);
};

stream.pipe(split('\n')).subscribe({
stream.subscribe({
next: (json: string) => {
try {
const response = JSON.parse(json) as BatchResponseItem<Result, ErrorLike>;
const response = JSON.parse(json);
if (response.error) {
items[response.id].future.reject(response.error);
} else if (response.result !== undefined) {
Expand Down
12 changes: 12 additions & 0 deletions src/plugins/bfetch/public/batching/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export {
createStreamingBatchedFunction,
StreamingBatchedFunctionParams,
} from './create_streaming_batched_function';
Loading

0 comments on commit 842bb69

Please sign in to comment.