Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.4] [ML] Explain Log Rate Spikes: Fix error handling. (#137947) #138115

Merged
merged 1 commit into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/response_stream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ The request's headers get passed on to automatically identify if compression is
On the client, the custom hook is used like this:

```ts
const { error, start, cancel, data, isRunning } = useFetchStream<
const { errors, start, cancel, data, isRunning } = useFetchStream<
ApiSimpleStringStream, typeof basePath
>(`${basePath}/internal/response_stream/simple_string_stream`);
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export const PageReducerStream: FC = () => {

const [simulateErrors, setSimulateErrors] = useState(false);

const { dispatch, start, cancel, data, error, isCancelled, isRunning } = useFetchStream<
const { dispatch, start, cancel, data, errors, isCancelled, isRunning } = useFetchStream<
ApiReducerStream,
typeof basePath
>(
Expand All @@ -65,13 +65,15 @@ export const PageReducerStream: FC = () => {
}
};

// TODO This approach needs to be adapted as it might miss when error messages arrive bulk.
// This is for low level errors on the stream/HTTP level.
useEffect(() => {
if (error) {
notifications.toasts.addDanger(error);
if (errors.length > 0) {
notifications.toasts.addDanger(errors[errors.length - 1]);
}
}, [error, notifications.toasts]);
}, [errors, notifications.toasts]);

// TODO This approach needs to be adapted as it might miss when error messages arrive bulk.
// This is for errors on the application level
useEffect(() => {
if (data.errors.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export const PageSimpleStringStream: FC = () => {
const { core } = useDeps();
const basePath = core.http?.basePath.get() ?? '';

const { dispatch, error, start, cancel, data, isRunning } = useFetchStream<
const { dispatch, errors, start, cancel, data, isRunning } = useFetchStream<
ApiSimpleStringStream,
typeof basePath
>(`${basePath}/internal/response_stream/simple_string_stream`, { timeout: 500 });
Expand Down Expand Up @@ -61,9 +61,17 @@ export const PageSimpleStringStream: FC = () => {
<EuiText>
<p>{data}</p>
</EuiText>
{error && (
{errors.length > 0 && (
<EuiCallOut title="Sorry, there was an error" color="danger" iconType="alert">
<p>{error}</p>
{errors.length === 1 ? (
<p>{errors[0]}</p>
) : (
<ul>
{errors.map((e, i) => (
<li key={i}>{e}</li>
))}
</ul>
)}{' '}
</EuiCallOut>
)}
</Page>
Expand Down
29 changes: 18 additions & 11 deletions x-pack/packages/ml/aiops_utils/src/fetch_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,24 @@ export async function* fetchStream<I extends UseFetchStreamParamsDefault, BasePa
): AsyncGenerator<
[GeneratorError, ReducerAction<I['reducer']> | Array<ReducerAction<I['reducer']>> | undefined]
> {
const stream = await fetch(endpoint, {
signal: abortCtrl.current.signal,
method: 'POST',
headers: {
// This refers to the format of the request body,
// not the response, which will be a uint8array Buffer.
'Content-Type': 'application/json',
'kbn-xsrf': 'stream',
},
...(Object.keys(body).length > 0 ? { body: JSON.stringify(body) } : {}),
});
let stream: Response;

try {
stream = await fetch(endpoint, {
signal: abortCtrl.current.signal,
method: 'POST',
headers: {
// This refers to the format of the request body,
// not the response, which will be a uint8array Buffer.
'Content-Type': 'application/json',
'kbn-xsrf': 'stream',
},
...(Object.keys(body).length > 0 ? { body: JSON.stringify(body) } : {}),
});
} catch (error) {
yield [error.toString(), undefined];
return;
}

if (!stream.ok) {
yield [`Error ${stream.status}: ${stream.statusText}`, undefined];
Expand Down
16 changes: 10 additions & 6 deletions x-pack/packages/ml/aiops_utils/src/use_fetch_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ interface UseFetchStreamReturnType<Data, Action> {
cancel: () => void;
data: Data;
dispatch: Dispatch<Action>;
error: string | undefined;
errors: string[];
isCancelled: boolean;
isRunning: boolean;
start: () => Promise<void>;
Expand Down Expand Up @@ -76,7 +76,7 @@ export function useFetchStream<I extends UseFetchStreamParamsDefault, BasePath e
body: I['body'],
options?: { reducer: I['reducer']; initialState: ReducerState<I['reducer']> }
): UseFetchStreamReturnType<ReducerState<I['reducer']>, ReducerAction<I['reducer']>> {
const [error, setError] = useState<string | undefined>();
const [errors, setErrors] = useState<string[]>([]);
const [isCancelled, setIsCancelled] = useState(false);
const [isRunning, setIsRunning] = useState(false);

Expand All @@ -87,13 +87,17 @@ export function useFetchStream<I extends UseFetchStreamParamsDefault, BasePath e

const abortCtrl = useRef(new AbortController());

const addError = (error: string) => {
setErrors((prevErrors) => [...prevErrors, error]);
};

const start = async () => {
if (isRunning) {
setError('Restart not supported yet.');
addError('Restart not supported yet.');
return;
}

setError(undefined);
setErrors([]);
setIsRunning(true);
setIsCancelled(false);

Expand All @@ -104,7 +108,7 @@ export function useFetchStream<I extends UseFetchStreamParamsDefault, BasePath e
BasePath
>(endpoint, abortCtrl, body, options !== undefined)) {
if (fetchStreamError !== null) {
setError(fetchStreamError);
addError(fetchStreamError);
} else if (actions.length > 0) {
dispatch(actions as ReducerAction<I['reducer']>);
}
Expand All @@ -128,7 +132,7 @@ export function useFetchStream<I extends UseFetchStreamParamsDefault, BasePath e
cancel,
data,
dispatch,
error,
errors,
isCancelled,
isRunning,
start,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type { ChangePoint, ChangePointHistogram } from '@kbn/ml-agg-utils';
export const API_ACTION_NAME = {
ADD_CHANGE_POINTS: 'add_change_points',
ADD_CHANGE_POINTS_HISTOGRAM: 'add_change_points_histogram',
ERROR: 'error',
ADD_ERROR: 'add_error',
RESET: 'reset',
UPDATE_LOADING_STATE: 'update_loading_state',
} as const;
Expand Down Expand Up @@ -44,14 +44,14 @@ export function addChangePointsHistogramAction(
};
}

interface ApiActionError {
type: typeof API_ACTION_NAME.ERROR;
interface ApiActionAddError {
type: typeof API_ACTION_NAME.ADD_ERROR;
payload: string;
}

export function errorAction(payload: ApiActionError['payload']): ApiActionError {
export function addErrorAction(payload: ApiActionAddError['payload']): ApiActionAddError {
return {
type: API_ACTION_NAME.ERROR,
type: API_ACTION_NAME.ADD_ERROR,
payload,
};
}
Expand Down Expand Up @@ -85,6 +85,6 @@ export function updateLoadingStateAction(
export type AiopsExplainLogRateSpikesApiAction =
| ApiActionAddChangePoints
| ApiActionAddChangePointsHistogram
| ApiActionError
| ApiActionAddError
| ApiActionReset
| ApiActionUpdateLoadingState;
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
export {
addChangePointsAction,
addChangePointsHistogramAction,
errorAction,
addErrorAction,
resetAction,
updateLoadingStateAction,
API_ACTION_NAME,
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/aiops/common/api/stream_reducer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ describe('streamReducer', () => {
loaded: 50,
loadingState: 'Loaded 50%',
changePoints: [],
errors: [],
});
});

Expand Down
4 changes: 4 additions & 0 deletions x-pack/plugins/aiops/common/api/stream_reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import { API_ACTION_NAME, AiopsExplainLogRateSpikesApiAction } from './explain_l
interface StreamState {
ccsWarning: boolean;
changePoints: ChangePoint[];
errors: string[];
loaded: number;
loadingState: string;
}

export const initialState: StreamState = {
ccsWarning: false,
changePoints: [],
errors: [],
loaded: 0,
loadingState: '',
};
Expand All @@ -45,6 +47,8 @@ export function streamReducer(
return cp;
});
return { ...state, changePoints };
case API_ACTION_NAME.ADD_ERROR:
return { ...state, errors: [...state.errors, action.payload] };
case API_ACTION_NAME.RESET:
return initialState;
case API_ACTION_NAME.UPDATE_LOADING_STATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import React, { useEffect, useMemo, useState, FC } from 'react';
import { isEqual } from 'lodash';

import { EuiEmptyPrompt } from '@elastic/eui';
import { EuiCallOut, EuiEmptyPrompt, EuiSpacer, EuiText } from '@elastic/eui';

import type { DataView } from '@kbn/data-views-plugin/public';
import { ProgressControls } from '@kbn/aiops-components';
import { useFetchStream } from '@kbn/aiops-utils';
import type { WindowParameters } from '@kbn/aiops-utils';
import { i18n } from '@kbn/i18n';
import { FormattedMessage } from '@kbn/i18n-react';
import type { ChangePoint } from '@kbn/ml-agg-utils';
import type { Query } from '@kbn/es-query';
Expand Down Expand Up @@ -59,10 +60,13 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
WindowParameters | undefined
>();

const { cancel, start, data, isRunning, error } = useFetchStream<
ApiExplainLogRateSpikes,
typeof basePath
>(
const {
cancel,
start,
data,
isRunning,
errors: streamErrors,
} = useFetchStream<ApiExplainLogRateSpikes, typeof basePath>(
`${basePath}/internal/aiops/explain_log_rate_spikes`,
{
start: earliest,
Expand All @@ -76,11 +80,7 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
{ reducer: streamReducer, initialState }
);

useEffect(() => {
setCurrentAnalysisWindowParameters(windowParameters);
start();
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);
const errors = useMemo(() => [...streamErrors, ...data.errors], [streamErrors, data.errors]);

// Start handler clears possibly hovered or pinned
// change points on analysis refresh.
Expand All @@ -96,6 +96,12 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
start();
}

useEffect(() => {
setCurrentAnalysisWindowParameters(windowParameters);
start();
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);

const shouldRerunAnalysis = useMemo(
() =>
currentAnalysisWindowParameters !== undefined &&
Expand All @@ -115,6 +121,7 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
onCancel={cancel}
shouldRerunAnalysis={shouldRerunAnalysis}
/>
<EuiSpacer size="xs" />
{!isRunning && !showSpikeAnalysisTable && (
<EuiEmptyPrompt
title={
Expand All @@ -136,11 +143,37 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
}
/>
)}
{errors.length > 0 && (
<>
<EuiCallOut
title={i18n.translate('xpack.aiops.analysis.errorCallOutTitle', {
defaultMessage:
'The following {errorCount, plural, one {error} other {errors}} occurred running the analysis.',
values: { errorCount: errors.length },
})}
color="warning"
iconType="alert"
size="s"
>
<EuiText size="s">
{errors.length === 1 ? (
<p>{errors[0]}</p>
) : (
<ul>
{errors.map((e, i) => (
<li key={i}>{e}</li>
))}
</ul>
)}
</EuiText>
</EuiCallOut>
<EuiSpacer size="xs" />
</>
)}
{showSpikeAnalysisTable && (
<SpikeAnalysisTable
changePoints={data.changePoints}
loading={isRunning}
error={error}
onPinnedChangePoint={onPinnedChangePoint}
onSelectedChangePoint={onSelectedChangePoint}
selectedChangePoint={selectedChangePoint}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ const DEFAULT_SORT_DIRECTION = 'asc';

interface SpikeAnalysisTableProps {
changePoints: ChangePoint[];
error?: string;
loading: boolean;
onPinnedChangePoint?: (changePoint: ChangePoint | null) => void;
onSelectedChangePoint?: (changePoint: ChangePoint | null) => void;
Expand All @@ -43,7 +42,6 @@ interface SpikeAnalysisTableProps {

export const SpikeAnalysisTable: FC<SpikeAnalysisTableProps> = ({
changePoints,
error,
loading,
onPinnedChangePoint,
onSelectedChangePoint,
Expand Down Expand Up @@ -218,7 +216,6 @@ export const SpikeAnalysisTable: FC<SpikeAnalysisTableProps> = ({
onChange={onChange}
pagination={pagination}
loading={false}
error={error}
sorting={sorting as EuiTableSortingType<ChangePoint>}
rowProps={(changePoint) => {
return {
Expand Down
6 changes: 3 additions & 3 deletions x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
addChangePointsAction,
addChangePointsHistogramAction,
aiopsExplainLogRateSpikesSchema,
errorAction,
addErrorAction,
resetAction,
updateLoadingStateAction,
AiopsExplainLogRateSpikesApiAction,
Expand Down Expand Up @@ -112,7 +112,7 @@ export const defineExplainLogRateSpikesRoute = (
try {
fieldCandidates = await fetchFieldCandidates(client, request.body);
} catch (e) {
push(errorAction(e.toString()));
push(addErrorAction(e.toString()));
end();
return;
}
Expand Down Expand Up @@ -154,7 +154,7 @@ export const defineExplainLogRateSpikesRoute = (
try {
pValues = await fetchChangePointPValues(client, request.body, fieldCandidatesChunk);
} catch (e) {
push(errorAction(e.toString()));
push(addErrorAction(e.toString()));
end();
return;
}
Expand Down
Loading