Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migration v2 respects the config.batchSize value #96207

Merged
merged 7 commits into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion src/core/server/saved_objects/migrationsv2/actions/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,12 @@ describe('actions', () => {

describe('searchForOutdatedDocuments', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.searchForOutdatedDocuments(client, 'new_index', { properties: {} });
mshustov marked this conversation as resolved.
Show resolved Hide resolved
const task = Actions.searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'new_index',
outdatedDocumentsQuery: {},
});

try {
await task();
} catch (e) {
Expand All @@ -172,6 +177,29 @@ describe('actions', () => {

expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});

it('configures request according to given parameters', async () => {
const esClient = elasticsearchClientMock.createInternalClient();
const query = {};
const targetIndex = 'new_index';
const batchSize = 1000;
const task = Actions.searchForOutdatedDocuments(esClient, {
batchSize,
targetIndex,
outdatedDocumentsQuery: query,
});

await task();

expect(esClient.search).toHaveBeenCalledTimes(1);
expect(esClient.search).toHaveBeenCalledWith(
expect.objectContaining({
index: targetIndex,
size: batchSize,
body: expect.objectContaining({ query }),
})
);
});
});

describe('bulkOverwriteTransformedDocuments', () => {
Expand Down
30 changes: 16 additions & 14 deletions src/core/server/saved_objects/migrationsv2/actions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import * as Option from 'fp-ts/lib/Option';
import { ElasticsearchClientError, ResponseError } from '@elastic/elasticsearch/lib/errors';
import { pipe } from 'fp-ts/lib/pipeable';
import type { estypes } from '@elastic/elasticsearch';
import { errors as EsErrors } from '@elastic/elasticsearch';
import type { ElasticsearchClientError, ResponseError } from '@elastic/elasticsearch/lib/errors';
import { pipe } from 'fp-ts/lib/pipeable';
import { flow } from 'fp-ts/lib/function';
import type { estypes } from '@elastic/elasticsearch';
import { ElasticsearchClient } from '../../../elasticsearch';
import { IndexMapping } from '../../mappings';
import { SavedObjectsRawDoc, SavedObjectsRawDocSource } from '../../serialization';
Expand All @@ -24,13 +24,10 @@ import {
export type { RetryableEsClientError };

/**
* Batch size for updateByQuery, reindex & search operations. Smaller batches
* reduce the memory pressure on Elasticsearch and Kibana so are less likely
* to cause failures.
* TODO (profile/tune): How much smaller can we make this number before it
* starts impacting how long migrations take to perform?
* Batch size for updateByQuery and reindex operations.
* Uses the default value of 1000 for Elasticsearch reindex operation.
*/
const BATCH_SIZE = 1000;
const BATCH_SIZE = 1_000;
const DEFAULT_TIMEOUT = '60s';
/** Allocate 1 replica if there are enough data nodes, otherwise continue with 0 */
const INDEX_AUTO_EXPAND_REPLICAS = '0-1';
Expand Down Expand Up @@ -839,25 +836,30 @@ export interface SearchResponse {
outdatedDocuments: SavedObjectsRawDoc[];
}

interface SearchForOutdatedDocumentsOptions {
batchSize: number;
targetIndex: string;
outdatedDocumentsQuery?: estypes.QueryContainer;
}

/**
* Search for outdated saved object documents with the provided query. Will
* return one batch of documents. Searching should be repeated until no more
* outdated documents can be found.
*/
export const searchForOutdatedDocuments = (
client: ElasticsearchClient,
index: string,
query: Record<string, unknown>
options: SearchForOutdatedDocumentsOptions
): TaskEither.TaskEither<RetryableEsClientError, SearchResponse> => () => {
return client
.search<SavedObjectsRawDocSource>({
index,
index: options.targetIndex,
// Return the _seq_no and _primary_term so we can use optimistic
// concurrency control for updates
seq_no_primary_term: true,
size: BATCH_SIZE,
size: options.batchSize,
body: {
query,
query: options.outdatedDocumentsQuery,
// Optimize search performance by sorting by the "natural" index order
sort: ['_doc'],
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe('migration actions', () => {

// Create test fixture data:
await createIndex(client, 'existing_index_with_docs', {
dynamic: true as any,
dynamic: true,
properties: {},
})();
const sourceDocs = ([
Expand Down Expand Up @@ -337,7 +337,6 @@ describe('migration actions', () => {
// Reindex doesn't return any errors on it's own, so we have to test
// together with waitForReindexTask
describe('reindex & waitForReindexTask', () => {
expect.assertions(2);
it('resolves right when reindex succeeds without reindex script', async () => {
const res = (await reindex(
client,
Expand All @@ -354,11 +353,11 @@ describe('migration actions', () => {
}
`);

const results = ((await searchForOutdatedDocuments(
client,
'reindex_target',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'reindex_target',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
Array [
"doc 1",
Expand All @@ -384,11 +383,11 @@ describe('migration actions', () => {
"right": "reindex_succeeded",
}
`);
const results = ((await searchForOutdatedDocuments(
client,
'reindex_target_2',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'reindex_target_2',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
Array [
"doc 1_updated",
Expand Down Expand Up @@ -432,12 +431,12 @@ describe('migration actions', () => {
}
`);

// Assert that documents weren't overrided by the second, unscripted reindex
const results = ((await searchForOutdatedDocuments(
client,
'reindex_target_3',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
// Assert that documents weren't overridden by the second, unscripted reindex
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'reindex_target_3',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
Array [
"doc 1_updated",
Expand All @@ -452,11 +451,11 @@ describe('migration actions', () => {
// Simulate a reindex that only adds some of the documents from the
// source index into the target index
await createIndex(client, 'reindex_target_4', { properties: {} })();
const sourceDocs = ((await searchForOutdatedDocuments(
client,
'existing_index_with_docs',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments
const sourceDocs = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_with_docs',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments
.slice(0, 2)
.map(({ _id, _source }) => ({
_id,
Expand All @@ -479,13 +478,13 @@ describe('migration actions', () => {
"right": "reindex_succeeded",
}
`);
// Assert that existing documents weren't overrided, but that missing
// Assert that existing documents weren't overridden, but that missing
// documents were added by the reindex
const results = ((await searchForOutdatedDocuments(
client,
'reindex_target_4',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'reindex_target_4',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
Array [
"doc 1",
Expand Down Expand Up @@ -701,26 +700,30 @@ describe('migration actions', () => {
describe('searchForOutdatedDocuments', () => {
it('only returns documents that match the outdatedDocumentsQuery', async () => {
expect.assertions(2);
const resultsWithQuery = ((await searchForOutdatedDocuments(
client,
'existing_index_with_docs',
{
const resultsWithQuery = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_with_docs',
outdatedDocumentsQuery: {
match: { title: { query: 'doc' } },
}
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
},
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(resultsWithQuery.length).toBe(3);

const resultsWithoutQuery = ((await searchForOutdatedDocuments(
client,
'existing_index_with_docs',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const resultsWithoutQuery = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_with_docs',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(resultsWithoutQuery.length).toBe(4);
});
it('resolves with _id, _source, _seq_no and _primary_term', async () => {
expect.assertions(1);
const results = ((await searchForOutdatedDocuments(client, 'existing_index_with_docs', {
match: { title: { query: 'doc' } },
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_with_docs',
outdatedDocumentsQuery: {
match: { title: { query: 'doc' } },
},
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results).toEqual(
expect.arrayContaining([
Expand Down Expand Up @@ -805,7 +808,7 @@ describe('migration actions', () => {
it('resolves right when mappings were updated and picked up', async () => {
// Create an index without any mappings and insert documents into it
await createIndex(client, 'existing_index_without_mappings', {
dynamic: false as any,
dynamic: false,
properties: {},
})();
const sourceDocs = ([
Expand All @@ -821,11 +824,13 @@ describe('migration actions', () => {
)();

// Assert that we can't search over the unmapped fields of the document
const originalSearchResults = ((await searchForOutdatedDocuments(
client,
'existing_index_without_mappings',
{ match: { title: { query: 'doc' } } }
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const originalSearchResults = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_without_mappings',
outdatedDocumentsQuery: {
match: { title: { query: 'doc' } },
},
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(originalSearchResults.length).toBe(0);

// Update and pickup mappings so that the title field is searchable
Expand All @@ -839,11 +844,13 @@ describe('migration actions', () => {
await waitForPickupUpdatedMappingsTask(client, taskId, '60s')();

// Repeat the search expecting to be able to find the existing documents
const pickedUpSearchResults = ((await searchForOutdatedDocuments(
client,
'existing_index_without_mappings',
{ match: { title: { query: 'doc' } } }
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const pickedUpSearchResults = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_without_mappings',
outdatedDocumentsQuery: {
match: { title: { query: 'doc' } },
},
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(pickedUpSearchResults.length).toBe(4);
});
});
Expand Down Expand Up @@ -1050,11 +1057,11 @@ describe('migration actions', () => {
`);
});
it('resolves right even if there were some version_conflict_engine_exception', async () => {
const existingDocs = ((await searchForOutdatedDocuments(
client,
'existing_index_with_docs',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const existingDocs = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_with_docs',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;

const task = bulkOverwriteTransformedDocuments(client, 'existing_index_with_docs', [
...existingDocs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ describe('migrationsStateActionMachine', () => {
Array [
"[.my-so-index] INIT -> LEGACY_DELETE",
Object {
"batchSize": 1000,
"controlState": "LEGACY_DELETE",
"currentAlias": ".my-so-index",
"indexPrefix": ".my-so-index",
Expand Down Expand Up @@ -262,6 +263,7 @@ describe('migrationsStateActionMachine', () => {
Array [
"[.my-so-index] LEGACY_DELETE -> FATAL",
Object {
"batchSize": 1000,
"controlState": "FATAL",
"currentAlias": ".my-so-index",
"indexPrefix": ".my-so-index",
Expand Down Expand Up @@ -413,6 +415,7 @@ describe('migrationsStateActionMachine', () => {
Array [
"[.my-so-index] INIT -> LEGACY_REINDEX",
Object {
"batchSize": 1000,
"controlState": "LEGACY_REINDEX",
"currentAlias": ".my-so-index",
"indexPrefix": ".my-so-index",
Expand Down Expand Up @@ -464,6 +467,7 @@ describe('migrationsStateActionMachine', () => {
Array [
"[.my-so-index] LEGACY_REINDEX -> LEGACY_DELETE",
Object {
"batchSize": 1000,
"controlState": "LEGACY_DELETE",
"currentAlias": ".my-so-index",
"indexPrefix": ".my-so-index",
Expand Down
3 changes: 3 additions & 0 deletions src/core/server/saved_objects/migrationsv2/model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ describe('migrations v2 model', () => {
retryCount: 0,
retryDelay: 0,
retryAttempts: 15,
batchSize: 1000,
indexPrefix: '.kibana',
outdatedDocumentsQuery: {},
targetIndexMappings: {
Expand Down Expand Up @@ -1182,6 +1183,7 @@ describe('migrations v2 model', () => {
describe('createInitialState', () => {
const migrationsConfig = ({
retryAttempts: 15,
batchSize: 1000,
} as unknown) as SavedObjectsMigrationConfigType;
it('creates the initial state for the model based on the passed in paramaters', () => {
expect(
Expand All @@ -1197,6 +1199,7 @@ describe('migrations v2 model', () => {
})
).toMatchInlineSnapshot(`
Object {
"batchSize": 1000,
"controlState": "INIT",
"currentAlias": ".kibana_task_manager",
"indexPrefix": ".kibana_task_manager",
Expand Down
1 change: 1 addition & 0 deletions src/core/server/saved_objects/migrationsv2/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ export const createInitialState = ({
retryCount: 0,
retryDelay: 0,
retryAttempts: migrationsConfig.retryAttempts,
batchSize: migrationsConfig.batchSize,
logs: [],
};
return initialState;
Expand Down
Loading