From 157129e5533d8220b90231fb4c593ac614521c46 Mon Sep 17 00:00:00 2001 From: Mikhail Shustov Date: Tue, 6 Apr 2021 14:52:36 +0200 Subject: [PATCH] migration v2 respects the config.batchSize value (#96207) * migrationsv2: read batchSize from config * update tests * update numeric values in so config to improve reading * fix integration tests. failed with illegal_argument_exception --- .../migrationsv2/actions/index.test.ts | 30 ++++- .../migrationsv2/actions/index.ts | 30 +++-- .../integration_tests/actions.test.ts | 123 +++++++++--------- .../migrations_state_action_machine.test.ts | 4 + .../saved_objects/migrationsv2/model.test.ts | 3 + .../saved_objects/migrationsv2/model.ts | 1 + .../server/saved_objects/migrationsv2/next.ts | 6 +- .../saved_objects/migrationsv2/types.ts | 15 +++ .../saved_objects/saved_objects_config.ts | 4 +- 9 files changed, 140 insertions(+), 76 deletions(-) diff --git a/src/core/server/saved_objects/migrationsv2/actions/index.test.ts b/src/core/server/saved_objects/migrationsv2/actions/index.test.ts index a21078cbe1135..14ca73e7fcca0 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/index.test.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/index.test.ts @@ -163,7 +163,12 @@ describe('actions', () => { describe('searchForOutdatedDocuments', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.searchForOutdatedDocuments(client, 'new_index', { properties: {} }); + const task = Actions.searchForOutdatedDocuments(client, { + batchSize: 1000, + targetIndex: 'new_index', + outdatedDocumentsQuery: {}, + }); + try { await task(); } catch (e) { @@ -172,6 +177,29 @@ describe('actions', () => { expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); }); + + it('configures request according to given parameters', async () => { + const esClient = elasticsearchClientMock.createInternalClient(); + const query = {}; + const targetIndex = 'new_index'; + const batchSize = 1000; + const task = Actions.searchForOutdatedDocuments(esClient, { + batchSize, + targetIndex, + outdatedDocumentsQuery: query, + }); + + await task(); + + expect(esClient.search).toHaveBeenCalledTimes(1); + expect(esClient.search).toHaveBeenCalledWith( + expect.objectContaining({ + index: targetIndex, + size: batchSize, + body: expect.objectContaining({ query }), + }) + ); + }); }); describe('bulkOverwriteTransformedDocuments', () => { diff --git a/src/core/server/saved_objects/migrationsv2/actions/index.ts b/src/core/server/saved_objects/migrationsv2/actions/index.ts index 52fa99b724873..8ac683a29d657 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/index.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/index.ts @@ -9,11 +9,11 @@ import * as Either from 'fp-ts/lib/Either'; import * as TaskEither from 'fp-ts/lib/TaskEither'; import * as Option from 'fp-ts/lib/Option'; -import { ElasticsearchClientError, ResponseError } from '@elastic/elasticsearch/lib/errors'; -import { pipe } from 'fp-ts/lib/pipeable'; +import type { estypes } from '@elastic/elasticsearch'; import { errors as EsErrors } from '@elastic/elasticsearch'; +import type { ElasticsearchClientError, ResponseError } from '@elastic/elasticsearch/lib/errors'; +import { pipe } from 'fp-ts/lib/pipeable'; import { flow } from 'fp-ts/lib/function'; -import type { estypes } from '@elastic/elasticsearch'; import { ElasticsearchClient } from '../../../elasticsearch'; import { IndexMapping } from '../../mappings'; import { SavedObjectsRawDoc, SavedObjectsRawDocSource } from '../../serialization'; @@ -24,13 +24,10 @@ import { export type { RetryableEsClientError }; /** - * Batch size for updateByQuery, reindex & search operations. Smaller batches - * reduce the memory pressure on Elasticsearch and Kibana so are less likely - * to cause failures. - * TODO (profile/tune): How much smaller can we make this number before it - * starts impacting how long migrations take to perform? + * Batch size for updateByQuery and reindex operations. + * Uses the default value of 1000 for Elasticsearch reindex operation. */ -const BATCH_SIZE = 1000; +const BATCH_SIZE = 1_000; const DEFAULT_TIMEOUT = '60s'; /** Allocate 1 replica if there are enough data nodes, otherwise continue with 0 */ const INDEX_AUTO_EXPAND_REPLICAS = '0-1'; @@ -839,6 +836,12 @@ export interface SearchResponse { outdatedDocuments: SavedObjectsRawDoc[]; } +interface SearchForOutdatedDocumentsOptions { + batchSize: number; + targetIndex: string; + outdatedDocumentsQuery?: estypes.QueryContainer; +} + /** * Search for outdated saved object documents with the provided query. Will * return one batch of documents. Searching should be repeated until no more @@ -846,18 +849,17 @@ export interface SearchResponse { */ export const searchForOutdatedDocuments = ( client: ElasticsearchClient, - index: string, - query: Record + options: SearchForOutdatedDocumentsOptions ): TaskEither.TaskEither => () => { return client .search({ - index, + index: options.targetIndex, // Return the _seq_no and _primary_term so we can use optimistic // concurrency control for updates seq_no_primary_term: true, - size: BATCH_SIZE, + size: options.batchSize, body: { - query, + query: options.outdatedDocumentsQuery, // Optimize search performance by sorting by the "natural" index order sort: ['_doc'], }, diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts index 1824efa0ed8d4..aa9a5ea92ac11 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts @@ -59,7 +59,7 @@ describe('migration actions', () => { // Create test fixture data: await createIndex(client, 'existing_index_with_docs', { - dynamic: true as any, + dynamic: true, properties: {}, })(); const sourceDocs = ([ @@ -337,7 +337,6 @@ describe('migration actions', () => { // Reindex doesn't return any errors on it's own, so we have to test // together with waitForReindexTask describe('reindex & waitForReindexTask', () => { - expect.assertions(2); it('resolves right when reindex succeeds without reindex script', async () => { const res = (await reindex( client, @@ -354,11 +353,11 @@ describe('migration actions', () => { } `); - const results = ((await searchForOutdatedDocuments( - client, - 'reindex_target', - undefined as any - )()) as Either.Right).right.outdatedDocuments; + const results = ((await searchForOutdatedDocuments(client, { + batchSize: 1000, + targetIndex: 'reindex_target', + outdatedDocumentsQuery: undefined, + })()) as Either.Right).right.outdatedDocuments; expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(` Array [ "doc 1", @@ -384,11 +383,11 @@ describe('migration actions', () => { "right": "reindex_succeeded", } `); - const results = ((await searchForOutdatedDocuments( - client, - 'reindex_target_2', - undefined as any - )()) as Either.Right).right.outdatedDocuments; + const results = ((await searchForOutdatedDocuments(client, { + batchSize: 1000, + targetIndex: 'reindex_target_2', + outdatedDocumentsQuery: undefined, + })()) as Either.Right).right.outdatedDocuments; expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(` Array [ "doc 1_updated", @@ -432,12 +431,12 @@ describe('migration actions', () => { } `); - // Assert that documents weren't overrided by the second, unscripted reindex - const results = ((await searchForOutdatedDocuments( - client, - 'reindex_target_3', - undefined as any - )()) as Either.Right).right.outdatedDocuments; + // Assert that documents weren't overridden by the second, unscripted reindex + const results = ((await searchForOutdatedDocuments(client, { + batchSize: 1000, + targetIndex: 'reindex_target_3', + outdatedDocumentsQuery: undefined, + })()) as Either.Right).right.outdatedDocuments; expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(` Array [ "doc 1_updated", @@ -452,11 +451,11 @@ describe('migration actions', () => { // Simulate a reindex that only adds some of the documents from the // source index into the target index await createIndex(client, 'reindex_target_4', { properties: {} })(); - const sourceDocs = ((await searchForOutdatedDocuments( - client, - 'existing_index_with_docs', - undefined as any - )()) as Either.Right).right.outdatedDocuments + const sourceDocs = ((await searchForOutdatedDocuments(client, { + batchSize: 1000, + targetIndex: 'existing_index_with_docs', + outdatedDocumentsQuery: undefined, + })()) as Either.Right).right.outdatedDocuments .slice(0, 2) .map(({ _id, _source }) => ({ _id, @@ -479,13 +478,13 @@ describe('migration actions', () => { "right": "reindex_succeeded", } `); - // Assert that existing documents weren't overrided, but that missing + // Assert that existing documents weren't overridden, but that missing // documents were added by the reindex - const results = ((await searchForOutdatedDocuments( - client, - 'reindex_target_4', - undefined as any - )()) as Either.Right).right.outdatedDocuments; + const results = ((await searchForOutdatedDocuments(client, { + batchSize: 1000, + targetIndex: 'reindex_target_4', + outdatedDocumentsQuery: undefined, + })()) as Either.Right).right.outdatedDocuments; expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(` Array [ "doc 1", @@ -701,26 +700,30 @@ describe('migration actions', () => { describe('searchForOutdatedDocuments', () => { it('only returns documents that match the outdatedDocumentsQuery', async () => { expect.assertions(2); - const resultsWithQuery = ((await searchForOutdatedDocuments( - client, - 'existing_index_with_docs', - { + const resultsWithQuery = ((await searchForOutdatedDocuments(client, { + batchSize: 1000, + targetIndex: 'existing_index_with_docs', + outdatedDocumentsQuery: { match: { title: { query: 'doc' } }, - } - )()) as Either.Right).right.outdatedDocuments; + }, + })()) as Either.Right).right.outdatedDocuments; expect(resultsWithQuery.length).toBe(3); - const resultsWithoutQuery = ((await searchForOutdatedDocuments( - client, - 'existing_index_with_docs', - undefined as any - )()) as Either.Right).right.outdatedDocuments; + const resultsWithoutQuery = ((await searchForOutdatedDocuments(client, { + batchSize: 1000, + targetIndex: 'existing_index_with_docs', + outdatedDocumentsQuery: undefined, + })()) as Either.Right).right.outdatedDocuments; expect(resultsWithoutQuery.length).toBe(4); }); it('resolves with _id, _source, _seq_no and _primary_term', async () => { expect.assertions(1); - const results = ((await searchForOutdatedDocuments(client, 'existing_index_with_docs', { - match: { title: { query: 'doc' } }, + const results = ((await searchForOutdatedDocuments(client, { + batchSize: 1000, + targetIndex: 'existing_index_with_docs', + outdatedDocumentsQuery: { + match: { title: { query: 'doc' } }, + }, })()) as Either.Right).right.outdatedDocuments; expect(results).toEqual( expect.arrayContaining([ @@ -805,7 +808,7 @@ describe('migration actions', () => { it('resolves right when mappings were updated and picked up', async () => { // Create an index without any mappings and insert documents into it await createIndex(client, 'existing_index_without_mappings', { - dynamic: false as any, + dynamic: false, properties: {}, })(); const sourceDocs = ([ @@ -821,11 +824,13 @@ describe('migration actions', () => { )(); // Assert that we can't search over the unmapped fields of the document - const originalSearchResults = ((await searchForOutdatedDocuments( - client, - 'existing_index_without_mappings', - { match: { title: { query: 'doc' } } } - )()) as Either.Right).right.outdatedDocuments; + const originalSearchResults = ((await searchForOutdatedDocuments(client, { + batchSize: 1000, + targetIndex: 'existing_index_without_mappings', + outdatedDocumentsQuery: { + match: { title: { query: 'doc' } }, + }, + })()) as Either.Right).right.outdatedDocuments; expect(originalSearchResults.length).toBe(0); // Update and pickup mappings so that the title field is searchable @@ -839,11 +844,13 @@ describe('migration actions', () => { await waitForPickupUpdatedMappingsTask(client, taskId, '60s')(); // Repeat the search expecting to be able to find the existing documents - const pickedUpSearchResults = ((await searchForOutdatedDocuments( - client, - 'existing_index_without_mappings', - { match: { title: { query: 'doc' } } } - )()) as Either.Right).right.outdatedDocuments; + const pickedUpSearchResults = ((await searchForOutdatedDocuments(client, { + batchSize: 1000, + targetIndex: 'existing_index_without_mappings', + outdatedDocumentsQuery: { + match: { title: { query: 'doc' } }, + }, + })()) as Either.Right).right.outdatedDocuments; expect(pickedUpSearchResults.length).toBe(4); }); }); @@ -1050,11 +1057,11 @@ describe('migration actions', () => { `); }); it('resolves right even if there were some version_conflict_engine_exception', async () => { - const existingDocs = ((await searchForOutdatedDocuments( - client, - 'existing_index_with_docs', - undefined as any - )()) as Either.Right).right.outdatedDocuments; + const existingDocs = ((await searchForOutdatedDocuments(client, { + batchSize: 1000, + targetIndex: 'existing_index_with_docs', + outdatedDocumentsQuery: undefined, + })()) as Either.Right).right.outdatedDocuments; const task = bulkOverwriteTransformedDocuments(client, 'existing_index_with_docs', [ ...existingDocs, diff --git a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts index 99c06c0a3586b..d4ce7b74baa5f 100644 --- a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts +++ b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts @@ -206,6 +206,7 @@ describe('migrationsStateActionMachine', () => { Array [ "[.my-so-index] INIT -> LEGACY_DELETE", Object { + "batchSize": 1000, "controlState": "LEGACY_DELETE", "currentAlias": ".my-so-index", "indexPrefix": ".my-so-index", @@ -262,6 +263,7 @@ describe('migrationsStateActionMachine', () => { Array [ "[.my-so-index] LEGACY_DELETE -> FATAL", Object { + "batchSize": 1000, "controlState": "FATAL", "currentAlias": ".my-so-index", "indexPrefix": ".my-so-index", @@ -413,6 +415,7 @@ describe('migrationsStateActionMachine', () => { Array [ "[.my-so-index] INIT -> LEGACY_REINDEX", Object { + "batchSize": 1000, "controlState": "LEGACY_REINDEX", "currentAlias": ".my-so-index", "indexPrefix": ".my-so-index", @@ -464,6 +467,7 @@ describe('migrationsStateActionMachine', () => { Array [ "[.my-so-index] LEGACY_REINDEX -> LEGACY_DELETE", Object { + "batchSize": 1000, "controlState": "LEGACY_DELETE", "currentAlias": ".my-so-index", "indexPrefix": ".my-so-index", diff --git a/src/core/server/saved_objects/migrationsv2/model.test.ts b/src/core/server/saved_objects/migrationsv2/model.test.ts index 2813f01093e95..f9bf3418c0ab6 100644 --- a/src/core/server/saved_objects/migrationsv2/model.test.ts +++ b/src/core/server/saved_objects/migrationsv2/model.test.ts @@ -46,6 +46,7 @@ describe('migrations v2 model', () => { retryCount: 0, retryDelay: 0, retryAttempts: 15, + batchSize: 1000, indexPrefix: '.kibana', outdatedDocumentsQuery: {}, targetIndexMappings: { @@ -1182,6 +1183,7 @@ describe('migrations v2 model', () => { describe('createInitialState', () => { const migrationsConfig = ({ retryAttempts: 15, + batchSize: 1000, } as unknown) as SavedObjectsMigrationConfigType; it('creates the initial state for the model based on the passed in paramaters', () => { expect( @@ -1197,6 +1199,7 @@ describe('migrations v2 model', () => { }) ).toMatchInlineSnapshot(` Object { + "batchSize": 1000, "controlState": "INIT", "currentAlias": ".kibana_task_manager", "indexPrefix": ".kibana_task_manager", diff --git a/src/core/server/saved_objects/migrationsv2/model.ts b/src/core/server/saved_objects/migrationsv2/model.ts index 5bdba98026792..e62bd108faea0 100644 --- a/src/core/server/saved_objects/migrationsv2/model.ts +++ b/src/core/server/saved_objects/migrationsv2/model.ts @@ -784,6 +784,7 @@ export const createInitialState = ({ retryCount: 0, retryDelay: 0, retryAttempts: migrationsConfig.retryAttempts, + batchSize: migrationsConfig.batchSize, logs: [], }; return initialState; diff --git a/src/core/server/saved_objects/migrationsv2/next.ts b/src/core/server/saved_objects/migrationsv2/next.ts index 1b594cf3d8b53..5c159f4f24e22 100644 --- a/src/core/server/saved_objects/migrationsv2/next.ts +++ b/src/core/server/saved_objects/migrationsv2/next.ts @@ -73,7 +73,11 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra UPDATE_TARGET_MAPPINGS_WAIT_FOR_TASK: (state: UpdateTargetMappingsWaitForTaskState) => Actions.waitForPickupUpdatedMappingsTask(client, state.updateTargetMappingsTaskId, '60s'), OUTDATED_DOCUMENTS_SEARCH: (state: OutdatedDocumentsSearch) => - Actions.searchForOutdatedDocuments(client, state.targetIndex, state.outdatedDocumentsQuery), + Actions.searchForOutdatedDocuments(client, { + batchSize: state.batchSize, + targetIndex: state.targetIndex, + outdatedDocumentsQuery: state.outdatedDocumentsQuery, + }), OUTDATED_DOCUMENTS_TRANSFORM: (state: OutdatedDocumentsTransform) => pipe( TaskEither.tryCatch( diff --git a/src/core/server/saved_objects/migrationsv2/types.ts b/src/core/server/saved_objects/migrationsv2/types.ts index dbdd5774dfa62..8d6fe3f030eb3 100644 --- a/src/core/server/saved_objects/migrationsv2/types.ts +++ b/src/core/server/saved_objects/migrationsv2/types.ts @@ -54,6 +54,21 @@ export interface BaseState extends ControlState { * max_retry_time = 11.7 minutes */ readonly retryAttempts: number; + + /** + * The number of documents to fetch from Elasticsearch server to run migration over. + * + * The higher the value, the faster the migration process will be performed since it reduces + * the number of round trips between Kibana and Elasticsearch servers. + * For the migration speed, we have to pay the price of increased memory consumption. + * + * Since batchSize defines the number of documents, not their size, it might happen that + * Elasticsearch fails a request with circuit_breaking_exception when it retrieves a set of + * saved objects of significant size. + * + * In this case, you should set a smaller batchSize value and restart the migration process again. + */ + readonly batchSize: number; readonly logs: Array<{ level: 'error' | 'info'; message: string }>; /** * The current alias e.g. `.kibana` which always points to the latest diff --git a/src/core/server/saved_objects/saved_objects_config.ts b/src/core/server/saved_objects/saved_objects_config.ts index 7228cb126d286..96fac85ded076 100644 --- a/src/core/server/saved_objects/saved_objects_config.ts +++ b/src/core/server/saved_objects/saved_objects_config.ts @@ -29,8 +29,8 @@ export type SavedObjectsConfigType = TypeOf; export const savedObjectsConfig = { path: 'savedObjects', schema: schema.object({ - maxImportPayloadBytes: schema.byteSize({ defaultValue: 26214400 }), - maxImportExportSize: schema.number({ defaultValue: 10000 }), + maxImportPayloadBytes: schema.byteSize({ defaultValue: 26_214_400 }), + maxImportExportSize: schema.number({ defaultValue: 10_000 }), }), };