Skip to content

Commit

Permalink
[7.15] Migrationsv2: limit batch sizes to migrations.batchSizeBytes (…
Browse files Browse the repository at this point in the history
…= 100mb by default) (#109540) (#110968)

* Migrationsv2: limit batch sizes to migrations.batchSizeBytes (= 100mb by default) (#109540)

* Fix logging for existing integration test

* First stab at limiting batches to batchSizeBytes

* Fix tests

* Fix batch size calculation, NDJSON needs to be terminated by an empty line

* Integration tests

* Fix type failures

* rename migration integration tests and log files to be consistent & more descriptive

* Review feedback

* Remove duplication of fatal error reasons

* migrations.maxBatchSizeBytes to docker environment vars

* docs for migrations.maxBatchSizeBytes
# Conflicts:
#	src/core/server/saved_objects/migrationsv2/integration_tests/7_13_0_unknown_types.test.ts
#	src/core/server/saved_objects/migrationsv2/integration_tests/migration_from_v1.test.ts

* Fix tests on 7.x being off by one byte

Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
rudolf and kibanamachine authored Sep 6, 2021
1 parent 74e7afb commit f3421a0
Show file tree
Hide file tree
Showing 32 changed files with 765 additions and 104 deletions.
5 changes: 4 additions & 1 deletion docs/setup/settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,10 @@ override this parameter to use their own Tile Map Service. For example:
`"https://tiles.elastic.co/v2/default/{z}/{x}/{y}.png?elastic_tile_service_tos=agree&my_app_name=kibana"`

| `migrations.batchSize:`
| Defines the number of documents migrated at a time. The higher the value, the faster the Saved Objects migration process performs at the cost of higher memory consumption. If the migration fails due to a `circuit_breaking_exception`, set a smaller `batchSize` value. *Default: `1000`*
| Defines the number of documents migrated at a time. The higher the value, the faster the Saved Objects migration process performs at the cost of higher memory consumption. If upgrade migrations results in {kib} crashing with an out of memory exception or fails due to an Elasticsearch `circuit_breaking_exception`, use a smaller `batchSize` value to reduce the memory pressure. *Default: `1000`*

| `migrations.maxBatchSizeBytes:`
| Defines the maximum payload size for indexing batches of upgraded saved objects to avoid migrations failing due to a 413 Request Entity Too Large response from Elasticsearch. This value should be lower than or equal to your Elasticsearch cluster's `http.max_content_length` configuration option. *Default: `100mb`*

| `migrations.enableV2:`
| experimental[]. Enables the new Saved Objects migration algorithm. For information about the migration algorithm, refer to <<upgrade-migrations>>. When `migrations v2` is stable, the setting will be removed in an upcoming release without any further notice. Setting the value to `false` causes {kib} to use the legacy migration algorithm, which shipped in 7.11 and earlier versions. *Default: `true`*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { buildActiveMappings } from '../core';
const { mergeTypes } = jest.requireActual('./kibana_migrator');
import { SavedObjectsType } from '../../types';
import { BehaviorSubject } from 'rxjs';
import { ByteSizeValue } from '@kbn/config-schema';

const defaultSavedObjectTypes: SavedObjectsType[] = [
{
Expand All @@ -37,6 +38,7 @@ const createMigrator = (
kibanaVersion: '8.0.0-testing',
soMigrationsConfig: {
batchSize: 100,
maxBatchSizeBytes: ByteSizeValue.parse('30kb'),
scrollDuration: '15m',
pollInterval: 1500,
skip: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { loggingSystemMock } from '../../../logging/logging_system.mock';
import { SavedObjectTypeRegistry } from '../../saved_objects_type_registry';
import { SavedObjectsType } from '../../types';
import { DocumentMigrator } from '../core/document_migrator';
import { ByteSizeValue } from '@kbn/config-schema';
jest.mock('../core/document_migrator', () => {
return {
// Create a mock for spying on the constructor
Expand Down Expand Up @@ -396,6 +397,7 @@ const mockOptions = ({ enableV2 }: { enableV2: boolean } = { enableV2: false })
} as KibanaMigratorOptions['kibanaConfig'],
soMigrationsConfig: {
batchSize: 20,
maxBatchSizeBytes: ByteSizeValue.parse('20mb'),
pollInterval: 20000,
scrollDuration: '10m',
skip: false,
Expand Down
3 changes: 3 additions & 0 deletions src/core/server/saved_objects/migrationsv2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,10 @@ completed this step:
- temp index has a write block
- temp index is not found
### New control state
1. If `currentBatch` is the last batch in `transformedDocBatches`
`REINDEX_SOURCE_TO_TEMP_READ`
2. If there are more batches left in `transformedDocBatches`
`REINDEX_SOURCE_TO_TEMP_INDEX_BULK`

## REINDEX_SOURCE_TO_TEMP_CLOSE_PIT
### Next action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@ import type {
IndexNotFound,
} from './index';

/**
* Given a document and index, creates a valid body for the Bulk API.
*/
export const createBulkOperationBody = (doc: SavedObjectsRawDoc, index: string) => {
return [
{
index: {
_index: index,
_id: doc._id,
// overwrite existing documents
op_type: 'index',
// use optimistic concurrency control to ensure that outdated
// documents are only overwritten once with the latest version
if_seq_no: doc._seq_no,
if_primary_term: doc._primary_term,
},
},
doc._source,
];
};

/** @internal */
export interface BulkOverwriteTransformedDocumentsParams {
client: ElasticsearchClient;
Expand All @@ -47,6 +68,10 @@ export const bulkOverwriteTransformedDocuments = ({
| RequestEntityTooLargeException,
'bulk_index_succeeded'
> => () => {
const body = transformedDocs.flatMap((doc) => {
return createBulkOperationBody(doc, index);
});

return client
.bulk({
// Because we only add aliases in the MARK_VERSION_INDEX_READY step we
Expand All @@ -60,23 +85,7 @@ export const bulkOverwriteTransformedDocuments = ({
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
refresh,
filter_path: ['items.*.error'],
body: transformedDocs.flatMap((doc) => {
return [
{
index: {
_index: index,
_id: doc._id,
// overwrite existing documents
op_type: 'index',
// use optimistic concurrency control to ensure that outdated
// documents are only overwritten once with the latest version
if_seq_no: doc._seq_no,
if_primary_term: doc._primary_term,
},
},
doc._source,
];
}),
body,
})
.then((res) => {
// Filter out version_conflict_engine_exception since these just mean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Side Public License, v 1.
*/

import { ByteSizeValue } from '@kbn/config-schema';
import * as Option from 'fp-ts/Option';
import { SavedObjectsMigrationConfigType } from '../saved_objects_config';
import { SavedObjectTypeRegistry } from '../saved_objects_type_registry';
Expand All @@ -21,6 +22,7 @@ describe('createInitialState', () => {
const migrationsConfig = ({
retryAttempts: 15,
batchSize: 1000,
maxBatchSizeBytes: ByteSizeValue.parse('100mb'),
} as unknown) as SavedObjectsMigrationConfigType;
it('creates the initial state for the model based on the passed in parameters', () => {
expect(
Expand All @@ -37,6 +39,7 @@ describe('createInitialState', () => {
})
).toEqual({
batchSize: 1000,
maxBatchSizeBytes: ByteSizeValue.parse('100mb').getValueInBytes(),
controlState: 'INIT',
currentAlias: '.kibana_task_manager',
excludeFromUpgradeFilterHooks: {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export const createInitialState = ({
retryDelay: 0,
retryAttempts: migrationsConfig.retryAttempts,
batchSize: migrationsConfig.batchSize,
maxBatchSizeBytes: migrationsConfig.maxBatchSizeBytes.getValueInBytes(),
logs: [],
unusedTypesQuery: excludeUnusedTypesQuery,
knownTypes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { InternalCoreStart } from '../../../internal_types';
import { Root } from '../../../root';

const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version;
const logFilePath = path.join(__dirname, 'migration_test_kibana.log');
const logFilePath = path.join(__dirname, '7.7.2_xpack_100k.log');

async function removeLogFile() {
// ignore errors if it doesn't exist
Expand Down Expand Up @@ -61,9 +61,12 @@ describe('migration from 7.7.2-xpack with 100k objects', () => {
},
},
},
root: {
appenders: ['default', 'file'],
},
loggers: [
{
name: 'root',
appenders: ['file'],
},
],
},
// reporting loads headless browser, that prevents nodejs process from exiting.
xpack: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import * as kbnTestServer from '../../../../test_helpers/kbn_server';
import { Root } from '../../../root';
import { ElasticsearchClient } from '../../../elasticsearch';

const logFilePath = Path.join(__dirname, '7_13_failed_action_tasks_test.log');
const logFilePath = Path.join(__dirname, '7_13_failed_action_tasks.log');

async function removeLogFile() {
// ignore errors if it doesn't exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import Util from 'util';
import * as kbnTestServer from '../../../../test_helpers/kbn_server';
import { Root } from '../../../root';

const logFilePath = Path.join(__dirname, '7_13_corrupt_transform_failures_test.log');
const logFilePath = Path.join(__dirname, '7_13_corrupt_transform_failures.log');

const asyncUnlink = Util.promisify(Fs.unlink);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ import { ElasticsearchClient } from '../../../elasticsearch';
import { Env } from '@kbn/config';
import { REPO_ROOT } from '@kbn/utils';
import { getEnvOptions } from '../../../config/mocks';
import { retryAsync } from '../test_helpers/retry_async';
import { LogRecord } from '@kbn/logging';

const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version;
const logFilePath = Path.join(__dirname, '7_13_unknown_types_test.log');
const targetIndex = `.kibana_${kibanaVersion}_001`;
const logFilePath = Path.join(__dirname, '7_13_unknown_types.log');

async function removeLogFile() {
// ignore errors if it doesn't exist
Expand Down Expand Up @@ -67,23 +70,30 @@ describe('migration v2', () => {
await root.setup();
await root.start();

const logFileContent = await fs.readFile(logFilePath, 'utf-8');
const records = logFileContent
.split('\n')
.filter(Boolean)
.map((str) => JSON5.parse(str));
let unknownDocsWarningLog: LogRecord;

const unknownDocsWarningLog = records.find((rec) =>
rec.message.startsWith(`[.kibana] CHECK_UNKNOWN_DOCUMENTS`)
);
await retryAsync(
async () => {
const logFileContent = await fs.readFile(logFilePath, 'utf-8');
const records = logFileContent
.split('\n')
.filter(Boolean)
.map((str) => JSON5.parse(str));

unknownDocsWarningLog = records.find((rec) =>
rec.message.startsWith(`[.kibana] CHECK_UNKNOWN_DOCUMENTS`)
);

expect(
unknownDocsWarningLog.message.startsWith(
'[.kibana] CHECK_UNKNOWN_DOCUMENTS Upgrades will fail for 8.0+ because documents were found for unknown saved ' +
'object types. To ensure that upgrades will succeed in the future, either re-enable plugins or delete ' +
`these documents from the ".kibana_${kibanaVersion}_001" index after the current upgrade completes.`
)
).toBeTruthy();
expect(
unknownDocsWarningLog.message.startsWith(
'[.kibana] CHECK_UNKNOWN_DOCUMENTS Upgrades will fail for 8.0+ because documents were found for unknown saved ' +
'object types. To ensure that upgrades will succeed in the future, either re-enable plugins or delete ' +
`these documents from the "${targetIndex}" index after the current upgrade completes.`
)
).toBeTruthy();
},
{ retryAttempts: 10, retryDelayMs: 200 }
);

const unknownDocs = [
{ type: 'space', id: 'space:default' },
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import Path from 'path';
import fs from 'fs/promises';
import JSON5 from 'json5';
import * as kbnTestServer from '../../../../test_helpers/kbn_server';
import { Root } from '../../../root';
import { ElasticsearchClient } from '../../../elasticsearch';
import { Env } from '@kbn/config';
import { REPO_ROOT } from '@kbn/utils';
import { getEnvOptions } from '../../../config/mocks';
import { LogRecord } from '@kbn/logging';
import { retryAsync } from '../test_helpers/retry_async';

const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version;
const targetIndex = `.kibana_${kibanaVersion}_001`;
const logFilePath = Path.join(__dirname, 'batch_size_bytes.log');

async function removeLogFile() {
// ignore errors if it doesn't exist
await fs.unlink(logFilePath).catch(() => void 0);
}

describe('migration v2', () => {
let esServer: kbnTestServer.TestElasticsearchUtils;
let root: Root;
let startES: () => Promise<kbnTestServer.TestElasticsearchUtils>;

beforeAll(async () => {
await removeLogFile();
});

beforeEach(() => {
({ startES } = kbnTestServer.createTestServers({
adjustTimeout: (t: number) => jest.setTimeout(t),
settings: {
es: {
license: 'basic',
dataArchive: Path.join(__dirname, 'archives', '7.14.0_xpack_sample_saved_objects.zip'),
esArgs: ['http.max_content_length=1715276b'],
},
},
}));
});

afterEach(async () => {
if (root) {
await root.shutdown();
}
if (esServer) {
await esServer.stop();
}

await new Promise((resolve) => setTimeout(resolve, 10000));
});

it('completes the migration even when a full batch would exceed ES http.max_content_length', async () => {
root = createRoot({ maxBatchSizeBytes: 1715276 });
esServer = await startES();
await root.preboot();
await root.setup();
await expect(root.start()).resolves.toBeTruthy();

await new Promise((resolve) => setTimeout(resolve, 5000));

const esClient: ElasticsearchClient = esServer.es.getClient();
const migratedIndexResponse = await esClient.count({
index: targetIndex,
});
const oldIndexResponse = await esClient.count({
index: '.kibana_7.14.0_001',
});

// Use a >= comparison since once Kibana has started it might create new
// documents like telemetry tasks
expect(migratedIndexResponse.body.count).toBeGreaterThanOrEqual(oldIndexResponse.body.count);
});

it('fails with a descriptive message when a single document exceeds maxBatchSizeBytes', async () => {
root = createRoot({ maxBatchSizeBytes: 1015275 });
esServer = await startES();
await root.preboot();
await root.setup();
await expect(root.start()).rejects.toMatchInlineSnapshot(
`[Error: Unable to complete saved object migrations for the [.kibana] index: The document with _id "canvas-workpad-template:workpad-template-061d7868-2b4e-4dc8-8bf7-3772b52926e5" is 1715276 bytes which exceeds the configured maximum batch size of 1015275 bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.]`
);

await retryAsync(
async () => {
const logFileContent = await fs.readFile(logFilePath, 'utf-8');
const records = logFileContent
.split('\n')
.filter(Boolean)
.map((str) => JSON5.parse(str)) as LogRecord[];
expect(
records.find((rec) =>
rec.message.startsWith(
`Unable to complete saved object migrations for the [.kibana] index: The document with _id "canvas-workpad-template:workpad-template-061d7868-2b4e-4dc8-8bf7-3772b52926e5" is 1715276 bytes which exceeds the configured maximum batch size of 1015275 bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.`
)
)
).toBeDefined();
},
{ retryAttempts: 10, retryDelayMs: 200 }
);
});
});

function createRoot(options: { maxBatchSizeBytes?: number }) {
return kbnTestServer.createRootWithCorePlugins(
{
migrations: {
skip: false,
enableV2: true,
batchSize: 1000,
maxBatchSizeBytes: options.maxBatchSizeBytes,
},
logging: {
appenders: {
file: {
type: 'file',
fileName: logFilePath,
layout: {
type: 'json',
},
},
},
loggers: [
{
name: 'root',
appenders: ['file'],
},
],
},
},
{
oss: true,
}
);
}
Loading

0 comments on commit f3421a0

Please sign in to comment.