Skip to content

Commit

Permalink
migrationsv2: handle 413 errors and log the request details for unexp…
Browse files Browse the repository at this point in the history
…ected ES failures (#108213) (#108658)

* Log the failing request and response code when an action throws a response error

* Provide useful log message when migrations fail due to ES 413 Request Entity Too Large

* Don't log request body for unexpected ES request failures

* Fix types

* CR feedback: fix order of ES request debug log

Co-authored-by: Kibana Machine <[email protected]>
# Conflicts:
#	src/core/server/elasticsearch/client/configure_client.ts
#	src/core/server/elasticsearch/index.ts
  • Loading branch information
rudolf authored Aug 16, 2021
1 parent 7fdc9ab commit 7fb9fbf
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 38 deletions.
40 changes: 30 additions & 10 deletions src/core/server/elasticsearch/client/configure_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ function ensureString(body: RequestBody): string {
return JSON.stringify(body);
}

function getErrorMessage(error: ApiError, event: RequestEvent): string {
/**
* Returns a debug message from an Elasticsearch error in the following format:
* [error type] error reason
*/
export function getErrorMessage(error: ApiError): string {
if (error instanceof errors.ResponseError) {
return `${getResponseMessage(event)} [${event.body?.error?.type}]: ${
event.body?.error?.reason ?? error.message
}`;
return `[${error.meta.body?.error?.type}]: ${error.meta.body?.error?.reason ?? error.message}`;
}
return `[${error.name}]: ${error.message}`;
}
Expand All @@ -58,26 +60,44 @@ function getErrorMessage(error: ApiError, event: RequestEvent): string {
* returns a string in format:
*
* status code
* URL
* method URL
* request body
*
* so it could be copy-pasted into the Dev console
*/
function getResponseMessage(event: RequestEvent): string {
const params = event.meta.request.params;
const errorMeta = getRequestDebugMeta(event);
const body = errorMeta.body ? `\n${errorMeta.body}` : '';
return `${errorMeta.statusCode}\n${errorMeta.method} ${errorMeta.url}${body}`;
}

/**
* Returns stringified debug information from an Elasticsearch request event
* useful for logging in case of an unexpected failure.
*/
export function getRequestDebugMeta(
event: RequestEvent
): { url: string; body: string; statusCode: number | null; method: string } {
const params = event.meta.request.params;
// definition is wrong, `params.querystring` can be either a string or an object
const querystring = convertQueryString(params.querystring);
const url = `${params.path}${querystring ? `?${querystring}` : ''}`;
const body = params.body ? `\n${ensureString(params.body)}` : '';
return `${event.statusCode}\n${params.method} ${url}${body}`;
return {
url: `${params.path}${querystring ? `?${querystring}` : ''}`,
body: params.body ? `${ensureString(params.body)}` : '',
method: params.method,
statusCode: event.statusCode,
};
}

const addLogging = (client: Client, logger: Logger) => {
client.on('response', (error, event) => {
if (event) {
if (error) {
logger.debug(getErrorMessage(error, event));
if (error instanceof errors.ResponseError) {
logger.debug(`${getResponseMessage(event)} ${getErrorMessage(error)}`);
} else {
logger.debug(getErrorMessage(error));
}
} else {
logger.debug(getResponseMessage(event));
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/server/elasticsearch/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ export type { IScopedClusterClient } from './scoped_cluster_client';
export type { ElasticsearchClientConfig } from './client_config';
export { ClusterClient } from './cluster_client';
export type { IClusterClient, ICustomClusterClient } from './cluster_client';
export { configureClient } from './configure_client';
export { configureClient, getRequestDebugMeta, getErrorMessage } from './configure_client';
export { retryCallCluster, migrationRetryCallCluster } from './retry_call_cluster';
1 change: 1 addition & 0 deletions src/core/server/elasticsearch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ export type {
GetResponse,
DeleteDocumentResponse,
} from './client';
export { getRequestDebugMeta, getErrorMessage } from './client';
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import type { estypes } from '@elastic/elasticsearch';
import { errors as esErrors, estypes } from '@elastic/elasticsearch';
import { ElasticsearchClient } from '../../../elasticsearch';
import type { SavedObjectsRawDoc } from '../../serialization';
import {
Expand All @@ -17,7 +17,7 @@ import {
} from './catch_retryable_es_client_errors';
import { isWriteBlockException } from './es_errors';
import { WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE } from './constants';
import type { TargetIndexHadWriteBlock } from './index';
import type { TargetIndexHadWriteBlock, RequestEntityTooLargeException } from './index';

/** @internal */
export interface BulkOverwriteTransformedDocumentsParams {
Expand All @@ -37,7 +37,7 @@ export const bulkOverwriteTransformedDocuments = ({
transformedDocs,
refresh = false,
}: BulkOverwriteTransformedDocumentsParams): TaskEither.TaskEither<
RetryableEsClientError | TargetIndexHadWriteBlock,
RetryableEsClientError | TargetIndexHadWriteBlock | RequestEntityTooLargeException,
'bulk_index_succeeded'
> => () => {
return client
Expand Down Expand Up @@ -90,5 +90,12 @@ export const bulkOverwriteTransformedDocuments = ({
throw new Error(JSON.stringify(errors));
}
})
.catch((error) => {
if (error instanceof esErrors.ResponseError && error.statusCode === 413) {
return Either.left({ type: 'request_entity_too_large_exception' as const });
} else {
throw error;
}
})
.catch(catchRetryableEsClientErrors);
};
5 changes: 5 additions & 0 deletions src/core/server/saved_objects/migrationsv2/actions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ export interface TargetIndexHadWriteBlock {
type: 'target_index_had_write_block';
}

export interface RequestEntityTooLargeException {
type: 'request_entity_too_large_exception';
}

/** @internal */
export interface AcknowledgeResponse {
acknowledged: boolean;
Expand All @@ -136,6 +140,7 @@ export interface ActionErrorTypeMap {
alias_not_found_exception: AliasNotFound;
remove_index_not_a_concrete_index: RemoveIndexNotAConcreteIndex;
documents_transform_failed: DocumentsTransformFailed;
request_entity_too_large_exception: RequestEntityTooLargeException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ import { TaskEither } from 'fp-ts/lib/TaskEither';

const { startES } = kbnTestServer.createTestServers({
adjustTimeout: (t: number) => jest.setTimeout(t),
settings: {
es: {
license: 'basic',
esArgs: ['http.max_content_length=10Kb'],
},
},
});
let esServer: kbnTestServer.TestElasticsearchUtils;

Expand Down Expand Up @@ -1471,11 +1477,11 @@ describe('migration actions', () => {
});

await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
"right": "bulk_index_succeeded",
}
`);
Object {
"_tag": "Right",
"right": "bulk_index_succeeded",
}
`);
});
it('resolves right even if there were some version_conflict_engine_exception', async () => {
const existingDocs = ((await searchForOutdatedDocuments(client, {
Expand All @@ -1500,7 +1506,7 @@ describe('migration actions', () => {
}
`);
});
it('resolves left if there are write_block errors', async () => {
it('resolves left target_index_had_write_block if there are write_block errors', async () => {
const newDocs = ([
{ _source: { title: 'doc 5' } },
{ _source: { title: 'doc 6' } },
Expand All @@ -1514,13 +1520,34 @@ describe('migration actions', () => {
refresh: 'wait_for',
})()
).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"type": "target_index_had_write_block",
},
}
`);
Object {
"_tag": "Left",
"left": Object {
"type": "target_index_had_write_block",
},
}
`);
});
it('resolves left request_entity_too_large_exception when the payload is too large', async () => {
const newDocs = new Array(10000).fill({
_source: {
title:
'how do I create a document thats large enoug to exceed the limits without typing long sentences',
},
}) as SavedObjectsRawDoc[];
const task = bulkOverwriteTransformedDocuments({
client,
index: 'existing_index_with_docs',
transformedDocs: newDocs,
});
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"type": "request_entity_too_large_exception",
},
}
`);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ describe('migrationsStateActionMachine', () => {
next: () => {
throw new ResponseError(
elasticsearchClientMock.createApiResponse({
meta: {
request: { options: {}, id: '', params: { method: 'POST', path: '/mock' } },
} as any,
body: {
error: {
type: 'snapshot_in_progress_exception',
Expand All @@ -365,14 +368,14 @@ describe('migrationsStateActionMachine', () => {
client: esClient,
})
).rejects.toMatchInlineSnapshot(
`[Error: Unable to complete saved object migrations for the [.my-so-index] index. Please check the health of your Elasticsearch cluster and try again. Error: [snapshot_in_progress_exception]: Cannot delete indices that are being snapshotted]`
`[Error: Unable to complete saved object migrations for the [.my-so-index] index. Please check the health of your Elasticsearch cluster and try again. Unexpected Elasticsearch ResponseError: statusCode: 200, method: POST, url: /mock error: [snapshot_in_progress_exception]: Cannot delete indices that are being snapshotted,]`
);
expect(loggingSystemMock.collect(mockLogger)).toMatchInlineSnapshot(`
Object {
"debug": Array [],
"error": Array [
Array [
"[.my-so-index] [snapshot_in_progress_exception]: Cannot delete indices that are being snapshotted",
"[.my-so-index] Unexpected Elasticsearch ResponseError: statusCode: 200, method: POST, url: /mock error: [snapshot_in_progress_exception]: Cannot delete indices that are being snapshotted,",
],
Array [
"[.my-so-index] migration failed, dumping execution log:",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { errors as EsErrors } from '@elastic/elasticsearch';
import * as Option from 'fp-ts/lib/Option';
import { Logger, LogMeta } from '../../logging';
import type { ElasticsearchClient } from '../../elasticsearch';
import { getErrorMessage, getRequestDebugMeta } from '../../elasticsearch';
import { Model, Next, stateActionMachine } from './state_action_machine';
import { cleanup } from './migrations_state_machine_cleanup';
import { State } from './types';
Expand Down Expand Up @@ -196,16 +197,19 @@ export async function migrationStateActionMachine({
} catch (e) {
await cleanup(client, executionLog, lastState);
if (e instanceof EsErrors.ResponseError) {
logger.error(
logMessagePrefix + `[${e.body?.error?.type}]: ${e.body?.error?.reason ?? e.message}`
);
// Log the failed request. This is very similar to the
// elasticsearch-service's debug logs, but we log everything in single
// line until we have sub-ms resolution in our cloud logs. Because this
// is error level logs, we're also more careful and don't log the request
// body since this can very likely have sensitive saved objects.
const req = getRequestDebugMeta(e.meta);
const failedRequestMessage = `Unexpected Elasticsearch ResponseError: statusCode: ${
req.statusCode
}, method: ${req.method}, url: ${req.url} error: ${getErrorMessage(e)},`;
logger.error(logMessagePrefix + failedRequestMessage);
dumpExecutionLog(logger, logMessagePrefix, executionLog);
throw new Error(
`Unable to complete saved object migrations for the [${
initialState.indexPrefix
}] index. Please check the health of your Elasticsearch cluster and try again. Error: [${
e.body?.error?.type
}]: ${e.body?.error?.reason ?? e.message}`
`Unable to complete saved object migrations for the [${initialState.indexPrefix}] index. Please check the health of your Elasticsearch cluster and try again. ${failedRequestMessage}`
);
} else {
logger.error(e);
Expand Down
21 changes: 21 additions & 0 deletions src/core/server/saved_objects/migrationsv2/model/model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,16 @@ describe('migrations v2 model', () => {
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});
test('REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> FATAL if action returns left request_entity_too_large_exception', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'> = Either.left({
type: 'request_entity_too_large_exception',
});
const newState = model(reindexSourceToTempIndexBulkState, res) as FatalState;
expect(newState.controlState).toEqual('FATAL');
expect(newState.reason).toMatchInlineSnapshot(
`"While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Try to use smaller batches by changing the Kibana 'migrations.batchSize' configuration option and restarting Kibana."`
);
});
test('REINDEX_SOURCE_TO_TEMP_INDEX_BULK should throw a throwBadResponse error if action failed', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'> = Either.left({
type: 'retryable_es_client_error',
Expand Down Expand Up @@ -1532,6 +1542,17 @@ describe('migrations v2 model', () => {
expect(newState.retryCount).toEqual(1);
expect(newState.retryDelay).toEqual(2000);
});

test('TRANSFORMED_DOCUMENTS_BULK_INDEX -> FATAL if action returns left request_entity_too_large_exception', () => {
const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.left({
type: 'request_entity_too_large_exception',
});
const newState = model(transformedDocumentsBulkIndexState, res) as FatalState;
expect(newState.controlState).toEqual('FATAL');
expect(newState.reason).toMatchInlineSnapshot(
`"While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Try to use smaller batches by changing the Kibana 'migrations.batchSize' configuration option and restarting Kibana."`
);
});
});

describe('UPDATE_TARGET_MAPPINGS', () => {
Expand Down
20 changes: 19 additions & 1 deletion src/core/server/saved_objects/migrationsv2/model/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,12 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
...stateP,
controlState: 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT',
};
} else if (isLeftTypeof(res.left, 'request_entity_too_large_exception')) {
return {
...stateP,
controlState: 'FATAL',
reason: `While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Try to use smaller batches by changing the Kibana 'migrations.batchSize' configuration option and restarting Kibana.`,
};
}
throwBadResponse(stateP, res.left);
}
Expand Down Expand Up @@ -709,7 +715,19 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
hasTransformedDocs: true,
};
} else {
throwBadResponse(stateP, res as never);
if (isLeftTypeof(res.left, 'request_entity_too_large_exception')) {
return {
...stateP,
controlState: 'FATAL',
reason: `While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Try to use smaller batches by changing the Kibana 'migrations.batchSize' configuration option and restarting Kibana.`,
};
} else if (isLeftTypeof(res.left, 'target_index_had_write_block')) {
// we fail on this error since the target index will only have a write
// block if a newer version of Kibana started an upgrade
throwBadResponse(stateP, res.left as never);
} else {
throwBadResponse(stateP, res.left);
}
}
} else if (stateP.controlState === 'UPDATE_TARGET_MAPPINGS') {
const res = resW as ExcludeRetryableEsError<ResponseType<typeof stateP.controlState>>;
Expand Down

0 comments on commit 7fb9fbf

Please sign in to comment.