diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts index 596e17a36b98b..891aeb0a32ee2 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts @@ -38,9 +38,27 @@ describe('synchronizeMigrators', () => { migratorsWaitGroups.forEach((waitGroup) => expect(waitGroup.reject).not.toHaveBeenCalled()); expect(res).toEqual([ - { _tag: 'Right', right: 'synchronized_successfully' }, - { _tag: 'Right', right: 'synchronized_successfully' }, - { _tag: 'Right', right: 'synchronized_successfully' }, + { + _tag: 'Right', + right: { + data: [undefined, undefined, undefined], + type: 'synchronization_successful', + }, + }, + { + _tag: 'Right', + right: { + data: [undefined, undefined, undefined], + type: 'synchronization_successful', + }, + }, + { + _tag: 'Right', + right: { + data: [undefined, undefined, undefined], + type: 'synchronization_successful', + }, + }, ]); }); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts index 50090293ee1e2..96184c8cf8d50 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts @@ -17,24 +17,28 @@ export interface SynchronizationFailed { } /** @internal */ -export interface SynchronizeMigratorsParams { +export interface SynchronizationSuccessful { + type: 'synchronization_successful'; + data: T[]; +} + +/** @internal */ +export interface SynchronizeMigratorsParams { waitGroup: WaitGroup; - thenHook?: (res: any) => Either.Right; payload?: T; } -export function synchronizeMigrators({ +export function synchronizeMigrators({ waitGroup, payload, - thenHook = () => - Either.right( - 'synchronized_successfully' as const - ) as Either.Right<'synchronized_successfully'> as unknown as Either.Right, -}: SynchronizeMigratorsParams): TaskEither.TaskEither { +}: SynchronizeMigratorsParams): TaskEither.TaskEither< + SynchronizationFailed, + SynchronizationSuccessful +> { return () => { waitGroup.resolve(payload); return waitGroup.promise - .then((res) => (thenHook ? thenHook(res) : res)) + .then((data: T[]) => Either.right({ type: 'synchronization_successful' as const, data })) .catch((error) => Either.left({ type: 'synchronization_failed' as const, error })); }; } diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts index a646f6e36081c..b742b9f1124ed 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts @@ -30,10 +30,7 @@ export function waitGroup(): WaitGroup { return new Defer(); } -export function createWaitGroupMap( - keys: string[], - thenHook: (res: T[]) => U = (res) => res as unknown as U -): Record> { +export function createWaitGroupMap(keys: string[]): Record> { if (!keys?.length) { return {}; } @@ -41,7 +38,7 @@ export function createWaitGroupMap( const defers: Array> = keys.map(() => waitGroup()); // every member of the WaitGroup will wait for all members to resolve - const all = Promise.all(defers.map(({ promise }) => promise)).then(thenHook); + const all = Promise.all(defers.map(({ promise }) => promise)); return keys.reduce>>((acc, indexName, i) => { const { resolve, reject } = defers[i]; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts index b6e7508770d7d..6cb045f6e4d3b 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts @@ -1764,9 +1764,10 @@ describe('migrations v2 model', () => { describe('if the migrator source index did NOT exist', () => { test('READY_TO_REINDEX_SYNC -> DONE_REINDEXING_SYNC', () => { - const res: ResponseType<'READY_TO_REINDEX_SYNC'> = Either.right( - 'synchronized_successfully' as const - ); + const res: ResponseType<'READY_TO_REINDEX_SYNC'> = Either.right({ + type: 'synchronization_successful' as const, + data: [], + }); const newState = model(state, res); expect(newState.controlState).toEqual('DONE_REINDEXING_SYNC'); }); @@ -1774,9 +1775,10 @@ describe('migrations v2 model', () => { describe('if the migrator source index did exist', () => { test('READY_TO_REINDEX_SYNC -> REINDEX_SOURCE_TO_TEMP_OPEN_PIT', () => { - const res: ResponseType<'READY_TO_REINDEX_SYNC'> = Either.right( - 'synchronized_successfully' as const - ); + const res: ResponseType<'READY_TO_REINDEX_SYNC'> = Either.right({ + type: 'synchronization_successful' as const, + data: [], + }); const newState = model( { ...state, @@ -2044,9 +2046,10 @@ describe('migrations v2 model', () => { }; test('DONE_REINDEXING_SYNC -> SET_TEMP_WRITE_BLOCK if synchronization succeeds', () => { - const res: ResponseType<'DONE_REINDEXING_SYNC'> = Either.right( - 'synchronized_successfully' as const - ); + const res: ResponseType<'READY_TO_REINDEX_SYNC'> = Either.right({ + type: 'synchronization_successful' as const, + data: [], + }); const newState = model(state, res); expect(newState.controlState).toEqual('SET_TEMP_WRITE_BLOCK'); }); @@ -2952,6 +2955,24 @@ describe('migrations v2 model', () => { expect(newState.retryDelay).toEqual(0); }); + test('CHECK_VERSION_INDEX_READY_ACTIONS -> MARK_VERSION_INDEX_READY_SYNC if mustRelocateDocuments === true', () => { + const versionIndexReadyActions = Option.some([ + { add: { index: 'kibana-index', alias: 'my-alias' } }, + ]); + + const newState = model( + { + ...сheckVersionIndexReadyActionsState, + mustRelocateDocuments: true, + versionIndexReadyActions, + }, + res + ) as PostInitState; + expect(newState.controlState).toEqual('MARK_VERSION_INDEX_READY_SYNC'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + test('CHECK_VERSION_INDEX_READY_ACTIONS -> DONE if none versionIndexReadyActions', () => { const newState = model(сheckVersionIndexReadyActionsState, res) as PostInitState; expect(newState.controlState).toEqual('DONE'); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts index e4f38c2a32a3c..ae1098f2209f5 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts @@ -276,7 +276,6 @@ export const model = (currentState: State, resW: ResponseType): if ( // If this version's migration has already been completed we can proceed Either.isRight(aliasesRes) && - // TODO check that this behaves correctly when skipping reindexing versionMigrationCompleted(stateP.currentAlias, stateP.versionAlias, aliasesRes.right) ) { return { @@ -637,11 +636,12 @@ export const model = (currentState: State, resW: ResponseType): controlState: stateP.mustRefresh ? 'REFRESH_SOURCE' : 'OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT', }; } else if (Either.isLeft(res)) { + const left = res.left; // Note: if multiple newer Kibana versions are competing with each other to perform a migration, // it might happen that another Kibana instance has deleted this instance's version index. // NIT to handle this in properly, we'd have to add a PREPARE_COMPATIBLE_MIGRATION_CONFLICT step, // similar to MARK_VERSION_INDEX_READY_CONFLICT. - if (isTypeof(res.left, 'alias_not_found_exception')) { + if (isTypeof(left, 'alias_not_found_exception')) { // We assume that the alias was already deleted by another Kibana instance return { ...stateP, @@ -649,8 +649,19 @@ export const model = (currentState: State, resW: ResponseType): ? 'REFRESH_SOURCE' : 'OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT', }; + } else if (isTypeof(left, 'index_not_found_exception')) { + // We don't handle the following errors as the migration algorithm + // will never cause them to occur: + // - index_not_found_exception + throwBadResponse(stateP, left as never); + } else if (isTypeof(left, 'remove_index_not_a_concrete_index')) { + // We don't handle this error as the migration algorithm will never + // cause it to occur (this error is only relevant to the LEGACY_DELETE + // step). + throwBadResponse(stateP, left as never); } else { - throwBadResponse(stateP, res.left as never); + // TODO update to handle 2 more cases + throwBadResponse(stateP, left); } } else { throwBadResponse(stateP, res); @@ -724,11 +735,13 @@ export const model = (currentState: State, resW: ResponseType): ...stateP, controlState: 'CALCULATE_EXCLUDE_FILTERS', }; - } else { + } else if (isTypeof(res.left, 'index_not_found_exception')) { // We don't handle the following errors as the migration algorithm // will never cause them to occur: // - index_not_found_exception throwBadResponse(stateP, res.left as never); + } else { + throwBadResponse(stateP, res.left); } } else if (stateP.controlState === 'CALCULATE_EXCLUDE_FILTERS') { const res = resW as ExcludeRetryableEsError>; @@ -825,7 +838,7 @@ export const model = (currentState: State, resW: ResponseType): throwBadResponse(stateP, left); } } else { - throwBadResponse(stateP, res as never); + throwBadResponse(stateP, res); } } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_OPEN_PIT') { const res = resW as ExcludeRetryableEsError>; @@ -972,7 +985,7 @@ export const model = (currentState: State, resW: ResponseType): throwBadResponse(stateP, left); } } else { - throwBadResponse(stateP, res as never); + throwBadResponse(stateP, res); } } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_TRANSFORM') { // We follow a similar control flow as for @@ -1052,7 +1065,7 @@ export const model = (currentState: State, resW: ResponseType): }; } else { // should never happen - throwBadResponse(stateP, res as never); + throwBadResponse(stateP, left); } } } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK') { @@ -1327,7 +1340,8 @@ export const model = (currentState: State, resW: ResponseType): }; } } else { - if (isTypeof(res.left, 'documents_transform_failed')) { + const left = res.left; + if (isTypeof(left, 'documents_transform_failed')) { // continue to build up any more transformation errors before failing the migration. return { ...stateP, @@ -1338,7 +1352,7 @@ export const model = (currentState: State, resW: ResponseType): progress, }; } else { - throwBadResponse(stateP, res as never); + throwBadResponse(stateP, left); } } } else if (stateP.controlState === 'TRANSFORMED_DOCUMENTS_BULK_INDEX') { @@ -1359,22 +1373,23 @@ export const model = (currentState: State, resW: ResponseType): hasTransformedDocs: true, }; } else { - if (isTypeof(res.left, 'request_entity_too_large_exception')) { + const left = res.left; + if (isTypeof(left, 'request_entity_too_large_exception')) { return { ...stateP, controlState: 'FATAL', reason: FATAL_REASON_REQUEST_ENTITY_TOO_LARGE, }; } else if ( - isTypeof(res.left, 'target_index_had_write_block') || - isTypeof(res.left, 'index_not_found_exception') + isTypeof(left, 'target_index_had_write_block') || + isTypeof(left, 'index_not_found_exception') ) { // we fail on these errors since the target index will never get // deleted and should only have a write block if a newer version of // Kibana started an upgrade - throwBadResponse(stateP, res.left as never); + throwBadResponse(stateP, left as never); } else { - throwBadResponse(stateP, res.left); + throwBadResponse(stateP, left); } } } else if (stateP.controlState === 'OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT') { @@ -1546,11 +1561,13 @@ export const model = (currentState: State, resW: ResponseType): // another instance has already completed the migration and deleted // the temporary index return { ...stateP, controlState: 'MARK_VERSION_INDEX_READY_CONFLICT' }; - } else { + } else if (isTypeof(left, 'index_not_found_exception')) { // The migration algorithm will never cause a // index_not_found_exception for an index other than the temporary // index handled above. throwBadResponse(stateP, left as never); + } else { + throwBadResponse(stateP, left); } } else if (isTypeof(left, 'remove_index_not_a_concrete_index')) { // We don't handle this error as the migration algorithm will never diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts index 18c6c8409853a..5509fb70c9231 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts @@ -6,7 +6,9 @@ * Side Public License, v 1. */ +import { pipe } from 'fp-ts/lib/pipeable'; import * as Option from 'fp-ts/lib/Option'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; import { omit } from 'lodash'; import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import type { WaitGroup } from './kibana_migrator_utils'; @@ -257,11 +259,18 @@ export const nextActionMap = ( MARK_VERSION_INDEX_READY: (state: MarkVersionIndexReady) => Actions.updateAliases({ client, aliasActions: state.versionIndexReadyActions.value }), MARK_VERSION_INDEX_READY_SYNC: (state: MarkVersionIndexReady) => - Actions.synchronizeMigrators({ - waitGroup: updateRelocationAliases, - payload: state.versionIndexReadyActions.value, - thenHook: (res) => res, - }), + pipe( + // First, we wait for all the migrators involved in a relocation to reach this point. + Actions.synchronizeMigrators({ + waitGroup: updateRelocationAliases, + payload: state.versionIndexReadyActions.value, + }), + // Then, all migrators will try to update all aliases (from all indices). Only the first one will succeed. + // The others will receive alias_not_found_exception and cause MARK_VERSION_INDEX_READY_CONFLICT (that's acceptable). + TaskEither.chainW(({ data }) => + Actions.updateAliases({ client, aliasActions: data.flat() }) + ) + ), MARK_VERSION_INDEX_READY_CONFLICT: (state: MarkVersionIndexReadyConflict) => Actions.fetchIndices({ client, indices: [state.currentAlias, state.versionAlias] }), LEGACY_SET_WRITE_BLOCK: (state: LegacySetWriteBlockState) => diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.test.ts index a785ff46823e7..8ebd1b3d660d4 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.test.ts @@ -144,11 +144,7 @@ describe('runV2Migration', () => { expect(mockCreateWaitGroupMap).toBeCalledTimes(3); expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith(1, ['.my_index', '.other_index']); expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith(2, ['.my_index', '.other_index']); - expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith( - 3, - ['.my_index', '.other_index'], - expect.any(Function) // we expect to receive a method to update all aliases in this hook - ); + expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith(3, ['.my_index', '.other_index']); }); it('calls runResilientMigrator for each migrator it must spawn', async () => { diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts index 49088ebc147b8..f39c74542a22d 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts @@ -30,8 +30,6 @@ import { } from './kibana_migrator_utils'; import { runResilientMigrator } from './run_resilient_migrator'; import { migrateRawDocsSafely } from './core/migrate_raw_docs'; -import type { AliasAction } from './actions/update_aliases'; -import { updateAliases } from './actions'; export interface RunV2MigrationOpts { /** The current Kibana version */ @@ -102,14 +100,7 @@ export const runV2Migration = async (options: RunV2MigrationOpts): Promise>( - indicesWithRelocatingTypes, - (allAliasActions) => - updateAliases({ - client: options.elasticsearchClient, - aliasActions: allAliasActions.flat(), - })() - ); + const updateAliasesWaitGroupMap = createWaitGroupMap(indicesWithRelocatingTypes); // build a list of all migrators that must be started const migratorIndices = new Set(Object.keys(indexMap)); diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts index a7377ec2050d1..755ed052c2dc6 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts @@ -42,7 +42,8 @@ const RELOCATE_TYPES: Record = { }; const PARALLEL_MIGRATORS = 6; -export const logFilePath = Path.join(__dirname, 'dot_kibana_split.test.log'); +export const logFilePathFirstRun = Path.join(__dirname, 'dot_kibana_split_1st_run.test.log'); +export const logFilePathSecondRun = Path.join(__dirname, 'dot_kibana_split_2nd_run.test.log'); describe('split .kibana index into multiple system indices', () => { let esServer: TestElasticsearchUtils['es']; @@ -53,11 +54,12 @@ describe('split .kibana index into multiple system indices', () => { }); beforeEach(async () => { - await clearLog(logFilePath); + await clearLog(logFilePathFirstRun); + await clearLog(logFilePathSecondRun); }); describe('when migrating from a legacy version', () => { - let migratorTestKitFactory: () => Promise; + let migratorTestKitFactory: (logFilePath: string) => Promise; beforeAll(async () => { esServer = await startElasticsearch({ @@ -77,7 +79,7 @@ describe('split .kibana index into multiple system indices', () => { } ); - migratorTestKitFactory = () => + migratorTestKitFactory = (logFilePath: string) => getKibanaMigratorTestKit({ types: updatedTypeRegistry.getAllTypes(), kibanaIndex: '.kibana', @@ -85,7 +87,7 @@ describe('split .kibana index into multiple system indices', () => { defaultIndexTypesMap: DEFAULT_INDEX_TYPES_MAP, }); - const { runMigrations, client } = await migratorTestKitFactory(); + const { runMigrations, client } = await migratorTestKitFactory(logFilePathFirstRun); // count of types in the legacy index expect(await getAggregatedTypesCount(client, '.kibana_1')).toEqual({ @@ -286,7 +288,7 @@ describe('split .kibana index into multiple system indices', () => { } `); - const logs = await parseLogFile(logFilePath); + const logs = await parseLogFile(logFilePathFirstRun); expect(logs).toContainLogEntries( [ @@ -378,7 +380,7 @@ describe('split .kibana index into multiple system indices', () => { `[${index}] UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK -> UPDATE_TARGET_MAPPINGS_META.`, `[${index}] UPDATE_TARGET_MAPPINGS_META -> CHECK_VERSION_INDEX_READY_ACTIONS.`, `[${index}] CHECK_VERSION_INDEX_READY_ACTIONS -> MARK_VERSION_INDEX_READY_SYNC.`, - `[${index}] MARK_VERSION_INDEX_READY_SYNC -> DONE.`, + `[${index}] MARK_VERSION_INDEX_READY_SYNC`, // all migrators try to update all aliases, all but one will have conclicts `[${index}] Migration completed after`, ], { ordered: true } @@ -393,10 +395,9 @@ describe('split .kibana index into multiple system indices', () => { afterEach(async () => { // we run the migrator again to ensure that the next time state is loaded everything still works as expected - const { runMigrations } = await migratorTestKitFactory(); - await clearLog(logFilePath); + const { runMigrations } = await migratorTestKitFactory(logFilePathSecondRun); await runMigrations(); - const logs = await parseLogFile(logFilePath); + const logs = await parseLogFile(logFilePathSecondRun); expect(logs).not.toContainLogEntries(['REINDEX', 'CREATE', 'UPDATE_TARGET_MAPPINGS']); }); @@ -419,9 +420,11 @@ describe('split .kibana index into multiple system indices', () => { expect(breakdownBefore).toEqual({ '.kibana': { 'apm-telemetry': 1, + application_usage_transactional: 4, config: 1, dashboard: 52994, 'index-pattern': 1, + 'maps-telemetry': 1, search: 1, space: 1, 'ui-metric': 5,