diff --git a/x-pack/packages/ml/aiops_utils/src/lib/stream_factory.test.ts b/x-pack/packages/ml/aiops_utils/src/lib/stream_factory.test.ts index 53cd5f332f805..a0c5212244ad6 100644 --- a/x-pack/packages/ml/aiops_utils/src/lib/stream_factory.test.ts +++ b/x-pack/packages/ml/aiops_utils/src/lib/stream_factory.test.ts @@ -7,6 +7,8 @@ import * as zlib from 'zlib'; +import type { Logger } from '@kbn/logging'; + import { streamFactory } from './stream_factory'; interface MockItem { @@ -24,8 +26,14 @@ const mockItem2: MockItem = { }; describe('streamFactory', () => { + let mockLogger: Logger; + + beforeEach(() => { + mockLogger = { error: jest.fn() } as unknown as Logger; + }); + it('should encode and receive an uncompressed string based stream', async () => { - const { end, push, responseWithHeaders } = streamFactory({}); + const { end, push, responseWithHeaders } = streamFactory({}, mockLogger); push('push1'); push('push2'); @@ -41,7 +49,7 @@ describe('streamFactory', () => { }); it('should encode and receive an uncompressed NDJSON based stream', async () => { - const { DELIMITER, end, push, responseWithHeaders } = streamFactory({}); + const { DELIMITER, end, push, responseWithHeaders } = streamFactory({}, mockLogger); push(mockItem1); push(mockItem2); @@ -74,9 +82,12 @@ describe('streamFactory', () => { // without the need for additional custom code. it('should encode and receive a compressed string based stream', (done) => { (async () => { - const { end, push, responseWithHeaders } = streamFactory({ - 'accept-encoding': 'gzip', - }); + const { end, push, responseWithHeaders } = streamFactory( + { + 'accept-encoding': 'gzip', + }, + mockLogger + ); push('push1'); push('push2'); @@ -104,9 +115,12 @@ describe('streamFactory', () => { it('should encode and receive a compressed NDJSON based stream', (done) => { (async () => { - const { DELIMITER, end, push, responseWithHeaders } = streamFactory({ - 'accept-encoding': 'gzip', - }); + const { DELIMITER, end, push, responseWithHeaders } = streamFactory( + { + 'accept-encoding': 'gzip', + }, + mockLogger + ); push(mockItem1); push(mockItem2); @@ -140,49 +154,49 @@ describe('streamFactory', () => { })(); }); - it('should throw when a string based stream receives a non-string chunk', async () => { - const { push } = streamFactory({}); + it('should log an error when a string based stream receives a non-string chunk', async () => { + const { push } = streamFactory({}, mockLogger); // First push initializes the stream as string based. - expect(() => { - push('push1'); - }).not.toThrow(); + push('push1'); + expect(mockLogger.error).toHaveBeenCalledTimes(0); // Second push is again a string and should not throw. - expect(() => { - push('push2'); - }).not.toThrow(); + push('push2'); + expect(mockLogger.error).toHaveBeenCalledTimes(0); // Third push is not a string and should trigger an error. - expect(() => { - push({ myObject: 'push3' } as unknown as string); - }).toThrow('Must not push non-string chunks to a string based stream.'); + push({ myObject: 'push3' } as unknown as string); + expect(mockLogger.error).toHaveBeenCalledTimes(1); + expect(mockLogger.error).toHaveBeenCalledWith( + 'Must not push non-string chunks to a string based stream.' + ); }); - it('should throw when an NDJSON based stream receives a string chunk', async () => { - const { push } = streamFactory({}); + it('should log an error when an NDJSON based stream receives a string chunk', async () => { + const { push } = streamFactory({}, mockLogger); // First push initializes the stream as NDJSON based. - expect(() => { - push(mockItem1); - }).not.toThrow(); + push(mockItem1); + expect(mockLogger.error).toHaveBeenCalledTimes(0); // Second push is again a valid object and should not throw. - expect(() => { - push(mockItem1); - }).not.toThrow(); + push(mockItem1); + expect(mockLogger.error).toHaveBeenCalledTimes(0); // Third push is a string and should trigger an error. - expect(() => { - push('push3' as unknown as MockItem); - }).toThrow('Must not push raw string chunks to an NDJSON based stream.'); + push('push3' as unknown as MockItem); + expect(mockLogger.error).toHaveBeenCalledTimes(1); + expect(mockLogger.error).toHaveBeenCalledWith( + 'Must not push raw string chunks to an NDJSON based stream.' + ); }); - it('should throw for undefined as push value', async () => { - const { push } = streamFactory({}); + it('should log an error for undefined as push value', async () => { + const { push } = streamFactory({}, mockLogger); - expect(() => { - push(undefined as unknown as string); - }).toThrow('Stream chunk must not be undefined.'); + push(undefined as unknown as string); + expect(mockLogger.error).toHaveBeenCalledTimes(1); + expect(mockLogger.error).toHaveBeenCalledWith('Stream chunk must not be undefined.'); }); }); diff --git a/x-pack/packages/ml/aiops_utils/src/lib/stream_factory.ts b/x-pack/packages/ml/aiops_utils/src/lib/stream_factory.ts index 291890138ea40..9df9702eb0870 100644 --- a/x-pack/packages/ml/aiops_utils/src/lib/stream_factory.ts +++ b/x-pack/packages/ml/aiops_utils/src/lib/stream_factory.ts @@ -8,6 +8,8 @@ import { Stream } from 'stream'; import * as zlib from 'zlib'; +import type { Logger } from '@kbn/logging'; + import { acceptCompression } from './accept_compression'; /** @@ -30,7 +32,6 @@ type StreamType = 'string' | 'ndjson'; interface StreamFactoryReturnType { DELIMITER: string; end: () => void; - error: (errorText: string) => void; push: (d: T) => void; responseWithHeaders: { body: zlib.Gzip | ResponseStream; @@ -47,7 +48,10 @@ interface StreamFactoryReturnType { * @param headers - Request headers. * @returns An object with stream attributes and methods. */ -export function streamFactory(headers: Headers): StreamFactoryReturnType; +export function streamFactory( + headers: Headers, + logger: Logger +): StreamFactoryReturnType; /** * Sets up a response stream with support for gzip compression depending on provided * request headers. Any non-string data pushed to the stream will be stream as NDJSON. @@ -55,23 +59,22 @@ export function streamFactory(headers: Headers): StreamFactoryReturn * @param headers - Request headers. * @returns An object with stream attributes and methods. */ -export function streamFactory(headers: Headers): StreamFactoryReturnType { +export function streamFactory( + headers: Headers, + logger: Logger +): StreamFactoryReturnType { let streamType: StreamType; const isCompressed = acceptCompression(headers); const stream = isCompressed ? zlib.createGzip() : new ResponseStream(); - function error(errorText: string) { - stream.emit('error', errorText); - } - function end() { stream.end(); } function push(d: T) { if (d === undefined) { - error('Stream chunk must not be undefined.'); + logger.error('Stream chunk must not be undefined.'); return; } // Initialize the stream type with the first push to the stream, @@ -79,10 +82,10 @@ export function streamFactory(headers: Headers): StreamFactoryRetur if (streamType === undefined) { streamType = typeof d === 'string' ? 'string' : 'ndjson'; } else if (streamType === 'string' && typeof d !== 'string') { - error('Must not push non-string chunks to a string based stream.'); + logger.error('Must not push non-string chunks to a string based stream.'); return; } else if (streamType === 'ndjson' && typeof d === 'string') { - error('Must not push raw string chunks to an NDJSON based stream.'); + logger.error('Must not push raw string chunks to an NDJSON based stream.'); return; } @@ -90,7 +93,8 @@ export function streamFactory(headers: Headers): StreamFactoryRetur const line = typeof d !== 'string' ? `${JSON.stringify(d)}${DELIMITER}` : d; stream.write(line); } catch (e) { - error(`Could not serialize or stream data chunk: ${e.toString()}`); + logger.error(`Could not serialize or stream data chunk: ${e.toString()}`); + return; } // Calling .flush() on a compression stream will @@ -111,5 +115,5 @@ export function streamFactory(headers: Headers): StreamFactoryRetur : {}), }; - return { DELIMITER, end, error, push, responseWithHeaders }; + return { DELIMITER, end, push, responseWithHeaders }; } diff --git a/x-pack/plugins/aiops/common/api/explain_log_rate_spikes.ts b/x-pack/plugins/aiops/common/api/explain_log_rate_spikes/actions.ts similarity index 64% rename from x-pack/plugins/aiops/common/api/explain_log_rate_spikes.ts rename to x-pack/plugins/aiops/common/api/explain_log_rate_spikes/actions.ts index 096851a31bdb0..cd6eb734da503 100644 --- a/x-pack/plugins/aiops/common/api/explain_log_rate_spikes.ts +++ b/x-pack/plugins/aiops/common/api/explain_log_rate_spikes/actions.ts @@ -5,29 +5,11 @@ * 2.0. */ -import { schema, TypeOf } from '@kbn/config-schema'; - -import type { ChangePoint } from '../types'; - -export const aiopsExplainLogRateSpikesSchema = schema.object({ - start: schema.number(), - end: schema.number(), - kuery: schema.string(), - timeFieldName: schema.string(), - includeFrozen: schema.maybe(schema.boolean()), - /** Analysis selection time ranges */ - baselineMin: schema.number(), - baselineMax: schema.number(), - deviationMin: schema.number(), - deviationMax: schema.number(), - /** The index to query for log rate spikes */ - index: schema.string(), -}); - -export type AiopsExplainLogRateSpikesSchema = TypeOf; +import type { ChangePoint } from '../../types'; export const API_ACTION_NAME = { ADD_CHANGE_POINTS: 'add_change_points', + ERROR: 'error', UPDATE_LOADING_STATE: 'update_loading_state', } as const; export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME]; @@ -37,7 +19,7 @@ interface ApiActionAddChangePoints { payload: ChangePoint[]; } -export function addChangePoints( +export function addChangePointsAction( payload: ApiActionAddChangePoints['payload'] ): ApiActionAddChangePoints { return { @@ -46,6 +28,18 @@ export function addChangePoints( }; } +interface ApiActionError { + type: typeof API_ACTION_NAME.ERROR; + payload: string; +} + +export function errorAction(payload: ApiActionError['payload']): ApiActionError { + return { + type: API_ACTION_NAME.ERROR, + payload, + }; +} + interface ApiActionUpdateLoadingState { type: typeof API_ACTION_NAME.UPDATE_LOADING_STATE; payload: { @@ -66,4 +60,5 @@ export function updateLoadingStateAction( export type AiopsExplainLogRateSpikesApiAction = | ApiActionAddChangePoints + | ApiActionError | ApiActionUpdateLoadingState; diff --git a/x-pack/plugins/aiops/common/api/explain_log_rate_spikes/index.ts b/x-pack/plugins/aiops/common/api/explain_log_rate_spikes/index.ts new file mode 100644 index 0000000000000..eb37d6e489be7 --- /dev/null +++ b/x-pack/plugins/aiops/common/api/explain_log_rate_spikes/index.ts @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export { + addChangePointsAction, + errorAction, + updateLoadingStateAction, + API_ACTION_NAME, +} from './actions'; +export type { AiopsExplainLogRateSpikesApiAction } from './actions'; + +export { aiopsExplainLogRateSpikesSchema } from './schema'; +export type { AiopsExplainLogRateSpikesSchema } from './schema'; diff --git a/x-pack/plugins/aiops/common/api/explain_log_rate_spikes/schema.ts b/x-pack/plugins/aiops/common/api/explain_log_rate_spikes/schema.ts new file mode 100644 index 0000000000000..9c4928d310701 --- /dev/null +++ b/x-pack/plugins/aiops/common/api/explain_log_rate_spikes/schema.ts @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { schema, TypeOf } from '@kbn/config-schema'; + +export const aiopsExplainLogRateSpikesSchema = schema.object({ + start: schema.number(), + end: schema.number(), + kuery: schema.string(), + timeFieldName: schema.string(), + includeFrozen: schema.maybe(schema.boolean()), + /** Analysis selection time ranges */ + baselineMin: schema.number(), + baselineMax: schema.number(), + deviationMin: schema.number(), + deviationMax: schema.number(), + /** The index to query for log rate spikes */ + index: schema.string(), +}); + +export type AiopsExplainLogRateSpikesSchema = TypeOf; diff --git a/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts b/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts index ac96937342961..23b1d7a8af0dc 100644 --- a/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts +++ b/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts @@ -7,13 +7,15 @@ import { chunk } from 'lodash'; -import type { IRouter, Logger } from '@kbn/core/server'; +import type { IRouter } from '@kbn/core/server'; +import type { Logger } from '@kbn/logging'; import type { DataRequestHandlerContext } from '@kbn/data-plugin/server'; import { streamFactory } from '@kbn/aiops-utils'; import { - addChangePoints, + addChangePointsAction, aiopsExplainLogRateSpikesSchema, + errorAction, updateLoadingStateAction, AiopsExplainLogRateSpikesApiAction, } from '../../common/api/explain_log_rate_spikes'; @@ -43,7 +45,7 @@ export const defineExplainLogRateSpikesRoute = ( }, async (context, request, response) => { if (!license.isActivePlatinumLicense) { - response.forbidden(); + return response.forbidden(); } const client = (await context.core).elasticsearch.client.asCurrentUser; @@ -62,7 +64,8 @@ export const defineExplainLogRateSpikesRoute = ( }); const { end, push, responseWithHeaders } = streamFactory( - request.headers + request.headers, + logger ); // Async IIFE to run the analysis while not blocking returning `responseWithHeaders`. @@ -75,7 +78,14 @@ export const defineExplainLogRateSpikesRoute = ( }) ); - const { fieldCandidates } = await fetchFieldCandidates(client, request.body); + let fieldCandidates: Awaited>; + try { + fieldCandidates = await fetchFieldCandidates(client, request.body); + } catch (e) { + push(errorAction(e.toString())); + end(); + return; + } if (fieldCandidates.length > 0) { loaded += LOADED_FIELD_CANDIDATES; @@ -103,11 +113,14 @@ export const defineExplainLogRateSpikesRoute = ( const fieldCandidatesChunks = chunk(fieldCandidates, chunkSize); for (const fieldCandidatesChunk of fieldCandidatesChunks) { - const { changePoints: pValues } = await fetchChangePointPValues( - client, - request.body, - fieldCandidatesChunk - ); + let pValues: Awaited>; + try { + pValues = await fetchChangePointPValues(client, request.body, fieldCandidatesChunk); + } catch (e) { + push(errorAction(e.toString())); + end(); + return; + } if (pValues.length > 0) { pValues.forEach((d) => { @@ -118,7 +131,7 @@ export const defineExplainLogRateSpikesRoute = ( loaded += (1 / fieldCandidatesChunks.length) * PROGRESS_STEP_P_VALUES; if (pValues.length > 0) { - push(addChangePoints(pValues)); + push(addChangePointsAction(pValues)); } push( updateLoadingStateAction({ diff --git a/x-pack/plugins/aiops/server/routes/queries/fetch_change_point_p_values.ts b/x-pack/plugins/aiops/server/routes/queries/fetch_change_point_p_values.ts index 0504d98ea0ab8..f8b088b76a713 100644 --- a/x-pack/plugins/aiops/server/routes/queries/fetch_change_point_p_values.ts +++ b/x-pack/plugins/aiops/server/routes/queries/fetch_change_point_p_values.ts @@ -90,7 +90,7 @@ export const fetchChangePointPValues = async ( esClient: ElasticsearchClient, params: AiopsExplainLogRateSpikesSchema, fieldNames: string[] -) => { +): Promise => { const result: ChangePoint[] = []; for (const fieldName of fieldNames) { @@ -119,7 +119,5 @@ export const fetchChangePointPValues = async ( } } - return { - changePoints: uniqBy(result, (d) => `${d.fieldName},${d.fieldValue}`), - }; + return uniqBy(result, (d) => `${d.fieldName},${d.fieldValue}`); }; diff --git a/x-pack/plugins/aiops/server/routes/queries/fetch_field_candidates.ts b/x-pack/plugins/aiops/server/routes/queries/fetch_field_candidates.ts index df92f370fd175..78d154435c4a6 100644 --- a/x-pack/plugins/aiops/server/routes/queries/fetch_field_candidates.ts +++ b/x-pack/plugins/aiops/server/routes/queries/fetch_field_candidates.ts @@ -45,7 +45,7 @@ export const getRandomDocsRequest = ( export const fetchFieldCandidates = async ( esClient: ElasticsearchClient, params: AiopsExplainLogRateSpikesSchema -): Promise<{ fieldCandidates: string[] }> => { +): Promise => { const { index } = params; // Get all supported fields const respMapping = await esClient.fieldCaps({ @@ -78,7 +78,5 @@ export const fetchFieldCandidates = async ( } }); - return { - fieldCandidates: [...finalFieldCandidates], - }; + return [...finalFieldCandidates]; }; diff --git a/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts b/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts index 6d5789224a683..be65531da0b02 100644 --- a/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts +++ b/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts @@ -36,7 +36,10 @@ export default ({ getService }: FtrProviderContext) => { const expected = { chunksLength: 7, actionsLength: 6, + noIndexChunksLength: 3, + noIndexActionsLength: 2, actionFilter: 'add_change_points', + errorFilter: 'error', changePoints: [ { fieldName: 'day_of_week', @@ -167,5 +170,41 @@ export default ({ getService }: FtrProviderContext) => { }); } }); + + it('should return an error for non existing index without streaming', async () => { + const resp = await supertest + .post(`/internal/aiops/explain_log_rate_spikes`) + .set('kbn-xsrf', 'kibana') + .send({ + ...requestBody, + index: 'does_not_exist', + }) + .expect(200); + + const chunks: string[] = resp.body.toString().split('\n'); + + expect(chunks.length).to.be(expected.noIndexChunksLength); + + const lastChunk = chunks.pop(); + expect(lastChunk).to.be(''); + + let data: any[] = []; + + expect(() => { + data = chunks.map((c) => JSON.parse(c)); + }).not.to.throwError(); + + expect(data.length).to.be(expected.noIndexActionsLength); + data.forEach((d) => { + expect(typeof d.type).to.be('string'); + }); + + const errorActions = data.filter((d) => d.type === expected.errorFilter); + expect(errorActions.length).to.be(1); + + expect(errorActions[0].payload).to.be( + 'ResponseError: index_not_found_exception: [index_not_found_exception] Reason: no such index [does_not_exist]' + ); + }); }); }; diff --git a/x-pack/test/api_integration/apis/aiops/index.ts b/x-pack/test/api_integration/apis/aiops/index.ts index 24a0391cf4877..1c80923e0c4f2 100644 --- a/x-pack/test/api_integration/apis/aiops/index.ts +++ b/x-pack/test/api_integration/apis/aiops/index.ts @@ -11,7 +11,7 @@ import { FtrProviderContext } from '../../ftr_provider_context'; export default function ({ loadTestFile }: FtrProviderContext) { describe('AIOps', function () { - this.tags(['aiops']); + this.tags(['aiops', 'walterra']); if (AIOPS_ENABLED) { loadTestFile(require.resolve('./explain_log_rate_spikes'));