Skip to content

Commit

Permalink
Migrations: dynamically adjust batchSize when reading (#157494)
Browse files Browse the repository at this point in the history
## Summary

Migrations read 1000 documents by default which works well for most
deployments. But if any batch happens to be > ~512MB we hit NodeJS' max
string length limit and cannot process that batch. This forces users to
reduce the batch size to a smaller number which could severely slow down
migrations.

This PR reduces the impact of large batches by catching
elasticsearch-js' `RequestAbortedError` and reducing the batch size in
half. When subsequent batches are successful the batchSize increases by
20%. This means we'll have a sequence like:

1. Read 1000 docs ✅ (small batch)
2. Read 1000 docs 🔴 (too large batch)
3. Read 500 docs ✅ 
4. Read 600 docs ✅ 
5. Read 720 docs ✅
6. Read 864 docs ✅
7. Read 1000 docs ✅ (small batch)

This assumes that most clusters just have a few large batches exceeding
the limit. If all batches exceed the limit we'd have 1 failure for every
4 successful reads so we pay a 20% throughput penalty. In such a case it
would be better to configure a lower `migrations.batchSize`.

Tested this manually:
1. Start ES with more heap than the default, otherwise reading large
batches will cause it to run out of memory
`ES_JAVA_OPTS=' -Xms6g -Xmx6g' yarn es snapshot
--data-archive=/Users/rudolf/dev/kibana/src/core/server/integration_tests/saved_objects/migrations/archives/8.4.0_with_sample_data_logs.zip`
2. Ingest lots of large documents of ~5mb
   ```
curl -XPUT
"elastic:changeme@localhost:9200/_security/role/grant_kibana_system_indices"
-H "kbn-xsrf: reporting" -H "Content-Type: application/json" -d'
      {                           
        "indices": [                                            
          {
            "names": [
              ".kibana*"
            ],
            "privileges": [
              "all"
            ],
            "allow_restricted_indices": true
          }
        ]
      }'

curl -XPOST "elastic:changeme@localhost:9200/_security/user/superuser"
-H "kbn-xsrf: reporting" -H "Content-Type: application/json" -d'
      {
        "password" : "changeme",  
        "roles" : [ "superuser", "grant_kibana_system_indices" ]
      }'

curl -XPUT
"superuser:changeme@localhost:9200/.kibana_8.4.0_001/_mappings" -H
"kbn-xsrf: reporting" -H "Content-Type: application/json" -d'
      {
"dynamic": false,
            "properties": {

            }

      }'

      set -B                  # enable brace expansion
      for i in {1..400}; do
curl -k --data-binary
"@/Users/rudolf/dev/kibana/src/core/server/integration_tests/saved_objects/migrations/group3/body.json"
-X PUT
"http://superuser:changeme@localhost:9200/.kibana_8.4.0_001/_doc/cases-comments:"{$i}"?&pretty=true"
-H "Content-Type: application/json"
      done
   ```
3. Start Kibana with a modest batchSize otherwise we could OOM ES `node
scripts/kibana --dev --migrations.batchSize=120`



<details><summary>Example logs. Note the "Processed x documents" only
logs when the next batch is successfull read, so the order seems wrong.
To improve it we'd need to log progress after a batch is successfully
written instead 🤷 </summary>
```
[.kibana] Processed 120 documents out of 542.
[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 3667ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1740ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1376ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1402ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1311ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1388ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 900ms.
[.kibana] Read a batch that exceeded the NodeJS maximum string length, retrying by reducing the batch size in half to 60.
[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_READ. took: 1538ms.
[.kibana] Processed 240 documents out of 542.
[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 2054ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1042ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1310ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1388ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 1130ms.
[.kibana] Processed 300 documents out of 542.
[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 2610ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1262ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1299ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1363ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1341ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 572ms.
[.kibana] Processed 372 documents out of 542.
[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 3330ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1488ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1349ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1312ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1380ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1310ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 139ms.
[.kibana] Processed 458 documents out of 542.
[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_TRANSFORM. took: 3278ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_TRANSFORM -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1460ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1370ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1303ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK. took: 1384ms.
[.kibana] REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ. took: 1298ms.
[.kibana] Processed 542 documents out of 542.
[.kibana] REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_CLOSE_PIT. took: 4ms.
```
</details>
### Checklist

Delete any items that are not applicable to this PR.

- [ ] Any text added follows [EUI's writing
guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses
sentence case text and includes [i18n
support](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md)
- [ ]
[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)
was added for features that require explanation or tutorials
- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [ ] Any UI touched in this PR is usable by keyboard only (learn more
about [keyboard accessibility](https://webaim.org/techniques/keyboard/))
- [ ] Any UI touched in this PR does not create any new axe failures
(run axe in browser:
[FF](https://addons.mozilla.org/en-US/firefox/addon/axe-devtools/),
[Chrome](https://chrome.google.com/webstore/detail/axe-web-accessibility-tes/lhdoppojpmngadmnindnejefpokejbdd?hl=en-US))
- [ ] If a plugin configuration key changed, check if it needs to be
allowlisted in the cloud and added to the [docker
list](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker)
- [ ] This renders correctly on smaller devices using a responsive
layout. (You can test this [in your
browser](https://www.browserstack.com/guide/responsive-testing-on-local-server))
- [ ] This was checked for [cross-browser
compatibility](https://www.elastic.co/support/matrix#matrix_browsers)


### Risks


### For maintainers

- [ ] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)

---------

Co-authored-by: Kibana Machine <[email protected]>
Co-authored-by: Gerard Soldevila <[email protected]>
  • Loading branch information
3 people authored May 30, 2023
1 parent c06b5ef commit 094b62a
Show file tree
Hide file tree
Showing 22 changed files with 511 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@
import { valid } from 'semver';
import { schema, TypeOf } from '@kbn/config-schema';
import type { ServiceConfigDescriptor } from '@kbn/core-base-server-internal';
import buffer from 'buffer';

const migrationSchema = schema.object({
algorithm: schema.oneOf([schema.literal('v2'), schema.literal('zdt')], {
defaultValue: 'v2',
}),
batchSize: schema.number({ defaultValue: 1_000 }),
maxBatchSizeBytes: schema.byteSize({ defaultValue: '100mb' }), // 100mb is the default http.max_content_length Elasticsearch config value
maxReadBatchSizeBytes: schema.byteSize({
defaultValue: buffer.constants.MAX_STRING_LENGTH,
max: buffer.constants.MAX_STRING_LENGTH,
}),
discardUnknownObjects: schema.maybe(
schema.string({
validate: (value: string) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export type {
ReindexResponse,
UpdateByQueryResponse,
UpdateAndPickupMappingsResponse,
EsResponseTooLargeError,
} from './src/actions';
export {
isClusterShardLimitExceeded,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ describe('catchRetryableEsClientErrors', () => {
});
});
it('ResponseError with retryable status code', async () => {
const statusCodes = [503, 401, 403, 408, 410];
const statusCodes = [503, 401, 403, 408, 410, 429];
return Promise.all(
statusCodes.map(async (status) => {
const error = new esErrors.ResponseError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const retryResponseStatuses = [
403, // AuthenticationException
408, // RequestTimeout
410, // Gone
429, // TooManyRequests -> ES circuit breaker
];

export interface RetryableEsClientError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ export interface RequestEntityTooLargeException {
type: 'request_entity_too_large_exception';
}

export interface EsResponseTooLargeError {
type: 'es_response_too_large';
contentLength: number;
}

/** @internal */
export interface AcknowledgeResponse {
acknowledged: boolean;
Expand All @@ -168,6 +173,7 @@ export interface ActionErrorTypeMap {
index_not_green_timeout: IndexNotGreenTimeout;
index_not_yellow_timeout: IndexNotYellowTimeout;
cluster_shard_limit_exceeded: ClusterShardLimitExceeded;
es_response_too_large: EsResponseTooLargeError;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,54 @@ describe('readWithPit', () => {
pitId: 'pitId',
query: { match_all: {} },
batchSize: 10_000,
maxResponseSizeBytes: 100_000,
})();

expect(client.search).toHaveBeenCalledTimes(1);
expect(client.search).toHaveBeenCalledWith({
allow_partial_search_results: false,
pit: {
id: 'pitId',
keep_alive: '10m',
},
query: {
match_all: {},
expect(client.search).toHaveBeenCalledWith(
{
allow_partial_search_results: false,
pit: {
id: 'pitId',
keep_alive: '10m',
},
query: {
match_all: {},
},
search_after: undefined,
seq_no_primary_term: undefined,
size: 10000,
sort: '_shard_doc:asc',
track_total_hits: true,
},
search_after: undefined,
seq_no_primary_term: undefined,
size: 10000,
sort: '_shard_doc:asc',
track_total_hits: true,
{ maxResponseSize: 100_000 }
);
});

it('returns left es_response_too_large when client throws RequestAbortedError', async () => {
// Create a mock client that rejects all methods with a RequestAbortedError
// response.
const retryableError = new EsErrors.RequestAbortedError(
'The content length (536870889) is bigger than the maximum allow string (536870888)'
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);

const task = readWithPit({
client,
pitId: 'pitId',
query: { match_all: {} },
batchSize: 10_000,
});
try {
await task();
} catch (e) {
/** ignore */
}
await expect(task()).resolves.toEqual({
_tag: 'Left',
left: { contentLength: 536870889, type: 'es_response_too_large' },
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { errors as EsErrors } from '@elastic/elasticsearch';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import {
catchRetryableEsClientErrors,
type RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import { DEFAULT_PIT_KEEP_ALIVE } from './open_pit';
import { EsResponseTooLargeError } from '.';

/** @internal */
export interface ReadWithPit {
Expand All @@ -32,6 +34,7 @@ export interface ReadWithPitParams {
batchSize: number;
searchAfter?: number[];
seqNoPrimaryTerm?: boolean;
maxResponseSizeBytes?: number;
}

/*
Expand All @@ -45,32 +48,39 @@ export const readWithPit =
batchSize,
searchAfter,
seqNoPrimaryTerm,
}: ReadWithPitParams): TaskEither.TaskEither<RetryableEsClientError, ReadWithPit> =>
maxResponseSizeBytes,
}: ReadWithPitParams): TaskEither.TaskEither<
RetryableEsClientError | EsResponseTooLargeError,
ReadWithPit
> =>
() => {
return client
.search<SavedObjectsRawDoc>({
seq_no_primary_term: seqNoPrimaryTerm,
// Fail if the index being searched doesn't exist or is closed
// allow_no_indices: false,
// By default ES returns a 200 with partial results if there are shard
// request timeouts or shard failures which can lead to data loss for
// migrations
allow_partial_search_results: false,
// Sort fields are required to use searchAfter so we sort by the
// natural order of the index which is the most efficient option
// as order is not important for the migration
sort: '_shard_doc:asc',
pit: { id: pitId, keep_alive: DEFAULT_PIT_KEEP_ALIVE },
size: batchSize,
search_after: searchAfter,
/**
* We want to know how many documents we need to process so we can log the progress.
* But we also want to increase the performance of these requests,
* so we ask ES to report the total count only on the first request (when searchAfter does not exist)
*/
track_total_hits: typeof searchAfter === 'undefined',
query,
})
.search<SavedObjectsRawDoc>(
{
seq_no_primary_term: seqNoPrimaryTerm,
// Fail if the index being searched doesn't exist or is closed
// allow_no_indices: false,
// By default ES returns a 200 with partial results if there are shard
// request timeouts or shard failures which can lead to data loss for
// migrations
allow_partial_search_results: false,
// Sort fields are required to use searchAfter so we sort by the
// natural order of the index which is the most efficient option
// as order is not important for the migration
sort: '_shard_doc:asc',
pit: { id: pitId, keep_alive: DEFAULT_PIT_KEEP_ALIVE },
size: batchSize,
search_after: searchAfter,
/**
* We want to know how many documents we need to process so we can log the progress.
* But we also want to increase the performance of these requests,
* so we ask ES to report the total count only on the first request (when searchAfter does not exist)
*/
track_total_hits: typeof searchAfter === 'undefined',
query,
},
{ maxResponseSize: maxResponseSizeBytes }
)
.then((body) => {
const totalHits =
typeof body.hits.total === 'number'
Expand All @@ -93,5 +103,22 @@ export const readWithPit =
totalHits,
});
})
.catch((e) => {
if (
e instanceof EsErrors.RequestAbortedError &&
/The content length \(\d+\) is bigger than the maximum/.test(e.message)
) {
return Either.left({
type: 'es_response_too_large' as const,
contentLength: Number.parseInt(
e.message.match(/The content length \((\d+)\) is bigger than the maximum/)?.[1] ??
'-1',
10
),
});
} else {
throw e;
}
})
.catch(catchRetryableEsClientErrors);
};
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const migrationsConfig = {
retryAttempts: 15,
batchSize: 1000,
maxBatchSizeBytes: ByteSizeValue.parse('100mb'),
maxReadBatchSizeBytes: ByteSizeValue.parse('500mb'),
} as unknown as SavedObjectsMigrationConfigType;

const createInitialStateCommonParams = {
Expand Down Expand Up @@ -217,7 +218,9 @@ describe('createInitialState', () => {
"knownTypes": Array [],
"legacyIndex": ".kibana_task_manager",
"logs": Array [],
"maxBatchSize": 1000,
"maxBatchSizeBytes": 104857600,
"maxReadBatchSizeBytes": 524288000,
"migrationDocLinks": Object {
"clusterShardLimitExceeded": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#cluster-shard-limit-exceeded",
"repeatedTimeoutRequests": "https://www.elastic.co/guide/en/kibana/test-branch/resolve-migrations-failures.html#_repeated_time_out_requests_that_eventually_fail",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ export const createInitialState = ({
retryDelay: 0,
retryAttempts: migrationsConfig.retryAttempts,
batchSize: migrationsConfig.batchSize,
maxBatchSize: migrationsConfig.batchSize,
maxBatchSizeBytes: migrationsConfig.maxBatchSizeBytes.getValueInBytes(),
maxReadBatchSizeBytes: migrationsConfig.maxReadBatchSizeBytes.getValueInBytes(),
discardUnknownObjects: migrationsConfig.discardUnknownObjects === kibanaVersion,
discardCorruptObjects: migrationsConfig.discardCorruptObjects === kibanaVersion,
logs: [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ describe('KibanaMigrator', () => {
const migrator = new KibanaMigrator(options);
migrator.prepareMigrations();
await expect(migrator.runMigrations()).rejects.toMatchInlineSnapshot(`
[Error: Unable to complete saved object migrations for the [.my-index] index. Error: Reindex failed with the following error:
{"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}]
`);
[Error: Unable to complete saved object migrations for the [.my-index] index. Error: Reindex failed with the following error:
{"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}]
`);
expect(loggingSystemMock.collect(options.logger).error[0][0]).toMatchInlineSnapshot(`
[Error: Reindex failed with the following error:
{"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}]
Expand Down Expand Up @@ -533,6 +533,7 @@ const mockOptions = () => {
algorithm: 'v2',
batchSize: 20,
maxBatchSizeBytes: ByteSizeValue.parse('20mb'),
maxReadBatchSizeBytes: new ByteSizeValue(536870888),
pollInterval: 20000,
scrollDuration: '10m',
skip: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ describe('migrationsStateActionMachine', () => {
algorithm: 'v2',
batchSize: 1000,
maxBatchSizeBytes: new ByteSizeValue(1e8),
maxReadBatchSizeBytes: new ByteSizeValue(536870888),
pollInterval: 0,
scrollDuration: '0s',
skip: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
import type { AliasAction, FetchIndexResponse } from '../actions';
import type { BulkIndexOperationTuple } from './create_batches';
import { OutdatedDocumentsSearchRead, ReindexSourceToTempRead } from '../state';

/** @internal */
export type Aliases = Partial<Record<string, string>>;
Expand Down Expand Up @@ -285,3 +286,11 @@ export function getMigrationType({
*/
export const getTempIndexName = (indexPrefix: string, kibanaVersion: string): string =>
`${indexPrefix}_${kibanaVersion}_reindex_temp`;

/** Increase batchSize by 20% until a maximum of maxBatchSize */
export const increaseBatchSize = (
stateP: OutdatedDocumentsSearchRead | ReindexSourceToTempRead
) => {
const increasedBatchSize = Math.floor(stateP.batchSize * 1.2);
return increasedBatchSize > stateP.maxBatchSize ? stateP.maxBatchSize : increasedBatchSize;
};
Loading

0 comments on commit 094b62a

Please sign in to comment.