Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrationsv2: limit batch sizes to migrations.batchSizeBytes (= 100mb by default) #109540

Merged
merged 14 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/setup/settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,10 @@ override this parameter to use their own Tile Map Service. For example:
`"https://tiles.elastic.co/v2/default/{z}/{x}/{y}.png?elastic_tile_service_tos=agree&my_app_name=kibana"`

| `migrations.batchSize:`
| Defines the number of documents migrated at a time. The higher the value, the faster the Saved Objects migration process performs at the cost of higher memory consumption. If the migration fails due to a `circuit_breaking_exception`, set a smaller `batchSize` value. *Default: `1000`*
| Defines the number of documents migrated at a time. The higher the value, the faster the Saved Objects migration process performs at the cost of higher memory consumption. If upgrade migrations results in {kib} crashing with an out of memory exception or fails due to an Elasticsearch `circuit_breaking_exception`, use a smaller `batchSize` value to reduce the memory pressure. *Default: `1000`*

| `migrations.maxBatchSizeBytes:`
| Defines the maximum payload size for indexing batches of upgraded saved objects to avoid migrations failing due to a 413 Request Entity Too Large response from Elasticsearch. This value should be lower than or equal to your Elasticsearch cluster's `http.max_content_length` configuration option. *Default: `100mb`*

| `migrations.enableV2:`
| experimental[]. Enables the new Saved Objects migration algorithm. For information about the migration algorithm, refer to <<upgrade-migrations>>. When `migrations v2` is stable, the setting will be removed in an upcoming release without any further notice. Setting the value to `false` causes {kib} to use the legacy migration algorithm, which shipped in 7.11 and earlier versions. *Default: `true`*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { buildActiveMappings } from '../core';
const { mergeTypes } = jest.requireActual('./kibana_migrator');
import { SavedObjectsType } from '../../types';
import { BehaviorSubject } from 'rxjs';
import { ByteSizeValue } from '@kbn/config-schema';

const defaultSavedObjectTypes: SavedObjectsType[] = [
{
Expand All @@ -37,6 +38,7 @@ const createMigrator = (
kibanaVersion: '8.0.0-testing',
soMigrationsConfig: {
batchSize: 100,
maxBatchSizeBytes: ByteSizeValue.parse('30kb'),
scrollDuration: '15m',
pollInterval: 1500,
skip: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { loggingSystemMock } from '../../../logging/logging_system.mock';
import { SavedObjectTypeRegistry } from '../../saved_objects_type_registry';
import { SavedObjectsType } from '../../types';
import { DocumentMigrator } from '../core/document_migrator';
import { ByteSizeValue } from '@kbn/config-schema';
jest.mock('../core/document_migrator', () => {
return {
// Create a mock for spying on the constructor
Expand Down Expand Up @@ -396,6 +397,7 @@ const mockOptions = ({ enableV2 }: { enableV2: boolean } = { enableV2: false })
} as KibanaMigratorOptions['kibanaConfig'],
soMigrationsConfig: {
batchSize: 20,
maxBatchSizeBytes: ByteSizeValue.parse('20mb'),
pollInterval: 20000,
scrollDuration: '10m',
skip: false,
Expand Down
3 changes: 3 additions & 0 deletions src/core/server/saved_objects/migrationsv2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,10 @@ completed this step:
- temp index has a write block
- temp index is not found
### New control state
1. If `currentBatch` is the last batch in `transformedDocBatches`
`REINDEX_SOURCE_TO_TEMP_READ`
2. If there are more batches left in `transformedDocBatches`
`REINDEX_SOURCE_TO_TEMP_INDEX_BULK`

## REINDEX_SOURCE_TO_TEMP_CLOSE_PIT
### Next action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@ import type {
IndexNotFound,
} from './index';

/**
* Given a document and index, creates a valid body for the Bulk API.
*/
export const createBulkOperationBody = (doc: SavedObjectsRawDoc, index: string) => {
return [
{
index: {
_index: index,
_id: doc._id,
// overwrite existing documents
op_type: 'index',
// use optimistic concurrency control to ensure that outdated
// documents are only overwritten once with the latest version
if_seq_no: doc._seq_no,
if_primary_term: doc._primary_term,
},
},
doc._source,
];
};

/** @internal */
export interface BulkOverwriteTransformedDocumentsParams {
client: ElasticsearchClient;
Expand All @@ -47,6 +68,10 @@ export const bulkOverwriteTransformedDocuments = ({
| RequestEntityTooLargeException,
'bulk_index_succeeded'
> => () => {
const body = transformedDocs.flatMap((doc) => {
return createBulkOperationBody(doc, index);
});

return client
.bulk({
// Because we only add aliases in the MARK_VERSION_INDEX_READY step we
Expand All @@ -60,23 +85,7 @@ export const bulkOverwriteTransformedDocuments = ({
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
refresh,
filter_path: ['items.*.error'],
body: transformedDocs.flatMap((doc) => {
return [
{
index: {
_index: index,
_id: doc._id,
// overwrite existing documents
op_type: 'index',
// use optimistic concurrency control to ensure that outdated
// documents are only overwritten once with the latest version
if_seq_no: doc._seq_no,
if_primary_term: doc._primary_term,
},
},
doc._source,
];
}),
body,
})
.then((res) => {
// Filter out version_conflict_engine_exception since these just mean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Side Public License, v 1.
*/

import { ByteSizeValue } from '@kbn/config-schema';
import * as Option from 'fp-ts/Option';
import { SavedObjectsMigrationConfigType } from '../saved_objects_config';
import { SavedObjectTypeRegistry } from '../saved_objects_type_registry';
Expand All @@ -21,6 +22,7 @@ describe('createInitialState', () => {
const migrationsConfig = ({
retryAttempts: 15,
batchSize: 1000,
maxBatchSizeBytes: ByteSizeValue.parse('100mb'),
} as unknown) as SavedObjectsMigrationConfigType;
it('creates the initial state for the model based on the passed in parameters', () => {
expect(
Expand All @@ -37,6 +39,7 @@ describe('createInitialState', () => {
})
).toEqual({
batchSize: 1000,
maxBatchSizeBytes: ByteSizeValue.parse('100mb').getValueInBytes(),
controlState: 'INIT',
currentAlias: '.kibana_task_manager',
excludeFromUpgradeFilterHooks: {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export const createInitialState = ({
retryDelay: 0,
retryAttempts: migrationsConfig.retryAttempts,
batchSize: migrationsConfig.batchSize,
maxBatchSizeBytes: migrationsConfig.maxBatchSizeBytes.getValueInBytes(),
logs: [],
unusedTypesQuery: excludeUnusedTypesQuery,
knownTypes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { InternalCoreStart } from '../../../internal_types';
import { Root } from '../../../root';

const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version;
const logFilePath = path.join(__dirname, 'migration_test_kibana.log');
const logFilePath = path.join(__dirname, '7.7.2_xpack_100k.log');

async function removeLogFile() {
// ignore errors if it doesn't exist
Expand Down Expand Up @@ -61,9 +61,12 @@ describe('migration from 7.7.2-xpack with 100k objects', () => {
},
},
},
root: {
appenders: ['default', 'file'],
},
loggers: [
{
name: 'root',
appenders: ['file'],
},
],
},
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import * as kbnTestServer from '../../../../test_helpers/kbn_server';
import { Root } from '../../../root';
import { ElasticsearchClient } from '../../../elasticsearch';

const logFilePath = Path.join(__dirname, '7_13_failed_action_tasks_test.log');
const logFilePath = Path.join(__dirname, '7_13_failed_action_tasks.log');

async function removeLogFile() {
// ignore errors if it doesn't exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import Util from 'util';
import * as kbnTestServer from '../../../../test_helpers/kbn_server';
import { Root } from '../../../root';

const logFilePath = Path.join(__dirname, '7_13_corrupt_transform_failures_test.log');
const logFilePath = Path.join(__dirname, '7_13_corrupt_transform_failures.log');

const asyncUnlink = Util.promisify(Fs.unlink);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import { ElasticsearchClient } from '../../../elasticsearch';
import { Env } from '@kbn/config';
import { REPO_ROOT } from '@kbn/utils';
import { getEnvOptions } from '../../../config/mocks';
import { retryAsync } from '../test_helpers/retry_async';
import { LogRecord } from '@kbn/logging';

const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version;
const targetIndex = `.kibana_${kibanaVersion}_001`;
const logFilePath = Path.join(__dirname, '7_13_unknown_types_test.log');
const logFilePath = Path.join(__dirname, '7_13_unknown_types.log');

async function removeLogFile() {
// ignore errors if it doesn't exist
Expand Down Expand Up @@ -68,23 +70,30 @@ describe('migration v2', () => {
await root.setup();
await root.start();

const logFileContent = await fs.readFile(logFilePath, 'utf-8');
const records = logFileContent
.split('\n')
.filter(Boolean)
.map((str) => JSON5.parse(str));
let unknownDocsWarningLog: LogRecord;

const unknownDocsWarningLog = records.find((rec) =>
rec.message.startsWith(`[.kibana] CHECK_UNKNOWN_DOCUMENTS`)
);
await retryAsync(
async () => {
const logFileContent = await fs.readFile(logFilePath, 'utf-8');
const records = logFileContent
.split('\n')
.filter(Boolean)
.map((str) => JSON5.parse(str));

unknownDocsWarningLog = records.find((rec) =>
rec.message.startsWith(`[.kibana] CHECK_UNKNOWN_DOCUMENTS`)
);

expect(
unknownDocsWarningLog.message.startsWith(
'[.kibana] CHECK_UNKNOWN_DOCUMENTS Upgrades will fail for 8.0+ because documents were found for unknown saved ' +
'object types. To ensure that upgrades will succeed in the future, either re-enable plugins or delete ' +
`these documents from the "${targetIndex}" index after the current upgrade completes.`
)
).toBeTruthy();
expect(
unknownDocsWarningLog.message.startsWith(
'[.kibana] CHECK_UNKNOWN_DOCUMENTS Upgrades will fail for 8.0+ because documents were found for unknown saved ' +
'object types. To ensure that upgrades will succeed in the future, either re-enable plugins or delete ' +
`these documents from the "${targetIndex}" index after the current upgrade completes.`
)
).toBeTruthy();
},
{ retryAttempts: 10, retryDelayMs: 200 }
);

const unknownDocs = [
{ type: 'space', id: 'space:default' },
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import Path from 'path';
import fs from 'fs/promises';
import JSON5 from 'json5';
import * as kbnTestServer from '../../../../test_helpers/kbn_server';
import { Root } from '../../../root';
import { ElasticsearchClient } from '../../../elasticsearch';
import { Env } from '@kbn/config';
import { REPO_ROOT } from '@kbn/utils';
import { getEnvOptions } from '../../../config/mocks';
import { LogRecord } from '@kbn/logging';
import { retryAsync } from '../test_helpers/retry_async';

const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version;
const targetIndex = `.kibana_${kibanaVersion}_001`;
const logFilePath = Path.join(__dirname, 'batch_size_bytes.log');

async function removeLogFile() {
// ignore errors if it doesn't exist
await fs.unlink(logFilePath).catch(() => void 0);
}

describe('migration v2', () => {
let esServer: kbnTestServer.TestElasticsearchUtils;
let root: Root;
let startES: () => Promise<kbnTestServer.TestElasticsearchUtils>;

beforeAll(async () => {
await removeLogFile();
});

beforeEach(() => {
({ startES } = kbnTestServer.createTestServers({
adjustTimeout: (t: number) => jest.setTimeout(t),
settings: {
es: {
license: 'basic',
dataArchive: Path.join(__dirname, 'archives', '7.14.0_xpack_sample_saved_objects.zip'),
esArgs: ['http.max_content_length=1715275b'],
},
},
}));
});

afterEach(async () => {
if (root) {
await root.shutdown();
}
if (esServer) {
await esServer.stop();
}

await new Promise((resolve) => setTimeout(resolve, 10000));
});

it('completes the migration even when a full batch would exceed ES http.max_content_length', async () => {
root = createRoot({ maxBatchSizeBytes: 1715275 });
esServer = await startES();
await root.preboot();
await root.setup();
await expect(root.start()).resolves.toBeTruthy();

await new Promise((resolve) => setTimeout(resolve, 1000));

const esClient: ElasticsearchClient = esServer.es.getClient();
const migratedIndexResponse = await esClient.count({
index: targetIndex,
});
const oldIndexResponse = await esClient.count({
index: '.kibana_7.14.0_001',
});

// Use a >= comparison since once Kibana has started it might create new
// documents like telemetry tasks
expect(migratedIndexResponse.body.count).toBeGreaterThanOrEqual(oldIndexResponse.body.count);
});

it('fails with a descriptive message when a single document exceeds maxBatchSizeBytes', async () => {
root = createRoot({ maxBatchSizeBytes: 1015275 });
esServer = await startES();
await root.preboot();
await root.setup();
await expect(root.start()).rejects.toMatchInlineSnapshot(
`[Error: Unable to complete saved object migrations for the [.kibana] index: The document with _id "canvas-workpad-template:workpad-template-061d7868-2b4e-4dc8-8bf7-3772b52926e5" is 1715275 bytes which exceeds the configured maximum batch size of 1015275 bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.]`
);

await retryAsync(
async () => {
const logFileContent = await fs.readFile(logFilePath, 'utf-8');
const records = logFileContent
.split('\n')
.filter(Boolean)
.map((str) => JSON5.parse(str)) as LogRecord[];
expect(
records.find((rec) =>
rec.message.startsWith(
`Unable to complete saved object migrations for the [.kibana] index: The document with _id "canvas-workpad-template:workpad-template-061d7868-2b4e-4dc8-8bf7-3772b52926e5" is 1715275 bytes which exceeds the configured maximum batch size of 1015275 bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.`
)
)
).toBeDefined();
},
{ retryAttempts: 10, retryDelayMs: 200 }
);
});
});

function createRoot(options: { maxBatchSizeBytes?: number }) {
return kbnTestServer.createRootWithCorePlugins(
{
migrations: {
skip: false,
enableV2: true,
batchSize: 1000,
maxBatchSizeBytes: options.maxBatchSizeBytes,
},
logging: {
appenders: {
file: {
type: 'file',
fileName: logFilePath,
layout: {
type: 'json',
},
},
},
loggers: [
{
name: 'root',
appenders: ['file'],
},
],
},
},
{
oss: true,
}
);
}
Loading