Skip to content

Commit

Permalink
[ML] Add support for gzip compressed streams.
Browse files Browse the repository at this point in the history
  • Loading branch information
walterra committed May 16, 2022
1 parent 8492e81 commit c8eaa0a
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 51 deletions.
42 changes: 42 additions & 0 deletions x-pack/plugins/aiops/server/lib/accept_compression.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { acceptCompression } from './accept_compression';

describe('acceptCompression', () => {
it('should return false for empty headers', () => {
expect(acceptCompression({})).toBe(false);
});
it('should return false for other header containing gzip as string', () => {
expect(acceptCompression({ 'other-header': 'gzip, other' })).toBe(false);
});
it('should return false for other header containing gzip as array', () => {
expect(acceptCompression({ 'other-header': ['gzip', 'other'] })).toBe(false);
});
it('should return true for upper-case header containing gzip as string', () => {
expect(acceptCompression({ 'Accept-Encoding': 'gzip, other' })).toBe(true);
});
it('should return true for lower-case header containing gzip as string', () => {
expect(acceptCompression({ 'accept-encoding': 'gzip, other' })).toBe(true);
});
it('should return true for upper-case header containing gzip as array', () => {
expect(acceptCompression({ 'Accept-Encoding': ['gzip', 'other'] })).toBe(true);
});
it('should return true for lower-case header containing gzip as array', () => {
expect(acceptCompression({ 'accept-encoding': ['gzip', 'other'] })).toBe(true);
});
it('should return true for mixed headers containing gzip as string', () => {
expect(
acceptCompression({ 'accept-encoding': 'gzip, other', 'other-header': 'other-value' })
).toBe(true);
});
it('should return true for mixed headers containing gzip as array', () => {
expect(
acceptCompression({ 'accept-encoding': ['gzip', 'other'], 'other-header': 'other-value' })
).toBe(true);
});
});
38 changes: 38 additions & 0 deletions x-pack/plugins/aiops/server/lib/accept_compression.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { Headers } from '@kbn/core/server';

export function acceptCompression(headers: Headers) {
let compressed = false;

Object.keys(headers).forEach((key) => {
if (key.toLocaleLowerCase() === 'accept-encoding') {
const acceptEncoding = headers[key];

function containsGzip(s: string) {
return s
.split(',')
.map((d) => d.trim())
.includes('gzip');
}

if (typeof acceptEncoding === 'string') {
compressed = containsGzip(acceptEncoding);
} else if (Array.isArray(acceptEncoding)) {
for (const ae of acceptEncoding) {
if (containsGzip(ae)) {
compressed = true;
break;
}
}
}
}
});

return compressed;
}
106 changes: 106 additions & 0 deletions x-pack/plugins/aiops/server/lib/stream_factory.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import zlib from 'zlib';

import { loggerMock, MockedLogger } from '@kbn/logging-mocks';

import { API_ENDPOINT } from '../../common/api';
import type { ApiEndpointActions } from '../../common/api';

import { streamFactory } from './stream_factory';

type Action = ApiEndpointActions['/internal/aiops/explain_log_rate_spikes'];

const mockItem1: Action = {
type: 'add_fields',
payload: ['clientip'],
};
const mockItem2: Action = {
type: 'add_fields',
payload: ['referer'],
};

describe('streamFactory', () => {
let mockLogger: MockedLogger;

beforeEach(() => {
mockLogger = loggerMock.create();
});

it('should encode and receive an uncompressed stream', async () => {
const { DELIMITER, end, push, responseWithHeaders, stream } = streamFactory<
typeof API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES
>(mockLogger, {});

push(mockItem1);
push(mockItem2);
end();

let streamResult = '';
for await (const chunk of stream) {
streamResult += chunk.toString('utf8');
}

const streamItems = streamResult.split(DELIMITER);
const lastItem = streamItems.pop();

const parsedItems = streamItems.map((d) => JSON.parse(d));

expect(responseWithHeaders.headers).toBe(undefined);
expect(parsedItems).toHaveLength(2);
expect(parsedItems[0]).toStrictEqual(mockItem1);
expect(parsedItems[1]).toStrictEqual(mockItem2);
expect(lastItem).toBe('');
});

// Because zlib.gunzip's API expects a callback, we need to use `done` here
// to indicate once all assertions are run. However, it's not allowed to use both
// `async` and `done` for the test callback. That's why we're using an "async IIFE"
// pattern inside the tests callback to still be able to do async/await for the
// `for await()` part. Note that the unzipping here is done just to be able to
// decode the stream for the test and assert it. When used in actual code,
// the browser on the client side will automatically take care of unzipping
// without the need for additional custom code.
it('should encode and receive a compressed stream', (done) => {
(async () => {
const { DELIMITER, end, push, responseWithHeaders, stream } = streamFactory<
typeof API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES
>(mockLogger, { 'accept-encoding': 'gzip' });

push(mockItem1);
push(mockItem2);
end();

const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
}

const buffer = Buffer.concat(chunks);

zlib.gunzip(buffer, function (err, decoded) {
expect(err).toBe(null);

const streamResult = decoded.toString('utf8');

const streamItems = streamResult.split(DELIMITER);
const lastItem = streamItems.pop();

const parsedItems = streamItems.map((d) => JSON.parse(d));

expect(responseWithHeaders.headers).toStrictEqual({ 'content-encoding': 'gzip' });
expect(parsedItems).toHaveLength(2);
expect(parsedItems[0]).toStrictEqual(mockItem1);
expect(parsedItems[1]).toStrictEqual(mockItem2);
expect(lastItem).toBe('');

done();
});
})();
});
});
62 changes: 62 additions & 0 deletions x-pack/plugins/aiops/server/lib/stream_factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { Stream } from 'stream';
import zlib from 'zlib';

import type { Headers, Logger } from '@kbn/core/server';

import { ApiEndpoint, ApiEndpointActions } from '../../common/api';

import { acceptCompression } from './accept_compression';

// We need this otherwise Kibana server will crash with a 'ERR_METHOD_NOT_IMPLEMENTED' error.
class ResponseStream extends Stream.PassThrough {
flush() {}
_read() {}
}

const DELIMITER = '\n';

export function streamFactory<T extends ApiEndpoint>(logger: Logger, headers: Headers) {
const isCompressed = acceptCompression(headers);

const stream = isCompressed ? zlib.createGzip() : new ResponseStream();

function push(d: ApiEndpointActions[T]) {
try {
const line = JSON.stringify(d);
stream.write(`${line}${DELIMITER}`);

// Calling .flush() on a compression stream will
// make zlib return as much output as currently possible.
if (isCompressed) {
stream.flush();
}
} catch (error) {
logger.error('Could not serialize or stream a message.');
logger.error(error);
}
}

function end() {
stream.end();
}

const responseWithHeaders = {
body: stream,
...(isCompressed
? {
headers: {
'content-encoding': 'gzip',
},
}
: {}),
};

return { DELIMITER, end, push, responseWithHeaders, stream };
}
11 changes: 5 additions & 6 deletions x-pack/plugins/aiops/server/routes/example_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
} from '../../common/api/example_stream';
import { API_ENDPOINT } from '../../common/api';

import { streamFactory } from './stream_factory';
import { streamFactory } from '../lib/stream_factory';

export const defineExampleStreamRoute = (router: IRouter, logger: Logger) => {
router.post(
Expand All @@ -37,8 +37,9 @@ export const defineExampleStreamRoute = (router: IRouter, logger: Logger) => {
shouldStop = true;
});

const { DELIMITER, end, push, stream } =
streamFactory<typeof API_ENDPOINT.EXAMPLE_STREAM>(logger);
const { DELIMITER, end, push, responseWithHeaders, stream } = streamFactory<
typeof API_ENDPOINT.EXAMPLE_STREAM
>(logger, request.headers);

const entities = [
'kimchy',
Expand Down Expand Up @@ -102,9 +103,7 @@ export const defineExampleStreamRoute = (router: IRouter, logger: Logger) => {
// do not call this using `await` so it will run asynchronously while we return the stream already.
pushStreamUpdate();

return response.ok({
body: stream,
});
return response.ok(responseWithHeaders);
}
);
};
11 changes: 5 additions & 6 deletions x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
} from '../../common/api/explain_log_rate_spikes';
import { API_ENDPOINT } from '../../common/api';

import { streamFactory } from './stream_factory';
import { streamFactory } from '../lib/stream_factory';

export const defineExplainLogRateSpikesRoute = (
router: IRouter<DataRequestHandlerContext>,
Expand Down Expand Up @@ -60,8 +60,9 @@ export const defineExplainLogRateSpikesRoute = (
const doc = res.rawResponse.hits.hits.pop();
const fields = Object.keys(doc?._source ?? {});

const { end, push, stream } =
streamFactory<typeof API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES>(logger);
const { end, push, responseWithHeaders } = streamFactory<
typeof API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES
>(logger, request.headers);

async function pushField() {
setTimeout(() => {
Expand All @@ -83,9 +84,7 @@ export const defineExplainLogRateSpikesRoute = (

pushField();

return response.ok({
body: stream,
});
return response.ok(responseWithHeaders);
}
);
};
39 changes: 0 additions & 39 deletions x-pack/plugins/aiops/server/routes/stream_factory.ts

This file was deleted.

0 comments on commit c8eaa0a

Please sign in to comment.