Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrationsv2: limit batch sizes to migrations.batchSizeBytes (= 100mb by default) #109540

Merged
merged 14 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { buildActiveMappings } from '../core';
const { mergeTypes } = jest.requireActual('./kibana_migrator');
import { SavedObjectsType } from '../../types';
import { BehaviorSubject } from 'rxjs';
import { ByteSizeValue } from '@kbn/config-schema';

const defaultSavedObjectTypes: SavedObjectsType[] = [
{
Expand All @@ -37,6 +38,7 @@ const createMigrator = (
kibanaVersion: '8.0.0-testing',
soMigrationsConfig: {
batchSize: 100,
batchSizeBytes: new ByteSizeValue(30_000),
scrollDuration: '15m',
pollInterval: 1500,
skip: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { loggingSystemMock } from '../../../logging/logging_system.mock';
import { SavedObjectTypeRegistry } from '../../saved_objects_type_registry';
import { SavedObjectsType } from '../../types';
import { DocumentMigrator } from '../core/document_migrator';
import { ByteSizeValue } from '@kbn/config-schema';
jest.mock('../core/document_migrator', () => {
return {
// Create a mock for spying on the constructor
Expand Down Expand Up @@ -396,6 +397,7 @@ const mockOptions = ({ enableV2 }: { enableV2: boolean } = { enableV2: false })
} as KibanaMigratorOptions['kibanaConfig'],
soMigrationsConfig: {
batchSize: 20,
batchSizeBytes: new ByteSizeValue(20_000),
pollInterval: 20000,
scrollDuration: '10m',
skip: false,
Expand Down
3 changes: 3 additions & 0 deletions src/core/server/saved_objects/migrationsv2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,10 @@ completed this step:
- temp index has a write block
- temp index is not found
### New control state
1. If `currentBatch` is the last batch in `transformedDocBatches`
`REINDEX_SOURCE_TO_TEMP_READ`
2. If there are more batches left in `transformedDocBatches`
`REINDEX_SOURCE_TO_TEMP_INDEX_BULK`

## REINDEX_SOURCE_TO_TEMP_CLOSE_PIT
### Next action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@ import type {
IndexNotFound,
} from './index';

/**
* Given a document and index, creates a valid body for the Bulk API.
*/
export const createBulkOperationBody = (doc: SavedObjectsRawDoc, index: string) => {
return [
{
index: {
_index: index,
_id: doc._id,
// overwrite existing documents
op_type: 'index',
// use optimistic concurrency control to ensure that outdated
// documents are only overwritten once with the latest version
if_seq_no: doc._seq_no,
if_primary_term: doc._primary_term,
},
},
doc._source,
];
};

/** @internal */
export interface BulkOverwriteTransformedDocumentsParams {
client: ElasticsearchClient;
Expand All @@ -47,6 +68,10 @@ export const bulkOverwriteTransformedDocuments = ({
| RequestEntityTooLargeException,
'bulk_index_succeeded'
> => () => {
const body = transformedDocs.flatMap((doc) => {
return createBulkOperationBody(doc, index);
});

return client
.bulk({
// Because we only add aliases in the MARK_VERSION_INDEX_READY step we
Expand All @@ -60,23 +85,7 @@ export const bulkOverwriteTransformedDocuments = ({
wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE,
refresh,
filter_path: ['items.*.error'],
body: transformedDocs.flatMap((doc) => {
return [
{
index: {
_index: index,
_id: doc._id,
// overwrite existing documents
op_type: 'index',
// use optimistic concurrency control to ensure that outdated
// documents are only overwritten once with the latest version
if_seq_no: doc._seq_no,
if_primary_term: doc._primary_term,
},
},
doc._source,
];
}),
body,
})
.then((res) => {
// Filter out version_conflict_engine_exception since these just mean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Side Public License, v 1.
*/

import { ByteSizeValue } from '@kbn/config-schema';
import * as Option from 'fp-ts/Option';
import { SavedObjectsMigrationConfigType } from '../saved_objects_config';
import { SavedObjectTypeRegistry } from '../saved_objects_type_registry';
Expand All @@ -21,6 +22,7 @@ describe('createInitialState', () => {
const migrationsConfig = ({
retryAttempts: 15,
batchSize: 1000,
batchSizeBytes: new ByteSizeValue(1e8),
rudolf marked this conversation as resolved.
Show resolved Hide resolved
} as unknown) as SavedObjectsMigrationConfigType;
it('creates the initial state for the model based on the passed in parameters', () => {
expect(
Expand All @@ -37,6 +39,7 @@ describe('createInitialState', () => {
})
).toEqual({
batchSize: 1000,
batchSizeBytes: 1e8,
controlState: 'INIT',
currentAlias: '.kibana_task_manager',
excludeFromUpgradeFilterHooks: {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export const createInitialState = ({
retryDelay: 0,
retryAttempts: migrationsConfig.retryAttempts,
batchSize: migrationsConfig.batchSize,
batchSizeBytes: migrationsConfig.batchSizeBytes.getValueInBytes(),
logs: [],
unusedTypesQuery: excludeUnusedTypesQuery,
knownTypes,
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import Path from 'path';
import fs from 'fs/promises';
import JSON5 from 'json5';
import * as kbnTestServer from '../../../../test_helpers/kbn_server';
import { Root } from '../../../root';
import { ElasticsearchClient } from '../../../elasticsearch';
import { Env } from '@kbn/config';
import { REPO_ROOT } from '@kbn/utils';
import { getEnvOptions } from '../../../config/mocks';

const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version;
const targetIndex = `.kibana_${kibanaVersion}_001`;
const logFilePath = Path.join(__dirname, 'batch_size_bytes.log');

async function removeLogFile() {
// ignore errors if it doesn't exist
await fs.unlink(logFilePath).catch(() => void 0);
}

describe('migration v2', () => {
let esServer: kbnTestServer.TestElasticsearchUtils;
let root: Root;
let startES: () => Promise<kbnTestServer.TestElasticsearchUtils>;

beforeAll(async () => {
await removeLogFile();
});

beforeEach(() => {
({ startES } = kbnTestServer.createTestServers({
adjustTimeout: (t: number) => jest.setTimeout(t),
settings: {
es: {
license: 'basic',
dataArchive: Path.join(__dirname, 'archives', '7.14.0_xpack_sample_saved_objects.zip'),
esArgs: ['http.max_content_length=1715275b'],
},
},
}));
});

afterEach(async () => {
if (root) {
await root.shutdown();
}
if (esServer) {
await esServer.stop();
}

await new Promise((resolve) => setTimeout(resolve, 10000));
});

it('completes the migration even when a full batch would exceed ES http.max_content_length', async () => {
root = createRoot({ batchSizeBytes: 1715275 });
esServer = await startES();
await root.preboot();
await root.setup();
await expect(root.start()).resolves.toBeTruthy();

await new Promise((resolve) => setTimeout(resolve, 1000));

const esClient: ElasticsearchClient = esServer.es.getClient();
const migratedIndexResponse = await esClient.count({
index: targetIndex,
});
const oldIndexResponse = await esClient.count({
index: '.kibana_7.14.0_001',
});

// Use a >= comparison since once Kibana has started it might create new
// documents like telemetry tasks
expect(migratedIndexResponse.body.count).toBeGreaterThanOrEqual(oldIndexResponse.body.count);
});

it('fails with a descriptive message when a single document exceeds batchSizeBytes', async () => {
root = createRoot({ batchSizeBytes: 1015275 });
esServer = await startES();
await root.preboot();
await root.setup();
await expect(root.start()).rejects.toMatchInlineSnapshot(
`[Error: Unable to complete saved object migrations for the [.kibana] index: The document with _id "canvas-workpad-template:workpad-template-061d7868-2b4e-4dc8-8bf7-3772b52926e5" is 1715275 bytes which equals or exceeds the configured maximum batch size of 1015275 bytes. To proceed, please increase the migrations.batchSizeBytes Kibana configuration option and ensure that the Elasticsearch http.max_content_length configuration option is set to an equal or larger value.]`
);

const logFileContent = await fs.readFile(logFilePath, 'utf-8');
rudolf marked this conversation as resolved.
Show resolved Hide resolved
const records = logFileContent
.split('\n')
.filter(Boolean)
.map((str) => JSON5.parse(str)) as any[];
rudolf marked this conversation as resolved.
Show resolved Hide resolved

expect(
records.find((rec) =>
rec.message.startsWith(
`Unable to complete saved object migrations for the [.kibana] index: The document with _id "canvas-workpad-template:workpad-template-061d7868-2b4e-4dc8-8bf7-3772b52926e5" is 1715275 bytes which equals or exceeds the configured maximum batch size of 1015275 bytes. To proceed, please increase the migrations.batchSizeBytes Kibana configuration option and ensure that the Elasticsearch http.max_content_length configuration option is set to an equal or larger value.`
)
)
).toBeDefined();
});
});

function createRoot(options: { batchSizeBytes?: number }) {
return kbnTestServer.createRootWithCorePlugins(
{
migrations: {
skip: false,
enableV2: true,
batchSize: 1000,
batchSizeBytes: options.batchSizeBytes,
},
logging: {
appenders: {
file: {
type: 'file',
fileName: logFilePath,
layout: {
type: 'json',
},
},
},
loggers: [
{
name: 'root',
appenders: ['file'],
},
],
},
},
{
oss: true,
}
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import Path from 'path';
import fs from 'fs/promises';
import JSON5 from 'json5';
import * as kbnTestServer from '../../../../test_helpers/kbn_server';
import { Root } from '../../../root';

const logFilePath = Path.join(__dirname, 'batch_size_bytes_exceeds_es_content_length.log');

async function removeLogFile() {
// ignore errors if it doesn't exist
await fs.unlink(logFilePath).catch(() => void 0);
}

describe('migration v2', () => {
let esServer: kbnTestServer.TestElasticsearchUtils;
let root: Root;
let startES: () => Promise<kbnTestServer.TestElasticsearchUtils>;

beforeAll(async () => {
await removeLogFile();
});
Comment on lines +28 to +30
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Should we also delete the logs afterAll?

Copy link
Contributor

@mshustov mshustov Sep 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I wrote this logic for the first time, I didn't add this logic to be able to investigate test failures with the logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we actually added the logs for debugging failed tests (and later used them for further integration testing) since these logs won't interfere with other test runs I think it's worth having them lie around so that you can see what had happened


beforeEach(() => {
({ startES } = kbnTestServer.createTestServers({
adjustTimeout: (t: number) => jest.setTimeout(t),
settings: {
es: {
license: 'basic',
dataArchive: Path.join(__dirname, 'archives', '7.14.0_xpack_sample_saved_objects.zip'),
esArgs: ['http.max_content_length=1mb'],
},
},
}));
});

afterEach(async () => {
if (root) {
await root.shutdown();
}
if (esServer) {
await esServer.stop();
}

await new Promise((resolve) => setTimeout(resolve, 10000));
});

it('fails with a descriptive message when batchSizeBytes exceeds ES http.max_content_length', async () => {
root = createRoot({ batchSizeBytes: 1715275 });
esServer = await startES();
await root.preboot();
await root.setup();
await expect(root.start()).rejects.toMatchInlineSnapshot(
`[Error: Unable to complete saved object migrations for the [.kibana] index: While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Ensure that the Kibana configuration option 'migrations.batchSize' is set to a value that is lower than or equal to the Elasticsearch 'http.max_content_length' configuration option.]`
);

const logFileContent = await fs.readFile(logFilePath, 'utf-8');
const records = logFileContent
.split('\n')
.filter(Boolean)
.map((str) => JSON5.parse(str)) as any[];

expect(
records.find((rec) =>
rec.message.startsWith(
`Unable to complete saved object migrations for the [.kibana] index: While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Ensure that the Kibana configuration option 'migrations.batchSize' is set to a value that is lower than or equal to the Elasticsearch 'http.max_content_length' configuration option.`
)
)
).toBeDefined();
});
});

function createRoot(options: { batchSizeBytes?: number }) {
return kbnTestServer.createRootWithCorePlugins(
{
migrations: {
skip: false,
enableV2: true,
batchSize: 1000,
batchSizeBytes: options.batchSizeBytes,
},
logging: {
appenders: {
file: {
type: 'file',
fileName: logFilePath,
layout: {
type: 'json',
},
},
},
loggers: [
{
name: 'root',
appenders: ['file'],
},
],
},
},
{
oss: true,
}
);
}
Loading