Skip to content

Commit

Permalink
migration v2 respects the config.batchSize value (#96207)
Browse files Browse the repository at this point in the history
* migrationsv2: read batchSize from config

* update tests

* update numeric values in so config to improve reading

* fix integration tests. failed with illegal_argument_exception
  • Loading branch information
mshustov authored Apr 6, 2021
1 parent 8b5e2fb commit 157129e
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 76 deletions.
30 changes: 29 additions & 1 deletion src/core/server/saved_objects/migrationsv2/actions/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,12 @@ describe('actions', () => {

describe('searchForOutdatedDocuments', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.searchForOutdatedDocuments(client, 'new_index', { properties: {} });
const task = Actions.searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'new_index',
outdatedDocumentsQuery: {},
});

try {
await task();
} catch (e) {
Expand All @@ -172,6 +177,29 @@ describe('actions', () => {

expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});

it('configures request according to given parameters', async () => {
const esClient = elasticsearchClientMock.createInternalClient();
const query = {};
const targetIndex = 'new_index';
const batchSize = 1000;
const task = Actions.searchForOutdatedDocuments(esClient, {
batchSize,
targetIndex,
outdatedDocumentsQuery: query,
});

await task();

expect(esClient.search).toHaveBeenCalledTimes(1);
expect(esClient.search).toHaveBeenCalledWith(
expect.objectContaining({
index: targetIndex,
size: batchSize,
body: expect.objectContaining({ query }),
})
);
});
});

describe('bulkOverwriteTransformedDocuments', () => {
Expand Down
30 changes: 16 additions & 14 deletions src/core/server/saved_objects/migrationsv2/actions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import * as Option from 'fp-ts/lib/Option';
import { ElasticsearchClientError, ResponseError } from '@elastic/elasticsearch/lib/errors';
import { pipe } from 'fp-ts/lib/pipeable';
import type { estypes } from '@elastic/elasticsearch';
import { errors as EsErrors } from '@elastic/elasticsearch';
import type { ElasticsearchClientError, ResponseError } from '@elastic/elasticsearch/lib/errors';
import { pipe } from 'fp-ts/lib/pipeable';
import { flow } from 'fp-ts/lib/function';
import type { estypes } from '@elastic/elasticsearch';
import { ElasticsearchClient } from '../../../elasticsearch';
import { IndexMapping } from '../../mappings';
import { SavedObjectsRawDoc, SavedObjectsRawDocSource } from '../../serialization';
Expand All @@ -24,13 +24,10 @@ import {
export type { RetryableEsClientError };

/**
* Batch size for updateByQuery, reindex & search operations. Smaller batches
* reduce the memory pressure on Elasticsearch and Kibana so are less likely
* to cause failures.
* TODO (profile/tune): How much smaller can we make this number before it
* starts impacting how long migrations take to perform?
* Batch size for updateByQuery and reindex operations.
* Uses the default value of 1000 for Elasticsearch reindex operation.
*/
const BATCH_SIZE = 1000;
const BATCH_SIZE = 1_000;
const DEFAULT_TIMEOUT = '60s';
/** Allocate 1 replica if there are enough data nodes, otherwise continue with 0 */
const INDEX_AUTO_EXPAND_REPLICAS = '0-1';
Expand Down Expand Up @@ -839,25 +836,30 @@ export interface SearchResponse {
outdatedDocuments: SavedObjectsRawDoc[];
}

interface SearchForOutdatedDocumentsOptions {
batchSize: number;
targetIndex: string;
outdatedDocumentsQuery?: estypes.QueryContainer;
}

/**
* Search for outdated saved object documents with the provided query. Will
* return one batch of documents. Searching should be repeated until no more
* outdated documents can be found.
*/
export const searchForOutdatedDocuments = (
client: ElasticsearchClient,
index: string,
query: Record<string, unknown>
options: SearchForOutdatedDocumentsOptions
): TaskEither.TaskEither<RetryableEsClientError, SearchResponse> => () => {
return client
.search<SavedObjectsRawDocSource>({
index,
index: options.targetIndex,
// Return the _seq_no and _primary_term so we can use optimistic
// concurrency control for updates
seq_no_primary_term: true,
size: BATCH_SIZE,
size: options.batchSize,
body: {
query,
query: options.outdatedDocumentsQuery,
// Optimize search performance by sorting by the "natural" index order
sort: ['_doc'],
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe('migration actions', () => {

// Create test fixture data:
await createIndex(client, 'existing_index_with_docs', {
dynamic: true as any,
dynamic: true,
properties: {},
})();
const sourceDocs = ([
Expand Down Expand Up @@ -337,7 +337,6 @@ describe('migration actions', () => {
// Reindex doesn't return any errors on it's own, so we have to test
// together with waitForReindexTask
describe('reindex & waitForReindexTask', () => {
expect.assertions(2);
it('resolves right when reindex succeeds without reindex script', async () => {
const res = (await reindex(
client,
Expand All @@ -354,11 +353,11 @@ describe('migration actions', () => {
}
`);

const results = ((await searchForOutdatedDocuments(
client,
'reindex_target',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'reindex_target',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
Array [
"doc 1",
Expand All @@ -384,11 +383,11 @@ describe('migration actions', () => {
"right": "reindex_succeeded",
}
`);
const results = ((await searchForOutdatedDocuments(
client,
'reindex_target_2',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'reindex_target_2',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
Array [
"doc 1_updated",
Expand Down Expand Up @@ -432,12 +431,12 @@ describe('migration actions', () => {
}
`);

// Assert that documents weren't overrided by the second, unscripted reindex
const results = ((await searchForOutdatedDocuments(
client,
'reindex_target_3',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
// Assert that documents weren't overridden by the second, unscripted reindex
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'reindex_target_3',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
Array [
"doc 1_updated",
Expand All @@ -452,11 +451,11 @@ describe('migration actions', () => {
// Simulate a reindex that only adds some of the documents from the
// source index into the target index
await createIndex(client, 'reindex_target_4', { properties: {} })();
const sourceDocs = ((await searchForOutdatedDocuments(
client,
'existing_index_with_docs',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments
const sourceDocs = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_with_docs',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments
.slice(0, 2)
.map(({ _id, _source }) => ({
_id,
Expand All @@ -479,13 +478,13 @@ describe('migration actions', () => {
"right": "reindex_succeeded",
}
`);
// Assert that existing documents weren't overrided, but that missing
// Assert that existing documents weren't overridden, but that missing
// documents were added by the reindex
const results = ((await searchForOutdatedDocuments(
client,
'reindex_target_4',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'reindex_target_4',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
Array [
"doc 1",
Expand Down Expand Up @@ -701,26 +700,30 @@ describe('migration actions', () => {
describe('searchForOutdatedDocuments', () => {
it('only returns documents that match the outdatedDocumentsQuery', async () => {
expect.assertions(2);
const resultsWithQuery = ((await searchForOutdatedDocuments(
client,
'existing_index_with_docs',
{
const resultsWithQuery = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_with_docs',
outdatedDocumentsQuery: {
match: { title: { query: 'doc' } },
}
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
},
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(resultsWithQuery.length).toBe(3);

const resultsWithoutQuery = ((await searchForOutdatedDocuments(
client,
'existing_index_with_docs',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const resultsWithoutQuery = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_with_docs',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(resultsWithoutQuery.length).toBe(4);
});
it('resolves with _id, _source, _seq_no and _primary_term', async () => {
expect.assertions(1);
const results = ((await searchForOutdatedDocuments(client, 'existing_index_with_docs', {
match: { title: { query: 'doc' } },
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_with_docs',
outdatedDocumentsQuery: {
match: { title: { query: 'doc' } },
},
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results).toEqual(
expect.arrayContaining([
Expand Down Expand Up @@ -805,7 +808,7 @@ describe('migration actions', () => {
it('resolves right when mappings were updated and picked up', async () => {
// Create an index without any mappings and insert documents into it
await createIndex(client, 'existing_index_without_mappings', {
dynamic: false as any,
dynamic: false,
properties: {},
})();
const sourceDocs = ([
Expand All @@ -821,11 +824,13 @@ describe('migration actions', () => {
)();

// Assert that we can't search over the unmapped fields of the document
const originalSearchResults = ((await searchForOutdatedDocuments(
client,
'existing_index_without_mappings',
{ match: { title: { query: 'doc' } } }
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const originalSearchResults = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_without_mappings',
outdatedDocumentsQuery: {
match: { title: { query: 'doc' } },
},
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(originalSearchResults.length).toBe(0);

// Update and pickup mappings so that the title field is searchable
Expand All @@ -839,11 +844,13 @@ describe('migration actions', () => {
await waitForPickupUpdatedMappingsTask(client, taskId, '60s')();

// Repeat the search expecting to be able to find the existing documents
const pickedUpSearchResults = ((await searchForOutdatedDocuments(
client,
'existing_index_without_mappings',
{ match: { title: { query: 'doc' } } }
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const pickedUpSearchResults = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_without_mappings',
outdatedDocumentsQuery: {
match: { title: { query: 'doc' } },
},
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(pickedUpSearchResults.length).toBe(4);
});
});
Expand Down Expand Up @@ -1050,11 +1057,11 @@ describe('migration actions', () => {
`);
});
it('resolves right even if there were some version_conflict_engine_exception', async () => {
const existingDocs = ((await searchForOutdatedDocuments(
client,
'existing_index_with_docs',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const existingDocs = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_with_docs',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;

const task = bulkOverwriteTransformedDocuments(client, 'existing_index_with_docs', [
...existingDocs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ describe('migrationsStateActionMachine', () => {
Array [
"[.my-so-index] INIT -> LEGACY_DELETE",
Object {
"batchSize": 1000,
"controlState": "LEGACY_DELETE",
"currentAlias": ".my-so-index",
"indexPrefix": ".my-so-index",
Expand Down Expand Up @@ -262,6 +263,7 @@ describe('migrationsStateActionMachine', () => {
Array [
"[.my-so-index] LEGACY_DELETE -> FATAL",
Object {
"batchSize": 1000,
"controlState": "FATAL",
"currentAlias": ".my-so-index",
"indexPrefix": ".my-so-index",
Expand Down Expand Up @@ -413,6 +415,7 @@ describe('migrationsStateActionMachine', () => {
Array [
"[.my-so-index] INIT -> LEGACY_REINDEX",
Object {
"batchSize": 1000,
"controlState": "LEGACY_REINDEX",
"currentAlias": ".my-so-index",
"indexPrefix": ".my-so-index",
Expand Down Expand Up @@ -464,6 +467,7 @@ describe('migrationsStateActionMachine', () => {
Array [
"[.my-so-index] LEGACY_REINDEX -> LEGACY_DELETE",
Object {
"batchSize": 1000,
"controlState": "LEGACY_DELETE",
"currentAlias": ".my-so-index",
"indexPrefix": ".my-so-index",
Expand Down
3 changes: 3 additions & 0 deletions src/core/server/saved_objects/migrationsv2/model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ describe('migrations v2 model', () => {
retryCount: 0,
retryDelay: 0,
retryAttempts: 15,
batchSize: 1000,
indexPrefix: '.kibana',
outdatedDocumentsQuery: {},
targetIndexMappings: {
Expand Down Expand Up @@ -1182,6 +1183,7 @@ describe('migrations v2 model', () => {
describe('createInitialState', () => {
const migrationsConfig = ({
retryAttempts: 15,
batchSize: 1000,
} as unknown) as SavedObjectsMigrationConfigType;
it('creates the initial state for the model based on the passed in paramaters', () => {
expect(
Expand All @@ -1197,6 +1199,7 @@ describe('migrations v2 model', () => {
})
).toMatchInlineSnapshot(`
Object {
"batchSize": 1000,
"controlState": "INIT",
"currentAlias": ".kibana_task_manager",
"indexPrefix": ".kibana_task_manager",
Expand Down
1 change: 1 addition & 0 deletions src/core/server/saved_objects/migrationsv2/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ export const createInitialState = ({
retryCount: 0,
retryDelay: 0,
retryAttempts: migrationsConfig.retryAttempts,
batchSize: migrationsConfig.batchSize,
logs: [],
};
return initialState;
Expand Down
Loading

0 comments on commit 157129e

Please sign in to comment.