diff --git a/docs/setup/settings.asciidoc b/docs/setup/settings.asciidoc index f2aa7fe863b83..e8d0b4b593c3f 100644 --- a/docs/setup/settings.asciidoc +++ b/docs/setup/settings.asciidoc @@ -488,7 +488,10 @@ override this parameter to use their own Tile Map Service. For example: `"https://tiles.elastic.co/v2/default/{z}/{x}/{y}.png?elastic_tile_service_tos=agree&my_app_name=kibana"` | `migrations.batchSize:` - | Defines the number of documents migrated at a time. The higher the value, the faster the Saved Objects migration process performs at the cost of higher memory consumption. If the migration fails due to a `circuit_breaking_exception`, set a smaller `batchSize` value. *Default: `1000`* + | Defines the number of documents migrated at a time. The higher the value, the faster the Saved Objects migration process performs at the cost of higher memory consumption. If upgrade migrations results in {kib} crashing with an out of memory exception or fails due to an Elasticsearch `circuit_breaking_exception`, use a smaller `batchSize` value to reduce the memory pressure. *Default: `1000`* + + | `migrations.maxBatchSizeBytes:` + | Defines the maximum payload size for indexing batches of upgraded saved objects to avoid migrations failing due to a 413 Request Entity Too Large response from Elasticsearch. This value should be lower than or equal to your Elasticsearch cluster's `http.max_content_length` configuration option. *Default: `100mb`* | `migrations.enableV2:` | experimental[]. Enables the new Saved Objects migration algorithm. For information about the migration algorithm, refer to <>. When `migrations v2` is stable, the setting will be removed in an upcoming release without any further notice. Setting the value to `false` causes {kib} to use the legacy migration algorithm, which shipped in 7.11 and earlier versions. *Default: `true`* diff --git a/src/core/server/saved_objects/migrations/kibana/kibana_migrator.mock.ts b/src/core/server/saved_objects/migrations/kibana/kibana_migrator.mock.ts index 530203e659086..9471bbc1b87a6 100644 --- a/src/core/server/saved_objects/migrations/kibana/kibana_migrator.mock.ts +++ b/src/core/server/saved_objects/migrations/kibana/kibana_migrator.mock.ts @@ -11,6 +11,7 @@ import { buildActiveMappings } from '../core'; const { mergeTypes } = jest.requireActual('./kibana_migrator'); import { SavedObjectsType } from '../../types'; import { BehaviorSubject } from 'rxjs'; +import { ByteSizeValue } from '@kbn/config-schema'; const defaultSavedObjectTypes: SavedObjectsType[] = [ { @@ -37,6 +38,7 @@ const createMigrator = ( kibanaVersion: '8.0.0-testing', soMigrationsConfig: { batchSize: 100, + maxBatchSizeBytes: ByteSizeValue.parse('30kb'), scrollDuration: '15m', pollInterval: 1500, skip: false, diff --git a/src/core/server/saved_objects/migrations/kibana/kibana_migrator.test.ts b/src/core/server/saved_objects/migrations/kibana/kibana_migrator.test.ts index d0cc52f2dd9bd..6e10349f4b57c 100644 --- a/src/core/server/saved_objects/migrations/kibana/kibana_migrator.test.ts +++ b/src/core/server/saved_objects/migrations/kibana/kibana_migrator.test.ts @@ -15,6 +15,7 @@ import { loggingSystemMock } from '../../../logging/logging_system.mock'; import { SavedObjectTypeRegistry } from '../../saved_objects_type_registry'; import { SavedObjectsType } from '../../types'; import { DocumentMigrator } from '../core/document_migrator'; +import { ByteSizeValue } from '@kbn/config-schema'; jest.mock('../core/document_migrator', () => { return { // Create a mock for spying on the constructor @@ -396,6 +397,7 @@ const mockOptions = ({ enableV2 }: { enableV2: boolean } = { enableV2: false }) } as KibanaMigratorOptions['kibanaConfig'], soMigrationsConfig: { batchSize: 20, + maxBatchSizeBytes: ByteSizeValue.parse('20mb'), pollInterval: 20000, scrollDuration: '10m', skip: false, diff --git a/src/core/server/saved_objects/migrationsv2/README.md b/src/core/server/saved_objects/migrationsv2/README.md index 5bdc548987842..5121e66052f40 100644 --- a/src/core/server/saved_objects/migrationsv2/README.md +++ b/src/core/server/saved_objects/migrationsv2/README.md @@ -316,7 +316,10 @@ completed this step: - temp index has a write block - temp index is not found ### New control state +1. If `currentBatch` is the last batch in `transformedDocBatches` → `REINDEX_SOURCE_TO_TEMP_READ` +2. If there are more batches left in `transformedDocBatches` + → `REINDEX_SOURCE_TO_TEMP_INDEX_BULK` ## REINDEX_SOURCE_TO_TEMP_CLOSE_PIT ### Next action diff --git a/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts b/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts index 4217ca599297a..82f642b928058 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts @@ -23,6 +23,27 @@ import type { IndexNotFound, } from './index'; +/** + * Given a document and index, creates a valid body for the Bulk API. + */ +export const createBulkOperationBody = (doc: SavedObjectsRawDoc, index: string) => { + return [ + { + index: { + _index: index, + _id: doc._id, + // overwrite existing documents + op_type: 'index', + // use optimistic concurrency control to ensure that outdated + // documents are only overwritten once with the latest version + if_seq_no: doc._seq_no, + if_primary_term: doc._primary_term, + }, + }, + doc._source, + ]; +}; + /** @internal */ export interface BulkOverwriteTransformedDocumentsParams { client: ElasticsearchClient; @@ -47,6 +68,10 @@ export const bulkOverwriteTransformedDocuments = ({ | RequestEntityTooLargeException, 'bulk_index_succeeded' > => () => { + const body = transformedDocs.flatMap((doc) => { + return createBulkOperationBody(doc, index); + }); + return client .bulk({ // Because we only add aliases in the MARK_VERSION_INDEX_READY step we @@ -60,23 +85,7 @@ export const bulkOverwriteTransformedDocuments = ({ wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, refresh, filter_path: ['items.*.error'], - body: transformedDocs.flatMap((doc) => { - return [ - { - index: { - _index: index, - _id: doc._id, - // overwrite existing documents - op_type: 'index', - // use optimistic concurrency control to ensure that outdated - // documents are only overwritten once with the latest version - if_seq_no: doc._seq_no, - if_primary_term: doc._primary_term, - }, - }, - doc._source, - ]; - }), + body, }) .then((res) => { // Filter out version_conflict_engine_exception since these just mean diff --git a/src/core/server/saved_objects/migrationsv2/initial_state.test.ts b/src/core/server/saved_objects/migrationsv2/initial_state.test.ts index 4066efeb65de0..26ba129cbeab4 100644 --- a/src/core/server/saved_objects/migrationsv2/initial_state.test.ts +++ b/src/core/server/saved_objects/migrationsv2/initial_state.test.ts @@ -6,6 +6,7 @@ * Side Public License, v 1. */ +import { ByteSizeValue } from '@kbn/config-schema'; import * as Option from 'fp-ts/Option'; import { SavedObjectsMigrationConfigType } from '../saved_objects_config'; import { SavedObjectTypeRegistry } from '../saved_objects_type_registry'; @@ -21,6 +22,7 @@ describe('createInitialState', () => { const migrationsConfig = ({ retryAttempts: 15, batchSize: 1000, + maxBatchSizeBytes: ByteSizeValue.parse('100mb'), } as unknown) as SavedObjectsMigrationConfigType; it('creates the initial state for the model based on the passed in parameters', () => { expect( @@ -37,6 +39,7 @@ describe('createInitialState', () => { }) ).toEqual({ batchSize: 1000, + maxBatchSizeBytes: ByteSizeValue.parse('100mb').getValueInBytes(), controlState: 'INIT', currentAlias: '.kibana_task_manager', excludeFromUpgradeFilterHooks: {}, diff --git a/src/core/server/saved_objects/migrationsv2/initial_state.ts b/src/core/server/saved_objects/migrationsv2/initial_state.ts index dce37b384a4f7..a61967be9242c 100644 --- a/src/core/server/saved_objects/migrationsv2/initial_state.ts +++ b/src/core/server/saved_objects/migrationsv2/initial_state.ts @@ -82,6 +82,7 @@ export const createInitialState = ({ retryDelay: 0, retryAttempts: migrationsConfig.retryAttempts, batchSize: migrationsConfig.batchSize, + maxBatchSizeBytes: migrationsConfig.maxBatchSizeBytes.getValueInBytes(), logs: [], unusedTypesQuery: excludeUnusedTypesQuery, knownTypes, diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7.7.2_xpack_100k.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/7.7.2_xpack_100k.test.ts similarity index 94% rename from src/core/server/saved_objects/migrationsv2/integration_tests/migration_7.7.2_xpack_100k.test.ts rename to src/core/server/saved_objects/migrationsv2/integration_tests/7.7.2_xpack_100k.test.ts index f2e7c2e3ae971..d072ccc5c976e 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7.7.2_xpack_100k.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/7.7.2_xpack_100k.test.ts @@ -17,7 +17,7 @@ import { InternalCoreStart } from '../../../internal_types'; import { Root } from '../../../root'; const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version; -const logFilePath = path.join(__dirname, 'migration_test_kibana.log'); +const logFilePath = path.join(__dirname, '7.7.2_xpack_100k.log'); async function removeLogFile() { // ignore errors if it doesn't exist @@ -61,9 +61,12 @@ describe('migration from 7.7.2-xpack with 100k objects', () => { }, }, }, - root: { - appenders: ['default', 'file'], - }, + loggers: [ + { + name: 'root', + appenders: ['file'], + }, + ], }, // reporting loads headless browser, that prevents nodejs process from exiting. xpack: { diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7_13_0_failed_action_tasks.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/7_13_0_failed_action_tasks.test.ts similarity index 99% rename from src/core/server/saved_objects/migrationsv2/integration_tests/migration_7_13_0_failed_action_tasks.test.ts rename to src/core/server/saved_objects/migrationsv2/integration_tests/7_13_0_failed_action_tasks.test.ts index 0788a7ecdf0b1..d70e034703158 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7_13_0_failed_action_tasks.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/7_13_0_failed_action_tasks.test.ts @@ -12,7 +12,7 @@ import * as kbnTestServer from '../../../../test_helpers/kbn_server'; import { Root } from '../../../root'; import { ElasticsearchClient } from '../../../elasticsearch'; -const logFilePath = Path.join(__dirname, '7_13_failed_action_tasks_test.log'); +const logFilePath = Path.join(__dirname, '7_13_failed_action_tasks.log'); async function removeLogFile() { // ignore errors if it doesn't exist diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7_13_0_transform_failures.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/7_13_0_transform_failures.test.ts similarity index 99% rename from src/core/server/saved_objects/migrationsv2/integration_tests/migration_7_13_0_transform_failures.test.ts rename to src/core/server/saved_objects/migrationsv2/integration_tests/7_13_0_transform_failures.test.ts index 3258732c6fdd2..fb40bda81cba5 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7_13_0_transform_failures.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/7_13_0_transform_failures.test.ts @@ -12,7 +12,7 @@ import Util from 'util'; import * as kbnTestServer from '../../../../test_helpers/kbn_server'; import { Root } from '../../../root'; -const logFilePath = Path.join(__dirname, '7_13_corrupt_transform_failures_test.log'); +const logFilePath = Path.join(__dirname, '7_13_corrupt_transform_failures.log'); const asyncUnlink = Util.promisify(Fs.unlink); diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7_13_0_unknown_types.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/7_13_0_unknown_types.test.ts similarity index 86% rename from src/core/server/saved_objects/migrationsv2/integration_tests/migration_7_13_0_unknown_types.test.ts rename to src/core/server/saved_objects/migrationsv2/integration_tests/7_13_0_unknown_types.test.ts index dbf285021118d..f15e0c5d684d0 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7_13_0_unknown_types.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/7_13_0_unknown_types.test.ts @@ -16,9 +16,12 @@ import { ElasticsearchClient } from '../../../elasticsearch'; import { Env } from '@kbn/config'; import { REPO_ROOT } from '@kbn/utils'; import { getEnvOptions } from '../../../config/mocks'; +import { retryAsync } from '../test_helpers/retry_async'; +import { LogRecord } from '@kbn/logging'; const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version; -const logFilePath = Path.join(__dirname, '7_13_unknown_types_test.log'); +const targetIndex = `.kibana_${kibanaVersion}_001`; +const logFilePath = Path.join(__dirname, '7_13_unknown_types.log'); async function removeLogFile() { // ignore errors if it doesn't exist @@ -67,23 +70,30 @@ describe('migration v2', () => { await root.setup(); await root.start(); - const logFileContent = await fs.readFile(logFilePath, 'utf-8'); - const records = logFileContent - .split('\n') - .filter(Boolean) - .map((str) => JSON5.parse(str)); + let unknownDocsWarningLog: LogRecord; - const unknownDocsWarningLog = records.find((rec) => - rec.message.startsWith(`[.kibana] CHECK_UNKNOWN_DOCUMENTS`) - ); + await retryAsync( + async () => { + const logFileContent = await fs.readFile(logFilePath, 'utf-8'); + const records = logFileContent + .split('\n') + .filter(Boolean) + .map((str) => JSON5.parse(str)); + + unknownDocsWarningLog = records.find((rec) => + rec.message.startsWith(`[.kibana] CHECK_UNKNOWN_DOCUMENTS`) + ); - expect( - unknownDocsWarningLog.message.startsWith( - '[.kibana] CHECK_UNKNOWN_DOCUMENTS Upgrades will fail for 8.0+ because documents were found for unknown saved ' + - 'object types. To ensure that upgrades will succeed in the future, either re-enable plugins or delete ' + - `these documents from the ".kibana_${kibanaVersion}_001" index after the current upgrade completes.` - ) - ).toBeTruthy(); + expect( + unknownDocsWarningLog.message.startsWith( + '[.kibana] CHECK_UNKNOWN_DOCUMENTS Upgrades will fail for 8.0+ because documents were found for unknown saved ' + + 'object types. To ensure that upgrades will succeed in the future, either re-enable plugins or delete ' + + `these documents from the "${targetIndex}" index after the current upgrade completes.` + ) + ).toBeTruthy(); + }, + { retryAttempts: 10, retryDelayMs: 200 } + ); const unknownDocs = [ { type: 'space', id: 'space:default' }, diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/archives/7.14.0_xpack_sample_saved_objects.zip b/src/core/server/saved_objects/migrationsv2/integration_tests/archives/7.14.0_xpack_sample_saved_objects.zip new file mode 100644 index 0000000000000..70d68587e3603 Binary files /dev/null and b/src/core/server/saved_objects/migrationsv2/integration_tests/archives/7.14.0_xpack_sample_saved_objects.zip differ diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/batch_size_bytes.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/batch_size_bytes.test.ts new file mode 100644 index 0000000000000..e96aeb6a93b65 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/batch_size_bytes.test.ts @@ -0,0 +1,145 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import Path from 'path'; +import fs from 'fs/promises'; +import JSON5 from 'json5'; +import * as kbnTestServer from '../../../../test_helpers/kbn_server'; +import { Root } from '../../../root'; +import { ElasticsearchClient } from '../../../elasticsearch'; +import { Env } from '@kbn/config'; +import { REPO_ROOT } from '@kbn/utils'; +import { getEnvOptions } from '../../../config/mocks'; +import { LogRecord } from '@kbn/logging'; +import { retryAsync } from '../test_helpers/retry_async'; + +const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version; +const targetIndex = `.kibana_${kibanaVersion}_001`; +const logFilePath = Path.join(__dirname, 'batch_size_bytes.log'); + +async function removeLogFile() { + // ignore errors if it doesn't exist + await fs.unlink(logFilePath).catch(() => void 0); +} + +describe('migration v2', () => { + let esServer: kbnTestServer.TestElasticsearchUtils; + let root: Root; + let startES: () => Promise; + + beforeAll(async () => { + await removeLogFile(); + }); + + beforeEach(() => { + ({ startES } = kbnTestServer.createTestServers({ + adjustTimeout: (t: number) => jest.setTimeout(t), + settings: { + es: { + license: 'basic', + dataArchive: Path.join(__dirname, 'archives', '7.14.0_xpack_sample_saved_objects.zip'), + esArgs: ['http.max_content_length=1715275b'], + }, + }, + })); + }); + + afterEach(async () => { + if (root) { + await root.shutdown(); + } + if (esServer) { + await esServer.stop(); + } + + await new Promise((resolve) => setTimeout(resolve, 10000)); + }); + + it('completes the migration even when a full batch would exceed ES http.max_content_length', async () => { + root = createRoot({ maxBatchSizeBytes: 1715275 }); + esServer = await startES(); + await root.preboot(); + await root.setup(); + await expect(root.start()).resolves.toBeTruthy(); + + await new Promise((resolve) => setTimeout(resolve, 1000)); + + const esClient: ElasticsearchClient = esServer.es.getClient(); + const migratedIndexResponse = await esClient.count({ + index: targetIndex, + }); + const oldIndexResponse = await esClient.count({ + index: '.kibana_7.14.0_001', + }); + + // Use a >= comparison since once Kibana has started it might create new + // documents like telemetry tasks + expect(migratedIndexResponse.body.count).toBeGreaterThanOrEqual(oldIndexResponse.body.count); + }); + + it('fails with a descriptive message when a single document exceeds maxBatchSizeBytes', async () => { + root = createRoot({ maxBatchSizeBytes: 1015275 }); + esServer = await startES(); + await root.preboot(); + await root.setup(); + await expect(root.start()).rejects.toMatchInlineSnapshot( + `[Error: Unable to complete saved object migrations for the [.kibana] index: The document with _id "canvas-workpad-template:workpad-template-061d7868-2b4e-4dc8-8bf7-3772b52926e5" is 1715275 bytes which exceeds the configured maximum batch size of 1015275 bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.]` + ); + + await retryAsync( + async () => { + const logFileContent = await fs.readFile(logFilePath, 'utf-8'); + const records = logFileContent + .split('\n') + .filter(Boolean) + .map((str) => JSON5.parse(str)) as LogRecord[]; + expect( + records.find((rec) => + rec.message.startsWith( + `Unable to complete saved object migrations for the [.kibana] index: The document with _id "canvas-workpad-template:workpad-template-061d7868-2b4e-4dc8-8bf7-3772b52926e5" is 1715275 bytes which exceeds the configured maximum batch size of 1015275 bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.` + ) + ) + ).toBeDefined(); + }, + { retryAttempts: 10, retryDelayMs: 200 } + ); + }); +}); + +function createRoot(options: { maxBatchSizeBytes?: number }) { + return kbnTestServer.createRootWithCorePlugins( + { + migrations: { + skip: false, + enableV2: true, + batchSize: 1000, + maxBatchSizeBytes: options.maxBatchSizeBytes, + }, + logging: { + appenders: { + file: { + type: 'file', + fileName: logFilePath, + layout: { + type: 'json', + }, + }, + }, + loggers: [ + { + name: 'root', + appenders: ['file'], + }, + ], + }, + }, + { + oss: true, + } + ); +} diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/batch_size_bytes_exceeds_es_content_length.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/batch_size_bytes_exceeds_es_content_length.test.ts new file mode 100644 index 0000000000000..192321227d4ae --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/batch_size_bytes_exceeds_es_content_length.test.ts @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import Path from 'path'; +import fs from 'fs/promises'; +import JSON5 from 'json5'; +import * as kbnTestServer from '../../../../test_helpers/kbn_server'; +import { Root } from '../../../root'; +import { retryAsync } from '../test_helpers/retry_async'; + +const logFilePath = Path.join(__dirname, 'batch_size_bytes_exceeds_es_content_length.log'); + +async function removeLogFile() { + // ignore errors if it doesn't exist + await fs.unlink(logFilePath).catch(() => void 0); +} + +describe('migration v2', () => { + let esServer: kbnTestServer.TestElasticsearchUtils; + let root: Root; + let startES: () => Promise; + + beforeAll(async () => { + await removeLogFile(); + }); + + beforeEach(() => { + ({ startES } = kbnTestServer.createTestServers({ + adjustTimeout: (t: number) => jest.setTimeout(t), + settings: { + es: { + license: 'basic', + dataArchive: Path.join(__dirname, 'archives', '7.14.0_xpack_sample_saved_objects.zip'), + esArgs: ['http.max_content_length=1mb'], + }, + }, + })); + }); + + afterEach(async () => { + if (root) { + await root.shutdown(); + } + if (esServer) { + await esServer.stop(); + } + + await new Promise((resolve) => setTimeout(resolve, 10000)); + }); + + it('fails with a descriptive message when maxBatchSizeBytes exceeds ES http.max_content_length', async () => { + root = createRoot({ maxBatchSizeBytes: 1715275 }); + esServer = await startES(); + await root.preboot(); + await root.setup(); + await expect(root.start()).rejects.toMatchInlineSnapshot( + `[Error: Unable to complete saved object migrations for the [.kibana] index: While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Ensure that the Kibana configuration option 'migrations.maxBatchSizeBytes' is set to a value that is lower than or equal to the Elasticsearch 'http.max_content_length' configuration option.]` + ); + + await retryAsync( + async () => { + const logFileContent = await fs.readFile(logFilePath, 'utf-8'); + const records = logFileContent + .split('\n') + .filter(Boolean) + .map((str) => JSON5.parse(str)) as any[]; + + expect( + records.find((rec) => + rec.message.startsWith( + `Unable to complete saved object migrations for the [.kibana] index: While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Ensure that the Kibana configuration option 'migrations.maxBatchSizeBytes' is set to a value that is lower than or equal to the Elasticsearch 'http.max_content_length' configuration option.` + ) + ) + ).toBeDefined(); + }, + { retryAttempts: 10, retryDelayMs: 200 } + ); + }); +}); + +function createRoot(options: { maxBatchSizeBytes?: number }) { + return kbnTestServer.createRootWithCorePlugins( + { + migrations: { + skip: false, + enableV2: true, + batchSize: 1000, + maxBatchSizeBytes: options.maxBatchSizeBytes, + }, + logging: { + appenders: { + file: { + type: 'file', + fileName: logFilePath, + layout: { + type: 'json', + }, + }, + }, + loggers: [ + { + name: 'root', + appenders: ['file'], + }, + ], + }, + }, + { + oss: true, + } + ); +} diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/cleanup.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/cleanup.test.ts index 684b75056bf44..bb408d14df6d7 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/cleanup.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/cleanup.test.ts @@ -13,7 +13,7 @@ import JSON5 from 'json5'; import * as kbnTestServer from '../../../../test_helpers/kbn_server'; import type { Root } from '../../../root'; -const logFilePath = Path.join(__dirname, 'cleanup_test.log'); +const logFilePath = Path.join(__dirname, 'cleanup.log'); const asyncUnlink = Util.promisify(Fs.unlink); const asyncReadFile = Util.promisify(Fs.readFile); diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/type_migration_failure.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/collects_corrupt_docs.test.ts similarity index 98% rename from src/core/server/saved_objects/migrationsv2/integration_tests/type_migration_failure.test.ts rename to src/core/server/saved_objects/migrationsv2/integration_tests/collects_corrupt_docs.test.ts index 662f6e3122d7a..9738650b0db88 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/type_migration_failure.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/collects_corrupt_docs.test.ts @@ -12,7 +12,7 @@ import Util from 'util'; import * as kbnTestServer from '../../../../test_helpers/kbn_server'; import { Root } from '../../../root'; -const logFilePath = Path.join(__dirname, 'migration_test_corrupt_docs_kibana.log'); +const logFilePath = Path.join(__dirname, 'collects_corrupt_docs.log'); const asyncUnlink = Util.promisify(Fs.unlink); diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/corrupt_outdated_docs.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/corrupt_outdated_docs.test.ts index d0d1c044ed35e..d19ae51315745 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/corrupt_outdated_docs.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/corrupt_outdated_docs.test.ts @@ -12,7 +12,7 @@ import Util from 'util'; import * as kbnTestServer from '../../../../test_helpers/kbn_server'; import { Root } from '../../../root'; -const logFilePath = Path.join(__dirname, 'migration_test_corrupt_docs_kibana.log'); +const logFilePath = Path.join(__dirname, 'corrupt_outdated_docs.log'); const asyncUnlink = Util.promisify(Fs.unlink); diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/migration_from_v1.test.ts similarity index 98% rename from src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts rename to src/core/server/saved_objects/migrationsv2/integration_tests/migration_from_v1.test.ts index 0fa5803c30c41..0f336d7fba43a 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/migration_from_v1.test.ts @@ -20,8 +20,7 @@ import { InternalCoreStart } from '../../../internal_types'; import { Root } from '../../../root'; const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version; - -const logFilePath = Path.join(__dirname, 'migration_test_kibana.log'); +const logFilePath = Path.join(__dirname, 'migration_from_v1.log'); const asyncUnlink = Util.promisify(Fs.unlink); async function removeLogFile() { diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/outdated_docs.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/outdated_docs.test.ts index dae40b4667b97..19d77637902f8 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/outdated_docs.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/outdated_docs.test.ts @@ -14,7 +14,7 @@ import * as kbnTestServer from '../../../../test_helpers/kbn_server'; import type { ElasticsearchClient } from '../../../elasticsearch'; import { Root } from '../../../root'; -const logFilePath = Path.join(__dirname, 'migration_test_kibana.log'); +const logFilePath = Path.join(__dirname, 'outdated_docs.log'); const asyncUnlink = Util.promisify(Fs.unlink); async function removeLogFile() { diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/rewriting_id.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/rewriting_id.test.ts index fb4f6df1890e9..78eec5fe94ef3 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/rewriting_id.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/rewriting_id.test.ts @@ -15,7 +15,7 @@ import type { ElasticsearchClient } from '../../../elasticsearch'; import { Root } from '../../../root'; import { deterministicallyRegenerateObjectId } from '../../migrations/core/document_migrator'; -const logFilePath = Path.join(__dirname, 'migration_test_kibana.log'); +const logFilePath = Path.join(__dirname, 'rewriting_id.log'); const asyncUnlink = Util.promisify(Fs.unlink); async function removeLogFile() { diff --git a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts index 773a0af469bd4..a312ac6be0c3d 100644 --- a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts +++ b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts @@ -17,6 +17,7 @@ import { elasticsearchClientMock } from '../../elasticsearch/client/mocks'; import { LoggerAdapter } from '../../logging/logger_adapter'; import { AllControlStates, State } from './types'; import { createInitialState } from './initial_state'; +import { ByteSizeValue } from '@kbn/config-schema'; const esClient = elasticsearchServiceMock.createElasticsearchClient(); @@ -40,6 +41,7 @@ describe('migrationsStateActionMachine', () => { indexPrefix: '.my-so-index', migrationsConfig: { batchSize: 1000, + maxBatchSizeBytes: new ByteSizeValue(1e8), pollInterval: 0, scrollDuration: '0s', skip: false, @@ -235,6 +237,7 @@ describe('migrationsStateActionMachine', () => { ...initialState, reason: 'the fatal reason', outdatedDocuments: [{ _id: '1234', password: 'sensitive password' }], + transformedDocBatches: [[{ _id: '1234', password: 'sensitive transformed password' }]], } as State, logger: mockLogger.get(), model: transitionModel(['LEGACY_DELETE', 'FATAL']), @@ -257,6 +260,7 @@ describe('migrationsStateActionMachine', () => { kibana: { migrationState: { batchSize: 1000, + maxBatchSizeBytes: 1e8, controlState: 'LEGACY_DELETE', currentAlias: '.my-so-index', excludeFromUpgradeFilterHooks: {}, @@ -270,7 +274,7 @@ describe('migrationsStateActionMachine', () => { message: 'Log from LEGACY_DELETE control state', }, ], - outdatedDocuments: ['1234'], + outdatedDocuments: [{ _id: '1234' }], outdatedDocumentsQuery: expect.any(Object), preMigrationScript: { _tag: 'None', @@ -284,6 +288,7 @@ describe('migrationsStateActionMachine', () => { }, tempIndex: '.my-so-index_7.11.0_reindex_temp', tempIndexMappings: expect.any(Object), + transformedDocBatches: [[{ _id: '1234' }]], unusedTypesQuery: expect.any(Object), versionAlias: '.my-so-index_7.11.0', versionIndex: '.my-so-index_7.11.0_001', @@ -304,6 +309,7 @@ describe('migrationsStateActionMachine', () => { kibana: { migrationState: { batchSize: 1000, + maxBatchSizeBytes: 1e8, controlState: 'FATAL', currentAlias: '.my-so-index', excludeFromUpgradeFilterHooks: {}, @@ -321,7 +327,7 @@ describe('migrationsStateActionMachine', () => { message: 'Log from FATAL control state', }, ], - outdatedDocuments: ['1234'], + outdatedDocuments: [{ _id: '1234' }], outdatedDocumentsQuery: expect.any(Object), preMigrationScript: { _tag: 'None', @@ -335,6 +341,7 @@ describe('migrationsStateActionMachine', () => { }, tempIndex: '.my-so-index_7.11.0_reindex_temp', tempIndexMappings: expect.any(Object), + transformedDocBatches: [[{ _id: '1234' }]], unusedTypesQuery: expect.any(Object), versionAlias: '.my-so-index_7.11.0', versionIndex: '.my-so-index_7.11.0_001', @@ -447,6 +454,7 @@ describe('migrationsStateActionMachine', () => { kibana: { migrationState: { batchSize: 1000, + maxBatchSizeBytes: 1e8, controlState: 'LEGACY_REINDEX', currentAlias: '.my-so-index', excludeFromUpgradeFilterHooks: {}, @@ -474,6 +482,7 @@ describe('migrationsStateActionMachine', () => { }, tempIndex: '.my-so-index_7.11.0_reindex_temp', tempIndexMappings: expect.any(Object), + transformedDocBatches: [], unusedTypesQuery: expect.any(Object), versionAlias: '.my-so-index_7.11.0', versionIndex: '.my-so-index_7.11.0_001', @@ -488,6 +497,7 @@ describe('migrationsStateActionMachine', () => { kibana: { migrationState: { batchSize: 1000, + maxBatchSizeBytes: 1e8, controlState: 'LEGACY_DELETE', currentAlias: '.my-so-index', excludeFromUpgradeFilterHooks: {}, @@ -519,6 +529,7 @@ describe('migrationsStateActionMachine', () => { }, tempIndex: '.my-so-index_7.11.0_reindex_temp', tempIndexMappings: expect.any(Object), + transformedDocBatches: [], unusedTypesQuery: expect.any(Object), versionAlias: '.my-so-index_7.11.0', versionIndex: '.my-so-index_7.11.0_001', diff --git a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts index 8e3b8ee4ab556..58c299b77fc60 100644 --- a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts +++ b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts @@ -13,7 +13,8 @@ import type { ElasticsearchClient } from '../../elasticsearch'; import { getErrorMessage, getRequestDebugMeta } from '../../elasticsearch'; import { Model, Next, stateActionMachine } from './state_action_machine'; import { cleanup } from './migrations_state_machine_cleanup'; -import { State } from './types'; +import { ReindexSourceToTempIndex, ReindexSourceToTempIndexBulk, State } from './types'; +import { SavedObjectsRawDoc } from '../serialization'; interface StateLogMeta extends LogMeta { kibana: { @@ -140,11 +141,22 @@ export async function migrationStateActionMachine({ const newState = model(state, res); // Redact the state to reduce the memory consumption and so that we // don't log sensitive information inside documents by only keeping - // the _id's of outdatedDocuments + // the _id's of documents const redactedNewState = { ...newState, - // @ts-expect-error outdatedDocuments don't exist in all states - ...{ outdatedDocuments: (newState.outdatedDocuments ?? []).map((doc) => doc._id) }, + ...{ + outdatedDocuments: ((newState as ReindexSourceToTempIndex).outdatedDocuments ?? []).map( + (doc) => + ({ + _id: doc._id, + } as SavedObjectsRawDoc) + ), + }, + ...{ + transformedDocBatches: ( + (newState as ReindexSourceToTempIndexBulk).transformedDocBatches ?? [] + ).map((batches) => batches.map((doc) => ({ _id: doc._id }))) as [SavedObjectsRawDoc[]], + }, }; executionLog.push({ type: 'transition', diff --git a/src/core/server/saved_objects/migrationsv2/model/create_batches.test.ts b/src/core/server/saved_objects/migrationsv2/model/create_batches.test.ts new file mode 100644 index 0000000000000..552c4c237675f --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/model/create_batches.test.ts @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +import * as Either from 'fp-ts/lib/Either'; +import { SavedObjectsRawDoc } from '../../serialization'; +import { createBatches } from './create_batches'; + +describe('createBatches', () => { + const DOCUMENT_SIZE_BYTES = 128; + const INDEX = '.kibana_version_index'; + it('returns right one batch if all documents fit in maxBatchSizeBytes', () => { + const documents = [ + { _id: '', _source: { type: 'dashboard', title: 'my saved object title ¹' } }, + { _id: '', _source: { type: 'dashboard', title: 'my saved object title ²' } }, + { _id: '', _source: { type: 'dashboard', title: 'my saved object title ®' } }, + ]; + + expect(createBatches(documents, INDEX, DOCUMENT_SIZE_BYTES * 3)).toEqual( + Either.right([documents]) + ); + }); + it('creates multiple batches with each batch limited to maxBatchSizeBytes', () => { + const documents = [ + { _id: '', _source: { type: 'dashboard', title: 'my saved object title ¹' } }, + { _id: '', _source: { type: 'dashboard', title: 'my saved object title ²' } }, + { _id: '', _source: { type: 'dashboard', title: 'my saved object title ®' } }, + { _id: '', _source: { type: 'dashboard', title: 'my saved object title 44' } }, + { _id: '', _source: { type: 'dashboard', title: 'my saved object title 55' } }, + ]; + expect(createBatches(documents, INDEX, DOCUMENT_SIZE_BYTES * 2)).toEqual( + Either.right([[documents[0], documents[1]], [documents[2], documents[3]], [documents[4]]]) + ); + }); + it('creates a single empty batch if there are no documents', () => { + const documents = [] as SavedObjectsRawDoc[]; + expect(createBatches(documents, INDEX, 100)).toEqual(Either.right([[]])); + }); + it('throws if any one document exceeds the maxBatchSizeBytes', () => { + const documents = [ + { _id: '', _source: { type: 'dashboard', title: 'my saved object title ¹' } }, + { + _id: '', + _source: { + type: 'dashboard', + title: 'my saved object title ² with a very long title that exceeds max size bytes', + }, + }, + { _id: '', _source: { type: 'dashboard', title: 'my saved object title ®' } }, + ]; + expect(createBatches(documents, INDEX, 178)).toEqual( + Either.left({ + maxBatchSizeBytes: 178, + docSizeBytes: 179, + type: 'document_exceeds_batch_size_bytes', + document: documents[1], + }) + ); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/model/create_batches.ts b/src/core/server/saved_objects/migrationsv2/model/create_batches.ts new file mode 100644 index 0000000000000..c80003fef09fb --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/model/create_batches.ts @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Either from 'fp-ts/lib/Either'; +import { SavedObjectsRawDoc } from '../..'; +import { createBulkOperationBody } from '../actions/bulk_overwrite_transformed_documents'; + +/** + * Creates batches of documents to be used by the bulk API. Each batch will + * have a request body content length that's <= maxBatchSizeBytes + */ +export function createBatches( + docs: SavedObjectsRawDoc[], + index: string, + maxBatchSizeBytes: number +) { + /* To build up the NDJSON request body we construct an array of objects like: + * [ + * {"index": ...} + * {"title": "my saved object"} + * ... + * ] + * However, when we call JSON.stringify on this array the resulting string + * will be surrounded by `[]` which won't be present in the NDJSON so these + * two characters need to be removed from the size calculation. + */ + const BRACKETS_BYTES = 2; + /* Each document in the NDJSON (including the last one) needs to be + * terminated by a newline, so we need to account for an extra newline + * character + */ + const NDJSON_NEW_LINE_BYTES = 1; + + const batches = [[]] as [SavedObjectsRawDoc[]]; + let currBatch = 0; + let currBatchSizeBytes = 0; + for (const doc of docs) { + const bulkOperationBody = createBulkOperationBody(doc, index); + const docSizeBytes = + Buffer.byteLength(JSON.stringify(bulkOperationBody), 'utf8') - + BRACKETS_BYTES + + NDJSON_NEW_LINE_BYTES; + if (docSizeBytes > maxBatchSizeBytes) { + return Either.left({ + type: 'document_exceeds_batch_size_bytes', + docSizeBytes, + maxBatchSizeBytes, + document: doc, + }); + } else if (currBatchSizeBytes + docSizeBytes <= maxBatchSizeBytes) { + batches[currBatch].push(doc); + currBatchSizeBytes = currBatchSizeBytes + docSizeBytes; + } else { + currBatch++; + batches[currBatch] = [doc]; + currBatchSizeBytes = docSizeBytes; + } + } + + return Either.right(batches); +} diff --git a/src/core/server/saved_objects/migrationsv2/model/model.test.ts b/src/core/server/saved_objects/migrationsv2/model/model.test.ts index f24d175f416a7..1d017116bf3fd 100644 --- a/src/core/server/saved_objects/migrationsv2/model/model.test.ts +++ b/src/core/server/saved_objects/migrationsv2/model/model.test.ts @@ -58,6 +58,7 @@ describe('migrations v2 model', () => { retryDelay: 0, retryAttempts: 15, batchSize: 1000, + maxBatchSizeBytes: 1e8, indexPrefix: '.kibana', outdatedDocumentsQuery: {}, targetIndexMappings: { @@ -1065,6 +1066,8 @@ describe('migrations v2 model', () => { }); const newState = model(state, res) as ReindexSourceToTempIndexBulk; expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_INDEX_BULK'); + expect(newState.currentBatch).toEqual(0); + expect(newState.transformedDocBatches).toEqual([processedDocs]); expect(newState.progress.processed).toBe(0); // Result of `(undefined ?? 0) + corruptDocumentsId.length` }); @@ -1119,16 +1122,19 @@ describe('migrations v2 model', () => { }); describe('REINDEX_SOURCE_TO_TEMP_INDEX_BULK', () => { - const transformedDocs = [ - { - _id: 'a:b', - _source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] }, - }, - ] as SavedObjectsRawDoc[]; + const transformedDocBatches = [ + [ + { + _id: 'a:b', + _source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] }, + }, + ], + ] as [SavedObjectsRawDoc[]]; const reindexSourceToTempIndexBulkState: ReindexSourceToTempIndexBulk = { ...baseState, controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK', - transformedDocs, + transformedDocBatches, + currentBatch: 0, versionIndexReadyActions: Option.none, sourceIndex: Option.some('.kibana') as Option.Some, sourceIndexPitId: 'pit_id', @@ -1171,7 +1177,7 @@ describe('migrations v2 model', () => { const newState = model(reindexSourceToTempIndexBulkState, res) as FatalState; expect(newState.controlState).toEqual('FATAL'); expect(newState.reason).toMatchInlineSnapshot( - `"While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Try to use smaller batches by changing the Kibana 'migrations.batchSize' configuration option and restarting Kibana."` + `"While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Ensure that the Kibana configuration option 'migrations.maxBatchSizeBytes' is set to a value that is lower than or equal to the Elasticsearch 'http.max_content_length' configuration option."` ); }); test('REINDEX_SOURCE_TO_TEMP_INDEX_BULK should throw a throwBadResponse error if action failed', () => { @@ -1438,7 +1444,8 @@ describe('migrations v2 model', () => { res ) as TransformedDocumentsBulkIndex; expect(newState.controlState).toEqual('TRANSFORMED_DOCUMENTS_BULK_INDEX'); - expect(newState.transformedDocs).toEqual(processedDocs); + expect(newState.transformedDocBatches).toEqual([processedDocs]); + expect(newState.currentBatch).toEqual(0); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); expect(newState.progress.processed).toBe(outdatedDocuments.length); @@ -1521,16 +1528,31 @@ describe('migrations v2 model', () => { }); describe('TRANSFORMED_DOCUMENTS_BULK_INDEX', () => { - const transformedDocs = [ - { - _id: 'a:b', - _source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] }, - }, - ] as SavedObjectsRawDoc[]; + const transformedDocBatches = [ + [ + // batch 0 + { + _id: 'a:b', + _source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] }, + }, + { + _id: 'a:c', + _source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] }, + }, + ], + [ + // batch 1 + { + _id: 'a:d', + _source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] }, + }, + ], + ] as SavedObjectsRawDoc[][]; const transformedDocumentsBulkIndexState: TransformedDocumentsBulkIndex = { ...baseState, controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX', - transformedDocs, + transformedDocBatches, + currentBatch: 0, versionIndexReadyActions: Option.none, sourceIndex: Option.some('.kibana') as Option.Some, targetIndex: '.kibana_7.11.0_001', @@ -1540,6 +1562,29 @@ describe('migrations v2 model', () => { progress: createInitialProgress(), }; + test('TRANSFORMED_DOCUMENTS_BULK_INDEX -> TRANSFORMED_DOCUMENTS_BULK_INDEX and increments currentBatch if more batches are left', () => { + const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.right( + 'bulk_index_succeeded' + ); + const newState = model( + transformedDocumentsBulkIndexState, + res + ) as TransformedDocumentsBulkIndex; + expect(newState.controlState).toEqual('TRANSFORMED_DOCUMENTS_BULK_INDEX'); + expect(newState.currentBatch).toEqual(1); + }); + + test('TRANSFORMED_DOCUMENTS_BULK_INDEX -> OUTDATED_DOCUMENTS_SEARCH_READ if all batches were written', () => { + const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.right( + 'bulk_index_succeeded' + ); + const newState = model( + { ...transformedDocumentsBulkIndexState, ...{ currentBatch: 1 } }, + res + ); + expect(newState.controlState).toEqual('OUTDATED_DOCUMENTS_SEARCH_READ'); + }); + test('TRANSFORMED_DOCUMENTS_BULK_INDEX throws if action returns left index_not_found_exception', () => { const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.left({ type: 'index_not_found_exception', @@ -1570,7 +1615,7 @@ describe('migrations v2 model', () => { const newState = model(transformedDocumentsBulkIndexState, res) as FatalState; expect(newState.controlState).toEqual('FATAL'); expect(newState.reason).toMatchInlineSnapshot( - `"While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Try to use smaller batches by changing the Kibana 'migrations.batchSize' configuration option and restarting Kibana."` + `"While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Ensure that the Kibana configuration option 'migrations.maxBatchSizeBytes' is set to a value that is lower than or equal to the Elasticsearch 'http.max_content_length' configuration option."` ); }); }); diff --git a/src/core/server/saved_objects/migrationsv2/model/model.ts b/src/core/server/saved_objects/migrationsv2/model/model.ts index 50be4a524f5c5..8aa3d7b83b295 100644 --- a/src/core/server/saved_objects/migrationsv2/model/model.ts +++ b/src/core/server/saved_objects/migrationsv2/model/model.ts @@ -31,6 +31,19 @@ import { throwBadControlState, throwBadResponse, } from './helpers'; +import { createBatches } from './create_batches'; + +const FATAL_REASON_REQUEST_ENTITY_TOO_LARGE = `While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Ensure that the Kibana configuration option 'migrations.maxBatchSizeBytes' is set to a value that is lower than or equal to the Elasticsearch 'http.max_content_length' configuration option.`; +const fatalReasonDocumentExceedsMaxBatchSizeBytes = ({ + _id, + docSizeBytes, + maxBatchSizeBytes, +}: { + _id: string; + docSizeBytes: number; + maxBatchSizeBytes: number; +}) => + `The document with _id "${_id}" is ${docSizeBytes} bytes which exceeds the configured maximum batch size of ${maxBatchSizeBytes} bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.`; export const model = (currentState: State, resW: ResponseType): State => { // The action response `resW` is weakly typed, the type includes all action @@ -489,12 +502,30 @@ export const model = (currentState: State, resW: ResponseType): if (Either.isRight(res)) { if (stateP.corruptDocumentIds.length === 0 && stateP.transformErrors.length === 0) { - return { - ...stateP, - controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK', // handles the actual bulk indexing into temp index - transformedDocs: [...res.right.processedDocs], - progress, - }; + const batches = createBatches( + res.right.processedDocs, + stateP.tempIndex, + stateP.maxBatchSizeBytes + ); + if (Either.isRight(batches)) { + return { + ...stateP, + controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK', // handles the actual bulk indexing into temp index + transformedDocBatches: batches.right, + currentBatch: 0, + progress, + }; + } else { + return { + ...stateP, + controlState: 'FATAL', + reason: fatalReasonDocumentExceedsMaxBatchSizeBytes({ + _id: batches.left.document._id, + docSizeBytes: batches.left.docSizeBytes, + maxBatchSizeBytes: batches.left.maxBatchSizeBytes, + }), + }; + } } else { // we don't have any transform issues with the current batch of outdated docs but // we have carried through previous transformation issues. @@ -525,13 +556,21 @@ export const model = (currentState: State, resW: ResponseType): } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK') { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { - return { - ...stateP, - controlState: 'REINDEX_SOURCE_TO_TEMP_READ', - // we're still on the happy path with no transformation failures seen. - corruptDocumentIds: [], - transformErrors: [], - }; + if (stateP.currentBatch + 1 < stateP.transformedDocBatches.length) { + return { + ...stateP, + controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK', + currentBatch: stateP.currentBatch + 1, + }; + } else { + return { + ...stateP, + controlState: 'REINDEX_SOURCE_TO_TEMP_READ', + // we're still on the happy path with no transformation failures seen. + corruptDocumentIds: [], + transformErrors: [], + }; + } } else { if ( isLeftTypeof(res.left, 'target_index_had_write_block') || @@ -548,7 +587,7 @@ export const model = (currentState: State, resW: ResponseType): return { ...stateP, controlState: 'FATAL', - reason: `While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Try to use smaller batches by changing the Kibana 'migrations.batchSize' configuration option and restarting Kibana.`, + reason: FATAL_REASON_REQUEST_ENTITY_TOO_LARGE, }; } throwBadResponse(stateP, res.left); @@ -677,13 +716,31 @@ export const model = (currentState: State, resW: ResponseType): // we haven't seen corrupt documents or any transformation errors thus far in the migration // index the migrated docs if (stateP.corruptDocumentIds.length === 0 && stateP.transformErrors.length === 0) { - return { - ...stateP, - controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX', - transformedDocs: [...res.right.processedDocs], - hasTransformedDocs: true, - progress, - }; + const batches = createBatches( + res.right.processedDocs, + stateP.targetIndex, + stateP.maxBatchSizeBytes + ); + if (Either.isRight(batches)) { + return { + ...stateP, + controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX', + transformedDocBatches: batches.right, + currentBatch: 0, + hasTransformedDocs: true, + progress, + }; + } else { + return { + ...stateP, + controlState: 'FATAL', + reason: fatalReasonDocumentExceedsMaxBatchSizeBytes({ + _id: batches.left.document._id, + docSizeBytes: batches.left.docSizeBytes, + maxBatchSizeBytes: batches.left.maxBatchSizeBytes, + }), + }; + } } else { // We have seen corrupt documents and/or transformation errors // skip indexing and go straight to reading and transforming more docs @@ -711,6 +768,13 @@ export const model = (currentState: State, resW: ResponseType): } else if (stateP.controlState === 'TRANSFORMED_DOCUMENTS_BULK_INDEX') { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { + if (stateP.currentBatch + 1 < stateP.transformedDocBatches.length) { + return { + ...stateP, + controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX', + currentBatch: stateP.currentBatch + 1, + }; + } return { ...stateP, controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ', @@ -723,7 +787,7 @@ export const model = (currentState: State, resW: ResponseType): return { ...stateP, controlState: 'FATAL', - reason: `While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Try to use smaller batches by changing the Kibana 'migrations.batchSize' configuration option and restarting Kibana.`, + reason: FATAL_REASON_REQUEST_ENTITY_TOO_LARGE, }; } else if ( isLeftTypeof(res.left, 'target_index_had_write_block') || diff --git a/src/core/server/saved_objects/migrationsv2/next.ts b/src/core/server/saved_objects/migrationsv2/next.ts index 9b091b6fc8509..3f3714552725b 100644 --- a/src/core/server/saved_objects/migrationsv2/next.ts +++ b/src/core/server/saved_objects/migrationsv2/next.ts @@ -111,7 +111,7 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra Actions.bulkOverwriteTransformedDocuments({ client, index: state.tempIndex, - transformedDocs: state.transformedDocs, + transformedDocs: state.transformedDocBatches[state.currentBatch], /** * Since we don't run a search against the target index, we disable "refresh" to speed up * the migration process. @@ -160,7 +160,7 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra Actions.bulkOverwriteTransformedDocuments({ client, index: state.targetIndex, - transformedDocs: state.transformedDocs, + transformedDocs: state.transformedDocBatches[state.currentBatch], /** * Since we don't run a search against the target index, we disable "refresh" to speed up * the migration process. diff --git a/src/core/server/saved_objects/migrationsv2/test_helpers/retry.test.ts b/src/core/server/saved_objects/migrationsv2/test_helpers/retry.test.ts new file mode 100644 index 0000000000000..246f61c71ae4d --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/test_helpers/retry.test.ts @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { retryAsync } from './retry_async'; + +describe('retry', () => { + it('retries throwing functions until they succeed', async () => { + let i = 0; + await expect( + retryAsync( + () => { + if (i++ < 2) { + return Promise.reject(new Error('boom')); + } else { + return Promise.resolve('done'); + } + }, + { retryAttempts: 3, retryDelayMs: 1 } + ) + ).resolves.toEqual('done'); + }); + + it('throws if all attempts are exhausted before success', async () => { + let attempts = 0; + await expect(() => + retryAsync( + () => { + attempts++; + return Promise.reject(new Error('boom')); + }, + { retryAttempts: 3, retryDelayMs: 1 } + ) + ).rejects.toMatchInlineSnapshot(`[Error: boom]`); + expect(attempts).toEqual(3); + }); + + it('waits retryDelayMs between each attempt ', async () => { + const now = Date.now(); + let i = 0; + await retryAsync( + () => { + if (i++ < 2) { + return Promise.reject(new Error('boom')); + } else { + return Promise.resolve('done'); + } + }, + { retryAttempts: 3, retryDelayMs: 100 } + ); + expect(Date.now() - now).toBeGreaterThanOrEqual(200); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/test_helpers/retry_async.ts b/src/core/server/saved_objects/migrationsv2/test_helpers/retry_async.ts new file mode 100644 index 0000000000000..f5dffede67a16 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/test_helpers/retry_async.ts @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +function delay(delayInMs: number) { + return new Promise((resolve) => setTimeout(resolve, delayInMs)); +} + +export async function retryAsync( + fn: () => Promise, + options: { retryAttempts: number; retryDelayMs: number } +): Promise { + try { + return await fn(); + } catch (e) { + if (options.retryAttempts > 1) { + await delay(options.retryDelayMs); + return retryAsync(fn, { + retryAttempts: options.retryAttempts - 1, + retryDelayMs: options.retryDelayMs, + }); + } else { + throw e; + } + } +} diff --git a/src/core/server/saved_objects/migrationsv2/types.ts b/src/core/server/saved_objects/migrationsv2/types.ts index ea03b64e03dc8..49ce12c53aa1a 100644 --- a/src/core/server/saved_objects/migrationsv2/types.ts +++ b/src/core/server/saved_objects/migrationsv2/types.ts @@ -76,19 +76,31 @@ export interface BaseState extends ControlState { readonly retryAttempts: number; /** - * The number of documents to fetch from Elasticsearch server to run migration over. + * The number of documents to process in each batch. This determines the + * maximum number of documents that will be read and written in a single + * request. * - * The higher the value, the faster the migration process will be performed since it reduces - * the number of round trips between Kibana and Elasticsearch servers. - * For the migration speed, we have to pay the price of increased memory consumption. + * The higher the value, the faster the migration process will be performed + * since it reduces the number of round trips between Kibana and + * Elasticsearch servers. For the migration speed, we have to pay the price + * of increased memory consumption and HTTP payload size. * - * Since batchSize defines the number of documents, not their size, it might happen that - * Elasticsearch fails a request with circuit_breaking_exception when it retrieves a set of - * saved objects of significant size. + * Since we cannot control the size in bytes of a batch when reading, + * Elasticsearch might fail with a circuit_breaking_exception when it + * retrieves a set of saved objects of significant size. In this case, you + * should set a smaller batchSize value and restart the migration process + * again. * - * In this case, you should set a smaller batchSize value and restart the migration process again. + * When writing batches, we limit the number of documents in a batch + * (batchSize) as well as the size of the batch in bytes (maxBatchSizeBytes). */ readonly batchSize: number; + /** + * When writing batches, limits the batch size in bytes to ensure that we + * don't construct HTTP requests which would exceed Elasticsearch's + * http.max_content_length which defaults to 100mb. + */ + readonly maxBatchSizeBytes: number; readonly logs: MigrationLog[]; /** * The current alias e.g. `.kibana` which always points to the latest @@ -233,7 +245,8 @@ export interface ReindexSourceToTempIndex extends PostInitState { export interface ReindexSourceToTempIndexBulk extends PostInitState { readonly controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'; - readonly transformedDocs: SavedObjectsRawDoc[]; + readonly transformedDocBatches: [SavedObjectsRawDoc[]]; + readonly currentBatch: number; readonly sourceIndexPitId: string; readonly lastHitSortValue: number[] | undefined; readonly progress: Progress; @@ -318,7 +331,8 @@ export interface TransformedDocumentsBulkIndex extends PostInitState { * Write the up-to-date transformed documents to the target index */ readonly controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX'; - readonly transformedDocs: SavedObjectsRawDoc[]; + readonly transformedDocBatches: SavedObjectsRawDoc[][]; + readonly currentBatch: number; readonly lastHitSortValue: number[] | undefined; readonly hasTransformedDocs: boolean; readonly pitId: string; diff --git a/src/core/server/saved_objects/saved_objects_config.ts b/src/core/server/saved_objects/saved_objects_config.ts index c62d322f0bf8d..e7bbd706762f5 100644 --- a/src/core/server/saved_objects/saved_objects_config.ts +++ b/src/core/server/saved_objects/saved_objects_config.ts @@ -12,6 +12,7 @@ import type { ConfigDeprecationProvider } from '../config'; const migrationSchema = schema.object({ batchSize: schema.number({ defaultValue: 1_000 }), + maxBatchSizeBytes: schema.byteSize({ defaultValue: '100mb' }), // 100mb is the default http.max_content_length Elasticsearch config value scrollDuration: schema.string({ defaultValue: '15m' }), pollInterval: schema.number({ defaultValue: 1_500 }), skip: schema.boolean({ defaultValue: false }), diff --git a/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker b/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker index 3295de5a336d3..c7c1d9661fc81 100755 --- a/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker +++ b/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker @@ -108,6 +108,7 @@ kibana_vars=( map.tilemap.options.subdomains map.tilemap.url migrations.batchSize + migrations.maxBatchSizeBytes migrations.enableV2 migrations.pollInterval migrations.retryAttempts