diff --git a/examples/response_stream/common/api/reducer_stream/request_body_schema.ts b/examples/response_stream/common/api/reducer_stream/request_body_schema.ts index 58ea487a8a539..8318a411ab86a 100644 --- a/examples/response_stream/common/api/reducer_stream/request_body_schema.ts +++ b/examples/response_stream/common/api/reducer_stream/request_body_schema.ts @@ -13,5 +13,7 @@ export const reducerStreamRequestBodySchema = schema.object({ simulateErrors: schema.maybe(schema.boolean()), /** Maximum timeout between streaming messages. */ timeout: schema.maybe(schema.number()), + /** Setting to override headers derived compression */ + compressResponse: schema.maybe(schema.boolean()), }); export type ReducerStreamRequestBodySchema = TypeOf; diff --git a/examples/response_stream/common/api/simple_string_stream/request_body_schema.ts b/examples/response_stream/common/api/simple_string_stream/request_body_schema.ts index 8e891395e5dc8..3a9d0f4d7e225 100644 --- a/examples/response_stream/common/api/simple_string_stream/request_body_schema.ts +++ b/examples/response_stream/common/api/simple_string_stream/request_body_schema.ts @@ -11,6 +11,8 @@ import { schema, TypeOf } from '@kbn/config-schema'; export const simpleStringStreamRequestBodySchema = schema.object({ /** Maximum timeout between streaming messages. */ timeout: schema.number(), + /** Setting to override headers derived compression */ + compressResponse: schema.maybe(schema.boolean()), }); export type SimpleStringStreamRequestBodySchema = TypeOf< typeof simpleStringStreamRequestBodySchema diff --git a/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx b/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx index 0755765374330..913f8b4064440 100644 --- a/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx +++ b/examples/response_stream/public/containers/app/pages/page_reducer_stream/index.tsx @@ -44,13 +44,14 @@ export const PageReducerStream: FC = () => { const basePath = http?.basePath.get() ?? ''; const [simulateErrors, setSimulateErrors] = useState(false); + const [compressResponse, setCompressResponse] = useState(true); const { dispatch, start, cancel, data, errors, isCancelled, isRunning } = useFetchStream< ApiReducerStream, typeof basePath >( `${basePath}/internal/response_stream/reducer_stream`, - { simulateErrors }, + { compressResponse, simulateErrors }, { reducer: reducerStreamReducer, initialState } ); @@ -144,6 +145,13 @@ export const PageReducerStream: FC = () => { onChange={(e) => setSimulateErrors(!simulateErrors)} compressed /> + setCompressResponse(!compressResponse)} + compressed + /> ); diff --git a/examples/response_stream/public/containers/app/pages/page_simple_string_stream/index.tsx b/examples/response_stream/public/containers/app/pages/page_simple_string_stream/index.tsx index cfa76688d9701..a305e907cfb45 100644 --- a/examples/response_stream/public/containers/app/pages/page_simple_string_stream/index.tsx +++ b/examples/response_stream/public/containers/app/pages/page_simple_string_stream/index.tsx @@ -6,9 +6,17 @@ * Side Public License, v 1. */ -import React, { FC } from 'react'; +import React, { useState, FC } from 'react'; -import { EuiButton, EuiCallOut, EuiFlexGroup, EuiFlexItem, EuiSpacer, EuiText } from '@elastic/eui'; +import { + EuiButton, + EuiCallOut, + EuiCheckbox, + EuiFlexGroup, + EuiFlexItem, + EuiSpacer, + EuiText, +} from '@elastic/eui'; import { useFetchStream } from '@kbn/aiops-utils'; @@ -21,10 +29,15 @@ export const PageSimpleStringStream: FC = () => { const { core } = useDeps(); const basePath = core.http?.basePath.get() ?? ''; + const [compressResponse, setCompressResponse] = useState(true); + const { dispatch, errors, start, cancel, data, isRunning } = useFetchStream< ApiSimpleStringStream, typeof basePath - >(`${basePath}/internal/response_stream/simple_string_stream`, { timeout: 500 }); + >(`${basePath}/internal/response_stream/simple_string_stream`, { + compressResponse, + timeout: 500, + }); const onClickHandler = async () => { if (isRunning) { @@ -58,6 +71,14 @@ export const PageSimpleStringStream: FC = () => { + setCompressResponse(!compressResponse)} + compressed + /> +

{data}

diff --git a/examples/response_stream/server/routes/reducer_stream.ts b/examples/response_stream/server/routes/reducer_stream.ts index 7cc02d9b1a80f..e9fe6d02b68f6 100644 --- a/examples/response_stream/server/routes/reducer_stream.ts +++ b/examples/response_stream/server/routes/reducer_stream.ts @@ -31,17 +31,29 @@ export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => { const maxTimeoutMs = request.body.timeout ?? 250; const simulateError = request.body.simulateErrors ?? false; + let logMessageCounter = 1; + + function logDebugMessage(msg: string) { + logger.debug(`Response Stream Example #${logMessageCounter}: ${msg}`); + logMessageCounter++; + } + + logDebugMessage('Starting stream.'); + let shouldStop = false; request.events.aborted$.subscribe(() => { + logDebugMessage('aborted$ subscription trigger.'); shouldStop = true; }); request.events.completed$.subscribe(() => { + logDebugMessage('completed$ subscription trigger.'); shouldStop = true; }); const { end, push, responseWithHeaders } = streamFactory( request.headers, - logger + logger, + request.body.compressResponse ); const entities = [ diff --git a/examples/response_stream/server/routes/single_string_stream.ts b/examples/response_stream/server/routes/single_string_stream.ts index dd3a784314c96..fde4947746dbf 100644 --- a/examples/response_stream/server/routes/single_string_stream.ts +++ b/examples/response_stream/server/routes/single_string_stream.ts @@ -35,7 +35,11 @@ export const defineSimpleStringStreamRoute = (router: IRouter, logger: Logger) = shouldStop = true; }); - const { end, push, responseWithHeaders } = streamFactory(request.headers, logger); + const { end, push, responseWithHeaders } = streamFactory( + request.headers, + logger, + request.body.compressResponse + ); const text = 'Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is developed in Java and is dual-licensed under the source-available Server Side Public License and the Elastic license, while other parts fall under the proprietary (source-available) Elastic License. Official clients are available in Java, .NET (C#), PHP, Python, Apache Groovy, Ruby and many other languages. According to the DB-Engines ranking, Elasticsearch is the most popular enterprise search engine.'; diff --git a/x-pack/packages/ml/aiops_utils/src/stream_factory.test.ts b/x-pack/packages/ml/aiops_utils/src/stream_factory.test.ts index 1e6d7b40b22d0..27751b7dc3fd1 100644 --- a/x-pack/packages/ml/aiops_utils/src/stream_factory.test.ts +++ b/x-pack/packages/ml/aiops_utils/src/stream_factory.test.ts @@ -29,7 +29,7 @@ describe('streamFactory', () => { let mockLogger: Logger; beforeEach(() => { - mockLogger = { error: jest.fn() } as unknown as Logger; + mockLogger = { debug: jest.fn(), error: jest.fn(), info: jest.fn() } as unknown as Logger; }); it('should encode and receive an uncompressed string based stream', async () => { diff --git a/x-pack/packages/ml/aiops_utils/src/stream_factory.ts b/x-pack/packages/ml/aiops_utils/src/stream_factory.ts index fa455a04c23f1..d6e60e76efaa9 100644 --- a/x-pack/packages/ml/aiops_utils/src/stream_factory.ts +++ b/x-pack/packages/ml/aiops_utils/src/stream_factory.ts @@ -14,12 +14,15 @@ import type { Headers, ResponseHeaders } from '@kbn/core-http-server'; import { acceptCompression } from './accept_compression'; -// We need this otherwise Kibana server will crash with a 'ERR_METHOD_NOT_IMPLEMENTED' error. -class ResponseStream extends Stream.PassThrough { - flush() {} - _read() {} +// type guard to identify compressed stream +function isCompressedSream(arg: unknown): arg is zlib.Gzip { + return typeof arg === 'object' && arg !== null && typeof (arg as zlib.Gzip).flush === 'function'; } +const FLUSH_PAYLOAD_SIZE = 4 * 1024; + +class UncompressedResponseStream extends Stream.PassThrough {} + const DELIMITER = '\n'; type StreamType = 'string' | 'ndjson'; @@ -27,9 +30,9 @@ type StreamType = 'string' | 'ndjson'; interface StreamFactoryReturnType { DELIMITER: string; end: () => void; - push: (d: T) => void; + push: (d: T, drain?: boolean) => void; responseWithHeaders: { - body: zlib.Gzip | ResponseStream; + body: zlib.Gzip | UncompressedResponseStream; headers?: ResponseHeaders; }; } @@ -39,11 +42,16 @@ interface StreamFactoryReturnType { * for gzip compression depending on provided request headers. * * @param headers - Request headers. + * @param logger - Kibana logger. + * @param compressOverride - Optional flag to override header based compression setting. + * @param flushFix - Adds an attribute with a random string payload to overcome buffer flushing with certain proxy configurations. + * * @returns An object with stream attributes and methods. */ export function streamFactory( headers: Headers, logger: Logger, + compressOverride?: boolean, flushFix?: boolean ): StreamFactoryReturnType; /** @@ -51,27 +59,72 @@ export function streamFactory( * request headers. Any non-string data pushed to the stream will be stream as NDJSON. * * @param headers - Request headers. + * @param logger - Kibana logger. + * @param compressOverride - Optional flag to override header based compression setting. + * @param flushFix - Adds an attribute with a random string payload to overcome buffer flushing with certain proxy configurations. + * * @returns An object with stream attributes and methods. */ export function streamFactory( headers: Headers, logger: Logger, + compressOverride: boolean = true, flushFix: boolean = false ): StreamFactoryReturnType { let streamType: StreamType; - const isCompressed = acceptCompression(headers); + const isCompressed = compressOverride && acceptCompression(headers); + + const stream = isCompressed ? zlib.createGzip() : new UncompressedResponseStream(); + + // If waiting for draining of the stream, items will be added to this buffer. + const backPressureBuffer: T[] = []; - const stream = isCompressed ? zlib.createGzip() : new ResponseStream(); + // Flag will be set when the "drain" listener is active so we can avoid setting multiple listeners. + let waitForDrain = false; + + // Instead of a flag this is an array where we check if we are waiting on any callback from writing to the stream. + // It needs to be an array to avoid running into race conditions. + const waitForCallbacks: number[] = []; + + // Flag to set if the stream should be ended. Because there could be items in the backpressure buffer, we might + // not want to end the stream right away. Once the backpressure buffer is cleared, we'll end the stream eventually. + let tryToEnd = false; + + function logDebugMessage(msg: string) { + logger.debug(`HTTP Response Stream: ${msg}`); + } function end() { - stream.end(); + tryToEnd = true; + + logDebugMessage(`backPressureBuffer size on end(): ${backPressureBuffer.length}`); + logDebugMessage(`waitForCallbacks size on end(): ${waitForCallbacks.length}`); + + // Before ending the stream, we need to empty the backPressureBuffer + if (backPressureBuffer.length > 0) { + const el = backPressureBuffer.shift(); + if (el !== undefined) { + push(el, true); + } + return; + } + + if (waitForCallbacks.length === 0) { + logDebugMessage('All backPressureBuffer and waitForCallbacks cleared, ending the stream.'); + stream.end(); + } } - function push(d: T) { + function push(d: T, drain = false) { + logDebugMessage( + `Push to stream. Current backPressure buffer size: ${backPressureBuffer.length}, drain flag: ${drain}` + ); + if (d === undefined) { logger.error('Stream chunk must not be undefined.'); return; } + // Initialize the stream type with the first push to the stream, // otherwise check the integrity of the data to be pushed. if (streamType === undefined) { @@ -84,26 +137,69 @@ export function streamFactory( return; } + if ((!drain && waitForDrain) || (!drain && backPressureBuffer.length > 0)) { + logDebugMessage('Adding item to backpressure buffer.'); + backPressureBuffer.push(d); + return; + } + try { const line = streamType === 'ndjson' ? `${JSON.stringify({ ...d, // This is a temporary fix for response streaming with proxy configurations that buffer responses up to 4KB in size. - ...(flushFix ? { flushPayload: crypto.randomBytes(4096).toString('hex') } : {}), + ...(flushFix + ? { flushPayload: crypto.randomBytes(FLUSH_PAYLOAD_SIZE).toString('hex') } + : {}), })}${DELIMITER}` : d; - stream.write(line); + + waitForCallbacks.push(1); + const writeOk = stream.write(line, () => { + waitForCallbacks.pop(); + // Calling .flush() on a compression stream will + // make zlib return as much output as currently possible. + if (isCompressedSream(stream)) { + stream.flush(); + } + + if (tryToEnd && waitForCallbacks.length === 0) { + end(); + } + }); + + logDebugMessage(`Ok to write to the stream again? ${writeOk}`); + + if (!writeOk) { + logDebugMessage(`Should we add the "drain" listener?: ${!waitForDrain}`); + if (!waitForDrain) { + waitForDrain = true; + stream.once('drain', () => { + logDebugMessage( + 'The "drain" listener triggered, we can continue pushing to the stream.' + ); + + waitForDrain = false; + if (backPressureBuffer.length > 0) { + const el = backPressureBuffer.shift(); + if (el !== undefined) { + push(el, true); + } + } + }); + } + } else if (writeOk && drain && backPressureBuffer.length > 0) { + logDebugMessage('Continue clearing the backpressure buffer.'); + const el = backPressureBuffer.shift(); + if (el !== undefined) { + push(el, true); + } + } } catch (e) { logger.error(`Could not serialize or stream data chunk: ${e.toString()}`); return; } - - // Calling .flush() on a compression stream will - // make zlib return as much output as currently possible. - if (isCompressed) { - stream.flush(); - } } const responseWithHeaders: StreamFactoryReturnType['responseWithHeaders'] = { diff --git a/x-pack/plugins/aiops/common/api/explain_log_rate_spikes/schema.ts b/x-pack/plugins/aiops/common/api/explain_log_rate_spikes/schema.ts index f816af06f5324..a5d48462c5170 100644 --- a/x-pack/plugins/aiops/common/api/explain_log_rate_spikes/schema.ts +++ b/x-pack/plugins/aiops/common/api/explain_log_rate_spikes/schema.ts @@ -21,6 +21,9 @@ export const aiopsExplainLogRateSpikesSchema = schema.object({ deviationMax: schema.number(), /** The index to query for log rate spikes */ index: schema.string(), + /** Settings to override headers derived compression and flush fix */ + compressResponse: schema.maybe(schema.boolean()), + flushFix: schema.maybe(schema.boolean()), }); export type AiopsExplainLogRateSpikesSchema = TypeOf; diff --git a/x-pack/plugins/aiops/public/components/explain_log_rate_spikes/explain_log_rate_spikes_analysis.tsx b/x-pack/plugins/aiops/public/components/explain_log_rate_spikes/explain_log_rate_spikes_analysis.tsx index 9949ec537b77a..74be1a0f048fc 100644 --- a/x-pack/plugins/aiops/public/components/explain_log_rate_spikes/explain_log_rate_spikes_analysis.tsx +++ b/x-pack/plugins/aiops/public/components/explain_log_rate_spikes/explain_log_rate_spikes_analysis.tsx @@ -95,6 +95,7 @@ export const ExplainLogRateSpikesAnalysis: FC timeFieldName: dataView.timeFieldName ?? '', index: dataView.title, grouping: true, + flushFix: true, ...windowParameters, }, { reducer: streamReducer, initialState } diff --git a/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts b/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts index 949b535ca16fb..67cbcce241401 100644 --- a/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts +++ b/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts @@ -51,6 +51,9 @@ import { markDuplicates, } from './queries/get_simple_hierarchical_tree'; +// 10s ping frequency to keep the stream alive. +const PING_FREQUENCY = 10000; + // Overall progress is a float from 0 to 1. const LOADED_FIELD_CANDIDATES = 0.2; const PROGRESS_STEP_P_VALUES = 0.5; @@ -77,12 +80,12 @@ export const defineExplainLogRateSpikesRoute = ( let logMessageCounter = 1; - function logInfoMessage(msg: string) { - logger.info(`Explain Log Rate Spikes #${logMessageCounter}: ${msg}`); + function logDebugMessage(msg: string) { + logger.debug(`Explain Log Rate Spikes #${logMessageCounter}: ${msg}`); logMessageCounter++; } - logInfoMessage('Starting analysis.'); + logDebugMessage('Starting analysis.'); const groupingEnabled = !!request.body.grouping; @@ -90,15 +93,16 @@ export const defineExplainLogRateSpikesRoute = ( const controller = new AbortController(); + let isRunning = false; let loaded = 0; let shouldStop = false; request.events.aborted$.subscribe(() => { - logInfoMessage('aborted$ subscription trigger.'); + logDebugMessage('aborted$ subscription trigger.'); shouldStop = true; controller.abort(); }); request.events.completed$.subscribe(() => { - logInfoMessage('completed$ subscription trigger.'); + logDebugMessage('completed$ subscription trigger.'); shouldStop = true; controller.abort(); }); @@ -107,17 +111,26 @@ export const defineExplainLogRateSpikesRoute = ( end: streamEnd, push, responseWithHeaders, - } = streamFactory(request.headers, logger, true); - - function pushPing() { - push(pingAction()); + } = streamFactory( + request.headers, + logger, + request.body.compressResponse, + request.body.flushFix + ); + + function pushPingWithTimeout() { + setTimeout(() => { + if (isRunning) { + logDebugMessage('Ping message.'); + push(pingAction()); + pushPingWithTimeout(); + } + }, PING_FREQUENCY); } - const pingInterval = setInterval(pushPing, 1000); - function end() { - logInfoMessage('Ending analysis.'); - clearInterval(pingInterval); + isRunning = false; + logDebugMessage('Ending analysis.'); streamEnd(); } @@ -139,15 +152,16 @@ export const defineExplainLogRateSpikesRoute = ( } function pushError(m: string) { - logInfoMessage('Push error.'); + logDebugMessage('Push error.'); push(addErrorAction(m)); } - // Async IIFE to run the analysis while not blocking returning `responseWithHeaders`. - (async () => { - logInfoMessage('Reset.'); + async function runAnalysis() { + isRunning = true; + logDebugMessage('Reset.'); push(resetAction()); - logInfoMessage('Load field candidates.'); + pushPingWithTimeout(); + logDebugMessage('Load field candidates.'); push( updateLoadingStateAction({ ccsWarning: false, @@ -204,11 +218,11 @@ export const defineExplainLogRateSpikesRoute = ( const fieldCandidatesChunks = chunk(fieldCandidates, chunkSize); - logInfoMessage('Fetch p-values.'); + logDebugMessage('Fetch p-values.'); for (const fieldCandidatesChunk of fieldCandidatesChunks) { chunkCount++; - logInfoMessage(`Fetch p-values. Chunk ${chunkCount} of ${fieldCandidatesChunks.length}`); + logDebugMessage(`Fetch p-values. Chunk ${chunkCount} of ${fieldCandidatesChunks.length}`); let pValues: Awaited>; try { pValues = await fetchChangePointPValues( @@ -258,7 +272,7 @@ export const defineExplainLogRateSpikesRoute = ( ); if (shouldStop) { - logInfoMessage('shouldStop fetching p-values.'); + logDebugMessage('shouldStop fetching p-values.'); end(); return; @@ -266,7 +280,7 @@ export const defineExplainLogRateSpikesRoute = ( } if (changePoints?.length === 0) { - logInfoMessage('Stopping analysis, did not find change points.'); + logDebugMessage('Stopping analysis, did not find change points.'); endWithUpdatedLoadingState(); return; } @@ -275,7 +289,7 @@ export const defineExplainLogRateSpikesRoute = ( { fieldName: request.body.timeFieldName, type: KBN_FIELD_TYPES.DATE }, ]; - logInfoMessage('Fetch overall histogram.'); + logDebugMessage('Fetch overall histogram.'); let overallTimeSeries: NumericChartData | undefined; try { @@ -313,7 +327,7 @@ export const defineExplainLogRateSpikesRoute = ( } if (groupingEnabled) { - logInfoMessage('Group results.'); + logDebugMessage('Group results.'); push( updateLoadingStateAction({ @@ -498,7 +512,7 @@ export const defineExplainLogRateSpikesRoute = ( pushHistogramDataLoadingState(); - logInfoMessage('Fetch group histograms.'); + logDebugMessage('Fetch group histograms.'); await asyncForEach(changePointGroups, async (cpg) => { if (overallTimeSeries !== undefined) { @@ -577,7 +591,7 @@ export const defineExplainLogRateSpikesRoute = ( loaded += PROGRESS_STEP_HISTOGRAMS_GROUPS; - logInfoMessage('Fetch field/value histograms.'); + logDebugMessage('Fetch field/value histograms.'); // time series filtered by fields if (changePoints && overallTimeSeries !== undefined) { @@ -661,7 +675,10 @@ export const defineExplainLogRateSpikesRoute = ( } endWithUpdatedLoadingState(); - })(); + } + + // Do not call this using `await` so it will run asynchronously while we return the stream already. + runAnalysis(); return response.ok(responseWithHeaders); } diff --git a/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts b/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts index a2e1f158a73e2..2804fc3fc3e0e 100644 --- a/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts +++ b/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts @@ -75,13 +75,21 @@ export default ({ getService }: FtrProviderContext) => { await esArchiver.unload('x-pack/test/functional/es_archives/ml/ecommerce'); }); - it('should return full data without streaming', async () => { + async function requestWithoutStreaming(body: ApiExplainLogRateSpikes['body']) { const resp = await supertest .post(`/internal/aiops/explain_log_rate_spikes`) .set('kbn-xsrf', 'kibana') - .send(requestBody) + .send(body) .expect(200); + // compression is on by default so if the request body is undefined + // the response header should include "gzip" and otherwise be "undefined" + if (body.compressResponse === undefined) { + expect(resp.header['content-encoding']).to.be('gzip'); + } else if (body.compressResponse === false) { + expect(resp.header['content-encoding']).to.be(undefined); + } + expect(Buffer.isBuffer(resp.body)).to.be(true); const chunks: string[] = resp.body.toString().split('\n'); @@ -131,34 +139,64 @@ export default ({ getService }: FtrProviderContext) => { histograms.forEach((h, index) => { expect(h.histogram.length).to.be(20); }); + } + + it('should return full data without streaming with compression with flushFix', async () => { + await requestWithoutStreaming(requestBody); + }); + + it('should return full data without streaming with compression without flushFix', async () => { + await requestWithoutStreaming({ ...requestBody, flushFix: false }); }); - it('should return data in chunks with streaming', async () => { - const response = await fetch(`${kibanaServerUrl}/internal/aiops/explain_log_rate_spikes`, { + it('should return full data without streaming without compression with flushFix', async () => { + await requestWithoutStreaming({ ...requestBody, compressResponse: false }); + }); + + it('should return full data without streaming without compression without flushFix', async () => { + await requestWithoutStreaming({ ...requestBody, compressResponse: false, flushFix: false }); + }); + + async function requestWithStreaming(body: ApiExplainLogRateSpikes['body']) { + const resp = await fetch(`${kibanaServerUrl}/internal/aiops/explain_log_rate_spikes`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'kbn-xsrf': 'stream', }, - body: JSON.stringify(requestBody), + body: JSON.stringify(body), }); - expect(response.ok).to.be(true); - expect(response.status).to.be(200); + // compression is on by default so if the request body is undefined + // the response header should include "gzip" and otherwise be "null" + if (body.compressResponse === undefined) { + expect(resp.headers.get('content-encoding')).to.be('gzip'); + } else if (body.compressResponse === false) { + expect(resp.headers.get('content-encoding')).to.be(null); + } + + expect(resp.ok).to.be(true); + expect(resp.status).to.be(200); - const stream = response.body; + const stream = resp.body; expect(stream).not.to.be(null); if (stream !== null) { const data: any[] = []; + let chunkCounter = 0; + const parseStreamCallback = (c: number) => (chunkCounter = c); - for await (const action of parseStream(stream)) { + for await (const action of parseStream(stream, parseStreamCallback)) { expect(action.type).not.to.be('error'); data.push(action); } + // If streaming works correctly we should receive more than one chunk. + expect(chunkCounter).to.be.greaterThan(1); + expect(data.length).to.be(expected.actionsLength); + const addChangePointsActions = data.filter((d) => d.type === expected.changePointFilter); expect(addChangePointsActions.length).to.greaterThan(0); @@ -189,6 +227,22 @@ export default ({ getService }: FtrProviderContext) => { expect(h.histogram.length).to.be(20); }); } + } + + it('should return data in chunks with streaming with compression with flushFix', async () => { + await requestWithStreaming(requestBody); + }); + + it('should return data in chunks with streaming with compression without flushFix', async () => { + await requestWithStreaming({ ...requestBody, flushFix: false }); + }); + + it('should return data in chunks with streaming without compression with flushFix', async () => { + await requestWithStreaming({ ...requestBody, compressResponse: false }); + }); + + it('should return data in chunks with streaming without compression without flushFix', async () => { + await requestWithStreaming({ ...requestBody, compressResponse: false, flushFix: false }); }); it('should return an error for non existing index without streaming', async () => { diff --git a/x-pack/test/api_integration/apis/aiops/parse_stream.ts b/x-pack/test/api_integration/apis/aiops/parse_stream.ts index f3da52e6024bb..70ee86bb64fd8 100644 --- a/x-pack/test/api_integration/apis/aiops/parse_stream.ts +++ b/x-pack/test/api_integration/apis/aiops/parse_stream.ts @@ -5,11 +5,16 @@ * 2.0. */ -export async function* parseStream(stream: NodeJS.ReadableStream) { +export async function* parseStream( + stream: NodeJS.ReadableStream, + callback?: (chunkCounter: number) => void +) { let partial = ''; + let chunkCounter = 0; try { for await (const value of stream) { + chunkCounter++; const full = `${partial}${value}`; const parts = full.split('\n'); const last = parts.pop(); @@ -25,4 +30,8 @@ export async function* parseStream(stream: NodeJS.ReadableStream) { } catch (error) { yield { type: 'error', payload: error.toString() }; } + + if (typeof callback === 'function') { + callback(chunkCounter); + } }