Skip to content

Commit

Permalink
[ML] Explain Log Rate Spikes: Fix error handling. (#137947)
Browse files Browse the repository at this point in the history
- Errors on the application level were not correctly surfaced in the UI. This PR fixes it by combining "transport" stream errors and application errors in a callout above the analysis results table.
- This also fixes the problem where a partly populated results table would turn empty again when used with the error prop of EUI's table. We now keep the table on display an show the errors above it in the callout.
  • Loading branch information
walterra authored Aug 4, 2022
1 parent bfe35cc commit 70efbf0
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 50 deletions.
2 changes: 1 addition & 1 deletion examples/response_stream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ The request's headers get passed on to automatically identify if compression is
On the client, the custom hook is used like this:

```ts
const { error, start, cancel, data, isRunning } = useFetchStream<
const { errors, start, cancel, data, isRunning } = useFetchStream<
ApiSimpleStringStream, typeof basePath
>(`${basePath}/internal/response_stream/simple_string_stream`);
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export const PageReducerStream: FC = () => {

const [simulateErrors, setSimulateErrors] = useState(false);

const { dispatch, start, cancel, data, error, isCancelled, isRunning } = useFetchStream<
const { dispatch, start, cancel, data, errors, isCancelled, isRunning } = useFetchStream<
ApiReducerStream,
typeof basePath
>(
Expand All @@ -65,13 +65,15 @@ export const PageReducerStream: FC = () => {
}
};

// TODO This approach needs to be adapted as it might miss when error messages arrive bulk.
// This is for low level errors on the stream/HTTP level.
useEffect(() => {
if (error) {
notifications.toasts.addDanger(error);
if (errors.length > 0) {
notifications.toasts.addDanger(errors[errors.length - 1]);
}
}, [error, notifications.toasts]);
}, [errors, notifications.toasts]);

// TODO This approach needs to be adapted as it might miss when error messages arrive bulk.
// This is for errors on the application level
useEffect(() => {
if (data.errors.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export const PageSimpleStringStream: FC = () => {
const { core } = useDeps();
const basePath = core.http?.basePath.get() ?? '';

const { dispatch, error, start, cancel, data, isRunning } = useFetchStream<
const { dispatch, errors, start, cancel, data, isRunning } = useFetchStream<
ApiSimpleStringStream,
typeof basePath
>(`${basePath}/internal/response_stream/simple_string_stream`, { timeout: 500 });
Expand Down Expand Up @@ -61,9 +61,17 @@ export const PageSimpleStringStream: FC = () => {
<EuiText>
<p>{data}</p>
</EuiText>
{error && (
{errors.length > 0 && (
<EuiCallOut title="Sorry, there was an error" color="danger" iconType="alert">
<p>{error}</p>
{errors.length === 1 ? (
<p>{errors[0]}</p>
) : (
<ul>
{errors.map((e, i) => (
<li key={i}>{e}</li>
))}
</ul>
)}{' '}
</EuiCallOut>
)}
</Page>
Expand Down
29 changes: 18 additions & 11 deletions x-pack/packages/ml/aiops_utils/src/fetch_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,24 @@ export async function* fetchStream<I extends UseFetchStreamParamsDefault, BasePa
): AsyncGenerator<
[GeneratorError, ReducerAction<I['reducer']> | Array<ReducerAction<I['reducer']>> | undefined]
> {
const stream = await fetch(endpoint, {
signal: abortCtrl.current.signal,
method: 'POST',
headers: {
// This refers to the format of the request body,
// not the response, which will be a uint8array Buffer.
'Content-Type': 'application/json',
'kbn-xsrf': 'stream',
},
...(Object.keys(body).length > 0 ? { body: JSON.stringify(body) } : {}),
});
let stream: Response;

try {
stream = await fetch(endpoint, {
signal: abortCtrl.current.signal,
method: 'POST',
headers: {
// This refers to the format of the request body,
// not the response, which will be a uint8array Buffer.
'Content-Type': 'application/json',
'kbn-xsrf': 'stream',
},
...(Object.keys(body).length > 0 ? { body: JSON.stringify(body) } : {}),
});
} catch (error) {
yield [error.toString(), undefined];
return;
}

if (!stream.ok) {
yield [`Error ${stream.status}: ${stream.statusText}`, undefined];
Expand Down
16 changes: 10 additions & 6 deletions x-pack/packages/ml/aiops_utils/src/use_fetch_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ interface UseFetchStreamReturnType<Data, Action> {
cancel: () => void;
data: Data;
dispatch: Dispatch<Action>;
error: string | undefined;
errors: string[];
isCancelled: boolean;
isRunning: boolean;
start: () => Promise<void>;
Expand Down Expand Up @@ -76,7 +76,7 @@ export function useFetchStream<I extends UseFetchStreamParamsDefault, BasePath e
body: I['body'],
options?: { reducer: I['reducer']; initialState: ReducerState<I['reducer']> }
): UseFetchStreamReturnType<ReducerState<I['reducer']>, ReducerAction<I['reducer']>> {
const [error, setError] = useState<string | undefined>();
const [errors, setErrors] = useState<string[]>([]);
const [isCancelled, setIsCancelled] = useState(false);
const [isRunning, setIsRunning] = useState(false);

Expand All @@ -87,13 +87,17 @@ export function useFetchStream<I extends UseFetchStreamParamsDefault, BasePath e

const abortCtrl = useRef(new AbortController());

const addError = (error: string) => {
setErrors((prevErrors) => [...prevErrors, error]);
};

const start = async () => {
if (isRunning) {
setError('Restart not supported yet.');
addError('Restart not supported yet.');
return;
}

setError(undefined);
setErrors([]);
setIsRunning(true);
setIsCancelled(false);

Expand All @@ -104,7 +108,7 @@ export function useFetchStream<I extends UseFetchStreamParamsDefault, BasePath e
BasePath
>(endpoint, abortCtrl, body, options !== undefined)) {
if (fetchStreamError !== null) {
setError(fetchStreamError);
addError(fetchStreamError);
} else if (actions.length > 0) {
dispatch(actions as ReducerAction<I['reducer']>);
}
Expand All @@ -128,7 +132,7 @@ export function useFetchStream<I extends UseFetchStreamParamsDefault, BasePath e
cancel,
data,
dispatch,
error,
errors,
isCancelled,
isRunning,
start,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type { ChangePoint, ChangePointHistogram } from '@kbn/ml-agg-utils';
export const API_ACTION_NAME = {
ADD_CHANGE_POINTS: 'add_change_points',
ADD_CHANGE_POINTS_HISTOGRAM: 'add_change_points_histogram',
ERROR: 'error',
ADD_ERROR: 'add_error',
RESET: 'reset',
UPDATE_LOADING_STATE: 'update_loading_state',
} as const;
Expand Down Expand Up @@ -44,14 +44,14 @@ export function addChangePointsHistogramAction(
};
}

interface ApiActionError {
type: typeof API_ACTION_NAME.ERROR;
interface ApiActionAddError {
type: typeof API_ACTION_NAME.ADD_ERROR;
payload: string;
}

export function errorAction(payload: ApiActionError['payload']): ApiActionError {
export function addErrorAction(payload: ApiActionAddError['payload']): ApiActionAddError {
return {
type: API_ACTION_NAME.ERROR,
type: API_ACTION_NAME.ADD_ERROR,
payload,
};
}
Expand Down Expand Up @@ -85,6 +85,6 @@ export function updateLoadingStateAction(
export type AiopsExplainLogRateSpikesApiAction =
| ApiActionAddChangePoints
| ApiActionAddChangePointsHistogram
| ApiActionError
| ApiActionAddError
| ApiActionReset
| ApiActionUpdateLoadingState;
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
export {
addChangePointsAction,
addChangePointsHistogramAction,
errorAction,
addErrorAction,
resetAction,
updateLoadingStateAction,
API_ACTION_NAME,
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/aiops/common/api/stream_reducer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ describe('streamReducer', () => {
loaded: 50,
loadingState: 'Loaded 50%',
changePoints: [],
errors: [],
});
});

Expand Down
4 changes: 4 additions & 0 deletions x-pack/plugins/aiops/common/api/stream_reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import { API_ACTION_NAME, AiopsExplainLogRateSpikesApiAction } from './explain_l
interface StreamState {
ccsWarning: boolean;
changePoints: ChangePoint[];
errors: string[];
loaded: number;
loadingState: string;
}

export const initialState: StreamState = {
ccsWarning: false,
changePoints: [],
errors: [],
loaded: 0,
loadingState: '',
};
Expand All @@ -45,6 +47,8 @@ export function streamReducer(
return cp;
});
return { ...state, changePoints };
case API_ACTION_NAME.ADD_ERROR:
return { ...state, errors: [...state.errors, action.payload] };
case API_ACTION_NAME.RESET:
return initialState;
case API_ACTION_NAME.UPDATE_LOADING_STATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import React, { useEffect, useMemo, useState, FC } from 'react';
import { isEqual } from 'lodash';

import { EuiEmptyPrompt } from '@elastic/eui';
import { EuiCallOut, EuiEmptyPrompt, EuiSpacer, EuiText } from '@elastic/eui';

import type { DataView } from '@kbn/data-views-plugin/public';
import { ProgressControls } from '@kbn/aiops-components';
import { useFetchStream } from '@kbn/aiops-utils';
import type { WindowParameters } from '@kbn/aiops-utils';
import { i18n } from '@kbn/i18n';
import { FormattedMessage } from '@kbn/i18n-react';
import type { ChangePoint } from '@kbn/ml-agg-utils';
import type { Query } from '@kbn/es-query';
Expand Down Expand Up @@ -59,10 +60,13 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
WindowParameters | undefined
>();

const { cancel, start, data, isRunning, error } = useFetchStream<
ApiExplainLogRateSpikes,
typeof basePath
>(
const {
cancel,
start,
data,
isRunning,
errors: streamErrors,
} = useFetchStream<ApiExplainLogRateSpikes, typeof basePath>(
`${basePath}/internal/aiops/explain_log_rate_spikes`,
{
start: earliest,
Expand All @@ -76,11 +80,7 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
{ reducer: streamReducer, initialState }
);

useEffect(() => {
setCurrentAnalysisWindowParameters(windowParameters);
start();
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);
const errors = useMemo(() => [...streamErrors, ...data.errors], [streamErrors, data.errors]);

// Start handler clears possibly hovered or pinned
// change points on analysis refresh.
Expand All @@ -96,6 +96,12 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
start();
}

useEffect(() => {
setCurrentAnalysisWindowParameters(windowParameters);
start();
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);

const shouldRerunAnalysis = useMemo(
() =>
currentAnalysisWindowParameters !== undefined &&
Expand All @@ -115,6 +121,7 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
onCancel={cancel}
shouldRerunAnalysis={shouldRerunAnalysis}
/>
<EuiSpacer size="xs" />
{!isRunning && !showSpikeAnalysisTable && (
<EuiEmptyPrompt
title={
Expand All @@ -136,11 +143,37 @@ export const ExplainLogRateSpikesAnalysis: FC<ExplainLogRateSpikesAnalysisProps>
}
/>
)}
{errors.length > 0 && (
<>
<EuiCallOut
title={i18n.translate('xpack.aiops.analysis.errorCallOutTitle', {
defaultMessage:
'The following {errorCount, plural, one {error} other {errors}} occurred running the analysis.',
values: { errorCount: errors.length },
})}
color="warning"
iconType="alert"
size="s"
>
<EuiText size="s">
{errors.length === 1 ? (
<p>{errors[0]}</p>
) : (
<ul>
{errors.map((e, i) => (
<li key={i}>{e}</li>
))}
</ul>
)}
</EuiText>
</EuiCallOut>
<EuiSpacer size="xs" />
</>
)}
{showSpikeAnalysisTable && (
<SpikeAnalysisTable
changePoints={data.changePoints}
loading={isRunning}
error={error}
onPinnedChangePoint={onPinnedChangePoint}
onSelectedChangePoint={onSelectedChangePoint}
selectedChangePoint={selectedChangePoint}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ const DEFAULT_SORT_DIRECTION = 'asc';

interface SpikeAnalysisTableProps {
changePoints: ChangePoint[];
error?: string;
loading: boolean;
onPinnedChangePoint?: (changePoint: ChangePoint | null) => void;
onSelectedChangePoint?: (changePoint: ChangePoint | null) => void;
Expand All @@ -43,7 +42,6 @@ interface SpikeAnalysisTableProps {

export const SpikeAnalysisTable: FC<SpikeAnalysisTableProps> = ({
changePoints,
error,
loading,
onPinnedChangePoint,
onSelectedChangePoint,
Expand Down Expand Up @@ -218,7 +216,6 @@ export const SpikeAnalysisTable: FC<SpikeAnalysisTableProps> = ({
onChange={onChange}
pagination={pagination}
loading={false}
error={error}
sorting={sorting as EuiTableSortingType<ChangePoint>}
rowProps={(changePoint) => {
return {
Expand Down
6 changes: 3 additions & 3 deletions x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
addChangePointsAction,
addChangePointsHistogramAction,
aiopsExplainLogRateSpikesSchema,
errorAction,
addErrorAction,
resetAction,
updateLoadingStateAction,
AiopsExplainLogRateSpikesApiAction,
Expand Down Expand Up @@ -112,7 +112,7 @@ export const defineExplainLogRateSpikesRoute = (
try {
fieldCandidates = await fetchFieldCandidates(client, request.body);
} catch (e) {
push(errorAction(e.toString()));
push(addErrorAction(e.toString()));
end();
return;
}
Expand Down Expand Up @@ -154,7 +154,7 @@ export const defineExplainLogRateSpikesRoute = (
try {
pValues = await fetchChangePointPValues(client, request.body, fieldCandidatesChunk);
} catch (e) {
push(errorAction(e.toString()));
push(addErrorAction(e.toString()));
end();
return;
}
Expand Down
Loading

0 comments on commit 70efbf0

Please sign in to comment.