Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bootstrap ZDT migration algorithm #151282

Merged
merged 12 commits into from
Feb 27, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import { schema, TypeOf } from '@kbn/config-schema';
import type { ServiceConfigDescriptor } from '@kbn/core-base-server-internal';

const migrationSchema = schema.object({
algorithm: schema.oneOf([schema.literal('v2'), schema.literal('zdt')], {
defaultValue: 'v2',
}),
Comment on lines +14 to +16
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure how to name it, so I went with zdt. If anyone prefers managed, or anything else, please feel free to tell me. We could even change the algorithm name to something else.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with a more descriptive algorithm name, managed is a little too obscure.

Copy link
Contributor

@jloleysens jloleysens Feb 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From someone using this API's perspective the values v2 and zdt seem to not have any relation. This is OK, but we need a good doc comment about what each means and where it came from.

IMO zero-downtime or operator (bc I'm guessing this algo is intended to work with the "operator pattern") are also good candidates.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my 2cents: I'd use a name that makes it obvious that there is an external agent required (operator seems like a good candidate).

At the same time, I tend to prefer options that resemble the functionality they enable: Zero-downtime...

Mixing both thoughts... I wonder if we should be more explicit about when we plan to use it (or anyone willing to change the defaults is expected to use this): with the k8s operator? Something like k8s-zdt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, naming is hard.... we could also go with serverless and default/standard. OTOH at some point we may want to allow this algo to be officially supported on prem (with 'manual' orchestration of the workflow)... So I really don't know.

batchSize: schema.number({ defaultValue: 1_000 }),
maxBatchSizeBytes: schema.byteSize({ defaultValue: '100mb' }), // 100mb is the default http.max_content_length Elasticsearch config value
discardUnknownObjects: schema.maybe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

export { DocumentMigrator, KibanaMigrator, buildActiveMappings, mergeTypes } from './src';
export { DocumentMigrator, KibanaMigrator, buildActiveMappings, buildTypesMappings } from './src';
export type { KibanaMigratorOptions } from './src';
export { getAggregatedTypesDocuments } from './src/actions/check_for_unknown_docs';
export { addExcludedTypesToBoolQuery } from './src/model/helpers';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export { logActionResponse, logStateTransition, type LogAwareState } from './logs';
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import type { Logger, LogMeta } from '@kbn/logging';
import { MigrationLog } from '../../types';

export interface LogAwareState {
controlState: string;
logs: MigrationLog[];
}

interface StateTransitionLogMeta extends LogMeta {
kibana: {
migrations: {
state: LogAwareState;
duration: number;
};
};
}

export const logStateTransition = (
logger: Logger,
logMessagePrefix: string,
prevState: LogAwareState,
Comment on lines +26 to +29
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was extracted from being inlined in a state machine file of the v2 algorithm to be re-used for zdt.

Please note the src/common folder of the package. This is where I'm planning to move stuff shared/common between the two algo.

Basically the ideal end structure for me would be

- src
  - common
    - actions 
    - other_stuff 
  - v2
    - model
    - other_v2_specific_stuff
  - zdt
    - folders_this_pr_introduced

Does that seems fine to you, or should we go with another folder structure?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that seems fine to you, or should we go with another folder structure?

Seems good to me. It's not worth getting too tied up in where the code lives, as long as it's somewhat categorized.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm all good with src/common... just wondering about the differences between src/common and src/core... Should we pick one or are we good with having both?

If we are at risk of having circular references, I'm OK with having 2 common dirs 😇

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/core is the remain of old history tbh 😅 . Ideally we would be moving src/core to subfolders of src/common soon. I just wanted to avoid doing it in the initial PR.

currState: LogAwareState,
tookMs: number
) => {
if (currState.logs.length > prevState.logs.length) {
currState.logs.slice(prevState.logs.length).forEach(({ message, level }) => {
switch (level) {
case 'error':
return logger.error(logMessagePrefix + message);
case 'warning':
return logger.warn(logMessagePrefix + message);
case 'info':
return logger.info(logMessagePrefix + message);
default:
throw new Error(`unexpected log level ${level}`);
}
});
}

logger.info(
logMessagePrefix + `${prevState.controlState} -> ${currState.controlState}. took: ${tookMs}ms.`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see it here but do we add a migration log identifier? I mean, does the logMessagePrefix include some indication that the logs come from the zdt migration algorithm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, atm we're using the same prefix than the v2 algo:

We should probably allow to distinguish between the two, but maybe we should use a dedicated logger child/context instead of this prefix, to make sure all our logs are properly identifiable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dedicated logger child/context instead of this prefix, to make sure all our logs are properly identifiable

++

);
logger.debug<StateTransitionLogMeta>(
logMessagePrefix + `${prevState.controlState} -> ${currState.controlState}. took: ${tookMs}ms.`,
{
kibana: {
migrations: {
state: currState,
duration: tookMs,
},
},
}
);
};

export const logActionResponse = (
logger: Logger,
logMessagePrefix: string,
state: LogAwareState,
res: unknown
) => {
logger.debug(logMessagePrefix + `${state.controlState} RESPONSE`, res as LogMeta);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import type { SavedObjectsType } from '@kbn/core-saved-objects-server';
import type { SavedObjectsTypeMappingDefinitions } from '@kbn/core-saved-objects-base-server-internal';

/**
* Merge mappings from all registered saved object types.
*/
export const buildTypesMappings = (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extracted from v2. Moved to src/core because the other mapping-related helpers are there already. src/core may eventually move to src/common/core or something later (if we feel like it's worth the effort)

types: SavedObjectsType[]
): SavedObjectsTypeMappingDefinitions => {
return types.reduce((acc, { name: type, mappings }) => {
const duplicate = acc.hasOwnProperty(type);
if (duplicate) {
throw new Error(`Type ${type} is already defined.`);
}
return {
...acc,
[type]: mappings,
};
}, {});
};
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export type { LogFn } from './migration_logger';
export { excludeUnusedTypesQuery, REMOVED_TYPES } from './unused_types';
export { TransformSavedObjectDocumentError } from './transform_saved_object_document_error';
export { deterministicallyRegenerateObjectId } from './regenerate_object_id';
export { buildTypesMappings } from './build_types_mappings';
export { createIndexMap, type IndexMap, type CreateIndexMapOptions } from './build_index_map';
export type {
DocumentsTransformFailed,
DocumentsTransformSuccess,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

export { KibanaMigrator, mergeTypes } from './kibana_migrator';
export { KibanaMigrator } from './kibana_migrator';
export type { KibanaMigratorOptions } from './kibana_migrator';
export { buildActiveMappings } from './core';
export { buildActiveMappings, buildTypesMappings } from './core';
export { DocumentMigrator } from './document_migrator';
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ const mockOptions = () => {
]),
kibanaIndex: '.my-index',
soMigrationsConfig: {
algorithm: 'v2',
batchSize: 20,
maxBatchSizeBytes: ByteSizeValue.parse('20mb'),
pollInterval: 20000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import Semver from 'semver';
import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { SavedObjectsType } from '@kbn/core-saved-objects-server';
import type {
SavedObjectUnsanitizedDoc,
SavedObjectsRawDoc,
Expand All @@ -31,11 +30,12 @@ import {
type KibanaMigratorStatus,
type MigrationResult,
} from '@kbn/core-saved-objects-base-server-internal';
import { buildActiveMappings } from './core';
import { buildActiveMappings, buildTypesMappings } from './core';
import { DocumentMigrator, type VersionedTransformer } from './document_migrator';
import { createIndexMap } from './core/build_index_map';
import { runResilientMigrator } from './run_resilient_migrator';
import { migrateRawDocsSafely } from './core/migrate_raw_docs';
import { runZeroDowntimeMigration } from './zdt';

// ensure plugins don't try to convert SO namespaceTypes after 8.0.0
// see https://github.com/elastic/kibana/issues/147344
Expand Down Expand Up @@ -91,7 +91,7 @@ export class KibanaMigrator implements IKibanaMigrator {
this.soMigrationsConfig = soMigrationsConfig;
this.typeRegistry = typeRegistry;
this.serializer = new SavedObjectsSerializer(this.typeRegistry);
this.mappingProperties = mergeTypes(this.typeRegistry.getAllTypes());
this.mappingProperties = buildTypesMappings(this.typeRegistry.getAllTypes());
this.log = logger;
this.kibanaVersion = kibanaVersion;
this.documentMigrator = new DocumentMigrator({
Expand Down Expand Up @@ -135,6 +135,28 @@ export class KibanaMigrator implements IKibanaMigrator {
}

private runMigrationsInternal(): Promise<MigrationResult[]> {
const migrationAlgorithm = this.soMigrationsConfig.algorithm;
if (migrationAlgorithm === 'zdt') {
return this.runMigrationZdt();
} else {
return this.runMigrationV2();
}
Comment on lines +139 to +143
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The branching

}

private runMigrationZdt(): Promise<MigrationResult[]> {
return runZeroDowntimeMigration({
kibanaIndexPrefix: this.kibanaIndex,
typeRegistry: this.typeRegistry,
logger: this.log,
documentMigrator: this.documentMigrator,
migrationConfig: this.soMigrationsConfig,
docLinks: this.docLinks,
serializer: this.serializer,
elasticsearchClient: this.client,
});
}

private runMigrationV2(): Promise<MigrationResult[]> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asside: The prep needed for runResilientMigrator looks clunky compared to runZeroDowntimeMigration.
Would it make sense to move that to a 'prepareResilientMigration' setup function? The cleanup's not needed now but possibly some time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. Ideally most of the 'architectural'/'code' improvements we do in this second algo should be ported to the existing one. Not sure our current priorities will allow us to take such chore tasks in the following months.

const indexMap = createIndexMap({
kibanaIndexName: this.kibanaIndex,
indexMap: this.mappingProperties,
Expand Down Expand Up @@ -187,20 +209,3 @@ export class KibanaMigrator implements IKibanaMigrator {
return this.documentMigrator.migrate(doc);
}
}

/**
* Merges savedObjectMappings properties into a single object, verifying that
* no mappings are redefined.
*/
export function mergeTypes(types: SavedObjectsType[]): SavedObjectsTypeMappingDefinitions {
return types.reduce((acc, { name: type, mappings }) => {
const duplicate = acc.hasOwnProperty(type);
if (duplicate) {
throw new Error(`Type ${type} is already defined.`);
}
return {
...acc,
[type]: mappings,
};
}, {});
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ describe('migrationsStateActionMachine', () => {
migrationVersionPerType: {},
indexPrefix: '.my-so-index',
migrationsConfig: {
algorithm: 'v2',
batchSize: 1000,
maxBatchSizeBytes: new ByteSizeValue(1e8),
pollInterval: 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,73 +8,18 @@

import { errors as EsErrors } from '@elastic/elasticsearch';
import * as Option from 'fp-ts/lib/Option';
import type { Logger, LogMeta } from '@kbn/logging';
import type { Logger } from '@kbn/logging';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import {
getErrorMessage,
getRequestDebugMeta,
} from '@kbn/core-elasticsearch-client-server-internal';
import type { SavedObjectsRawDoc } from '@kbn/core-saved-objects-server';
import { logActionResponse, logStateTransition } from './common/utils/logs';
import { type Model, type Next, stateActionMachine } from './state_action_machine';
import { cleanup } from './migrations_state_machine_cleanup';
import type { ReindexSourceToTempTransform, ReindexSourceToTempIndexBulk, State } from './state';

interface StateTransitionLogMeta extends LogMeta {
kibana: {
migrations: {
state: State;
duration: number;
};
};
}

const logStateTransition = (
logger: Logger,
logMessagePrefix: string,
prevState: State,
currState: State,
tookMs: number
) => {
if (currState.logs.length > prevState.logs.length) {
currState.logs.slice(prevState.logs.length).forEach(({ message, level }) => {
switch (level) {
case 'error':
return logger.error(logMessagePrefix + message);
case 'warning':
return logger.warn(logMessagePrefix + message);
case 'info':
return logger.info(logMessagePrefix + message);
default:
throw new Error(`unexpected log level ${level}`);
}
});
}

logger.info(
logMessagePrefix + `${prevState.controlState} -> ${currState.controlState}. took: ${tookMs}ms.`
);
logger.debug<StateTransitionLogMeta>(
logMessagePrefix + `${prevState.controlState} -> ${currState.controlState}. took: ${tookMs}ms.`,
{
kibana: {
migrations: {
state: currState,
duration: tookMs,
},
},
}
);
};

const logActionResponse = (
logger: Logger,
logMessagePrefix: string,
state: State,
res: unknown
) => {
logger.debug(logMessagePrefix + `${state.controlState} RESPONSE`, res as LogMeta);
};

/**
* A specialized migrations-specific state-action machine that:
* - logs messages in state.logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import * as Actions from './actions';
import type { State } from './state';

export async function cleanup(client: ElasticsearchClient, state?: State) {
if (!state) return;
type CleanableState = { sourceIndexPitId: string } | {};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adapted the type to make it reusable with different State types (as the 2 algos have different shapes of states)


export async function cleanup(client: ElasticsearchClient, state?: CleanableState) {
if (!state) {
return;
}
if ('sourceIndexPitId' in state) {
await Actions.closePit({ client, pitId: state.sourceIndexPitId })();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import type {
} from '@elastic/elasticsearch/lib/api/types';
import * as Either from 'fp-ts/lib/Either';
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
import type { State } from '../state';
import type { AliasAction, FetchIndexResponse } from '../actions';

/**
Expand All @@ -27,8 +26,8 @@ export function throwBadControlState(controlState: any) {
/**
* A helper function/type for ensuring that all response types are handled.
*/
export function throwBadResponse(state: State, p: never): never;
export function throwBadResponse(state: State, res: any): never {
export function throwBadResponse(state: { controlState: string }, p: never): never;
export function throwBadResponse(state: { controlState: string }, res: any): never {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, using a specialized type compatible with both kind of State

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: does res: unknown work here?

Copy link
Contributor Author

@pgayvallet pgayvallet Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it does! Changed in 8c7acbf

throw new Error(
`${state.controlState} received unexpected action response: ` + JSON.stringify(res)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,16 @@
* Side Public License, v 1.
*/

import { State } from '../state';
import type { MigrationLog } from '../types';

export const delayRetryState = <S extends State>(
export interface RetryableState {
controlState: string;
retryCount: number;
retryDelay: number;
logs: MigrationLog[];
}

export const delayRetryState = <S extends RetryableState>(
state: S,
errorMessage: string,
/** How many times to retry a step that fails */
Expand Down Expand Up @@ -39,7 +46,7 @@ export const delayRetryState = <S extends State>(
};
}
};
export const resetRetryState = <S extends State>(state: S): S => {
export const resetRetryState = <S extends RetryableState>(state: S): S => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here too.

return {
...state,
retryCount: 0,
Expand Down
Loading