Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrations: dynamically adjust batchSize when reading #157494

Merged
merged 23 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
019eb09
Use batchSize config for update_by_query in updateAndPickupMappings
rudolf Mar 13, 2023
87bc176
Merge branch 'main' into updateAndPickupMappings-batch-size
rudolf Mar 15, 2023
ae9cdc9
Add batchSize to ZDT
rudolf Mar 15, 2023
3e5b0e5
Merge branch 'main' into updateAndPickupMappings-batch-size
rudolf Mar 27, 2023
e7d328e
Migrations: dynamically adjust batchSize when reading
rudolf May 12, 2023
a570e3a
Fixes and improve logging
rudolf May 12, 2023
7e761ac
model.test.ts unit tests
rudolf May 12, 2023
61cdf9c
Unit tests
rudolf May 12, 2023
d972600
Merge branch 'main' into updateAndPickupMappings-batch-size
rudolf May 12, 2023
156619f
Merge branch 'updateAndPickupMappings-batch-size' into migrations-dyn…
rudolf May 16, 2023
e3f11bc
E2E & integration tests
rudolf May 16, 2023
701e53b
Increase dot_kibana_split test timeout to reduce flakiness
rudolf May 16, 2023
b71ab24
Fix tests
rudolf May 16, 2023
9c241b4
Delete unecessary file
rudolf May 16, 2023
5202b28
Merge branch 'main' into migrations-dynamic-read-batchsize
kibanamachine May 16, 2023
532a41d
Retry when there's circuit breaker exceptions from Elasticsearch
rudolf May 17, 2023
67562ec
Address reviews, better handling when batchSize: 1 still exceeds maxR…
rudolf May 24, 2023
98826d4
Merge branch 'main' into migrations-dynamic-read-batchsize
rudolf May 24, 2023
c3c272f
Review feedback: increase coverage of recovering up to maxBatchSize o…
rudolf May 24, 2023
9a41d95
Review: why match when you can test
rudolf May 25, 2023
b81deb1
Merge branch 'main' into migrations-dynamic-read-batchsize
rudolf May 25, 2023
0efb67a
Fix outdated integration test
gsoldevila May 30, 2023
eb35b1b
Merge branch 'main' into migrations-dynamic-read-batchsize
gsoldevila May 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const migrationSchema = schema.object({
}),
batchSize: schema.number({ defaultValue: 1_000 }),
maxBatchSizeBytes: schema.byteSize({ defaultValue: '100mb' }), // 100mb is the default http.max_content_length Elasticsearch config value
maxReadBatchSizeBytes: schema.byteSize({ defaultValue: 536870888, max: 536870888 }),
rudolf marked this conversation as resolved.
Show resolved Hide resolved
discardUnknownObjects: schema.maybe(
schema.string({
validate: (value: string) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export type {
ReindexResponse,
UpdateByQueryResponse,
UpdateAndPickupMappingsResponse,
EsResponseTooLargeError,
} from './src/actions';
export {
isClusterShardLimitExceeded,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ export interface RequestEntityTooLargeException {
type: 'request_entity_too_large_exception';
}

export interface EsResponseTooLargeError {
type: 'es_response_too_large';
}

/** @internal */
export interface AcknowledgeResponse {
acknowledged: boolean;
Expand All @@ -169,6 +173,7 @@ export interface ActionErrorTypeMap {
index_not_green_timeout: IndexNotGreenTimeout;
index_not_yellow_timeout: IndexNotYellowTimeout;
cluster_shard_limit_exceeded: ClusterShardLimitExceeded;
es_response_too_large: EsResponseTooLargeError;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ describe('pickupUpdatedMappings', () => {
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = pickupUpdatedMappings(client, 'my_index');
const task = pickupUpdatedMappings(client, 'my_index', 1000);
try {
await task();
} catch (e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import {
catchRetryableEsClientErrors,
type RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import { BATCH_SIZE } from './constants';

export interface UpdateByQueryResponse {
taskId: string;
Expand All @@ -35,7 +34,8 @@ export interface UpdateByQueryResponse {
export const pickupUpdatedMappings =
(
client: ElasticsearchClient,
index: string
index: string,
batchSize: number
): TaskEither.TaskEither<RetryableEsClientError, UpdateByQueryResponse> =>
() => {
return client
Expand All @@ -46,7 +46,7 @@ export const pickupUpdatedMappings =
allow_no_indices: false,
index,
// How many documents to update per batch
scroll_size: BATCH_SIZE,
scroll_size: batchSize,
// force a refresh so that we can query the updated index immediately
// after the operation completes
refresh: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,54 @@ describe('readWithPit', () => {
pitId: 'pitId',
query: { match_all: {} },
batchSize: 10_000,
maxResponseSizeBytes: 100_000,
})();

expect(client.search).toHaveBeenCalledTimes(1);
expect(client.search).toHaveBeenCalledWith({
allow_partial_search_results: false,
pit: {
id: 'pitId',
keep_alive: '10m',
},
query: {
match_all: {},
expect(client.search).toHaveBeenCalledWith(
{
allow_partial_search_results: false,
pit: {
id: 'pitId',
keep_alive: '10m',
},
query: {
match_all: {},
},
search_after: undefined,
seq_no_primary_term: undefined,
size: 10000,
sort: '_shard_doc:asc',
track_total_hits: true,
},
search_after: undefined,
seq_no_primary_term: undefined,
size: 10000,
sort: '_shard_doc:asc',
track_total_hits: true,
{ maxResponseSize: 100_000 }
);
});

it('returns left es_response_too_large when client throws RequestAbortedError', async () => {
// Create a mock client that rejects all methods with a RequestAbortedError
// response.
const retryableError = new EsErrors.RequestAbortedError(
'The content length (536870889) is bigger than the maximum allow string (536870888)'
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);

const task = readWithPit({
client,
pitId: 'pitId',
query: { match_all: {} },
batchSize: 10_000,
});
try {
await task();
} catch (e) {
/** ignore */
}
await expect(task()).resolves.toEqual({
_tag: 'Left',
left: { type: 'es_response_too_large' },
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { errors as EsErrors } from '@elastic/elasticsearch';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import {
catchRetryableEsClientErrors,
type RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import { DEFAULT_PIT_KEEP_ALIVE } from './open_pit';
import { EsResponseTooLargeError } from '.';

/** @internal */
export interface ReadWithPit {
Expand All @@ -32,6 +34,7 @@ export interface ReadWithPitParams {
batchSize: number;
searchAfter?: number[];
seqNoPrimaryTerm?: boolean;
maxResponseSizeBytes?: number;
}

/*
Expand All @@ -45,32 +48,39 @@ export const readWithPit =
batchSize,
searchAfter,
seqNoPrimaryTerm,
}: ReadWithPitParams): TaskEither.TaskEither<RetryableEsClientError, ReadWithPit> =>
maxResponseSizeBytes,
}: ReadWithPitParams): TaskEither.TaskEither<
RetryableEsClientError | EsResponseTooLargeError,
ReadWithPit
> =>
() => {
return client
.search<SavedObjectsRawDoc>({
seq_no_primary_term: seqNoPrimaryTerm,
// Fail if the index being searched doesn't exist or is closed
// allow_no_indices: false,
// By default ES returns a 200 with partial results if there are shard
// request timeouts or shard failures which can lead to data loss for
// migrations
allow_partial_search_results: false,
// Sort fields are required to use searchAfter so we sort by the
// natural order of the index which is the most efficient option
// as order is not important for the migration
sort: '_shard_doc:asc',
pit: { id: pitId, keep_alive: DEFAULT_PIT_KEEP_ALIVE },
size: batchSize,
search_after: searchAfter,
/**
* We want to know how many documents we need to process so we can log the progress.
* But we also want to increase the performance of these requests,
* so we ask ES to report the total count only on the first request (when searchAfter does not exist)
*/
track_total_hits: typeof searchAfter === 'undefined',
query,
})
.search<SavedObjectsRawDoc>(
{
seq_no_primary_term: seqNoPrimaryTerm,
// Fail if the index being searched doesn't exist or is closed
// allow_no_indices: false,
// By default ES returns a 200 with partial results if there are shard
// request timeouts or shard failures which can lead to data loss for
// migrations
allow_partial_search_results: false,
// Sort fields are required to use searchAfter so we sort by the
// natural order of the index which is the most efficient option
// as order is not important for the migration
sort: '_shard_doc:asc',
pit: { id: pitId, keep_alive: DEFAULT_PIT_KEEP_ALIVE },
size: batchSize,
search_after: searchAfter,
/**
* We want to know how many documents we need to process so we can log the progress.
* But we also want to increase the performance of these requests,
* so we ask ES to report the total count only on the first request (when searchAfter does not exist)
*/
track_total_hits: typeof searchAfter === 'undefined',
query,
},
{ maxResponseSize: maxResponseSizeBytes }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)
.then((body) => {
const totalHits =
typeof body.hits.total === 'number'
Expand All @@ -93,5 +103,17 @@ export const readWithPit =
totalHits,
});
})
.catch((e) => {
if (
e instanceof EsErrors.RequestAbortedError &&
e.message.match(/The content length \(\d+\) is bigger than the maximum/) != null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: .test might be more performant because it doesn't need to extract the groups. I don't have numbers that back this assumption though.

Suggested change
e.message.match(/The content length \(\d+\) is bigger than the maximum/) != null
/The content length \(\d+\) is bigger than the maximum/.test(e.message)

) {
return Either.left({
type: 'es_response_too_large' as const,
});
} else {
throw e;
}
})
.catch(catchRetryableEsClientErrors);
};
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ describe('updateAndPickupMappings', () => {
client,
index: 'new_index',
mappings: { properties: {} },
batchSize: 1000,
});
try {
await task();
Expand Down Expand Up @@ -65,6 +66,7 @@ describe('updateAndPickupMappings', () => {
},
},
},
batchSize: 1000,
});
try {
await task();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export interface UpdateAndPickupMappingsParams {
client: ElasticsearchClient;
index: string;
mappings: IndexMapping;
batchSize: number;
}
/**
* Updates an index's mappings and runs an pickupUpdatedMappings task so that the mapping
Expand All @@ -37,6 +38,7 @@ export const updateAndPickupMappings = ({
client,
index,
mappings,
batchSize,
}: UpdateAndPickupMappingsParams): TaskEither.TaskEither<
RetryableEsClientError,
UpdateAndPickupMappingsResponse
Expand Down Expand Up @@ -74,7 +76,7 @@ export const updateAndPickupMappings = ({
return pipe(
putMappingTask,
TaskEither.chain((res) => {
return pickupUpdatedMappings(client, index);
return pickupUpdatedMappings(client, index, batchSize);
})
);
};
Loading