Skip to content

Commit

Permalink
Implement the second part of the ZDT migration algorithm (#153031)
Browse files Browse the repository at this point in the history
## Summary

Part of #150309
Follow-up of #152219

Implement the second part of the zero-downtime migration algorithm: the
document conversion.

### Schema

because a schema is worth a thousand words:

<img width="650" alt="Screenshot 2023-03-22 at 08 33 44"
src="https://user-images.githubusercontent.com/1532934/226832339-d74d8349-9969-4c51-a5fe-f77558f17b67.png">


### TODO / notepad

- ~check that all types have model versions in INIT~ will do later when
we'll start have real types using MVs
- [x] Optimize to skip document migration when creating new index
- [x] documentsUpdateInit: extract remaining logic to utilities
- [x] outdatedDocumentsSearchRead: cleanup corrupted doc logic
- [x] outdatedDocumentsSearchTransform: cleanup corrupted doc logic
- [x] tests for /zdt/actions/wait_for_delay.ts ?
- ~support for coreMigrationVersion~ added as a follow-up in the parent
issue
- [x] init -> equal -> check if aliasActions is empty

---------

Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
pgayvallet and kibanamachine authored Mar 28, 2023
1 parent 23b7137 commit 3ff906d
Show file tree
Hide file tree
Showing 102 changed files with 4,385 additions and 342 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export {
type IndexMapping,
type IndexMappingMeta,
type SavedObjectsTypeMappingDefinitions,
type IndexMappingMigrationStateMeta,
} from './src/mappings';
export { SavedObjectsSerializer } from './src/serialization';
export { SavedObjectsTypeValidator } from './src/validation';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@
*/

export { getTypes, getProperty, getRootProperties, getRootPropertiesObjects } from './lib';
export type { SavedObjectsTypeMappingDefinitions, IndexMappingMeta, IndexMapping } from './types';
export type {
SavedObjectsTypeMappingDefinitions,
IndexMappingMeta,
IndexMapping,
IndexMappingMigrationStateMeta,
} from './types';
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,19 @@ export interface IndexMappingMeta {
* @remark: Only defined for indices using the zdt migration algorithm.
*/
docVersions?: { [k: string]: number };
/**
* Info about the current state of the migration.
* Should only be present if a migration is in progress or was interrupted.
*
* @remark: Only defined for indices using the zdt migration algorithm.
*/
migrationState?: IndexMappingMigrationStateMeta;
}

/** @internal */
export interface IndexMappingMigrationStateMeta {
/**
* Indicates that the algorithm is currently converting the documents.
*/
convertingDocuments: boolean;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,57 @@ import type { IndexMapping, IndexMappingMeta } from '../mappings';
import type { ModelVersionMap } from './version_map';
import { assertValidModelVersion } from './conversion';

export interface GetModelVersionsFromMappingsOpts {
mappings: IndexMapping;
source: 'mappingVersions' | 'docVersions';
/** if specified, will filter the types with the provided list */
knownTypes?: string[];
}

/**
* Build the version map from the specified source of the provided mappings.
*/
export const getModelVersionsFromMappings = ({
mappings,
source,
}: {
mappings: IndexMapping;
source: 'mappingVersions' | 'docVersions';
}): ModelVersionMap | undefined => {
knownTypes,
}: GetModelVersionsFromMappingsOpts): ModelVersionMap | undefined => {
if (!mappings._meta) {
return undefined;
}

return getModelVersionsFromMappingMeta({
meta: mappings._meta,
source,
knownTypes,
});
};

export interface GetModelVersionsFromMappingMetaOpts {
meta: IndexMappingMeta;
source: 'mappingVersions' | 'docVersions';
/** if specified, will filter the types with the provided list */
knownTypes?: string[];
}

/**
* Build the version map from the specified source of the provided mappings meta.
*/
export const getModelVersionsFromMappingMeta = ({
meta,
source,
}: {
meta: IndexMappingMeta;
source: 'mappingVersions' | 'docVersions';
}): ModelVersionMap | undefined => {
knownTypes,
}: GetModelVersionsFromMappingMetaOpts): ModelVersionMap | undefined => {
const indexVersions = source === 'mappingVersions' ? meta.mappingVersions : meta.docVersions;
if (!indexVersions) {
return undefined;
}
const typeSet = knownTypes ? new Set(knownTypes) : undefined;

return Object.entries(indexVersions).reduce<ModelVersionMap>((map, [type, rawVersion]) => {
map[type] = assertValidModelVersion(rawVersion);
if (!typeSet || typeSet.has(type)) {
map[type] = assertValidModelVersion(rawVersion);
}
return map;
}, {});
};
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const migrationSchema = schema.object({
pollInterval: schema.number({ defaultValue: 1_500 }),
skip: schema.boolean({ defaultValue: false }),
retryAttempts: schema.number({ defaultValue: 15 }),
zdt: schema.object({
metaPickupSyncDelaySec: schema.number({ min: 1, defaultValue: 120 }),
}),
});

export type SavedObjectsMigrationConfigType = TypeOf<typeof migrationSchema>;
Expand Down Expand Up @@ -60,6 +63,7 @@ export const savedObjectsConfig: ServiceConfigDescriptor<SavedObjectsConfigType>
path: 'savedObjects',
schema: soSchema,
};

export class SavedObjectConfig {
public maxImportPayloadBytes: number;
public maxImportExportSize: number;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@
*/

export const CLUSTER_SHARD_LIMIT_EXCEEDED_REASON = `[cluster_shard_limit_exceeded] Upgrading Kibana requires adding a small number of new shards. Ensure that Kibana is able to add 10 more shards by increasing the cluster.max_shards_per_node setting, or removing indices to clear up resources.`;

export const FATAL_REASON_REQUEST_ENTITY_TOO_LARGE = `While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Ensure that the Kibana configuration option 'migrations.maxBatchSizeBytes' is set to a value that is lower than or equal to the Elasticsearch 'http.max_content_length' configuration option.`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import type { BulkOperationContainer } from '@elastic/elasticsearch/lib/api/types';
import type { BulkOperation } from '../model/create_batches';

export const redactBulkOperationBatches = (
bulkOperationBatches: BulkOperation[][]
): BulkOperationContainer[][] => {
return bulkOperationBatches.map((batch) =>
batch.map((operation) => (Array.isArray(operation) ? operation[0] : operation))
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import type {
SavedObjectSanitizedDoc,
SavedObjectsRawDoc,
SavedObjectUnsanitizedDoc,
ISavedObjectsSerializer,
} from '@kbn/core-saved-objects-server';
import { SavedObjectsSerializer } from '@kbn/core-saved-objects-base-server-internal';
import type { MigrateAndConvertFn } from '../document_migrator/document_migrator';
import { TransformSavedObjectDocumentError } from '.';

Expand Down Expand Up @@ -65,7 +65,7 @@ export class CorruptSavedObjectError extends Error {
* @returns {SavedObjectsRawDoc[]}
*/
export async function migrateRawDocs(
serializer: SavedObjectsSerializer,
serializer: ISavedObjectsSerializer,
migrateDoc: MigrateAndConvertFn,
rawDocs: SavedObjectsRawDoc[]
): Promise<SavedObjectsRawDoc[]> {
Expand All @@ -86,7 +86,7 @@ export async function migrateRawDocs(
}

interface MigrateRawDocsSafelyDeps {
serializer: SavedObjectsSerializer;
serializer: ISavedObjectsSerializer;
migrateDoc: MigrateAndConvertFn;
rawDocs: SavedObjectsRawDoc[];
}
Expand Down Expand Up @@ -181,7 +181,7 @@ function transformNonBlocking(
async function migrateMapToRawDoc(
migrateMethod: MigrateFn,
savedObject: SavedObjectSanitizedDoc<unknown>,
serializer: SavedObjectsSerializer
serializer: ISavedObjectsSerializer
): Promise<SavedObjectsRawDoc[]> {
return [...(await migrateMethod(savedObject))].map((attrs) =>
serializer.savedObjectToRaw({
Expand All @@ -201,7 +201,7 @@ async function migrateMapToRawDoc(
function convertToRawAddMigrationVersion(
rawDoc: SavedObjectsRawDoc,
options: { namespaceTreatment: 'lax' },
serializer: SavedObjectsSerializer
serializer: ISavedObjectsSerializer
): SavedObjectSanitizedDoc<unknown> {
const savedObject = serializer.rawToSavedObject(rawDoc, options);
if (!savedObject.migrationVersion && !savedObject.typeMigrationVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ const mockOptions = () => {
scrollDuration: '10m',
skip: false,
retryAttempts: 20,
zdt: {
metaPickupSyncDelaySec: 120,
},
},
client: mockedClient,
docLinks: docLinksServiceMock.createSetupContract(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ describe('migrationsStateActionMachine', () => {
scrollDuration: '0s',
skip: false,
retryAttempts: 5,
zdt: {
metaPickupSyncDelaySec: 120,
},
},
typeRegistry,
docLinks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ import {
getRequestDebugMeta,
} from '@kbn/core-elasticsearch-client-server-internal';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import type { BulkOperationContainer } from '@elastic/elasticsearch/lib/api/types';
import { logActionResponse, logStateTransition } from './common/utils/logs';
import { type Model, type Next, stateActionMachine } from './state_action_machine';
import { cleanup } from './migrations_state_machine_cleanup';
import type { ReindexSourceToTempTransform, ReindexSourceToTempIndexBulk, State } from './state';
import type { BulkOperation } from './model/create_batches';
import { redactBulkOperationBatches } from './common/redact_state';

/**
* A specialized migrations-specific state-action machine that:
Expand Down Expand Up @@ -159,11 +158,3 @@ export async function migrationStateActionMachine({
}
}
}

const redactBulkOperationBatches = (
bulkOperationBatches: BulkOperation[][]
): BulkOperationContainer[][] => {
return bulkOperationBatches.map((batch) =>
batch.map((operation) => (Array.isArray(operation) ? operation[0] : operation))
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type { TransformErrorObjects } from '../core';

export type BulkIndexOperationTuple = [BulkOperationContainer, SavedObjectsRawDocSource];
export type BulkOperation = BulkIndexOperationTuple | BulkOperationContainer;
export type BulkOperationBatch = BulkOperation[];

export interface CreateBatchesParams {
documents: SavedObjectsRawDoc[];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ import {
} from './helpers';
import { createBatches } from './create_batches';
import type { MigrationLog } from '../types';
import { CLUSTER_SHARD_LIMIT_EXCEEDED_REASON } from '../common/constants';

export const FATAL_REASON_REQUEST_ENTITY_TOO_LARGE = `While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Ensure that the Kibana configuration option 'migrations.maxBatchSizeBytes' is set to a value that is lower than or equal to the Elasticsearch 'http.max_content_length' configuration option.`;
import {
CLUSTER_SHARD_LIMIT_EXCEEDED_REASON,
FATAL_REASON_REQUEST_ENTITY_TOO_LARGE,
} from '../common/constants';

export const model = (currentState: State, resW: ResponseType<AllActionStates>): State => {
// The action response `resW` is weakly typed, the type includes all action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,7 @@
* Side Public License, v 1.
*/

import type {
IncompatibleClusterRoutingAllocation,
RetryableEsClientError,
WaitForTaskCompletionTimeout,
IndexNotYellowTimeout,
IndexNotGreenTimeout,
ClusterShardLimitExceeded,
IndexNotFound,
AliasNotFound,
IncompatibleMappingException,
} from '../../actions';
import type { ActionErrorTypeMap as BaseActionErrorTypeMap } from '../../actions';

export {
initAction as init,
Expand All @@ -25,25 +15,28 @@ export {
updateAliases,
updateMappings,
updateAndPickupMappings,
cleanupUnknownAndExcluded,
waitForDeleteByQueryTask,
waitForPickupUpdatedMappingsTask,
refreshIndex,
openPit,
readWithPit,
closePit,
transformDocs,
bulkOverwriteTransformedDocuments,
noop,
type InitActionParams,
type IncompatibleClusterRoutingAllocation,
type RetryableEsClientError,
type WaitForTaskCompletionTimeout,
type IndexNotFound,
} from '../../actions';

export interface ActionErrorTypeMap {
wait_for_task_completion_timeout: WaitForTaskCompletionTimeout;
incompatible_cluster_routing_allocation: IncompatibleClusterRoutingAllocation;
retryable_es_client_error: RetryableEsClientError;
index_not_found_exception: IndexNotFound;
index_not_green_timeout: IndexNotGreenTimeout;
index_not_yellow_timeout: IndexNotYellowTimeout;
cluster_shard_limit_exceeded: ClusterShardLimitExceeded;
alias_not_found_exception: AliasNotFound;
incompatible_mapping_exception: IncompatibleMappingException;
}
export { updateIndexMeta, type UpdateIndexMetaParams } from './update_index_meta';
export { waitForDelay, type WaitForDelayParams } from './wait_for_delay';

// alias in case we need to extend it with zdt specific actions/errors
export type ActionErrorTypeMap = BaseActionErrorTypeMap;

/** Type guard for narrowing the type of a left */
export function isTypeof<T extends keyof ActionErrorTypeMap>(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export const updateMappingsMock = jest.fn();

jest.doMock('../../actions/update_mappings', () => {
const actual = jest.requireActual('../../actions/update_mappings');
return {
...actual,
updateMappings: updateMappingsMock,
};
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { updateMappingsMock } from './update_index_meta.test.mocks';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import type { IndexMappingMeta } from '@kbn/core-saved-objects-base-server-internal';
import { updateIndexMeta } from './update_index_meta';

describe('updateIndexMeta', () => {
it('calls updateMappings with the correct parameters', () => {
const client = elasticsearchClientMock.createElasticsearchClient();
const index = '.kibana_1';
const meta: IndexMappingMeta = {
mappingVersions: {
foo: 1,
bar: 1,
},
};

updateIndexMeta({ client, index, meta });

expect(updateMappingsMock).toHaveBeenCalledTimes(1);
expect(updateMappingsMock).toHaveBeenCalledWith({
client,
index,
mappings: {
properties: {},
_meta: meta,
},
});
});

it('returns the response from updateMappings', () => {
const client = elasticsearchClientMock.createElasticsearchClient();
const index = '.kibana_1';
const meta: IndexMappingMeta = {};

const expected = Symbol();
updateMappingsMock.mockReturnValue(expected);

const actual = updateIndexMeta({ client, index, meta });

expect(actual).toBe(expected);
});
});
Loading

0 comments on commit 3ff906d

Please sign in to comment.