Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Explain log rate spikes: Adds API license check. #135431

Merged
merged 9 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion examples/response_stream/common/api/reducer_stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export const API_ACTION_NAME = {
UPDATE_PROGRESS: 'update_progress',
ADD_TO_ENTITY: 'add_to_entity',
DELETE_ENTITY: 'delete_entity',
ERROR: 'error',
} as const;
export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME];

Expand Down Expand Up @@ -59,7 +60,20 @@ export function deleteEntityAction(payload: string): ApiActionDeleteEntity {
};
}

interface ApiActionError {
type: typeof API_ACTION_NAME.ERROR;
payload: string;
}

export function errorAction(payload: string): ApiActionError {
return {
type: API_ACTION_NAME.ERROR,
payload,
};
}

export type ReducerStreamApiAction =
| ApiActionUpdateProgress
| ApiActionAddToEntity
| ApiActionDeleteEntity;
| ApiActionDeleteEntity
| ApiActionError;
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ export const UI_ACTION_NAME = {
export type UiActionName = typeof UI_ACTION_NAME[keyof typeof UI_ACTION_NAME];

export interface StreamState {
errors: string[];
progress: number;
entities: Record<string, number>;
}
export const initialState: StreamState = {
errors: [],
progress: 0,
entities: {},
};
Expand Down Expand Up @@ -64,6 +66,11 @@ export function reducerStreamReducer(
...state,
entities: addToEntities,
};
case API_ACTION_NAME.ERROR:
return {
...state,
errors: [...state.errors, action.payload],
};
case UI_ACTION_NAME.RESET:
return initialState;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,20 @@ export const PageReducerStream: FC = () => {
}
};

// This is for low level errors on the stream/HTTP level.
useEffect(() => {
if (error) {
notifications.toasts.addDanger(error);
}
}, [error, notifications.toasts]);

// This is for errors on the application level
useEffect(() => {
if (data.errors.length > 0) {
notifications.toasts.addDanger(data.errors[data.errors.length - 1]);
}
}, [data.errors, notifications.toasts]);

const buttonLabel = isRunning ? 'Stop development' : 'Start development';

return (
Expand Down
13 changes: 7 additions & 6 deletions examples/response_stream/server/routes/reducer_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type { IRouter, Logger } from '@kbn/core/server';
import { streamFactory } from '@kbn/aiops-utils';

import {
errorAction,
reducerStreamRequestBodySchema,
updateProgressAction,
addToEntityAction,
Expand Down Expand Up @@ -38,8 +39,9 @@ export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => {
shouldStop = true;
});

const { end, error, push, responseWithHeaders } = streamFactory<ReducerStreamApiAction>(
request.headers
const { end, push, responseWithHeaders } = streamFactory<ReducerStreamApiAction>(
request.headers,
logger
);

const entities = [
Expand Down Expand Up @@ -84,18 +86,17 @@ export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => {
push(deleteEntityAction(randomEntity));
} else if (randomAction === 'throw-error') {
// Throw an error. It should not crash Kibana!
// It should be caught, logged and passed on as a stream error.
// It should be caught and logged to the Kibana server console.
throw new Error('There was a (simulated) server side error!');
} else if (randomAction === 'emit-error') {
// Directly emit an error to the stream, this will not be logged.
error('Error pushed to the stream');
// Emit an error as a stream action.
push(errorAction('(Simulated) error pushed to the stream'));
return;
}

pushStreamUpdate();
} catch (e) {
logger.error(e);
error(e);
}
}, Math.floor(Math.random() * maxTimeoutMs));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export const defineSimpleStringStreamRoute = (router: IRouter, logger: Logger) =
shouldStop = true;
});

const { end, error, push, responseWithHeaders } = streamFactory(request.headers);
const { end, push, responseWithHeaders } = streamFactory(request.headers, logger);

const text =
'Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is developed in Java and is dual-licensed under the source-available Server Side Public License and the Elastic license, while other parts fall under the proprietary (source-available) Elastic License. Official clients are available in Java, .NET (C#), PHP, Python, Apache Groovy, Ruby and many other languages. According to the DB-Engines ranking, Elasticsearch is the most popular enterprise search engine.';
Expand All @@ -62,7 +62,7 @@ export const defineSimpleStringStreamRoute = (router: IRouter, logger: Logger) =
end();
}
} catch (e) {
error(`There was an error: ${e.toString()}`);
logger.error(`There was an error: ${e.toString()}`);
}
}

Expand Down
84 changes: 49 additions & 35 deletions x-pack/packages/ml/aiops_utils/src/lib/stream_factory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import * as zlib from 'zlib';

import type { Logger } from '@kbn/logging';

import { streamFactory } from './stream_factory';

interface MockItem {
Expand All @@ -24,8 +26,14 @@ const mockItem2: MockItem = {
};

describe('streamFactory', () => {
let mockLogger: Logger;

beforeEach(() => {
mockLogger = { error: jest.fn() } as unknown as Logger;
});

it('should encode and receive an uncompressed string based stream', async () => {
const { end, push, responseWithHeaders } = streamFactory({});
const { end, push, responseWithHeaders } = streamFactory({}, mockLogger);

push('push1');
push('push2');
Expand All @@ -41,7 +49,7 @@ describe('streamFactory', () => {
});

it('should encode and receive an uncompressed NDJSON based stream', async () => {
const { DELIMITER, end, push, responseWithHeaders } = streamFactory<MockItem>({});
const { DELIMITER, end, push, responseWithHeaders } = streamFactory<MockItem>({}, mockLogger);

push(mockItem1);
push(mockItem2);
Expand Down Expand Up @@ -74,9 +82,12 @@ describe('streamFactory', () => {
// without the need for additional custom code.
it('should encode and receive a compressed string based stream', (done) => {
(async () => {
const { end, push, responseWithHeaders } = streamFactory({
'accept-encoding': 'gzip',
});
const { end, push, responseWithHeaders } = streamFactory(
{
'accept-encoding': 'gzip',
},
mockLogger
);

push('push1');
push('push2');
Expand Down Expand Up @@ -104,9 +115,12 @@ describe('streamFactory', () => {

it('should encode and receive a compressed NDJSON based stream', (done) => {
(async () => {
const { DELIMITER, end, push, responseWithHeaders } = streamFactory<MockItem>({
'accept-encoding': 'gzip',
});
const { DELIMITER, end, push, responseWithHeaders } = streamFactory<MockItem>(
{
'accept-encoding': 'gzip',
},
mockLogger
);

push(mockItem1);
push(mockItem2);
Expand Down Expand Up @@ -140,49 +154,49 @@ describe('streamFactory', () => {
})();
});

it('should throw when a string based stream receives a non-string chunk', async () => {
const { push } = streamFactory({});
it('should log an error when a string based stream receives a non-string chunk', async () => {
const { push } = streamFactory({}, mockLogger);

// First push initializes the stream as string based.
expect(() => {
push('push1');
}).not.toThrow();
push('push1');
expect(mockLogger.error).toHaveBeenCalledTimes(0);

// Second push is again a string and should not throw.
expect(() => {
push('push2');
}).not.toThrow();
push('push2');
expect(mockLogger.error).toHaveBeenCalledTimes(0);

// Third push is not a string and should trigger an error.
expect(() => {
push({ myObject: 'push3' } as unknown as string);
}).toThrow('Must not push non-string chunks to a string based stream.');
push({ myObject: 'push3' } as unknown as string);
expect(mockLogger.error).toHaveBeenCalledTimes(1);
expect(mockLogger.error).toHaveBeenCalledWith(
'Must not push non-string chunks to a string based stream.'
);
});

it('should throw when an NDJSON based stream receives a string chunk', async () => {
const { push } = streamFactory<MockItem>({});
it('should log an error when an NDJSON based stream receives a string chunk', async () => {
const { push } = streamFactory<MockItem>({}, mockLogger);

// First push initializes the stream as NDJSON based.
expect(() => {
push(mockItem1);
}).not.toThrow();
push(mockItem1);
expect(mockLogger.error).toHaveBeenCalledTimes(0);

// Second push is again a valid object and should not throw.
expect(() => {
push(mockItem1);
}).not.toThrow();
push(mockItem1);
expect(mockLogger.error).toHaveBeenCalledTimes(0);

// Third push is a string and should trigger an error.
expect(() => {
push('push3' as unknown as MockItem);
}).toThrow('Must not push raw string chunks to an NDJSON based stream.');
push('push3' as unknown as MockItem);
expect(mockLogger.error).toHaveBeenCalledTimes(1);
expect(mockLogger.error).toHaveBeenCalledWith(
'Must not push raw string chunks to an NDJSON based stream.'
);
});

it('should throw for undefined as push value', async () => {
const { push } = streamFactory({});
it('should log an error for undefined as push value', async () => {
const { push } = streamFactory({}, mockLogger);

expect(() => {
push(undefined as unknown as string);
}).toThrow('Stream chunk must not be undefined.');
push(undefined as unknown as string);
expect(mockLogger.error).toHaveBeenCalledTimes(1);
expect(mockLogger.error).toHaveBeenCalledWith('Stream chunk must not be undefined.');
});
});
28 changes: 16 additions & 12 deletions x-pack/packages/ml/aiops_utils/src/lib/stream_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import { Stream } from 'stream';
import * as zlib from 'zlib';

import type { Logger } from '@kbn/logging';

import { acceptCompression } from './accept_compression';

/**
Expand All @@ -30,7 +32,6 @@ type StreamType = 'string' | 'ndjson';
interface StreamFactoryReturnType<T = unknown> {
DELIMITER: string;
end: () => void;
error: (errorText: string) => void;
push: (d: T) => void;
responseWithHeaders: {
body: zlib.Gzip | ResponseStream;
Expand All @@ -47,50 +48,53 @@ interface StreamFactoryReturnType<T = unknown> {
* @param headers - Request headers.
* @returns An object with stream attributes and methods.
*/
export function streamFactory<T = string>(headers: Headers): StreamFactoryReturnType<T>;
export function streamFactory<T = string>(
headers: Headers,
logger: Logger
): StreamFactoryReturnType<T>;
/**
* Sets up a response stream with support for gzip compression depending on provided
* request headers. Any non-string data pushed to the stream will be stream as NDJSON.
*
* @param headers - Request headers.
* @returns An object with stream attributes and methods.
*/
export function streamFactory<T = unknown>(headers: Headers): StreamFactoryReturnType<T> {
export function streamFactory<T = unknown>(
headers: Headers,
logger: Logger
): StreamFactoryReturnType<T> {
let streamType: StreamType;
const isCompressed = acceptCompression(headers);

const stream = isCompressed ? zlib.createGzip() : new ResponseStream();

function error(errorText: string) {
stream.emit('error', errorText);
}

function end() {
stream.end();
}

function push(d: T) {
if (d === undefined) {
error('Stream chunk must not be undefined.');
logger.error('Stream chunk must not be undefined.');
return;
}
// Initialize the stream type with the first push to the stream,
// otherwise check the integrity of the data to be pushed.
if (streamType === undefined) {
streamType = typeof d === 'string' ? 'string' : 'ndjson';
} else if (streamType === 'string' && typeof d !== 'string') {
error('Must not push non-string chunks to a string based stream.');
logger.error('Must not push non-string chunks to a string based stream.');
return;
} else if (streamType === 'ndjson' && typeof d === 'string') {
error('Must not push raw string chunks to an NDJSON based stream.');
logger.error('Must not push raw string chunks to an NDJSON based stream.');
return;
}

try {
const line = typeof d !== 'string' ? `${JSON.stringify(d)}${DELIMITER}` : d;
stream.write(line);
} catch (e) {
error(`Could not serialize or stream data chunk: ${e.toString()}`);
logger.error(`Could not serialize or stream data chunk: ${e.toString()}`);
return;
}

// Calling .flush() on a compression stream will
Expand All @@ -111,5 +115,5 @@ export function streamFactory<T = unknown>(headers: Headers): StreamFactoryRetur
: {}),
};

return { DELIMITER, end, error, push, responseWithHeaders };
return { DELIMITER, end, push, responseWithHeaders };
}
Loading