From 019eb09ced1f3a7f9be842d1c59da1fc476e68cd Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Mon, 13 Mar 2023 13:18:17 -0400 Subject: [PATCH 01/15] Use batchSize config for update_by_query in updateAndPickupMappings --- .../src/actions/pickup_updated_mappings.test.ts | 2 +- .../src/actions/pickup_updated_mappings.ts | 6 +++--- .../src/actions/update_and_pickup_mappings.test.ts | 2 ++ .../src/actions/update_and_pickup_mappings.ts | 4 +++- .../src/next.ts | 1 + .../migrations/group3/actions/actions.test.ts | 13 +++++++++---- 6 files changed, 19 insertions(+), 9 deletions(-) diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/pickup_updated_mappings.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/pickup_updated_mappings.test.ts index 41253ebcd1ed2..9d2c1092e6363 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/pickup_updated_mappings.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/pickup_updated_mappings.test.ts @@ -29,7 +29,7 @@ describe('pickupUpdatedMappings', () => { elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) ); it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = pickupUpdatedMappings(client, 'my_index'); + const task = pickupUpdatedMappings(client, 'my_index', 1000); try { await task(); } catch (e) { diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/pickup_updated_mappings.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/pickup_updated_mappings.ts index 0e34857b4f208..8b6205cb7cc6f 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/pickup_updated_mappings.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/pickup_updated_mappings.ts @@ -13,7 +13,6 @@ import { catchRetryableEsClientErrors, type RetryableEsClientError, } from './catch_retryable_es_client_errors'; -import { BATCH_SIZE } from './constants'; export interface UpdateByQueryResponse { taskId: string; @@ -35,7 +34,8 @@ export interface UpdateByQueryResponse { export const pickupUpdatedMappings = ( client: ElasticsearchClient, - index: string + index: string, + batchSize: number ): TaskEither.TaskEither => () => { return client @@ -46,7 +46,7 @@ export const pickupUpdatedMappings = allow_no_indices: false, index, // How many documents to update per batch - scroll_size: BATCH_SIZE, + scroll_size: batchSize, // force a refresh so that we can query the updated index immediately // after the operation completes refresh: true, diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_and_pickup_mappings.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_and_pickup_mappings.test.ts index da243af9a7ebc..6b227ea2ef66a 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_and_pickup_mappings.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_and_pickup_mappings.test.ts @@ -36,6 +36,7 @@ describe('updateAndPickupMappings', () => { client, index: 'new_index', mappings: { properties: {} }, + batchSize: 1000, }); try { await task(); @@ -65,6 +66,7 @@ describe('updateAndPickupMappings', () => { }, }, }, + batchSize: 1000, }); try { await task(); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_and_pickup_mappings.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_and_pickup_mappings.ts index 653a90746dea0..58fd65c9718d0 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_and_pickup_mappings.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_and_pickup_mappings.ts @@ -28,6 +28,7 @@ export interface UpdateAndPickupMappingsParams { client: ElasticsearchClient; index: string; mappings: IndexMapping; + batchSize: number; } /** * Updates an index's mappings and runs an pickupUpdatedMappings task so that the mapping @@ -37,6 +38,7 @@ export const updateAndPickupMappings = ({ client, index, mappings, + batchSize, }: UpdateAndPickupMappingsParams): TaskEither.TaskEither< RetryableEsClientError, UpdateAndPickupMappingsResponse @@ -74,7 +76,7 @@ export const updateAndPickupMappings = ({ return pipe( putMappingTask, TaskEither.chain((res) => { - return pickupUpdatedMappings(client, index); + return pickupUpdatedMappings(client, index, batchSize); }) ); }; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts index 8cebce9995900..6bc8e105f76ae 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts @@ -172,6 +172,7 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra client, index: state.targetIndex, mappings: omit(state.targetIndexMappings, ['_meta']), // ._meta property will be updated on a later step + batchSize: state.batchSize, }), UPDATE_TARGET_MAPPINGS_WAIT_FOR_TASK: (state: UpdateTargetMappingsWaitForTaskState) => Actions.waitForPickupUpdatedMappingsTask({ diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts index 64592be985719..fb2bf61175500 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts @@ -1351,7 +1351,8 @@ describe('migration actions', () => { it('rejects if there are failures', async () => { const res = (await pickupUpdatedMappings( client, - 'existing_index_with_write_block' + 'existing_index_with_write_block', + 1000 )()) as Either.Right; const task = waitForPickupUpdatedMappingsTask({ @@ -1370,7 +1371,8 @@ describe('migration actions', () => { it('rejects if there is an error', async () => { const res = (await pickupUpdatedMappings( client, - 'no_such_index' + 'no_such_index', + 1000 )()) as Either.Right; const task = waitForPickupUpdatedMappingsTask({ @@ -1385,7 +1387,8 @@ describe('migration actions', () => { it('resolves left wait_for_task_completion_timeout when the task does not complete within the timeout', async () => { const res = (await pickupUpdatedMappings( client, - '.kibana_1' + '.kibana_1', + 1000 )()) as Either.Right; const task = waitForPickupUpdatedMappingsTask({ @@ -1408,7 +1411,8 @@ describe('migration actions', () => { it('resolves right when successful', async () => { const res = (await pickupUpdatedMappings( client, - 'existing_index_with_docs' + 'existing_index_with_docs', + 1000 )()) as Either.Right; const task = waitForPickupUpdatedMappingsTask({ @@ -1471,6 +1475,7 @@ describe('migration actions', () => { title: { type: 'text' }, }, }, + batchSize: 1000, })(); expect(Either.isRight(res)).toBe(true); const taskId = (res as Either.Right).right.taskId; From ae9cdc96831b2d4cf51c6c55ff0eeecd41a36762 Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Wed, 15 Mar 2023 07:20:36 -0400 Subject: [PATCH 02/15] Add batchSize to ZDT --- .../src/zdt/context/create_context.ts | 1 + .../src/zdt/context/types.ts | 2 ++ .../src/zdt/next.ts | 1 + .../src/zdt/test_helpers/context.ts | 1 + 4 files changed, 5 insertions(+) diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/create_context.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/create_context.ts index 7a660ea470443..14326fc7b6fc7 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/create_context.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/create_context.ts @@ -37,5 +37,6 @@ export const createContext = ({ maxRetryAttempts: migrationConfig.retryAttempts, migrationDocLinks: docLinks.links.kibanaUpgradeSavedObjects, deletedTypes: REMOVED_TYPES, + batchSize: migrationConfig.batchSize, }; }; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/types.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/types.ts index 5b6d4b2fe27e9..dc76dbc4e0ca9 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/types.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/types.ts @@ -38,4 +38,6 @@ export interface MigratorContext { readonly typeRegistry: ISavedObjectTypeRegistry; /** List of types that are no longer registered */ readonly deletedTypes: string[]; + /** The number of documents to process at a time */ + batchSize: number; } diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/next.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/next.ts index a85e9bfde6b56..bb139fc375188 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/next.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/next.ts @@ -51,6 +51,7 @@ export const nextActionMap = (context: MigratorContext) => { client, index: state.currentIndex, mappings: { properties: state.additiveMappingChanges }, + batchSize: context.batchSize, }), UPDATE_INDEX_MAPPINGS_WAIT_FOR_TASK: (state: UpdateIndexMappingsWaitForTaskState) => Actions.waitForPickupUpdatedMappingsTask({ diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/test_helpers/context.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/test_helpers/context.ts index faf9f9c89c9f5..6ed068461e2a3 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/test_helpers/context.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/test_helpers/context.ts @@ -39,6 +39,7 @@ export const createContextMock = ( typeRegistry, serializer: serializerMock.create(), deletedTypes: ['deleted-type'], + batchSize: 1000, ...parts, }; }; From e7d328e4e7830e958324266901f74063f1ed2ef1 Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Fri, 12 May 2023 15:35:39 +0200 Subject: [PATCH 03/15] Migrations: dynamically adjust batchSize when reading --- .../src/actions/index.ts | 4 +- .../src/actions/read_with_pit.ts | 22 +++++++++- .../src/model/helpers.ts | 9 ++++ .../src/model/model.ts | 43 ++++++++++++++++++- .../src/state.ts | 7 ++- .../src/test.json | 20 +++++++++ 6 files changed, 100 insertions(+), 5 deletions(-) create mode 100644 packages/core/saved-objects/core-saved-objects-migration-server-internal/src/test.json diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts index 9080e2ce93dbe..d9ff25022d9e9 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts @@ -54,7 +54,7 @@ import { pickupUpdatedMappings } from './pickup_updated_mappings'; export type { OpenPitResponse, OpenPitParams } from './open_pit'; export { openPit } from './open_pit'; -export type { ReadWithPit, ReadWithPitParams } from './read_with_pit'; +export type { ReadWithPit, ReadWithPitParams, EsResponseTooLargeError } from './read_with_pit'; export { readWithPit } from './read_with_pit'; export type { ClosePitParams } from './close_pit'; @@ -108,6 +108,7 @@ export { import type { UnknownDocsFound } from './check_for_unknown_docs'; import type { IncompatibleClusterRoutingAllocation } from './initialize_action'; import { ClusterShardLimitExceeded } from './create_index'; +import { EsResponseTooLargeError } from './read_with_pit'; export type { CheckForUnknownDocsParams, @@ -175,6 +176,7 @@ export interface ActionErrorTypeMap { index_not_green_timeout: IndexNotGreenTimeout; index_not_yellow_timeout: IndexNotYellowTimeout; cluster_shard_limit_exceeded: ClusterShardLimitExceeded; + es_response_too_large: EsResponseTooLargeError; } /** diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts index 1d0303947c1b6..a808963a47bfa 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts @@ -9,6 +9,7 @@ import * as Either from 'fp-ts/lib/Either'; import * as TaskEither from 'fp-ts/lib/TaskEither'; import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import { errors as EsErrors } from '@elastic/elasticsearch'; import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server'; import { @@ -34,6 +35,10 @@ export interface ReadWithPitParams { seqNoPrimaryTerm?: boolean; } +export interface EsResponseTooLargeError { + type: 'es_response_too_large'; +} + /* * Requests documents from the index using PIT mechanism. * */ @@ -45,7 +50,10 @@ export const readWithPit = batchSize, searchAfter, seqNoPrimaryTerm, - }: ReadWithPitParams): TaskEither.TaskEither => + }: ReadWithPitParams): TaskEither.TaskEither< + RetryableEsClientError | EsResponseTooLargeError, + ReadWithPit + > => () => { return client .search({ @@ -93,5 +101,17 @@ export const readWithPit = totalHits, }); }) + .catch((e) => { + if ( + e instanceof EsErrors.RequestAbortedError && + e.message.match(/The content length \(\d+\) is bigger than the maximum/) != null + ) { + return Either.left({ + type: 'es_response_too_large' as const, + }); + } else { + throw e; + } + }) .catch(catchRetryableEsClientErrors); }; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts index c8d8884c980cd..e2f17d0212d5c 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts @@ -17,6 +17,7 @@ import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server'; import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal'; import type { AliasAction, FetchIndexResponse } from '../actions'; import type { BulkIndexOperationTuple } from './create_batches'; +import { OutdatedDocumentsSearchRead } from '../state'; /** @internal */ export type Aliases = Partial>; @@ -285,3 +286,11 @@ export function getMigrationType({ */ export const getTempIndexName = (indexPrefix: string, kibanaVersion: string): string => `${indexPrefix}_${kibanaVersion}_reindex_temp`; + +/** Increase batchSize by 20% until a maximum of defaultBatchSize */ +export const increaseBatchSize = (stateP: OutdatedDocumentsSearchRead) => { + const increasedBatchSize = stateP.batchSize * 1.2; + return increasedBatchSize > stateP.defaultBatchSize + ? stateP.defaultBatchSize + : increasedBatchSize; +}; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts index 54657310912f8..13510b9538d7c 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts @@ -43,6 +43,7 @@ import { versionMigrationCompleted, buildRemoveAliasActions, MigrationType, + increaseBatchSize, } from './helpers'; import { buildTempIndexMap, createBatches } from './create_batches'; import type { MigrationLog } from '../types'; @@ -875,7 +876,25 @@ export const model = (currentState: State, resW: ResponseType): }; } } else { - throwBadResponse(stateP, res); + const left = res.left; + if (isTypeof(left, 'es_response_too_large')) { + return { + ...stateP, + batchSize: stateP.batchSize / 2, + controlState: 'REINDEX_SOURCE_TO_TEMP_READ', + logs: [ + ...stateP.logs, + { + level: 'warning', + message: `Read a batch that exceeded the NodeJS maximum string length, retrying by reducing the batch size in half to ${ + stateP.batchSize / 2 + }.`, + }, + ], + }; + } else { + throwBadResponse(stateP, left); + } } } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT') { const res = resW as ExcludeRetryableEsError>; @@ -1175,11 +1194,31 @@ export const model = (currentState: State, resW: ResponseType): // and can proceed to the next step return { ...stateP, + // We succeeded in reading this batch, so increase the default batch + // size up to the default + batchSize: increaseBatchSize(stateP), controlState: 'OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT', }; } } else { - throwBadResponse(stateP, res); + const left = res.left; + if (isTypeof(left, 'es_response_too_large')) { + return { + ...stateP, + batchSize: stateP.batchSize / 2, + controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ', + logs: [ + ...stateP.logs, + { + level: 'warning', + message: + 'Read a batch that exceeded the NodeJS maximum string length, retrying by reducing the batch size in half.', + }, + ], + }; + } else { + throwBadResponse(stateP, left); + } } } else if (stateP.controlState === 'OUTDATED_DOCUMENTS_TRANSFORM') { const res = resW as ExcludeRetryableEsError>; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts index 6a483da04a399..20b848b261c89 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts @@ -63,7 +63,6 @@ export interface BaseState extends ControlState { * max_retry_time = 11.7 minutes */ readonly retryAttempts: number; - /** * The number of documents to process in each batch. This determines the * maximum number of documents that will be read and written in a single @@ -83,6 +82,12 @@ export interface BaseState extends ControlState { * When writing batches, we limit the number of documents in a batch * (batchSize) as well as the size of the batch in bytes (maxBatchSizeBytes). */ + readonly defaultBatchSize: number; + /** + * The number of documents to process in each batch. Under most circumstances + * batchSize == defaultBatchSize. But if we fail to read a batch because of a + * Nodejs `RangeError` we'll temporarily half `batchSize` and retry. + */ readonly batchSize: number; /** * When writing batches, limits the batch size in bytes to ensure that we diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/test.json b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/test.json new file mode 100644 index 0000000000000..a1eccb7b316a5 --- /dev/null +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/test.json @@ -0,0 +1,20 @@ +{ + "settings": { + "index": { + "number_of_shards": "1", + "auto_expand_replicas": "0-1" + } + }, + "mappings": { + "dynamic": false, + "properties": { + "migrationVersion": { + "dynamic": "true", + "type": "object" + }, + "type": { + "type": "keyword" + } + } + } + } \ No newline at end of file From a570e3a440b6b2be7b366fd6506153eb4a66232b Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Fri, 12 May 2023 16:20:45 +0200 Subject: [PATCH 04/15] Fixes and improve logging --- .../src/initial_state.ts | 1 + .../src/model/helpers.ts | 6 ++++-- .../src/model/model.ts | 12 +++++++----- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.ts index 6daa8887d5b72..1206deae8ab26 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.ts @@ -153,6 +153,7 @@ export const createInitialState = ({ retryDelay: 0, retryAttempts: migrationsConfig.retryAttempts, batchSize: migrationsConfig.batchSize, + defaultBatchSize: migrationsConfig.batchSize, maxBatchSizeBytes: migrationsConfig.maxBatchSizeBytes.getValueInBytes(), discardUnknownObjects: migrationsConfig.discardUnknownObjects === kibanaVersion, discardCorruptObjects: migrationsConfig.discardCorruptObjects === kibanaVersion, diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts index e2f17d0212d5c..7a0005d6ad99d 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts @@ -17,7 +17,7 @@ import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server'; import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal'; import type { AliasAction, FetchIndexResponse } from '../actions'; import type { BulkIndexOperationTuple } from './create_batches'; -import { OutdatedDocumentsSearchRead } from '../state'; +import { OutdatedDocumentsSearchRead, ReindexSourceToTempRead } from '../state'; /** @internal */ export type Aliases = Partial>; @@ -288,7 +288,9 @@ export const getTempIndexName = (indexPrefix: string, kibanaVersion: string): st `${indexPrefix}_${kibanaVersion}_reindex_temp`; /** Increase batchSize by 20% until a maximum of defaultBatchSize */ -export const increaseBatchSize = (stateP: OutdatedDocumentsSearchRead) => { +export const increaseBatchSize = ( + stateP: OutdatedDocumentsSearchRead | ReindexSourceToTempRead +) => { const increasedBatchSize = stateP.batchSize * 1.2; return increasedBatchSize > stateP.defaultBatchSize ? stateP.defaultBatchSize diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts index 13510b9538d7c..20040ebb77264 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts @@ -834,6 +834,8 @@ export const model = (currentState: State, resW: ResponseType): lastHitSortValue: res.right.lastHitSortValue, progress, logs, + // We succeeded in reading this batch, so increase the batch size for the next request. + batchSize: increaseBatchSize(stateP), }; } else { // we don't have any more outdated documents and need to either fail or move on to updating the target mappings. @@ -1158,6 +1160,8 @@ export const model = (currentState: State, resW: ResponseType): lastHitSortValue: res.right.lastHitSortValue, progress, logs, + // We succeeded in reading this batch, so increase the batch size for the next request. + batchSize: increaseBatchSize(stateP), }; } else { // we don't have any more outdated documents and need to either fail or move on to updating the target mappings. @@ -1194,9 +1198,6 @@ export const model = (currentState: State, resW: ResponseType): // and can proceed to the next step return { ...stateP, - // We succeeded in reading this batch, so increase the default batch - // size up to the default - batchSize: increaseBatchSize(stateP), controlState: 'OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT', }; } @@ -1211,8 +1212,9 @@ export const model = (currentState: State, resW: ResponseType): ...stateP.logs, { level: 'warning', - message: - 'Read a batch that exceeded the NodeJS maximum string length, retrying by reducing the batch size in half.', + message: `Read a batch that exceeded the NodeJS maximum string length, retrying by reducing the batch size in half to ${ + stateP.batchSize / 2 + }.`, }, ], }; From 7e761acc513a90dc97a90dbe7bfebe41f1be206e Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Fri, 12 May 2023 16:52:57 +0200 Subject: [PATCH 05/15] model.test.ts unit tests --- .../src/model/model.test.ts | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts index 969a7704e4e75..41e68ab3f634a 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts @@ -86,6 +86,7 @@ describe('migrations v2 model', () => { retryDelay: 0, retryAttempts: 15, batchSize: 1000, + defaultBatchSize: 1000, maxBatchSizeBytes: 1e8, discardUnknownObjects: false, discardCorruptObjects: false, @@ -1832,6 +1833,8 @@ describe('migrations v2 model', () => { expect(newState.lastHitSortValue).toBe(lastHitSortValue); expect(newState.progress.processed).toBe(undefined); expect(newState.progress.total).toBe(1); + expect(newState.defaultBatchSize).toBe(1000); + expect(newState.batchSize).toBe(1000); // don't increase batchsize above default expect(newState.logs).toMatchInlineSnapshot(` Array [ Object { @@ -1842,6 +1845,40 @@ describe('migrations v2 model', () => { `); }); + it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM increases batchSize if < defaultBatchSize', () => { + const outdatedDocuments = [{ _id: '1', _source: { type: 'vis' } }]; + const lastHitSortValue = [123456]; + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.right({ + outdatedDocuments, + lastHitSortValue, + totalHits: 1, + }); + const newState = model({ ...state, batchSize: 500 }, res) as ReindexSourceToTempTransform; + expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_TRANSFORM'); + expect(newState.batchSize).toBe(600); + expect(newState.defaultBatchSize).toBe(1000); + }); + + it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_READ if left es_response_too_large', () => { + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.left({ + type: 'es_response_too_large', + }); + const newState = model(state, res) as ReindexSourceToTempRead; + expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_READ'); + expect(newState.lastHitSortValue).toBe(undefined); // lastHitSortValue should not be set + expect(newState.progress.processed).toBe(undefined); // don't increment progress + expect(newState.batchSize).toBe(500); // halves the batch size + expect(newState.defaultBatchSize).toBe(1000); // leaves defaultBatchSize unchanged + expect(newState.logs).toMatchInlineSnapshot(` + Array [ + Object { + "level": "warning", + "message": "Read a batch that exceeded the NodeJS maximum string length, retrying by reducing the batch size in half to 500.", + }, + ] + `); + }); + it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_CLOSE_PIT if no outdated documents to reindex', () => { const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.right({ outdatedDocuments: [], @@ -2304,6 +2341,8 @@ describe('migrations v2 model', () => { expect(newState.lastHitSortValue).toBe(lastHitSortValue); expect(newState.progress.processed).toBe(undefined); expect(newState.progress.total).toBe(10); + expect(newState.defaultBatchSize).toBe(1000); + expect(newState.batchSize).toBe(1000); // don't increase batchsize above default expect(newState.logs).toMatchInlineSnapshot(` Array [ Object { @@ -2345,6 +2384,40 @@ describe('migrations v2 model', () => { `); }); + it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_TRANSFORM increases batchSize if < defaultBatchSize', () => { + const outdatedDocuments = [{ _id: '1', _source: { type: 'vis' } }]; + const lastHitSortValue = [123456]; + const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.right({ + outdatedDocuments, + lastHitSortValue, + totalHits: 1, + }); + const newState = model({ ...state, batchSize: 500 }, res) as ReindexSourceToTempTransform; + expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_TRANSFORM'); + expect(newState.batchSize).toBe(600); + expect(newState.defaultBatchSize).toBe(1000); + }); + + it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_SEARCH_READ if left es_response_too_large', () => { + const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.left({ + type: 'es_response_too_large', + }); + const newState = model(state, res) as ReindexSourceToTempRead; + expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_SEARCH_READ'); + expect(newState.lastHitSortValue).toBe(undefined); // lastHitSortValue should not be set + expect(newState.progress.processed).toBe(undefined); // don't increment progress + expect(newState.batchSize).toBe(500); // halves the batch size + expect(newState.defaultBatchSize).toBe(1000); // leaves defaultBatchSize unchanged + expect(newState.logs).toMatchInlineSnapshot(` + Array [ + Object { + "level": "warning", + "message": "Read a batch that exceeded the NodeJS maximum string length, retrying by reducing the batch size in half to 500.", + }, + ] + `); + }); + it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT if no outdated documents to transform', () => { const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.right({ outdatedDocuments: [], From 61cdf9cb232f86ec327ca496fa37ab24da64a3dd Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Fri, 12 May 2023 17:08:20 +0200 Subject: [PATCH 06/15] Unit tests --- ...grations_state_action_machine.test.ts.snap | 6 +++++ .../src/actions/read_with_pit.test.ts | 27 +++++++++++++++++++ .../src/initial_state.test.ts | 1 + 3 files changed, 34 insertions(+) diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/__snapshots__/migrations_state_action_machine.test.ts.snap b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/__snapshots__/migrations_state_action_machine.test.ts.snap index 1a73eb3c2f1fa..dff12867ac1c2 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/__snapshots__/migrations_state_action_machine.test.ts.snap +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/__snapshots__/migrations_state_action_machine.test.ts.snap @@ -23,6 +23,7 @@ Object { ], "controlState": "LEGACY_REINDEX", "currentAlias": ".my-so-index", + "defaultBatchSize": 1000, "discardCorruptObjects": false, "discardUnknownObjects": false, "excludeFromUpgradeFilterHooks": Object {}, @@ -246,6 +247,7 @@ Object { ], "controlState": "LEGACY_DELETE", "currentAlias": ".my-so-index", + "defaultBatchSize": 1000, "discardCorruptObjects": false, "discardUnknownObjects": false, "excludeFromUpgradeFilterHooks": Object {}, @@ -473,6 +475,7 @@ Object { ], "controlState": "LEGACY_DELETE", "currentAlias": ".my-so-index", + "defaultBatchSize": 1000, "discardCorruptObjects": false, "discardUnknownObjects": false, "excludeFromUpgradeFilterHooks": Object {}, @@ -704,6 +707,7 @@ Object { ], "controlState": "DONE", "currentAlias": ".my-so-index", + "defaultBatchSize": 1000, "discardCorruptObjects": false, "discardUnknownObjects": false, "excludeFromUpgradeFilterHooks": Object {}, @@ -971,6 +975,7 @@ Object { ], "controlState": "LEGACY_DELETE", "currentAlias": ".my-so-index", + "defaultBatchSize": 1000, "discardCorruptObjects": false, "discardUnknownObjects": false, "excludeFromUpgradeFilterHooks": Object {}, @@ -1205,6 +1210,7 @@ Object { ], "controlState": "FATAL", "currentAlias": ".my-so-index", + "defaultBatchSize": 1000, "discardCorruptObjects": false, "discardUnknownObjects": false, "excludeFromUpgradeFilterHooks": Object {}, diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.test.ts index d2f7a3ab3c3d7..acd331bfcca5e 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.test.ts @@ -52,6 +52,33 @@ describe('readWithPit', () => { }); }); + it('returns left es_response_too_large when client throws RequestAbortedError', async () => { + // Create a mock client that rejects all methods with a RequestAbortedError + // response. + const retryableError = new EsErrors.RequestAbortedError( + 'The content length (536870889) is bigger than the maximum allow string (536870888)' + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + + const task = readWithPit({ + client, + pitId: 'pitId', + query: { match_all: {} }, + batchSize: 10_000, + }); + try { + await task(); + } catch (e) { + /** ignore */ + } + await expect(task()).resolves.toEqual({ + _tag: 'Left', + left: { type: 'es_response_too_large' }, + }); + }); + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { // Create a mock client that rejects all methods with a 503 status code // response. diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.test.ts index 3ee201605f1ac..2ea881c2cea4e 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.test.ts @@ -76,6 +76,7 @@ describe('createInitialState', () => { "batchSize": 1000, "controlState": "INIT", "currentAlias": ".kibana_task_manager", + "defaultBatchSize": 1000, "discardCorruptObjects": false, "discardUnknownObjects": false, "excludeFromUpgradeFilterHooks": Object {}, From e3f11bc6aab0894574bccf46bd0ebac1de4efc18 Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Tue, 16 May 2023 14:32:17 +0200 Subject: [PATCH 07/15] E2E & integration tests --- .../src/saved_objects_config.ts | 1 + .../index.ts | 1 + ...grations_state_action_machine.test.ts.snap | 6 + .../src/actions/index.ts | 7 +- .../src/actions/read_with_pit.test.ts | 32 ++--- .../src/actions/read_with_pit.ts | 56 +++++---- .../src/initial_state.test.ts | 2 + .../src/initial_state.ts | 1 + .../src/kibana_migrator.test.ts | 7 +- .../migrations_state_action_machine.test.ts | 1 + .../src/model/helpers.ts | 2 +- .../src/model/model.test.ts | 4 +- .../src/model/model.ts | 14 +-- .../src/next.ts | 1 + .../src/state.ts | 6 + .../migrations/group3/actions/actions.test.ts | 39 +++++- .../migrations/group3/read_batch_size.test.ts | 118 ++++++++++++++++++ 17 files changed, 240 insertions(+), 58 deletions(-) create mode 100644 src/core/server/integration_tests/saved_objects/migrations/group3/read_batch_size.test.ts diff --git a/packages/core/saved-objects/core-saved-objects-base-server-internal/src/saved_objects_config.ts b/packages/core/saved-objects/core-saved-objects-base-server-internal/src/saved_objects_config.ts index 8af470de187f0..15822c7faef44 100644 --- a/packages/core/saved-objects/core-saved-objects-base-server-internal/src/saved_objects_config.ts +++ b/packages/core/saved-objects/core-saved-objects-base-server-internal/src/saved_objects_config.ts @@ -16,6 +16,7 @@ const migrationSchema = schema.object({ }), batchSize: schema.number({ defaultValue: 1_000 }), maxBatchSizeBytes: schema.byteSize({ defaultValue: '100mb' }), // 100mb is the default http.max_content_length Elasticsearch config value + maxReadBatchSizeBytes: schema.byteSize({ defaultValue: 536870888, max: 536870888 }), discardUnknownObjects: schema.maybe( schema.string({ validate: (value: string) => diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/index.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/index.ts index 61856a30cfc10..8fe3c8b7833c6 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/index.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/index.ts @@ -49,6 +49,7 @@ export type { ReindexResponse, UpdateByQueryResponse, UpdateAndPickupMappingsResponse, + EsResponseTooLargeError, } from './src/actions'; export { isClusterShardLimitExceeded, diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/__snapshots__/migrations_state_action_machine.test.ts.snap b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/__snapshots__/migrations_state_action_machine.test.ts.snap index ec7b459573180..a67009f13dc17 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/__snapshots__/migrations_state_action_machine.test.ts.snap +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/__snapshots__/migrations_state_action_machine.test.ts.snap @@ -168,6 +168,7 @@ Object { }, ], "maxBatchSizeBytes": 100000000, + "maxReadBatchSizeBytes": 536870888, "migrationDocLinks": Object { "clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded", "repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail", @@ -395,6 +396,7 @@ Object { }, ], "maxBatchSizeBytes": 100000000, + "maxReadBatchSizeBytes": 536870888, "migrationDocLinks": Object { "clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded", "repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail", @@ -626,6 +628,7 @@ Object { }, ], "maxBatchSizeBytes": 100000000, + "maxReadBatchSizeBytes": 536870888, "migrationDocLinks": Object { "clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded", "repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail", @@ -861,6 +864,7 @@ Object { }, ], "maxBatchSizeBytes": 100000000, + "maxReadBatchSizeBytes": 536870888, "migrationDocLinks": Object { "clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded", "repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail", @@ -1116,6 +1120,7 @@ Object { }, ], "maxBatchSizeBytes": 100000000, + "maxReadBatchSizeBytes": 536870888, "migrationDocLinks": Object { "clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded", "repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail", @@ -1354,6 +1359,7 @@ Object { }, ], "maxBatchSizeBytes": 100000000, + "maxReadBatchSizeBytes": 536870888, "migrationDocLinks": Object { "clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded", "repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail", diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts index d9ff25022d9e9..5edc900b68e19 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts @@ -54,7 +54,7 @@ import { pickupUpdatedMappings } from './pickup_updated_mappings'; export type { OpenPitResponse, OpenPitParams } from './open_pit'; export { openPit } from './open_pit'; -export type { ReadWithPit, ReadWithPitParams, EsResponseTooLargeError } from './read_with_pit'; +export type { ReadWithPit, ReadWithPitParams } from './read_with_pit'; export { readWithPit } from './read_with_pit'; export type { ClosePitParams } from './close_pit'; @@ -108,7 +108,6 @@ export { import type { UnknownDocsFound } from './check_for_unknown_docs'; import type { IncompatibleClusterRoutingAllocation } from './initialize_action'; import { ClusterShardLimitExceeded } from './create_index'; -import { EsResponseTooLargeError } from './read_with_pit'; export type { CheckForUnknownDocsParams, @@ -154,6 +153,10 @@ export interface RequestEntityTooLargeException { type: 'request_entity_too_large_exception'; } +export interface EsResponseTooLargeError { + type: 'es_response_too_large'; +} + /** @internal */ export interface AcknowledgeResponse { acknowledged: boolean; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.test.ts index acd331bfcca5e..3092561c7ba0b 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.test.ts @@ -32,24 +32,28 @@ describe('readWithPit', () => { pitId: 'pitId', query: { match_all: {} }, batchSize: 10_000, + maxResponseSizeBytes: 100_000, })(); expect(client.search).toHaveBeenCalledTimes(1); - expect(client.search).toHaveBeenCalledWith({ - allow_partial_search_results: false, - pit: { - id: 'pitId', - keep_alive: '10m', - }, - query: { - match_all: {}, + expect(client.search).toHaveBeenCalledWith( + { + allow_partial_search_results: false, + pit: { + id: 'pitId', + keep_alive: '10m', + }, + query: { + match_all: {}, + }, + search_after: undefined, + seq_no_primary_term: undefined, + size: 10000, + sort: '_shard_doc:asc', + track_total_hits: true, }, - search_after: undefined, - seq_no_primary_term: undefined, - size: 10000, - sort: '_shard_doc:asc', - track_total_hits: true, - }); + { maxResponseSize: 100_000 } + ); }); it('returns left es_response_too_large when client throws RequestAbortedError', async () => { diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts index a808963a47bfa..dc182af1737e9 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts @@ -17,6 +17,7 @@ import { type RetryableEsClientError, } from './catch_retryable_es_client_errors'; import { DEFAULT_PIT_KEEP_ALIVE } from './open_pit'; +import { EsResponseTooLargeError } from '.'; /** @internal */ export interface ReadWithPit { @@ -33,10 +34,7 @@ export interface ReadWithPitParams { batchSize: number; searchAfter?: number[]; seqNoPrimaryTerm?: boolean; -} - -export interface EsResponseTooLargeError { - type: 'es_response_too_large'; + maxResponseSizeBytes?: number; } /* @@ -50,35 +48,39 @@ export const readWithPit = batchSize, searchAfter, seqNoPrimaryTerm, + maxResponseSizeBytes, }: ReadWithPitParams): TaskEither.TaskEither< RetryableEsClientError | EsResponseTooLargeError, ReadWithPit > => () => { return client - .search({ - seq_no_primary_term: seqNoPrimaryTerm, - // Fail if the index being searched doesn't exist or is closed - // allow_no_indices: false, - // By default ES returns a 200 with partial results if there are shard - // request timeouts or shard failures which can lead to data loss for - // migrations - allow_partial_search_results: false, - // Sort fields are required to use searchAfter so we sort by the - // natural order of the index which is the most efficient option - // as order is not important for the migration - sort: '_shard_doc:asc', - pit: { id: pitId, keep_alive: DEFAULT_PIT_KEEP_ALIVE }, - size: batchSize, - search_after: searchAfter, - /** - * We want to know how many documents we need to process so we can log the progress. - * But we also want to increase the performance of these requests, - * so we ask ES to report the total count only on the first request (when searchAfter does not exist) - */ - track_total_hits: typeof searchAfter === 'undefined', - query, - }) + .search( + { + seq_no_primary_term: seqNoPrimaryTerm, + // Fail if the index being searched doesn't exist or is closed + // allow_no_indices: false, + // By default ES returns a 200 with partial results if there are shard + // request timeouts or shard failures which can lead to data loss for + // migrations + allow_partial_search_results: false, + // Sort fields are required to use searchAfter so we sort by the + // natural order of the index which is the most efficient option + // as order is not important for the migration + sort: '_shard_doc:asc', + pit: { id: pitId, keep_alive: DEFAULT_PIT_KEEP_ALIVE }, + size: batchSize, + search_after: searchAfter, + /** + * We want to know how many documents we need to process so we can log the progress. + * But we also want to increase the performance of these requests, + * so we ask ES to report the total count only on the first request (when searchAfter does not exist) + */ + track_total_hits: typeof searchAfter === 'undefined', + query, + }, + { maxResponseSize: maxResponseSizeBytes } + ) .then((body) => { const totalHits = typeof body.hits.total === 'number' diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.test.ts index a9cc52cae6f84..64b274ce2be66 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.test.ts @@ -25,6 +25,7 @@ const migrationsConfig = { retryAttempts: 15, batchSize: 1000, maxBatchSizeBytes: ByteSizeValue.parse('100mb'), + maxReadBatchSizeBytes: ByteSizeValue.parse('500mb'), } as unknown as SavedObjectsMigrationConfigType; const createInitialStateCommonParams = { @@ -216,6 +217,7 @@ describe('createInitialState', () => { "legacyIndex": ".kibana_task_manager", "logs": Array [], "maxBatchSizeBytes": 104857600, + "maxReadBatchSizeBytes": 524288000, "migrationDocLinks": Object { "clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded", "repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail", diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.ts index 9f6efdbeae0aa..3b04579929d30 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.ts @@ -153,6 +153,7 @@ export const createInitialState = ({ batchSize: migrationsConfig.batchSize, defaultBatchSize: migrationsConfig.batchSize, maxBatchSizeBytes: migrationsConfig.maxBatchSizeBytes.getValueInBytes(), + maxReadBatchSizeBytes: migrationsConfig.maxReadBatchSizeBytes.getValueInBytes(), discardUnknownObjects: migrationsConfig.discardUnknownObjects === kibanaVersion, discardCorruptObjects: migrationsConfig.discardCorruptObjects === kibanaVersion, logs: [], diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator.test.ts index 24559a87f20bb..ac3d18817b09f 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator.test.ts @@ -262,9 +262,9 @@ describe('KibanaMigrator', () => { const migrator = new KibanaMigrator(options); migrator.prepareMigrations(); await expect(migrator.runMigrations()).rejects.toMatchInlineSnapshot(` - [Error: Unable to complete saved object migrations for the [.my-index] index. Error: Reindex failed with the following error: - {"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}] - `); + [Error: Unable to complete saved object migrations for the [.my-index] index. Error: Reindex failed with the following error: + {"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}] + `); expect(loggingSystemMock.collect(options.logger).error[0][0]).toMatchInlineSnapshot(` [Error: Reindex failed with the following error: {"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}] @@ -552,6 +552,7 @@ const mockOptions = () => { algorithm: 'v2', batchSize: 20, maxBatchSizeBytes: ByteSizeValue.parse('20mb'), + maxReadBatchSizeBytes: new ByteSizeValue(536870888), pollInterval: 20000, scrollDuration: '10m', skip: false, diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/migrations_state_action_machine.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/migrations_state_action_machine.test.ts index a14cd9aa27081..90b07dc71f8c0 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/migrations_state_action_machine.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/migrations_state_action_machine.test.ts @@ -50,6 +50,7 @@ describe('migrationsStateActionMachine', () => { algorithm: 'v2', batchSize: 1000, maxBatchSizeBytes: new ByteSizeValue(1e8), + maxReadBatchSizeBytes: new ByteSizeValue(536870888), pollInterval: 0, scrollDuration: '0s', skip: false, diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts index 7a0005d6ad99d..6d502ecfb0b10 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts @@ -291,7 +291,7 @@ export const getTempIndexName = (indexPrefix: string, kibanaVersion: string): st export const increaseBatchSize = ( stateP: OutdatedDocumentsSearchRead | ReindexSourceToTempRead ) => { - const increasedBatchSize = stateP.batchSize * 1.2; + const increasedBatchSize = Math.floor(stateP.batchSize * 1.2); return increasedBatchSize > stateP.defaultBatchSize ? stateP.defaultBatchSize : increasedBatchSize; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts index 41e68ab3f634a..f61941cab8b4a 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts @@ -1873,7 +1873,7 @@ describe('migrations v2 model', () => { Array [ Object { "level": "warning", - "message": "Read a batch that exceeded the NodeJS maximum string length, retrying by reducing the batch size in half to 500.", + "message": "Read a batch that exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 500.", }, ] `); @@ -2412,7 +2412,7 @@ describe('migrations v2 model', () => { Array [ Object { "level": "warning", - "message": "Read a batch that exceeded the NodeJS maximum string length, retrying by reducing the batch size in half to 500.", + "message": "Read a batch that exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 500.", }, ] `); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts index 20040ebb77264..e64f43f053d81 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts @@ -880,17 +880,16 @@ export const model = (currentState: State, resW: ResponseType): } else { const left = res.left; if (isTypeof(left, 'es_response_too_large')) { + const batchSize = Math.floor(stateP.batchSize / 2); return { ...stateP, - batchSize: stateP.batchSize / 2, + batchSize, controlState: 'REINDEX_SOURCE_TO_TEMP_READ', logs: [ ...stateP.logs, { level: 'warning', - message: `Read a batch that exceeded the NodeJS maximum string length, retrying by reducing the batch size in half to ${ - stateP.batchSize / 2 - }.`, + message: `Read a batch that exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to ${batchSize}.`, }, ], }; @@ -1204,17 +1203,16 @@ export const model = (currentState: State, resW: ResponseType): } else { const left = res.left; if (isTypeof(left, 'es_response_too_large')) { + const batchSize = Math.floor(stateP.batchSize / 2); return { ...stateP, - batchSize: stateP.batchSize / 2, + batchSize, controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ', logs: [ ...stateP.logs, { level: 'warning', - message: `Read a batch that exceeded the NodeJS maximum string length, retrying by reducing the batch size in half to ${ - stateP.batchSize / 2 - }.`, + message: `Read a batch that exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to ${batchSize}.`, }, ], }; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts index c12b0b5a2f41a..af02c9996d856 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts @@ -219,6 +219,7 @@ export const nextActionMap = ( query: state.outdatedDocumentsQuery, batchSize: state.batchSize, searchAfter: state.lastHitSortValue, + maxResponseSizeBytes: state.maxReadBatchSizeBytes, }), OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT: (state: OutdatedDocumentsSearchClosePit) => Actions.closePit({ client, pitId: state.pitId }), diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts index 20b848b261c89..06c79d46452a0 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts @@ -95,6 +95,12 @@ export interface BaseState extends ControlState { * http.max_content_length which defaults to 100mb. */ readonly maxBatchSizeBytes: number; + /** + * If a read batch exceeds this limit we half the batchSize and retry. By + * not JSON.parsing and transforming large batches we can avoid RangeErrors + * or Kibana OOMing. + */ + readonly maxReadBatchSizeBytes: number; readonly logs: MigrationLog[]; /** * If saved objects exist which have an unknown type they will cause diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts index a084f986fae7d..d0f56f73755c5 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts @@ -22,6 +22,7 @@ import { type OpenPitResponse, reindex, readWithPit, + type EsResponseTooLargeError, type ReadWithPit, searchForOutdatedDocuments, type SearchResponse, @@ -89,6 +90,7 @@ describe('migration actions', () => { { _source: { title: 'doc 3' } }, { _source: { title: 'saved object 4', type: 'another_unused_type' } }, { _source: { title: 'f-agent-event 5', type: 'f_agent_event' } }, + { _source: { title: new Array(1000).fill('a').join(), type: 'large' } }, // "large" saved object ] as unknown as SavedObjectsRawDoc[]; await bulkOverwriteTransformedDocuments({ client, @@ -733,6 +735,7 @@ describe('migration actions', () => { ).right.outdatedDocuments; expect(results.map((doc) => doc._source.title).sort()).toMatchInlineSnapshot(` Array [ + "a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a", "doc 1", "doc 2", "doc 3", @@ -773,6 +776,7 @@ describe('migration actions', () => { ).right.outdatedDocuments; expect(results.map((doc) => doc._source.title).sort()).toMatchInlineSnapshot(` Array [ + "a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a", "doc 1", "doc 2", "doc 3", @@ -805,6 +809,7 @@ describe('migration actions', () => { ).right.outdatedDocuments; expect(results.map((doc) => doc._source.title).sort()).toMatchInlineSnapshot(` Array [ + "a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a_updated", "doc 1_updated", "doc 2_updated", "doc 3_updated", @@ -859,6 +864,7 @@ describe('migration actions', () => { ).right.outdatedDocuments; expect(results.map((doc) => doc._source.title).sort()).toMatchInlineSnapshot(` Array [ + "a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a_updated", "doc 1_updated", "doc 2_updated", "doc 3_updated", @@ -918,6 +924,7 @@ describe('migration actions', () => { ).right.outdatedDocuments; expect(results.map((doc) => doc._source.title).sort()).toMatchInlineSnapshot(` Array [ + "a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a_updated", "doc 1", "doc 2", "doc 3_updated", @@ -1138,7 +1145,7 @@ describe('migration actions', () => { }); const docsResponse = (await readWithPitTask()) as Either.Right; - await expect(docsResponse.right.outdatedDocuments.length).toBe(5); + await expect(docsResponse.right.outdatedDocuments.length).toBe(6); }); it('requests the batchSize of documents from an index', async () => { @@ -1189,6 +1196,7 @@ describe('migration actions', () => { expect(docsResponse.right.outdatedDocuments.map((doc) => doc._source.title).sort()) .toMatchInlineSnapshot(` Array [ + "a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a", "doc 1", "doc 2", "doc 3", @@ -1275,6 +1283,35 @@ describe('migration actions', () => { ); }); + it('returns a left es_response_too_large error when a read batch exceeds the maxResponseSize', async () => { + const openPitTask = openPit({ client, index: 'existing_index_with_docs' }); + const pitResponse = (await openPitTask()) as Either.Right; + + let readWithPitTask = readWithPit({ + client, + pitId: pitResponse.right.pitId, + query: { match_all: {} }, + batchSize: 1, // small batch size so we don't exceed the maxResponseSize + searchAfter: undefined, + maxResponseSize: 500, // set a small size to force the error + }); + const rightResponse = (await readWithPitTask()) as Either.Right; + + await expect(Either.isRight(rightResponse)).toBe(true); + + readWithPitTask = readWithPit({ + client, + pitId: pitResponse.right.pitId, + query: { match_all: {} }, + batchSize: 10, // a bigger batch will exceed the maxResponseSize + searchAfter: undefined, + maxResponseSize: 500, // set a small size to force the error + }); + const leftResponse = (await readWithPitTask()) as Either.Left; + + await expect(leftResponse.left.type).toBe('es_response_too_large'); + }); + it('rejects if PIT does not exist', async () => { const readWithPitTask = readWithPit({ client, diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/read_batch_size.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/read_batch_size.test.ts new file mode 100644 index 0000000000000..0e0d438196525 --- /dev/null +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/read_batch_size.test.ts @@ -0,0 +1,118 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import Path from 'path'; +import fs from 'fs/promises'; +import { Root } from '@kbn/core-root-server-internal'; +import { + createRootWithCorePlugins, + createTestServers, + type TestElasticsearchUtils, +} from '@kbn/core-test-helpers-kbn-server'; +import { delay } from '../test_utils'; + +const logFilePath = Path.join(__dirname, 'read_batch_size.log'); + +describe('migration v2 - read batch size', () => { + let esServer: TestElasticsearchUtils; + let root: Root; + let logs: string; + + beforeEach(async () => { + await fs.unlink(logFilePath).catch(() => {}); + }); + + afterEach(async () => { + await root?.shutdown(); + await esServer?.stop(); + await delay(10); + }); + + it.only('reduces the read batchSize in half if a batch exceeds maxReadBatchSizeBytes', async () => { + const { startES } = createTestServers({ + adjustTimeout: (t: number) => jest.setTimeout(t), + settings: { + es: { + license: 'basic', + dataArchive: Path.join(__dirname, '..', 'archives', '8.4.0_with_sample_data_logs.zip'), + }, + }, + }); + + root = createRoot({ maxReadBatchSizeBytes: 15000 }); + esServer = await startES(); + await root.preboot(); + await root.setup(); + await root.start(); + + // Check for migration steps present in the logs + logs = await fs.readFile(logFilePath, 'utf-8'); + + expect(logs).toMatch( + 'Read a batch that exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 15' + ); + expect(logs).toMatch('[.kibana] Migration completed'); + }); + + it('does not reduce the read batchSize in half if no batches exceeded maxReadBatchSizeBytes', async () => { + const { startES } = createTestServers({ + adjustTimeout: (t: number) => jest.setTimeout(t), + settings: { + es: { + license: 'basic', + dataArchive: Path.join(__dirname, '..', 'archives', '8.4.0_with_sample_data_logs.zip'), + }, + }, + }); + + root = createRoot({ maxReadBatchSizeBytes: 50000 }); + esServer = await startES(); + await root.preboot(); + await root.setup(); + await root.start(); + + // Check for migration steps present in the logs + logs = await fs.readFile(logFilePath, 'utf-8'); + + expect(logs).not.toMatch( + 'Read a batch that exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to .' + ); + expect(logs).toMatch('[.kibana] Migration completed'); + }); +}); + +function createRoot({ maxReadBatchSizeBytes }: { maxReadBatchSizeBytes?: number }) { + return createRootWithCorePlugins( + { + migrations: { + maxReadBatchSizeBytes, + }, + logging: { + appenders: { + file: { + type: 'file', + fileName: logFilePath, + layout: { + type: 'json', + }, + }, + }, + loggers: [ + { + name: 'root', + level: 'info', + appenders: ['file'], + }, + ], + }, + }, + { + oss: false, + } + ); +} From 701e53ba05f8c9154468e76820488090b9254208 Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Tue, 16 May 2023 17:13:28 +0200 Subject: [PATCH 08/15] Increase dot_kibana_split test timeout to reduce flakiness --- .../saved_objects/migrations/group3/dot_kibana_split.test.ts | 1 + .../saved_objects/migrations/kibana_migrator_test_kit.ts | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts index 2a3a41054f87d..fed20c332dcb3 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts @@ -60,6 +60,7 @@ describe('split .kibana index into multiple system indices', () => { beforeAll(async () => { esServer = await startElasticsearch({ dataArchive: Path.join(__dirname, '..', 'archives', '7.3.0_xpack_sample_saved_objects.zip'), + timeout: 60000, }); }); diff --git a/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts b/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts index 0a0fbdaaac588..2feaa98d5f9a7 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts @@ -87,12 +87,14 @@ export interface KibanaMigratorTestKit { export const startElasticsearch = async ({ basePath, dataArchive, + timeout, }: { basePath?: string; dataArchive?: string; + timeout?: number; } = {}) => { const { startES } = createTestServers({ - adjustTimeout: (t: number) => jest.setTimeout(t), + adjustTimeout: (t: number) => jest.setTimeout(t + (timeout ?? 0)), settings: { es: { license: 'basic', From b71ab241d00b226564e2a5c65625ac6a53f57e60 Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Tue, 16 May 2023 17:13:53 +0200 Subject: [PATCH 09/15] Fix tests --- .../saved_objects/migrations/group3/actions/actions.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts index d0f56f73755c5..2a19a3beac2c3 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts @@ -1293,7 +1293,7 @@ describe('migration actions', () => { query: { match_all: {} }, batchSize: 1, // small batch size so we don't exceed the maxResponseSize searchAfter: undefined, - maxResponseSize: 500, // set a small size to force the error + maxResponseSizeBytes: 500, // set a small size to force the error }); const rightResponse = (await readWithPitTask()) as Either.Right; @@ -1305,7 +1305,7 @@ describe('migration actions', () => { query: { match_all: {} }, batchSize: 10, // a bigger batch will exceed the maxResponseSize searchAfter: undefined, - maxResponseSize: 500, // set a small size to force the error + maxResponseSizeBytes: 500, // set a small size to force the error }); const leftResponse = (await readWithPitTask()) as Either.Left; From 9c241b4f2df4edbbfb4a961a3c3a311965ba90c2 Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Tue, 16 May 2023 17:18:26 +0200 Subject: [PATCH 10/15] Delete unecessary file --- .../src/test.json | 20 ------------------- 1 file changed, 20 deletions(-) delete mode 100644 packages/core/saved-objects/core-saved-objects-migration-server-internal/src/test.json diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/test.json b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/test.json deleted file mode 100644 index a1eccb7b316a5..0000000000000 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/test.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "settings": { - "index": { - "number_of_shards": "1", - "auto_expand_replicas": "0-1" - } - }, - "mappings": { - "dynamic": false, - "properties": { - "migrationVersion": { - "dynamic": "true", - "type": "object" - }, - "type": { - "type": "keyword" - } - } - } - } \ No newline at end of file From 532a41d524dde95d16f2d50f1554f295fd8ea468 Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Wed, 17 May 2023 11:45:25 +0200 Subject: [PATCH 11/15] Retry when there's circuit breaker exceptions from Elasticsearch --- .../src/actions/catch_retryable_es_client_errors.test.ts | 2 +- .../src/actions/catch_retryable_es_client_errors.ts | 1 + .../saved_objects/migrations/group3/read_batch_size.test.ts | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/catch_retryable_es_client_errors.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/catch_retryable_es_client_errors.test.ts index 989e50af73683..ffe0189abe237 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/catch_retryable_es_client_errors.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/catch_retryable_es_client_errors.test.ts @@ -72,7 +72,7 @@ describe('catchRetryableEsClientErrors', () => { }); }); it('ResponseError with retryable status code', async () => { - const statusCodes = [503, 401, 403, 408, 410]; + const statusCodes = [503, 401, 403, 408, 410, 429]; return Promise.all( statusCodes.map(async (status) => { const error = new esErrors.ResponseError( diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/catch_retryable_es_client_errors.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/catch_retryable_es_client_errors.ts index 168e3170d30bf..74877c9386422 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/catch_retryable_es_client_errors.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/catch_retryable_es_client_errors.ts @@ -15,6 +15,7 @@ const retryResponseStatuses = [ 403, // AuthenticationException 408, // RequestTimeout 410, // Gone + 429, // TooManyRequests -> ES circuit breaker ]; export interface RetryableEsClientError { diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/read_batch_size.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/read_batch_size.test.ts index 0e0d438196525..b2afe46ebfd8a 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/read_batch_size.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/read_batch_size.test.ts @@ -33,7 +33,7 @@ describe('migration v2 - read batch size', () => { await delay(10); }); - it.only('reduces the read batchSize in half if a batch exceeds maxReadBatchSizeBytes', async () => { + it('reduces the read batchSize in half if a batch exceeds maxReadBatchSizeBytes', async () => { const { startES } = createTestServers({ adjustTimeout: (t: number) => jest.setTimeout(t), settings: { From 67562ecf5757f546973bdec72e3969bae6ebca5c Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Wed, 24 May 2023 22:27:26 +0200 Subject: [PATCH 12/15] Address reviews, better handling when batchSize: 1 still exceeds maxReadBatchSizeBytes --- .../src/saved_objects_config.ts | 6 +- ...grations_state_action_machine.test.ts.snap | 12 +-- .../src/actions/constants.ts | 5 - .../src/actions/index.ts | 2 +- .../src/actions/read_with_pit.ts | 5 + .../src/actions/reindex.test.ts | 1 + .../src/actions/reindex.ts | 5 +- .../src/initial_state.test.ts | 2 +- .../src/initial_state.ts | 2 +- .../src/model/helpers.ts | 6 +- .../src/model/model.test.ts | 95 ++++++++++++++++--- .../src/model/model.ts | 68 ++++++++----- .../src/next.ts | 1 + .../src/state.ts | 4 +- .../migrations/group3/actions/actions.test.ts | 15 ++- 15 files changed, 168 insertions(+), 61 deletions(-) diff --git a/packages/core/saved-objects/core-saved-objects-base-server-internal/src/saved_objects_config.ts b/packages/core/saved-objects/core-saved-objects-base-server-internal/src/saved_objects_config.ts index 15822c7faef44..c402031897445 100644 --- a/packages/core/saved-objects/core-saved-objects-base-server-internal/src/saved_objects_config.ts +++ b/packages/core/saved-objects/core-saved-objects-base-server-internal/src/saved_objects_config.ts @@ -9,6 +9,7 @@ import { valid } from 'semver'; import { schema, TypeOf } from '@kbn/config-schema'; import type { ServiceConfigDescriptor } from '@kbn/core-base-server-internal'; +import buffer from 'buffer'; const migrationSchema = schema.object({ algorithm: schema.oneOf([schema.literal('v2'), schema.literal('zdt')], { @@ -16,7 +17,10 @@ const migrationSchema = schema.object({ }), batchSize: schema.number({ defaultValue: 1_000 }), maxBatchSizeBytes: schema.byteSize({ defaultValue: '100mb' }), // 100mb is the default http.max_content_length Elasticsearch config value - maxReadBatchSizeBytes: schema.byteSize({ defaultValue: 536870888, max: 536870888 }), + maxReadBatchSizeBytes: schema.byteSize({ + defaultValue: buffer.constants.MAX_STRING_LENGTH, + max: buffer.constants.MAX_STRING_LENGTH, + }), discardUnknownObjects: schema.maybe( schema.string({ validate: (value: string) => diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/__snapshots__/migrations_state_action_machine.test.ts.snap b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/__snapshots__/migrations_state_action_machine.test.ts.snap index a67009f13dc17..651182fb621fa 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/__snapshots__/migrations_state_action_machine.test.ts.snap +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/__snapshots__/migrations_state_action_machine.test.ts.snap @@ -23,7 +23,6 @@ Object { ], "controlState": "LEGACY_REINDEX", "currentAlias": ".my-so-index", - "defaultBatchSize": 1000, "discardCorruptObjects": false, "discardUnknownObjects": false, "excludeFromUpgradeFilterHooks": Object {}, @@ -167,6 +166,7 @@ Object { "message": "Log from LEGACY_REINDEX control state", }, ], + "maxBatchSize": 1000, "maxBatchSizeBytes": 100000000, "maxReadBatchSizeBytes": 536870888, "migrationDocLinks": Object { @@ -247,7 +247,6 @@ Object { ], "controlState": "LEGACY_DELETE", "currentAlias": ".my-so-index", - "defaultBatchSize": 1000, "discardCorruptObjects": false, "discardUnknownObjects": false, "excludeFromUpgradeFilterHooks": Object {}, @@ -395,6 +394,7 @@ Object { "message": "Log from LEGACY_DELETE control state", }, ], + "maxBatchSize": 1000, "maxBatchSizeBytes": 100000000, "maxReadBatchSizeBytes": 536870888, "migrationDocLinks": Object { @@ -475,7 +475,6 @@ Object { ], "controlState": "LEGACY_DELETE", "currentAlias": ".my-so-index", - "defaultBatchSize": 1000, "discardCorruptObjects": false, "discardUnknownObjects": false, "excludeFromUpgradeFilterHooks": Object {}, @@ -627,6 +626,7 @@ Object { "message": "Log from LEGACY_DELETE control state", }, ], + "maxBatchSize": 1000, "maxBatchSizeBytes": 100000000, "maxReadBatchSizeBytes": 536870888, "migrationDocLinks": Object { @@ -707,7 +707,6 @@ Object { ], "controlState": "DONE", "currentAlias": ".my-so-index", - "defaultBatchSize": 1000, "discardCorruptObjects": false, "discardUnknownObjects": false, "excludeFromUpgradeFilterHooks": Object {}, @@ -863,6 +862,7 @@ Object { "message": "Log from DONE control state", }, ], + "maxBatchSize": 1000, "maxBatchSizeBytes": 100000000, "maxReadBatchSizeBytes": 536870888, "migrationDocLinks": Object { @@ -975,7 +975,6 @@ Object { ], "controlState": "LEGACY_DELETE", "currentAlias": ".my-so-index", - "defaultBatchSize": 1000, "discardCorruptObjects": false, "discardUnknownObjects": false, "excludeFromUpgradeFilterHooks": Object {}, @@ -1119,6 +1118,7 @@ Object { "message": "Log from LEGACY_DELETE control state", }, ], + "maxBatchSize": 1000, "maxBatchSizeBytes": 100000000, "maxReadBatchSizeBytes": 536870888, "migrationDocLinks": Object { @@ -1210,7 +1210,6 @@ Object { ], "controlState": "FATAL", "currentAlias": ".my-so-index", - "defaultBatchSize": 1000, "discardCorruptObjects": false, "discardUnknownObjects": false, "excludeFromUpgradeFilterHooks": Object {}, @@ -1358,6 +1357,7 @@ Object { "message": "Log from FATAL control state", }, ], + "maxBatchSize": 1000, "maxBatchSizeBytes": 100000000, "maxReadBatchSizeBytes": 536870888, "migrationDocLinks": Object { diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/constants.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/constants.ts index 536ae1d256960..8432e60bf2329 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/constants.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/constants.ts @@ -6,11 +6,6 @@ * Side Public License, v 1. */ -/** - * Batch size for updateByQuery and reindex operations. - * Uses the default value of 1000 for Elasticsearch reindex operation. - */ -export const BATCH_SIZE = 1_000; /** * When a request takes a long time to complete and hits the timeout or the * client aborts that request due to the requestTimeout, our only course of diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts index 09089cfdcfebd..dbe61920b31b3 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts @@ -12,7 +12,6 @@ import type { RetryableEsClientError } from './catch_retryable_es_client_errors' import type { DocumentsTransformFailed } from '../core/migrate_raw_docs'; export { - BATCH_SIZE, DEFAULT_TIMEOUT, INDEX_AUTO_EXPAND_REPLICAS, INDEX_NUMBER_OF_SHARDS, @@ -149,6 +148,7 @@ export interface RequestEntityTooLargeException { export interface EsResponseTooLargeError { type: 'es_response_too_large'; + contentLength: number; } /** @internal */ diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts index dc182af1737e9..f781e63c437c3 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts @@ -110,6 +110,11 @@ export const readWithPit = ) { return Either.left({ type: 'es_response_too_large' as const, + contentLength: Number.parseInt( + e.message.match(/The content length \((\d+)\) is bigger than the maximum/)?.[1] ?? + '-1', + 10 + ), }); } else { throw e; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/reindex.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/reindex.test.ts index 969150a21cfcd..d5d03bbabb405 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/reindex.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/reindex.test.ts @@ -38,6 +38,7 @@ describe('reindex', () => { reindexScript: Option.none, requireAlias: false, excludeOnUpgradeQuery: {}, + batchSize: 1000, }); try { await task(); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/reindex.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/reindex.ts index 594822f724760..b64631d3b87f6 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/reindex.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/reindex.ts @@ -15,7 +15,6 @@ import { catchRetryableEsClientErrors, type RetryableEsClientError, } from './catch_retryable_es_client_errors'; -import { BATCH_SIZE } from './constants'; /** @internal */ export interface ReindexResponse { @@ -34,6 +33,7 @@ export interface ReindexParams { * index for backup purposes, but won't be available in the upgraded index. */ excludeOnUpgradeQuery: QueryDslQueryContainer; + batchSize: number; } /** @@ -52,6 +52,7 @@ export const reindex = reindexScript, requireAlias, excludeOnUpgradeQuery, + batchSize, }: ReindexParams): TaskEither.TaskEither => () => { return client @@ -65,7 +66,7 @@ export const reindex = source: { index: sourceIndex, // Set reindex batch size - size: BATCH_SIZE, + size: batchSize, // Exclude saved object types query: excludeOnUpgradeQuery, }, diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.test.ts index 64b274ce2be66..1378dec852e2b 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.test.ts @@ -77,7 +77,6 @@ describe('createInitialState', () => { "batchSize": 1000, "controlState": "INIT", "currentAlias": ".kibana_task_manager", - "defaultBatchSize": 1000, "discardCorruptObjects": false, "discardUnknownObjects": false, "excludeFromUpgradeFilterHooks": Object {}, @@ -216,6 +215,7 @@ describe('createInitialState', () => { "knownTypes": Array [], "legacyIndex": ".kibana_task_manager", "logs": Array [], + "maxBatchSize": 1000, "maxBatchSizeBytes": 104857600, "maxReadBatchSizeBytes": 524288000, "migrationDocLinks": Object { diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.ts index 3b04579929d30..6bb6c22e3a7f2 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/initial_state.ts @@ -151,7 +151,7 @@ export const createInitialState = ({ retryDelay: 0, retryAttempts: migrationsConfig.retryAttempts, batchSize: migrationsConfig.batchSize, - defaultBatchSize: migrationsConfig.batchSize, + maxBatchSize: migrationsConfig.batchSize, maxBatchSizeBytes: migrationsConfig.maxBatchSizeBytes.getValueInBytes(), maxReadBatchSizeBytes: migrationsConfig.maxReadBatchSizeBytes.getValueInBytes(), discardUnknownObjects: migrationsConfig.discardUnknownObjects === kibanaVersion, diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts index 6d502ecfb0b10..e83601a49e172 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts @@ -287,12 +287,10 @@ export function getMigrationType({ export const getTempIndexName = (indexPrefix: string, kibanaVersion: string): string => `${indexPrefix}_${kibanaVersion}_reindex_temp`; -/** Increase batchSize by 20% until a maximum of defaultBatchSize */ +/** Increase batchSize by 20% until a maximum of maxBatchSize */ export const increaseBatchSize = ( stateP: OutdatedDocumentsSearchRead | ReindexSourceToTempRead ) => { const increasedBatchSize = Math.floor(stateP.batchSize * 1.2); - return increasedBatchSize > stateP.defaultBatchSize - ? stateP.defaultBatchSize - : increasedBatchSize; + return increasedBatchSize > stateP.maxBatchSize ? stateP.maxBatchSize : increasedBatchSize; }; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts index f61941cab8b4a..236be2a7a791c 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts @@ -86,8 +86,9 @@ describe('migrations v2 model', () => { retryDelay: 0, retryAttempts: 15, batchSize: 1000, - defaultBatchSize: 1000, + maxBatchSize: 1000, maxBatchSizeBytes: 1e8, + maxReadBatchSizeBytes: 1234, discardUnknownObjects: false, discardCorruptObjects: false, indexPrefix: '.kibana', @@ -1833,7 +1834,7 @@ describe('migrations v2 model', () => { expect(newState.lastHitSortValue).toBe(lastHitSortValue); expect(newState.progress.processed).toBe(undefined); expect(newState.progress.total).toBe(1); - expect(newState.defaultBatchSize).toBe(1000); + expect(newState.maxBatchSize).toBe(1000); expect(newState.batchSize).toBe(1000); // don't increase batchsize above default expect(newState.logs).toMatchInlineSnapshot(` Array [ @@ -1845,7 +1846,7 @@ describe('migrations v2 model', () => { `); }); - it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM increases batchSize if < defaultBatchSize', () => { + it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM increases batchSize if < maxBatchSize', () => { const outdatedDocuments = [{ _id: '1', _source: { type: 'vis' } }]; const lastHitSortValue = [123456]; const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.right({ @@ -1856,29 +1857,65 @@ describe('migrations v2 model', () => { const newState = model({ ...state, batchSize: 500 }, res) as ReindexSourceToTempTransform; expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_TRANSFORM'); expect(newState.batchSize).toBe(600); - expect(newState.defaultBatchSize).toBe(1000); + expect(newState.maxBatchSize).toBe(1000); }); it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_READ if left es_response_too_large', () => { const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.left({ type: 'es_response_too_large', + contentLength: 4567, }); const newState = model(state, res) as ReindexSourceToTempRead; expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_READ'); expect(newState.lastHitSortValue).toBe(undefined); // lastHitSortValue should not be set expect(newState.progress.processed).toBe(undefined); // don't increment progress expect(newState.batchSize).toBe(500); // halves the batch size - expect(newState.defaultBatchSize).toBe(1000); // leaves defaultBatchSize unchanged + expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged expect(newState.logs).toMatchInlineSnapshot(` Array [ Object { "level": "warning", - "message": "Read a batch that exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 500.", + "message": "Read a batch with a response content length of 4567 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 500.", }, ] `); }); + it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_READ if left es_response_too_large will not reduce batch size below 1', () => { + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.left({ + type: 'es_response_too_large', + contentLength: 2345, + }); + const newState = model({ ...state, batchSize: 1.5 }, res) as ReindexSourceToTempRead; + expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_READ'); + expect(newState.lastHitSortValue).toBe(undefined); // lastHitSortValue should not be set + expect(newState.progress.processed).toBe(undefined); // don't increment progress + expect(newState.batchSize).toBe(1); // don't halve the batch size or go below 1 + expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged + expect(newState.logs).toMatchInlineSnapshot(` + Array [ + Object { + "level": "warning", + "message": "Read a batch with a response content length of 2345 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 1.", + }, + ] + `); + }); + + it('REINDEX_SOURCE_TO_TEMP_READ -> FATAL if left es_response_too_large and batchSize already 1', () => { + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.left({ + type: 'es_response_too_large', + contentLength: 2345, + }); + const newState = model({ ...state, batchSize: 1 }, res) as FatalState; + expect(newState.controlState).toBe('FATAL'); + expect(newState.batchSize).toBe(1); // don't halve the batch size or go below 1 + expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged + expect(newState.reason).toMatchInlineSnapshot( + `"After reducing the read batch size to a single document, the Elasticsearch response content length was 2345bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again."` + ); + }); + it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_CLOSE_PIT if no outdated documents to reindex', () => { const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.right({ outdatedDocuments: [], @@ -2341,7 +2378,7 @@ describe('migrations v2 model', () => { expect(newState.lastHitSortValue).toBe(lastHitSortValue); expect(newState.progress.processed).toBe(undefined); expect(newState.progress.total).toBe(10); - expect(newState.defaultBatchSize).toBe(1000); + expect(newState.maxBatchSize).toBe(1000); expect(newState.batchSize).toBe(1000); // don't increase batchsize above default expect(newState.logs).toMatchInlineSnapshot(` Array [ @@ -2384,7 +2421,7 @@ describe('migrations v2 model', () => { `); }); - it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_TRANSFORM increases batchSize if < defaultBatchSize', () => { + it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_TRANSFORM increases batchSize if < maxBatchSize', () => { const outdatedDocuments = [{ _id: '1', _source: { type: 'vis' } }]; const lastHitSortValue = [123456]; const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.right({ @@ -2395,29 +2432,65 @@ describe('migrations v2 model', () => { const newState = model({ ...state, batchSize: 500 }, res) as ReindexSourceToTempTransform; expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_TRANSFORM'); expect(newState.batchSize).toBe(600); - expect(newState.defaultBatchSize).toBe(1000); + expect(newState.maxBatchSize).toBe(1000); }); it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_SEARCH_READ if left es_response_too_large', () => { const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.left({ type: 'es_response_too_large', + contentLength: 3456, }); const newState = model(state, res) as ReindexSourceToTempRead; expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_SEARCH_READ'); expect(newState.lastHitSortValue).toBe(undefined); // lastHitSortValue should not be set expect(newState.progress.processed).toBe(undefined); // don't increment progress expect(newState.batchSize).toBe(500); // halves the batch size - expect(newState.defaultBatchSize).toBe(1000); // leaves defaultBatchSize unchanged + expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged expect(newState.logs).toMatchInlineSnapshot(` Array [ Object { "level": "warning", - "message": "Read a batch that exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 500.", + "message": "Read a batch with a response content length of 3456 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 500.", }, ] `); }); + it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_SEARCH_READ if left es_response_too_large will not reduce batch size below 1', () => { + const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.left({ + type: 'es_response_too_large', + contentLength: 2345, + }); + const newState = model({ ...state, batchSize: 1.5 }, res) as ReindexSourceToTempRead; + expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_SEARCH_READ'); + expect(newState.lastHitSortValue).toBe(undefined); // lastHitSortValue should not be set + expect(newState.progress.processed).toBe(undefined); // don't increment progress + expect(newState.batchSize).toBe(1); // don't halve the batch size or go below 1 + expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged + expect(newState.logs).toMatchInlineSnapshot(` + Array [ + Object { + "level": "warning", + "message": "Read a batch with a response content length of 2345 bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 1.", + }, + ] + `); + }); + + it('OUTDATED_DOCUMENTS_SEARCH_READ -> FATAL if left es_response_too_large and batchSize already 1', () => { + const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.left({ + type: 'es_response_too_large', + contentLength: 2345, + }); + const newState = model({ ...state, batchSize: 1 }, res) as FatalState; + expect(newState.controlState).toBe('FATAL'); + expect(newState.batchSize).toBe(1); // don't halve the batch size or go below 1 + expect(newState.maxBatchSize).toBe(1000); // leaves maxBatchSize unchanged + expect(newState.reason).toMatchInlineSnapshot( + `"After reducing the read batch size to a single document, the response content length was 2345 bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again."` + ); + }); + it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT if no outdated documents to transform', () => { const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.right({ outdatedDocuments: [], diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts index e64f43f053d81..fdeb2d346c2d6 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts @@ -880,19 +880,27 @@ export const model = (currentState: State, resW: ResponseType): } else { const left = res.left; if (isTypeof(left, 'es_response_too_large')) { - const batchSize = Math.floor(stateP.batchSize / 2); - return { - ...stateP, - batchSize, - controlState: 'REINDEX_SOURCE_TO_TEMP_READ', - logs: [ - ...stateP.logs, - { - level: 'warning', - message: `Read a batch that exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to ${batchSize}.`, - }, - ], - }; + if (stateP.batchSize === 1) { + return { + ...stateP, + controlState: 'FATAL', + reason: `After reducing the read batch size to a single document, the Elasticsearch response content length was ${left.contentLength}bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again.`, + }; + } else { + const batchSize = Math.max(Math.floor(stateP.batchSize / 2), 1); + return { + ...stateP, + batchSize, + controlState: 'REINDEX_SOURCE_TO_TEMP_READ', + logs: [ + ...stateP.logs, + { + level: 'warning', + message: `Read a batch with a response content length of ${left.contentLength} bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to ${batchSize}.`, + }, + ], + }; + } } else { throwBadResponse(stateP, left); } @@ -1203,19 +1211,27 @@ export const model = (currentState: State, resW: ResponseType): } else { const left = res.left; if (isTypeof(left, 'es_response_too_large')) { - const batchSize = Math.floor(stateP.batchSize / 2); - return { - ...stateP, - batchSize, - controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ', - logs: [ - ...stateP.logs, - { - level: 'warning', - message: `Read a batch that exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to ${batchSize}.`, - }, - ], - }; + if (stateP.batchSize === 1) { + return { + ...stateP, + controlState: 'FATAL', + reason: `After reducing the read batch size to a single document, the response content length was ${left.contentLength} bytes which still exceeded migrations.maxReadBatchSizeBytes. Increase migrations.maxReadBatchSizeBytes and try again.`, + }; + } else { + const batchSize = Math.max(Math.floor(stateP.batchSize / 2), 1); + return { + ...stateP, + batchSize, + controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ', + logs: [ + ...stateP.logs, + { + level: 'warning', + message: `Read a batch with a response content length of ${left.contentLength} bytes which exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to ${batchSize}.`, + }, + ], + }; + } } else { throwBadResponse(stateP, left); } diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts index af02c9996d856..1b5a9fe99fe3a 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts @@ -260,6 +260,7 @@ export const nextActionMap = ( reindexScript: state.preMigrationScript, requireAlias: false, excludeOnUpgradeQuery: state.excludeOnUpgradeQuery, + batchSize: state.batchSize, }), LEGACY_REINDEX_WAIT_FOR_TASK: (state: LegacyReindexWaitForTaskState) => Actions.waitForReindexTask({ client, taskId: state.legacyReindexTaskId, timeout: '60s' }), diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts index 06c79d46452a0..8a6be0269947e 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts @@ -82,10 +82,10 @@ export interface BaseState extends ControlState { * When writing batches, we limit the number of documents in a batch * (batchSize) as well as the size of the batch in bytes (maxBatchSizeBytes). */ - readonly defaultBatchSize: number; + readonly maxBatchSize: number; /** * The number of documents to process in each batch. Under most circumstances - * batchSize == defaultBatchSize. But if we fail to read a batch because of a + * batchSize == maxBatchSize. But if we fail to read a batch because of a * Nodejs `RangeError` we'll temporarily half `batchSize` and retry. */ readonly batchSize: number; diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts index 588904929cdd4..38d4075f9516c 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts @@ -715,6 +715,7 @@ describe('migration actions', () => { reindexScript: Option.none, requireAlias: false, excludeOnUpgradeQuery: { match_all: {} }, + batchSize: 1000, })()) as Either.Right; const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` @@ -751,6 +752,7 @@ describe('migration actions', () => { })), }, }, + batchSize: 1000, })()) as Either.Right; const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` @@ -780,6 +782,7 @@ describe('migration actions', () => { reindexScript: Option.some(`ctx._source.title = ctx._source.title + '_updated'`), requireAlias: false, excludeOnUpgradeQuery: { match_all: {} }, + batchSize: 1000, })()) as Either.Right; const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` @@ -812,6 +815,7 @@ describe('migration actions', () => { reindexScript: Option.some(`ctx._source.title = ctx._source.title + '_updated'`), requireAlias: false, excludeOnUpgradeQuery: { match_all: {} }, + batchSize: 1000, })()) as Either.Right; let task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` @@ -829,6 +833,7 @@ describe('migration actions', () => { reindexScript: Option.none, requireAlias: false, excludeOnUpgradeQuery: { match_all: {} }, + batchSize: 1000, })()) as Either.Right; task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` @@ -879,6 +884,7 @@ describe('migration actions', () => { reindexScript: Option.some(`ctx._source.title = ctx._source.title + '_updated'`), requireAlias: false, excludeOnUpgradeQuery: { match_all: {} }, + batchSize: 1000, })()) as Either.Right; const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` @@ -931,6 +937,7 @@ describe('migration actions', () => { reindexScript: Option.none, requireAlias: false, excludeOnUpgradeQuery: { match_all: {} }, + batchSize: 1000, })()) as Either.Right; const task = waitForReindexTask({ client, taskId: reindexTaskId, timeout: '10s' }); @@ -970,6 +977,7 @@ describe('migration actions', () => { reindexScript: Option.none, requireAlias: false, excludeOnUpgradeQuery: { match_all: {} }, + batchSize: 1000, })()) as Either.Right; const task = waitForReindexTask({ client, taskId: reindexTaskId, timeout: '10s' }); @@ -993,6 +1001,7 @@ describe('migration actions', () => { excludeOnUpgradeQuery: { match_all: {}, }, + batchSize: 1000, })()) as Either.Right; const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` @@ -1014,6 +1023,7 @@ describe('migration actions', () => { reindexScript: Option.none, requireAlias: false, excludeOnUpgradeQuery: { match_all: {} }, + batchSize: 1000, })()) as Either.Right; const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); @@ -1036,6 +1046,7 @@ describe('migration actions', () => { reindexScript: Option.none, requireAlias: true, excludeOnUpgradeQuery: { match_all: {} }, + batchSize: 1000, })()) as Either.Right; const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); @@ -1064,6 +1075,7 @@ describe('migration actions', () => { reindexScript: Option.none, requireAlias: false, excludeOnUpgradeQuery: { match_all: {} }, + batchSize: 1000, })()) as Either.Right; const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '0s' }); @@ -1278,7 +1290,8 @@ describe('migration actions', () => { }); const leftResponse = (await readWithPitTask()) as Either.Left; - await expect(leftResponse.left.type).toBe('es_response_too_large'); + expect(leftResponse.left.type).toBe('es_response_too_large'); + expect(leftResponse.left.contentLength).toBe(3184); }); it('rejects if PIT does not exist', async () => { From c3c272f3ddccdbb46fe80427c7f3b247c7e56e69 Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Wed, 24 May 2023 22:48:12 +0200 Subject: [PATCH 13/15] Review feedback: increase coverage of recovering up to maxBatchSize on success --- .../src/model/model.test.ts | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts index 236be2a7a791c..892ca75aea2f0 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts @@ -1853,10 +1853,17 @@ describe('migrations v2 model', () => { outdatedDocuments, lastHitSortValue, totalHits: 1, + processedDocs: 1, }); - const newState = model({ ...state, batchSize: 500 }, res) as ReindexSourceToTempTransform; - expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_TRANSFORM'); + let newState = model({ ...state, batchSize: 500 }, res) as ReindexSourceToTempTransform; expect(newState.batchSize).toBe(600); + newState = model({ ...state, batchSize: 600 }, res) as ReindexSourceToTempTransform; + expect(newState.batchSize).toBe(720); + newState = model({ ...state, batchSize: 720 }, res) as ReindexSourceToTempTransform; + expect(newState.batchSize).toBe(864); + newState = model({ ...state, batchSize: 864 }, res) as ReindexSourceToTempTransform; + expect(newState.batchSize).toBe(1000); // + 20% would have been 1036 + expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_TRANSFORM'); expect(newState.maxBatchSize).toBe(1000); }); @@ -2421,17 +2428,24 @@ describe('migrations v2 model', () => { `); }); - it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_TRANSFORM increases batchSize if < maxBatchSize', () => { + it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_TRANSFORM increases batchSize up to maxBatchSize', () => { const outdatedDocuments = [{ _id: '1', _source: { type: 'vis' } }]; const lastHitSortValue = [123456]; const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.right({ outdatedDocuments, lastHitSortValue, totalHits: 1, + processedDocs: [], }); - const newState = model({ ...state, batchSize: 500 }, res) as ReindexSourceToTempTransform; - expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_TRANSFORM'); + let newState = model({ ...state, batchSize: 500 }, res) as ReindexSourceToTempTransform; expect(newState.batchSize).toBe(600); + newState = model({ ...state, batchSize: 600 }, res) as ReindexSourceToTempTransform; + expect(newState.batchSize).toBe(720); + newState = model({ ...state, batchSize: 720 }, res) as ReindexSourceToTempTransform; + expect(newState.batchSize).toBe(864); + newState = model({ ...state, batchSize: 864 }, res) as ReindexSourceToTempTransform; + expect(newState.batchSize).toBe(1000); // + 20% would have been 1036 + expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_TRANSFORM'); expect(newState.maxBatchSize).toBe(1000); }); From 9a41d950bc5ea2c11dd25f5ba8699810119905d8 Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Thu, 25 May 2023 22:06:01 +0200 Subject: [PATCH 14/15] Review: why match when you can test --- .../src/actions/read_with_pit.ts | 2 +- .../src/zdt/test_helpers/context.ts | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts index f781e63c437c3..91652f2175ff7 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/read_with_pit.ts @@ -106,7 +106,7 @@ export const readWithPit = .catch((e) => { if ( e instanceof EsErrors.RequestAbortedError && - e.message.match(/The content length \(\d+\) is bigger than the maximum/) != null + /The content length \(\d+\) is bigger than the maximum/.test(e.message) ) { return Either.left({ type: 'es_response_too_large' as const, diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/test_helpers/context.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/test_helpers/context.ts index 89aa1a13fa928..69722cb9652e8 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/test_helpers/context.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/test_helpers/context.ts @@ -31,6 +31,7 @@ export const createMigrationConfigMock = ( algorithm: 'zdt', batchSize: 1000, maxBatchSizeBytes: new ByteSizeValue(1e8), + maxReadBatchSizeBytes: new ByteSizeValue(1e6), pollInterval: 0, scrollDuration: '0s', skip: false, From 0efb67a63c859d86636bc15a36301db6ddc69ae5 Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Tue, 30 May 2023 13:05:55 +0200 Subject: [PATCH 15/15] Fix outdated integration test --- .../migrations/group3/read_batch_size.test.ts | 33 ++++--------------- 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/read_batch_size.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/read_batch_size.test.ts index b2afe46ebfd8a..0fce643975c53 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/read_batch_size.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/read_batch_size.test.ts @@ -11,10 +11,10 @@ import fs from 'fs/promises'; import { Root } from '@kbn/core-root-server-internal'; import { createRootWithCorePlugins, - createTestServers, type TestElasticsearchUtils, } from '@kbn/core-test-helpers-kbn-server'; import { delay } from '../test_utils'; +import { startElasticsearch } from '../kibana_migrator_test_kit'; const logFilePath = Path.join(__dirname, 'read_batch_size.log'); @@ -24,6 +24,9 @@ describe('migration v2 - read batch size', () => { let logs: string; beforeEach(async () => { + esServer = await startElasticsearch({ + dataArchive: Path.join(__dirname, '..', 'archives', '8.4.0_with_sample_data_logs.zip'), + }); await fs.unlink(logFilePath).catch(() => {}); }); @@ -34,18 +37,7 @@ describe('migration v2 - read batch size', () => { }); it('reduces the read batchSize in half if a batch exceeds maxReadBatchSizeBytes', async () => { - const { startES } = createTestServers({ - adjustTimeout: (t: number) => jest.setTimeout(t), - settings: { - es: { - license: 'basic', - dataArchive: Path.join(__dirname, '..', 'archives', '8.4.0_with_sample_data_logs.zip'), - }, - }, - }); - root = createRoot({ maxReadBatchSizeBytes: 15000 }); - esServer = await startES(); await root.preboot(); await root.setup(); await root.start(); @@ -54,24 +46,13 @@ describe('migration v2 - read batch size', () => { logs = await fs.readFile(logFilePath, 'utf-8'); expect(logs).toMatch( - 'Read a batch that exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 15' + /Read a batch with a response content length of \d+ bytes which exceeds migrations\.maxReadBatchSizeBytes, retrying by reducing the batch size in half to 15/ ); expect(logs).toMatch('[.kibana] Migration completed'); }); it('does not reduce the read batchSize in half if no batches exceeded maxReadBatchSizeBytes', async () => { - const { startES } = createTestServers({ - adjustTimeout: (t: number) => jest.setTimeout(t), - settings: { - es: { - license: 'basic', - dataArchive: Path.join(__dirname, '..', 'archives', '8.4.0_with_sample_data_logs.zip'), - }, - }, - }); - root = createRoot({ maxReadBatchSizeBytes: 50000 }); - esServer = await startES(); await root.preboot(); await root.setup(); await root.start(); @@ -79,9 +60,7 @@ describe('migration v2 - read batch size', () => { // Check for migration steps present in the logs logs = await fs.readFile(logFilePath, 'utf-8'); - expect(logs).not.toMatch( - 'Read a batch that exceeds migrations.maxReadBatchSizeBytes, retrying by reducing the batch size in half to .' - ); + expect(logs).not.toMatch('retrying by reducing the batch size in half to'); expect(logs).toMatch('[.kibana] Migration completed'); }); });