Skip to content

Commit

Permalink
[ML] Explain Log Rate Spikes: Fix uncompressed streams and backpressu…
Browse files Browse the repository at this point in the history
…re handling. (#142970) (#144396)

- Adds a flag for `compressResponse` and `flushFix` to the request body to be able to overrule compression settings inferred from headers.
- Updates the developer examples with a toggle to run requests with compression enabled or disabled.
- Adds support for backpressure handling for response streams.
- The backpressure update includes a fix where uncompressed streams would never start streaming to the client.
- The analysis endpoint for Explain Log Rate Spikes now includes a ping every 10 seconds to keep the stream alive.
- Integration tests were updated to test both uncompressed and compressed streaming.

(cherry picked from commit b38bbbc)
  • Loading branch information
walterra authored Nov 2, 2022
1 parent ed16c6c commit 455704e
Show file tree
Hide file tree
Showing 13 changed files with 291 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@ export const reducerStreamRequestBodySchema = schema.object({
simulateErrors: schema.maybe(schema.boolean()),
/** Maximum timeout between streaming messages. */
timeout: schema.maybe(schema.number()),
/** Setting to override headers derived compression */
compressResponse: schema.maybe(schema.boolean()),
});
export type ReducerStreamRequestBodySchema = TypeOf<typeof reducerStreamRequestBodySchema>;
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import { schema, TypeOf } from '@kbn/config-schema';
export const simpleStringStreamRequestBodySchema = schema.object({
/** Maximum timeout between streaming messages. */
timeout: schema.number(),
/** Setting to override headers derived compression */
compressResponse: schema.maybe(schema.boolean()),
});
export type SimpleStringStreamRequestBodySchema = TypeOf<
typeof simpleStringStreamRequestBodySchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ export const PageReducerStream: FC = () => {
const basePath = http?.basePath.get() ?? '';

const [simulateErrors, setSimulateErrors] = useState(false);
const [compressResponse, setCompressResponse] = useState(true);

const { dispatch, start, cancel, data, errors, isCancelled, isRunning } = useFetchStream<
ApiReducerStream,
typeof basePath
>(
`${basePath}/internal/response_stream/reducer_stream`,
{ simulateErrors },
{ compressResponse, simulateErrors },
{ reducer: reducerStreamReducer, initialState }
);

Expand Down Expand Up @@ -144,6 +145,13 @@ export const PageReducerStream: FC = () => {
onChange={(e) => setSimulateErrors(!simulateErrors)}
compressed
/>
<EuiCheckbox
id="responseStreamCompressionCheckbox"
label="Toggle compression setting for response stream."
checked={compressResponse}
onChange={(e) => setCompressResponse(!compressResponse)}
compressed
/>
</EuiText>
</Page>
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@
* Side Public License, v 1.
*/

import React, { FC } from 'react';
import React, { useState, FC } from 'react';

import { EuiButton, EuiCallOut, EuiFlexGroup, EuiFlexItem, EuiSpacer, EuiText } from '@elastic/eui';
import {
EuiButton,
EuiCallOut,
EuiCheckbox,
EuiFlexGroup,
EuiFlexItem,
EuiSpacer,
EuiText,
} from '@elastic/eui';

import { useFetchStream } from '@kbn/aiops-utils';

Expand All @@ -21,10 +29,15 @@ export const PageSimpleStringStream: FC = () => {
const { core } = useDeps();
const basePath = core.http?.basePath.get() ?? '';

const [compressResponse, setCompressResponse] = useState(true);

const { dispatch, errors, start, cancel, data, isRunning } = useFetchStream<
ApiSimpleStringStream,
typeof basePath
>(`${basePath}/internal/response_stream/simple_string_stream`, { timeout: 500 });
>(`${basePath}/internal/response_stream/simple_string_stream`, {
compressResponse,
timeout: 500,
});

const onClickHandler = async () => {
if (isRunning) {
Expand Down Expand Up @@ -58,6 +71,14 @@ export const PageSimpleStringStream: FC = () => {
</EuiFlexItem>
</EuiFlexGroup>
<EuiSpacer />
<EuiCheckbox
id="responseStreamCompressionCheckbox"
label="Toggle compression setting for response stream."
checked={compressResponse}
onChange={(e) => setCompressResponse(!compressResponse)}
compressed
/>
<EuiSpacer />
<EuiText>
<p>{data}</p>
</EuiText>
Expand Down
14 changes: 13 additions & 1 deletion examples/response_stream/server/routes/reducer_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,29 @@ export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => {
const maxTimeoutMs = request.body.timeout ?? 250;
const simulateError = request.body.simulateErrors ?? false;

let logMessageCounter = 1;

function logDebugMessage(msg: string) {
logger.debug(`Response Stream Example #${logMessageCounter}: ${msg}`);
logMessageCounter++;
}

logDebugMessage('Starting stream.');

let shouldStop = false;
request.events.aborted$.subscribe(() => {
logDebugMessage('aborted$ subscription trigger.');
shouldStop = true;
});
request.events.completed$.subscribe(() => {
logDebugMessage('completed$ subscription trigger.');
shouldStop = true;
});

const { end, push, responseWithHeaders } = streamFactory<ReducerStreamApiAction>(
request.headers,
logger
logger,
request.body.compressResponse
);

const entities = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ export const defineSimpleStringStreamRoute = (router: IRouter, logger: Logger) =
shouldStop = true;
});

const { end, push, responseWithHeaders } = streamFactory(request.headers, logger);
const { end, push, responseWithHeaders } = streamFactory(
request.headers,
logger,
request.body.compressResponse
);

const text =
'Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is developed in Java and is dual-licensed under the source-available Server Side Public License and the Elastic license, while other parts fall under the proprietary (source-available) Elastic License. Official clients are available in Java, .NET (C#), PHP, Python, Apache Groovy, Ruby and many other languages. According to the DB-Engines ranking, Elasticsearch is the most popular enterprise search engine.';
Expand Down
2 changes: 1 addition & 1 deletion x-pack/packages/ml/aiops_utils/src/stream_factory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ describe('streamFactory', () => {
let mockLogger: Logger;

beforeEach(() => {
mockLogger = { error: jest.fn() } as unknown as Logger;
mockLogger = { debug: jest.fn(), error: jest.fn(), info: jest.fn() } as unknown as Logger;
});

it('should encode and receive an uncompressed string based stream', async () => {
Expand Down
132 changes: 114 additions & 18 deletions x-pack/packages/ml/aiops_utils/src/stream_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,25 @@ import type { Headers, ResponseHeaders } from '@kbn/core-http-server';

import { acceptCompression } from './accept_compression';

// We need this otherwise Kibana server will crash with a 'ERR_METHOD_NOT_IMPLEMENTED' error.
class ResponseStream extends Stream.PassThrough {
flush() {}
_read() {}
// type guard to identify compressed stream
function isCompressedSream(arg: unknown): arg is zlib.Gzip {
return typeof arg === 'object' && arg !== null && typeof (arg as zlib.Gzip).flush === 'function';
}

const FLUSH_PAYLOAD_SIZE = 4 * 1024;

class UncompressedResponseStream extends Stream.PassThrough {}

const DELIMITER = '\n';

type StreamType = 'string' | 'ndjson';

interface StreamFactoryReturnType<T = unknown> {
DELIMITER: string;
end: () => void;
push: (d: T) => void;
push: (d: T, drain?: boolean) => void;
responseWithHeaders: {
body: zlib.Gzip | ResponseStream;
body: zlib.Gzip | UncompressedResponseStream;
headers?: ResponseHeaders;
};
}
Expand All @@ -39,39 +42,89 @@ interface StreamFactoryReturnType<T = unknown> {
* for gzip compression depending on provided request headers.
*
* @param headers - Request headers.
* @param logger - Kibana logger.
* @param compressOverride - Optional flag to override header based compression setting.
* @param flushFix - Adds an attribute with a random string payload to overcome buffer flushing with certain proxy configurations.
*
* @returns An object with stream attributes and methods.
*/
export function streamFactory<T = string>(
headers: Headers,
logger: Logger,
compressOverride?: boolean,
flushFix?: boolean
): StreamFactoryReturnType<T>;
/**
* Sets up a response stream with support for gzip compression depending on provided
* request headers. Any non-string data pushed to the stream will be stream as NDJSON.
*
* @param headers - Request headers.
* @param logger - Kibana logger.
* @param compressOverride - Optional flag to override header based compression setting.
* @param flushFix - Adds an attribute with a random string payload to overcome buffer flushing with certain proxy configurations.
*
* @returns An object with stream attributes and methods.
*/
export function streamFactory<T = unknown>(
headers: Headers,
logger: Logger,
compressOverride: boolean = true,
flushFix: boolean = false
): StreamFactoryReturnType<T> {
let streamType: StreamType;
const isCompressed = acceptCompression(headers);
const isCompressed = compressOverride && acceptCompression(headers);

const stream = isCompressed ? zlib.createGzip() : new UncompressedResponseStream();

// If waiting for draining of the stream, items will be added to this buffer.
const backPressureBuffer: T[] = [];

const stream = isCompressed ? zlib.createGzip() : new ResponseStream();
// Flag will be set when the "drain" listener is active so we can avoid setting multiple listeners.
let waitForDrain = false;

// Instead of a flag this is an array where we check if we are waiting on any callback from writing to the stream.
// It needs to be an array to avoid running into race conditions.
const waitForCallbacks: number[] = [];

// Flag to set if the stream should be ended. Because there could be items in the backpressure buffer, we might
// not want to end the stream right away. Once the backpressure buffer is cleared, we'll end the stream eventually.
let tryToEnd = false;

function logDebugMessage(msg: string) {
logger.debug(`HTTP Response Stream: ${msg}`);
}

function end() {
stream.end();
tryToEnd = true;

logDebugMessage(`backPressureBuffer size on end(): ${backPressureBuffer.length}`);
logDebugMessage(`waitForCallbacks size on end(): ${waitForCallbacks.length}`);

// Before ending the stream, we need to empty the backPressureBuffer
if (backPressureBuffer.length > 0) {
const el = backPressureBuffer.shift();
if (el !== undefined) {
push(el, true);
}
return;
}

if (waitForCallbacks.length === 0) {
logDebugMessage('All backPressureBuffer and waitForCallbacks cleared, ending the stream.');
stream.end();
}
}

function push(d: T) {
function push(d: T, drain = false) {
logDebugMessage(
`Push to stream. Current backPressure buffer size: ${backPressureBuffer.length}, drain flag: ${drain}`
);

if (d === undefined) {
logger.error('Stream chunk must not be undefined.');
return;
}

// Initialize the stream type with the first push to the stream,
// otherwise check the integrity of the data to be pushed.
if (streamType === undefined) {
Expand All @@ -84,26 +137,69 @@ export function streamFactory<T = unknown>(
return;
}

if ((!drain && waitForDrain) || (!drain && backPressureBuffer.length > 0)) {
logDebugMessage('Adding item to backpressure buffer.');
backPressureBuffer.push(d);
return;
}

try {
const line =
streamType === 'ndjson'
? `${JSON.stringify({
...d,
// This is a temporary fix for response streaming with proxy configurations that buffer responses up to 4KB in size.
...(flushFix ? { flushPayload: crypto.randomBytes(4096).toString('hex') } : {}),
...(flushFix
? { flushPayload: crypto.randomBytes(FLUSH_PAYLOAD_SIZE).toString('hex') }
: {}),
})}${DELIMITER}`
: d;
stream.write(line);

waitForCallbacks.push(1);
const writeOk = stream.write(line, () => {
waitForCallbacks.pop();
// Calling .flush() on a compression stream will
// make zlib return as much output as currently possible.
if (isCompressedSream(stream)) {
stream.flush();
}

if (tryToEnd && waitForCallbacks.length === 0) {
end();
}
});

logDebugMessage(`Ok to write to the stream again? ${writeOk}`);

if (!writeOk) {
logDebugMessage(`Should we add the "drain" listener?: ${!waitForDrain}`);
if (!waitForDrain) {
waitForDrain = true;
stream.once('drain', () => {
logDebugMessage(
'The "drain" listener triggered, we can continue pushing to the stream.'
);

waitForDrain = false;
if (backPressureBuffer.length > 0) {
const el = backPressureBuffer.shift();
if (el !== undefined) {
push(el, true);
}
}
});
}
} else if (writeOk && drain && backPressureBuffer.length > 0) {
logDebugMessage('Continue clearing the backpressure buffer.');
const el = backPressureBuffer.shift();
if (el !== undefined) {
push(el, true);
}
}
} catch (e) {
logger.error(`Could not serialize or stream data chunk: ${e.toString()}`);
return;
}

// Calling .flush() on a compression stream will
// make zlib return as much output as currently possible.
if (isCompressed) {
stream.flush();
}
}

const responseWithHeaders: StreamFactoryReturnType['responseWithHeaders'] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ export const aiopsExplainLogRateSpikesSchema = schema.object({
deviationMax: schema.number(),
/** The index to query for log rate spikes */
index: schema.string(),
/** Settings to override headers derived compression and flush fix */
compressResponse: schema.maybe(schema.boolean()),
flushFix: schema.maybe(schema.boolean()),
});

export type AiopsExplainLogRateSpikesSchema = TypeOf<typeof aiopsExplainLogRateSpikesSchema>;
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
timeFieldName: dataView.timeFieldName ?? '',
index: dataView.title,
grouping: true,
flushFix: true,
...windowParameters,
},
{ reducer: streamReducer, initialState }
Expand Down
Loading

0 comments on commit 455704e

Please sign in to comment.