Skip to content

Commit

Permalink
Address reviews, better handling when batchSize: 1 still exceeds maxR…
Browse files Browse the repository at this point in the history
…eadBatchSizeBytes
  • Loading branch information
rudolf committed May 24, 2023
1 parent 532a41d commit 67562ec
Show file tree
Hide file tree
Showing 15 changed files with 168 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@
import { valid } from 'semver';
import { schema, TypeOf } from '@kbn/config-schema';
import type { ServiceConfigDescriptor } from '@kbn/core-base-server-internal';
import buffer from 'buffer';

const migrationSchema = schema.object({
algorithm: schema.oneOf([schema.literal('v2'), schema.literal('zdt')], {
defaultValue: 'v2',
}),
batchSize: schema.number({ defaultValue: 1_000 }),
maxBatchSizeBytes: schema.byteSize({ defaultValue: '100mb' }), // 100mb is the default http.max_content_length Elasticsearch config value
maxReadBatchSizeBytes: schema.byteSize({ defaultValue: 536870888, max: 536870888 }),
maxReadBatchSizeBytes: schema.byteSize({
defaultValue: buffer.constants.MAX_STRING_LENGTH,
max: buffer.constants.MAX_STRING_LENGTH,
}),
discardUnknownObjects: schema.maybe(
schema.string({
validate: (value: string) =>
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
* Side Public License, v 1.
*/

/**
* Batch size for updateByQuery and reindex operations.
* Uses the default value of 1000 for Elasticsearch reindex operation.
*/
export const BATCH_SIZE = 1_000;
/**
* When a request takes a long time to complete and hits the timeout or the
* client aborts that request due to the requestTimeout, our only course of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import type { RetryableEsClientError } from './catch_retryable_es_client_errors'
import type { DocumentsTransformFailed } from '../core/migrate_raw_docs';

export {
BATCH_SIZE,
DEFAULT_TIMEOUT,
INDEX_AUTO_EXPAND_REPLICAS,
INDEX_NUMBER_OF_SHARDS,
Expand Down Expand Up @@ -149,6 +148,7 @@ export interface RequestEntityTooLargeException {

export interface EsResponseTooLargeError {
type: 'es_response_too_large';
contentLength: number;
}

/** @internal */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ export const readWithPit =
) {
return Either.left({
type: 'es_response_too_large' as const,
contentLength: Number.parseInt(
e.message.match(/The content length \((\d+)\) is bigger than the maximum/)?.[1] ??
'-1',
10
),
});
} else {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ describe('reindex', () => {
reindexScript: Option.none,
requireAlias: false,
excludeOnUpgradeQuery: {},
batchSize: 1000,
});
try {
await task();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
catchRetryableEsClientErrors,
type RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import { BATCH_SIZE } from './constants';

/** @internal */
export interface ReindexResponse {
Expand All @@ -34,6 +33,7 @@ export interface ReindexParams {
* index for backup purposes, but won't be available in the upgraded index.
*/
excludeOnUpgradeQuery: QueryDslQueryContainer;
batchSize: number;
}

/**
Expand All @@ -52,6 +52,7 @@ export const reindex =
reindexScript,
requireAlias,
excludeOnUpgradeQuery,
batchSize,
}: ReindexParams): TaskEither.TaskEither<RetryableEsClientError, ReindexResponse> =>
() => {
return client
Expand All @@ -65,7 +66,7 @@ export const reindex =
source: {
index: sourceIndex,
// Set reindex batch size
size: BATCH_SIZE,
size: batchSize,
// Exclude saved object types
query: excludeOnUpgradeQuery,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ describe('createInitialState', () => {
"batchSize": 1000,
"controlState": "INIT",
"currentAlias": ".kibana_task_manager",
"defaultBatchSize": 1000,
"discardCorruptObjects": false,
"discardUnknownObjects": false,
"excludeFromUpgradeFilterHooks": Object {},
Expand Down Expand Up @@ -216,6 +215,7 @@ describe('createInitialState', () => {
"knownTypes": Array [],
"legacyIndex": ".kibana_task_manager",
"logs": Array [],
"maxBatchSize": 1000,
"maxBatchSizeBytes": 104857600,
"maxReadBatchSizeBytes": 524288000,
"migrationDocLinks": Object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ export const createInitialState = ({
retryDelay: 0,
retryAttempts: migrationsConfig.retryAttempts,
batchSize: migrationsConfig.batchSize,
defaultBatchSize: migrationsConfig.batchSize,
maxBatchSize: migrationsConfig.batchSize,
maxBatchSizeBytes: migrationsConfig.maxBatchSizeBytes.getValueInBytes(),
maxReadBatchSizeBytes: migrationsConfig.maxReadBatchSizeBytes.getValueInBytes(),
discardUnknownObjects: migrationsConfig.discardUnknownObjects === kibanaVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,10 @@ export function getMigrationType({
export const getTempIndexName = (indexPrefix: string, kibanaVersion: string): string =>
`${indexPrefix}_${kibanaVersion}_reindex_temp`;

/** Increase batchSize by 20% until a maximum of defaultBatchSize */
/** Increase batchSize by 20% until a maximum of maxBatchSize */
export const increaseBatchSize = (
stateP: OutdatedDocumentsSearchRead | ReindexSourceToTempRead
) => {
const increasedBatchSize = Math.floor(stateP.batchSize * 1.2);
return increasedBatchSize > stateP.defaultBatchSize
? stateP.defaultBatchSize
: increasedBatchSize;
return increasedBatchSize > stateP.maxBatchSize ? stateP.maxBatchSize : increasedBatchSize;
};
Loading

0 comments on commit 67562ec

Please sign in to comment.