diff --git a/src/core/server/saved_objects/migrations/core/document_migrator.test.ts b/src/core/server/saved_objects/migrations/core/document_migrator.test.ts index 1cf408ea96a56..45286f158edb1 100644 --- a/src/core/server/saved_objects/migrations/core/document_migrator.test.ts +++ b/src/core/server/saved_objects/migrations/core/document_migrator.test.ts @@ -11,6 +11,7 @@ import { set } from '@elastic/safer-lodash-set'; import _ from 'lodash'; import { SavedObjectUnsanitizedDoc } from '../../serialization'; import { DocumentMigrator } from './document_migrator'; +import { TransformSavedObjectDocumentError } from './transform_saved_object_document_error'; import { loggingSystemMock } from '../../../logging/logging_system.mock'; import { SavedObjectsType } from '../../types'; import { SavedObjectTypeRegistry } from '../../saved_objects_type_registry'; @@ -724,6 +725,12 @@ describe('DocumentMigrator', () => { it('logs the original error and throws a transform error if a document transform fails', () => { const log = mockLogger; + const failedDoc = { + id: 'smelly', + type: 'dog', + attributes: {}, + migrationVersion: {}, + }; const migrator = new DocumentMigrator({ ...testOpts(), typeRegistry: createRegistry({ @@ -737,12 +744,6 @@ describe('DocumentMigrator', () => { log, }); migrator.prepareMigrations(); - const failedDoc = { - id: 'smelly', - type: 'dog', - attributes: {}, - migrationVersion: {}, - }; try { migrator.migrate(_.cloneDeep(failedDoc)); expect('Did not throw').toEqual('But it should have!'); @@ -751,6 +752,7 @@ describe('DocumentMigrator', () => { "Failed to transform document smelly. Transform: dog:1.2.3 Doc: {\\"id\\":\\"smelly\\",\\"type\\":\\"dog\\",\\"attributes\\":{},\\"migrationVersion\\":{}}" `); + expect(error).toBeInstanceOf(TransformSavedObjectDocumentError); expect(loggingSystemMock.collect(mockLoggerFactory).error[0][0]).toMatchInlineSnapshot( `[Error: Dang diggity!]` ); diff --git a/src/core/server/saved_objects/migrations/core/document_migrator.ts b/src/core/server/saved_objects/migrations/core/document_migrator.ts index 1dd4a8fbf6388..4f58397866cfb 100644 --- a/src/core/server/saved_objects/migrations/core/document_migrator.ts +++ b/src/core/server/saved_objects/migrations/core/document_migrator.ts @@ -62,6 +62,7 @@ import { SavedObjectsType, } from '../../types'; import { MigrationLogger } from './migration_logger'; +import { TransformSavedObjectDocumentError } from '.'; import { ISavedObjectTypeRegistry } from '../../saved_objects_type_registry'; import { SavedObjectMigrationFn, SavedObjectMigrationMap } from '../types'; import { DEFAULT_NAMESPACE_STRING } from '../../service/lib/utils'; @@ -679,9 +680,15 @@ function wrapWithTry( const failedTransform = `${type.name}:${version}`; const failedDoc = JSON.stringify(doc); log.error(error); - - throw new Error( - `Failed to transform document ${doc?.id}. Transform: ${failedTransform}\nDoc: ${failedDoc}` + // To make debugging failed migrations easier, we add items needed to convert the + // saved object id to the full raw id (the id only contains the uuid part) and the full error itself + throw new TransformSavedObjectDocumentError( + doc.id, + doc.type, + doc.namespace, + failedTransform, + failedDoc, + error ); } }; diff --git a/src/core/server/saved_objects/migrations/core/index.ts b/src/core/server/saved_objects/migrations/core/index.ts index 1e51983a0ffbd..ca54d6876ad75 100644 --- a/src/core/server/saved_objects/migrations/core/index.ts +++ b/src/core/server/saved_objects/migrations/core/index.ts @@ -15,3 +15,9 @@ export type { MigrationResult, MigrationStatus } from './migration_coordinator'; export { createMigrationEsClient } from './migration_es_client'; export type { MigrationEsClient } from './migration_es_client'; export { excludeUnusedTypesQuery } from './elastic_index'; +export { TransformSavedObjectDocumentError } from './transform_saved_object_document_error'; +export type { + DocumentsTransformFailed, + DocumentsTransformSuccess, + TransformErrorObjects, +} from './migrate_raw_docs'; diff --git a/src/core/server/saved_objects/migrations/core/migrate_raw_docs.test.ts b/src/core/server/saved_objects/migrations/core/migrate_raw_docs.test.ts index 45e73f7dfae30..1d43e2f54a726 100644 --- a/src/core/server/saved_objects/migrations/core/migrate_raw_docs.test.ts +++ b/src/core/server/saved_objects/migrations/core/migrate_raw_docs.test.ts @@ -7,10 +7,17 @@ */ import { set } from '@elastic/safer-lodash-set'; +import * as Either from 'fp-ts/lib/Either'; import _ from 'lodash'; import { SavedObjectTypeRegistry } from '../../saved_objects_type_registry'; import { SavedObjectsSerializer } from '../../serialization'; -import { migrateRawDocs } from './migrate_raw_docs'; +import { + DocumentsTransformFailed, + DocumentsTransformSuccess, + migrateRawDocs, + migrateRawDocsSafely, +} from './migrate_raw_docs'; +import { TransformSavedObjectDocumentError } from './transform_saved_object_document_error'; describe('migrateRawDocs', () => { test('converts raw docs to saved objects', async () => { @@ -120,3 +127,156 @@ describe('migrateRawDocs', () => { ).rejects.toThrowErrorMatchingInlineSnapshot(`"error during transform"`); }); }); + +describe('migrateRawDocsSafely', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + test('converts raw docs to saved objects', async () => { + const transform = jest.fn((doc: any) => [ + set(_.cloneDeep(doc), 'attributes.name', 'HOI!'), + ]); + const task = migrateRawDocsSafely( + new SavedObjectsSerializer(new SavedObjectTypeRegistry()), + transform, + [ + { _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }, + { _id: 'c:d', _source: { type: 'c', c: { name: 'DDD' } } }, + ] + ); + const result = (await task()) as Either.Right; + expect(result._tag).toEqual('Right'); + expect(result.right.processedDocs).toEqual([ + { + _id: 'a:b', + _source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] }, + }, + { + _id: 'c:d', + _source: { type: 'c', c: { name: 'HOI!' }, migrationVersion: {}, references: [] }, + }, + ]); + + const obj1 = { + id: 'b', + type: 'a', + attributes: { name: 'AAA' }, + migrationVersion: {}, + references: [], + }; + const obj2 = { + id: 'd', + type: 'c', + attributes: { name: 'DDD' }, + migrationVersion: {}, + references: [], + }; + expect(transform).toHaveBeenCalledTimes(2); + expect(transform).toHaveBeenNthCalledWith(1, obj1); + expect(transform).toHaveBeenNthCalledWith(2, obj2); + }); + + test('returns a `left` tag when encountering a corrupt saved object document', async () => { + const transform = jest.fn((doc: any) => [ + set(_.cloneDeep(doc), 'attributes.name', 'TADA'), + ]); + const task = migrateRawDocsSafely( + new SavedObjectsSerializer(new SavedObjectTypeRegistry()), + transform, + [ + { _id: 'foo:b', _source: { type: 'a', a: { name: 'AAA' } } }, + { _id: 'c:d', _source: { type: 'c', c: { name: 'DDD' } } }, + ] + ); + const result = (await task()) as Either.Left; + expect(transform).toHaveBeenCalledTimes(1); + expect(result._tag).toEqual('Left'); + expect(Object.keys(result.left)).toEqual(['type', 'corruptDocumentIds', 'transformErrors']); + expect(result.left.corruptDocumentIds.length).toEqual(1); + expect(result.left.transformErrors.length).toEqual(0); + }); + + test('handles when one document is transformed into multiple documents', async () => { + const transform = jest.fn((doc: any) => [ + set(_.cloneDeep(doc), 'attributes.name', 'HOI!'), + { id: 'bar', type: 'foo', attributes: { name: 'baz' } }, + ]); + const task = migrateRawDocsSafely( + new SavedObjectsSerializer(new SavedObjectTypeRegistry()), + transform, + [{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }] + ); + const result = (await task()) as Either.Right; + expect(result._tag).toEqual('Right'); + expect(result.right.processedDocs).toEqual([ + { + _id: 'a:b', + _source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] }, + }, + { + _id: 'foo:bar', + _source: { type: 'foo', foo: { name: 'baz' }, references: [] }, + }, + ]); + + const obj = { + id: 'b', + type: 'a', + attributes: { name: 'AAA' }, + migrationVersion: {}, + references: [], + }; + expect(transform).toHaveBeenCalledTimes(1); + expect(transform).toHaveBeenCalledWith(obj); + }); + + test('instance of Either.left containing transform errors when the transform function throws a TransformSavedObjectDocument error', async () => { + const transform = jest.fn((doc: any) => { + throw new TransformSavedObjectDocumentError( + `${doc.id}`, + `${doc.type}`, + `${doc.namespace}`, + `${doc.type}1.2.3`, + JSON.stringify(doc), + new Error('error during transform') + ); + }); + const task = migrateRawDocsSafely( + new SavedObjectsSerializer(new SavedObjectTypeRegistry()), + transform, + [{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }] // this is the raw doc + ); + const result = (await task()) as Either.Left; + expect(transform).toHaveBeenCalledTimes(1); + expect(result._tag).toEqual('Left'); + expect(result.left.corruptDocumentIds.length).toEqual(0); + expect(result.left.transformErrors.length).toEqual(1); + expect(result.left.transformErrors[0].err.message).toMatchInlineSnapshot(` + "Failed to transform document b. Transform: a1.2.3 + Doc: {\\"type\\":\\"a\\",\\"id\\":\\"b\\",\\"attributes\\":{\\"name\\":\\"AAA\\"},\\"references\\":[],\\"migrationVersion\\":{}}" + `); + }); + + test("instance of Either.left containing errors when the transform function throws an error that isn't a TransformSavedObjectDocument error", async () => { + const transform = jest.fn((doc: any) => { + throw new Error('error during transform'); + }); + const task = migrateRawDocsSafely( + new SavedObjectsSerializer(new SavedObjectTypeRegistry()), + transform, + [{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }] // this is the raw doc + ); + const result = (await task()) as Either.Left; + expect(transform).toHaveBeenCalledTimes(1); + expect(result._tag).toEqual('Left'); + expect(result.left.corruptDocumentIds.length).toEqual(0); + expect(result.left.transformErrors.length).toEqual(1); + expect(result.left.transformErrors[0]).toMatchInlineSnapshot(` + Object { + "err": [Error: error during transform], + "rawId": "a:b", + } + `); + }); +}); diff --git a/src/core/server/saved_objects/migrations/core/migrate_raw_docs.ts b/src/core/server/saved_objects/migrations/core/migrate_raw_docs.ts index 102ec81646a92..461ae1df6bc3d 100644 --- a/src/core/server/saved_objects/migrations/core/migrate_raw_docs.ts +++ b/src/core/server/saved_objects/migrations/core/migrate_raw_docs.ts @@ -9,13 +9,32 @@ /* * This file provides logic for migrating raw documents. */ - +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import * as Either from 'fp-ts/lib/Either'; import { + SavedObjectSanitizedDoc, SavedObjectsRawDoc, SavedObjectsSerializer, SavedObjectUnsanitizedDoc, } from '../../serialization'; import { MigrateAndConvertFn } from './document_migrator'; +import { TransformSavedObjectDocumentError } from '.'; + +export interface DocumentsTransformFailed { + readonly type: string; + readonly corruptDocumentIds: string[]; + readonly transformErrors: TransformErrorObjects[]; +} +export interface DocumentsTransformSuccess { + readonly processedDocs: SavedObjectsRawDoc[]; +} +export interface TransformErrorObjects { + readonly rawId: string; + readonly err: TransformSavedObjectDocumentError | Error; +} +type MigrateFn = ( + doc: SavedObjectUnsanitizedDoc +) => Promise>>; /** * Error thrown when saved object migrations encounter a corrupt saved object. @@ -37,7 +56,6 @@ export class CorruptSavedObjectError extends Error { /** * Applies the specified migration function to every saved object document in the list * of raw docs. Any raw docs that are not valid saved objects will simply be passed through. - * * @param {TransformFn} migrateDoc * @param {SavedObjectsRawDoc[]} rawDocs * @returns {SavedObjectsRawDoc[]} @@ -52,15 +70,9 @@ export async function migrateRawDocs( for (const raw of rawDocs) { const options = { namespaceTreatment: 'lax' as const }; if (serializer.isRawSavedObject(raw, options)) { - const savedObject = serializer.rawToSavedObject(raw, options); - savedObject.migrationVersion = savedObject.migrationVersion || {}; + const savedObject = convertToRawAddMigrationVersion(raw, options, serializer); processedDocs.push( - ...(await migrateDocWithoutBlocking(savedObject)).map((attrs) => - serializer.savedObjectToRaw({ - references: [], - ...attrs, - }) - ) + ...(await migrateMapToRawDoc(migrateDocWithoutBlocking, savedObject, serializer)) ); } else { throw new CorruptSavedObjectError(raw._id); @@ -69,6 +81,58 @@ export async function migrateRawDocs( return processedDocs; } +/** + * Applies the specified migration function to every saved object document provided + * and converts the saved object to a raw document. + * Captures the ids and errors from any documents that are not valid saved objects or + * for which the transformation function failed. + * @returns {TaskEither.TaskEither} + */ +export function migrateRawDocsSafely( + serializer: SavedObjectsSerializer, + migrateDoc: MigrateAndConvertFn, + rawDocs: SavedObjectsRawDoc[] +): TaskEither.TaskEither { + return async () => { + const migrateDocNonBlocking = transformNonBlocking(migrateDoc); + const processedDocs: SavedObjectsRawDoc[] = []; + const transformErrors: TransformErrorObjects[] = []; + const corruptSavedObjectIds: string[] = []; + const options = { namespaceTreatment: 'lax' as const }; + for (const raw of rawDocs) { + if (serializer.isRawSavedObject(raw, options)) { + try { + const savedObject = convertToRawAddMigrationVersion(raw, options, serializer); + processedDocs.push( + ...(await migrateMapToRawDoc(migrateDocNonBlocking, savedObject, serializer)) + ); + } catch (err) { + if (err instanceof TransformSavedObjectDocumentError) { + // the doc id we get from the error is only the uuid part + // we use the original raw document _id instead + transformErrors.push({ + rawId: raw._id, + err, + }); + } else { + transformErrors.push({ rawId: raw._id, err }); // cases we haven't accounted for yet + } + } + } else { + corruptSavedObjectIds.push(raw._id); + } + } + if (corruptSavedObjectIds.length > 0 || transformErrors.length > 0) { + return Either.left({ + type: 'documents_transform_failed', + corruptDocumentIds: [...corruptSavedObjectIds], + transformErrors, + }); + } + return Either.right({ processedDocs }); + }; +} + /** * Migration transform functions are potentially CPU heavy e.g. doing decryption/encryption * or (de)/serializing large JSON payloads. @@ -92,3 +156,40 @@ function transformNonBlocking( }); }); } + +/** + * Applies the specified migration function to every saved object document provided + * and converts the saved object to a raw document + * @param {MigrateFn} transformNonBlocking + * @param {SavedObjectsRawDoc[]} rawDoc + * @returns {Promise} + */ +async function migrateMapToRawDoc( + migrateMethod: MigrateFn, + savedObject: SavedObjectSanitizedDoc, + serializer: SavedObjectsSerializer +): Promise { + return [...(await migrateMethod(savedObject))].map((attrs) => + serializer.savedObjectToRaw({ + references: [], + ...attrs, + }) + ); +} + +/** + * Sanitizes the raw saved object document + * @param {SavedObjectRawDoc} rawDoc + * @param options + * @param {SavedObjectsSerializer} serializer + * @returns {SavedObjectSanitizedDoc} + */ +function convertToRawAddMigrationVersion( + rawDoc: SavedObjectsRawDoc, + options: { namespaceTreatment: 'lax' }, + serializer: SavedObjectsSerializer +): SavedObjectSanitizedDoc { + const savedObject = serializer.rawToSavedObject(rawDoc, options); + savedObject.migrationVersion = savedObject.migrationVersion || {}; + return savedObject; +} diff --git a/src/core/server/saved_objects/migrations/core/transform_saved_object_document_error.test.ts b/src/core/server/saved_objects/migrations/core/transform_saved_object_document_error.test.ts new file mode 100644 index 0000000000000..80c670edd39ba --- /dev/null +++ b/src/core/server/saved_objects/migrations/core/transform_saved_object_document_error.test.ts @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +import { TransformSavedObjectDocumentError } from './transform_saved_object_document_error'; + +describe('TransformSavedObjectDocumentError', () => { + it('is a special error', () => { + const originalError = new Error('Dang diggity!'); + const err = new TransformSavedObjectDocumentError( + 'id', + 'type', + 'namespace', + 'failedTransform', + 'failedDoc', + originalError + ); + expect(err).toBeInstanceOf(TransformSavedObjectDocumentError); + expect(err.id).toEqual('id'); + expect(err.namespace).toEqual('namespace'); + expect(err.stack).not.toBeNull(); + }); + it('constructs an special error message', () => { + const originalError = new Error('Dang diggity!'); + const err = new TransformSavedObjectDocumentError( + 'id', + 'type', + 'namespace', + 'failedTransform', + 'failedDoc', + originalError + ); + expect(err.message).toMatchInlineSnapshot( + ` + "Failed to transform document id. Transform: failedTransform + Doc: failedDoc" + ` + ); + }); + it('handles undefined namespace', () => { + const originalError = new Error('Dang diggity!'); + const err = new TransformSavedObjectDocumentError( + 'id', + 'type', + undefined, + 'failedTransform', + 'failedDoc', + originalError + ); + expect(err.message).toMatchInlineSnapshot( + ` + "Failed to transform document id. Transform: failedTransform + Doc: failedDoc" + ` + ); + }); +}); diff --git a/src/core/server/saved_objects/migrations/core/transform_saved_object_document_error.ts b/src/core/server/saved_objects/migrations/core/transform_saved_object_document_error.ts new file mode 100644 index 0000000000000..6a6f87ea1eeb2 --- /dev/null +++ b/src/core/server/saved_objects/migrations/core/transform_saved_object_document_error.ts @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +/** + * Error thrown when saved object migrations encounter a transformation error. + * Transformation errors happen when a transform function throws an error for an unsanitized saved object + * The id (doc.id) reported in this error class is just the uuid part and doesn't tell users what the full elasticsearch id is. + * in order to convert the id to the serialized version further upstream using serializer.generateRawId, we need to provide the following items: + * - namespace: doc.namespace, + * - type: doc.type, + * - id: doc.id, + * The new error class helps with v2 migrations. + * For backward compatibility with v1 migrations, the error message is the same as what was previously thrown as a plain error + */ + +export class TransformSavedObjectDocumentError extends Error { + constructor( + public readonly id: string, + public readonly type: string, + public readonly namespace: string | undefined, + public readonly failedTransform: string, // created by document_migrator wrapWithTry as `${type.name}:${version}`; + public readonly failedDoc: string, + public readonly originalError: Error + ) { + super(`Failed to transform document ${id}. Transform: ${failedTransform}\nDoc: ${failedDoc}`); + } +} diff --git a/src/core/server/saved_objects/migrations/kibana/kibana_migrator.ts b/src/core/server/saved_objects/migrations/kibana/kibana_migrator.ts index e09284b49c86e..f74fe7e7a6e1c 100644 --- a/src/core/server/saved_objects/migrations/kibana/kibana_migrator.ts +++ b/src/core/server/saved_objects/migrations/kibana/kibana_migrator.ts @@ -35,7 +35,7 @@ import { SavedObjectsMigrationConfigType } from '../../saved_objects_config'; import { ISavedObjectTypeRegistry } from '../../saved_objects_type_registry'; import { SavedObjectsType } from '../../types'; import { runResilientMigrator } from '../../migrationsv2'; -import { migrateRawDocs } from '../core/migrate_raw_docs'; +import { migrateRawDocsSafely } from '../core/migrate_raw_docs'; export interface KibanaMigratorOptions { client: ElasticsearchClient; @@ -135,7 +135,6 @@ export class KibanaMigrator { if (!rerun) { this.status$.next({ status: 'running' }); } - this.migrationResult = this.runMigrationsInternal().then((result) => { // Similar to above, don't publish status updates when rerunning in CI. if (!rerun) { @@ -185,7 +184,11 @@ export class KibanaMigrator { logger: this.log, preMigrationScript: indexMap[index].script, transformRawDocs: (rawDocs: SavedObjectsRawDoc[]) => - migrateRawDocs(this.serializer, this.documentMigrator.migrateAndConvert, rawDocs), + migrateRawDocsSafely( + this.serializer, + this.documentMigrator.migrateAndConvert, + rawDocs + ), migrationVersionPerType: this.documentMigrator.migrationVersion, indexPrefix: index, migrationsConfig: this.soMigrationsConfig, diff --git a/src/core/server/saved_objects/migrationsv2/actions/index.test.ts b/src/core/server/saved_objects/migrationsv2/actions/index.test.ts index ba6aafbb2f651..df74a4e1282e4 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/index.test.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/index.test.ts @@ -129,18 +129,6 @@ describe('actions', () => { }); }); - describe('transformDocs', () => { - it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.transformDocs(client, () => Promise.resolve([]), [], 'my_index', false); - try { - await task(); - } catch (e) { - /** ignore */ - } - expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); - }); - }); - describe('reindex', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { const task = Actions.reindex( diff --git a/src/core/server/saved_objects/migrationsv2/actions/index.ts b/src/core/server/saved_objects/migrationsv2/actions/index.ts index 79261aecf675c..d0623de51e4c3 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/index.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/index.ts @@ -22,6 +22,10 @@ import { catchRetryableEsClientErrors, RetryableEsClientError, } from './catch_retryable_es_client_errors'; +import { + DocumentsTransformFailed, + DocumentsTransformSuccess, +} from '../../migrations/core/migrate_raw_docs'; export type { RetryableEsClientError }; /** @@ -46,6 +50,7 @@ export interface ActionErrorTypeMap { incompatible_mapping_exception: IncompatibleMappingException; alias_not_found_exception: AliasNotFound; remove_index_not_a_concrete_index: RemoveIndexNotAConcreteIndex; + documents_transform_failed: DocumentsTransformFailed; } /** @@ -523,28 +528,13 @@ export const closePit = ( }; /* - * Transform outdated docs and write them to the index. + * Transform outdated docs * */ export const transformDocs = ( - client: ElasticsearchClient, transformRawDocs: TransformRawDocs, - outdatedDocuments: SavedObjectsRawDoc[], - index: string, - // used for testing purposes only - refresh: estypes.Refresh -): TaskEither.TaskEither< - RetryableEsClientError | IndexNotFound | TargetIndexHadWriteBlock, - 'bulk_index_succeeded' -> => - pipe( - TaskEither.tryCatch( - () => transformRawDocs(outdatedDocuments), - (e) => { - throw e; - } - ), - TaskEither.chain((docs) => bulkOverwriteTransformedDocuments(client, index, docs, refresh)) - ); + outdatedDocuments: SavedObjectsRawDoc[] +): TaskEither.TaskEither => + transformRawDocs(outdatedDocuments); /** @internal */ export interface ReindexResponse { @@ -747,8 +737,6 @@ export const waitForPickupUpdatedMappingsTask = flow( } ) ); - -/** @internal */ export interface AliasNotFound { type: 'alias_not_found_exception'; } diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts index 832d322037465..d0158a4c68f24 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts @@ -41,6 +41,8 @@ import { import * as Either from 'fp-ts/lib/Either'; import * as Option from 'fp-ts/lib/Option'; import { ResponseError } from '@elastic/elasticsearch/lib/errors'; +import { DocumentsTransformFailed, DocumentsTransformSuccess } from '../../migrations/core'; +import { TaskEither } from 'fp-ts/lib/TaskEither'; const { startES } = kbnTestServer.createTestServers({ adjustTimeout: (t: number) => jest.setTimeout(t), @@ -1014,41 +1016,30 @@ describe('migration actions', () => { }); describe('transformDocs', () => { - it('applies "transformRawDocs" and writes result into an index', async () => { - const index = 'transform_docs_index'; + it('applies "transformRawDocs" and returns the transformed documents', async () => { const originalDocs = [ { _id: 'foo:1', _source: { type: 'dashboard', value: 1 } }, { _id: 'foo:2', _source: { type: 'dashboard', value: 2 } }, ]; - const createIndexTask = createIndex(client, index, { - dynamic: true, - properties: {}, - }); - await createIndexTask(); - - async function tranformRawDocs(docs: SavedObjectsRawDoc[]): Promise { - for (const doc of docs) { - doc._source.value += 1; - } - return docs; + function innerTransformRawDocs( + docs: SavedObjectsRawDoc[] + ): TaskEither { + return async () => { + const processedDocs: SavedObjectsRawDoc[] = []; + for (const doc of docs) { + doc._source.value += 1; + processedDocs.push(doc); + } + return Either.right({ processedDocs }); + }; } + const transformTask = transformDocs(innerTransformRawDocs, originalDocs); - const transformTask = transformDocs(client, tranformRawDocs, originalDocs, index, 'wait_for'); - - const result = (await transformTask()) as Either.Right<'bulk_index_succeeded'>; - - expect(result.right).toBe('bulk_index_succeeded'); - - const { body } = await client.search<{ value: number }>({ - index, - }); - const hits = body.hits.hits; - - const foo1 = hits.find((h) => h._id === 'foo:1'); - expect(foo1?._source?.value).toBe(2); - - const foo2 = hits.find((h) => h._id === 'foo:2'); + const resultsWithProcessDocs = ((await transformTask()) as Either.Right) + .right.processedDocs; + expect(resultsWithProcessDocs.length).toEqual(2); + const foo2 = resultsWithProcessDocs.find((h) => h._id === 'foo:2'); expect(foo2?._source?.value).toBe(3); }); }); diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/archives/8.0.0_migrated_with_corrupt_outdated_docs.zip b/src/core/server/saved_objects/migrationsv2/integration_tests/archives/8.0.0_migrated_with_corrupt_outdated_docs.zip new file mode 100644 index 0000000000000..726df7782cda3 Binary files /dev/null and b/src/core/server/saved_objects/migrationsv2/integration_tests/archives/8.0.0_migrated_with_corrupt_outdated_docs.zip differ diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/cleanup.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/cleanup.test.ts index 48bb282da18f6..1e494d4b55861 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/cleanup.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/cleanup.test.ts @@ -100,7 +100,7 @@ describe('migration v2', () => { await root.setup(); await expect(root.start()).rejects.toThrow( - /Unable to migrate the corrupt saved object document with _id: 'index-pattern:test_index\*'/ + 'Unable to complete saved object migrations for the [.kibana] index: Migrations failed. Reason: Corrupt saved object documents: index-pattern:test_index*. To allow migrations to proceed, please delete these documents.' ); const logFileContent = await asyncReadFile(logFilePath, 'utf-8'); diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/corrupt_outdated_docs.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/corrupt_outdated_docs.test.ts new file mode 100644 index 0000000000000..a114263f5b985 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/corrupt_outdated_docs.test.ts @@ -0,0 +1,155 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import Path from 'path'; +import Fs from 'fs'; +import Util from 'util'; +import * as kbnTestServer from '../../../../test_helpers/kbn_server'; +import { Root } from '../../../root'; + +const logFilePath = Path.join(__dirname, 'migration_test_corrupt_docs_kibana.log'); + +const asyncUnlink = Util.promisify(Fs.unlink); +async function removeLogFile() { + // ignore errors if it doesn't exist + await asyncUnlink(logFilePath).catch(() => void 0); +} + +// relies on archive with SO from v8 +describe.skip('migration v2 with corrupt saved object documents', () => { + let esServer: kbnTestServer.TestElasticsearchUtils; + let root: Root; + + beforeAll(async () => { + await removeLogFile(); + }); + + afterAll(async () => { + if (root) { + await root.shutdown(); + } + if (esServer) { + await esServer.stop(); + } + + await new Promise((resolve) => setTimeout(resolve, 10000)); + }); + + it('collects corrupt saved object documents accross batches', async () => { + const { startES } = kbnTestServer.createTestServers({ + adjustTimeout: (t: number) => jest.setTimeout(t), + settings: { + es: { + license: 'basic', + // original uncorrupt SO: + // { + // type: 'foo', // 'bar', 'baz' + // foo: {}, // bar: {}, baz: {} + // migrationVersion: { + // foo: '7.13.0', + // }, + // }, + // original corrupt SO example: + // { + // id: 'bar:123' + // type: 'foo', + // foo: {}, + // migrationVersion: { + // foo: '7.13.0', + // }, + // }, + // contains migrated index with 8.0 aliases to skip migration, but run outdated doc search + dataArchive: Path.join( + __dirname, + 'archives', + '8.0.0_migrated_with_corrupt_outdated_docs.zip' + ), + }, + }, + }); + + root = createRoot(); + + esServer = await startES(); + const coreSetup = await root.setup(); + + coreSetup.savedObjects.registerType({ + name: 'foo', + hidden: false, + mappings: { properties: {} }, + namespaceType: 'agnostic', + migrations: { + '7.14.0': (doc) => doc, + }, + }); + coreSetup.savedObjects.registerType({ + name: 'bar', + hidden: false, + mappings: { properties: {} }, + namespaceType: 'agnostic', + migrations: { + '7.14.0': (doc) => doc, + }, + }); + coreSetup.savedObjects.registerType({ + name: 'baz', + hidden: false, + mappings: { properties: {} }, + namespaceType: 'agnostic', + migrations: { + '7.14.0': (doc) => doc, + }, + }); + try { + await root.start(); + } catch (err) { + const corruptFooSOs = /foo:/g; + const corruptBarSOs = /bar:/g; + const corruptBazSOs = /baz:/g; + expect( + [ + ...err.message.matchAll(corruptFooSOs), + ...err.message.matchAll(corruptBarSOs), + ...err.message.matchAll(corruptBazSOs), + ].length + ).toEqual(16); + } + }); +}); + +function createRoot() { + return kbnTestServer.createRootWithCorePlugins( + { + migrations: { + skip: false, + enableV2: true, + batchSize: 5, + }, + logging: { + appenders: { + file: { + type: 'file', + fileName: logFilePath, + layout: { + type: 'json', + }, + }, + }, + loggers: [ + { + name: 'root', + appenders: ['file'], + }, + ], + }, + }, + { + oss: true, + } + ); +} diff --git a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts index 85cc86fe0a468..8443f837a7f1d 100644 --- a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts +++ b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts @@ -10,7 +10,6 @@ import { errors as EsErrors } from '@elastic/elasticsearch'; import * as Option from 'fp-ts/lib/Option'; import { Logger, LogMeta } from '../../logging'; import type { ElasticsearchClient } from '../../elasticsearch'; -import { CorruptSavedObjectError } from '../migrations/core/migrate_raw_docs'; import { Model, Next, stateActionMachine } from './state_action_machine'; import { cleanup } from './migrations_state_machine_cleanup'; import { State } from './types'; @@ -74,7 +73,6 @@ const logActionResponse = ( ) => { logger.debug(logMessagePrefix + `${state.controlState} RESPONSE`, res as LogMeta); }; - const dumpExecutionLog = (logger: Logger, logMessagePrefix: string, executionLog: ExecutionLog) => { logger.error(logMessagePrefix + 'migration failed, dumping execution log:'); executionLog.forEach((log) => { @@ -211,11 +209,6 @@ export async function migrationStateActionMachine({ logger.error(e); dumpExecutionLog(logger, logMessagePrefix, executionLog); - if (e instanceof CorruptSavedObjectError) { - throw new Error( - `${e.message} To allow migrations to proceed, please delete this document from the [${initialState.indexPrefix}_${initialState.kibanaVersion}_001] index.` - ); - } const newError = new Error( `Unable to complete saved object migrations for the [${initialState.indexPrefix}] index. ${e}` diff --git a/src/core/server/saved_objects/migrationsv2/model.test.ts b/src/core/server/saved_objects/migrationsv2/model.test.ts index 213e8b43c0ea0..bdaedba9c9ea3 100644 --- a/src/core/server/saved_objects/migrationsv2/model.test.ts +++ b/src/core/server/saved_objects/migrationsv2/model.test.ts @@ -36,12 +36,15 @@ import type { CloneTempToSource, SetTempWriteBlock, WaitForYellowSourceState, + TransformedDocumentsBulkIndex, + ReindexSourceToTempIndexBulk, } from './types'; import { SavedObjectsRawDoc } from '..'; import { AliasAction, RetryableEsClientError } from './actions'; import { createInitialState, model } from './model'; import { ResponseType } from './next'; import { SavedObjectsMigrationConfigType } from '../saved_objects_config'; +import { TransformErrorObjects, TransformSavedObjectDocumentError } from '../migrations/core'; describe('migrations v2 model', () => { const baseState: BaseState = { @@ -778,6 +781,8 @@ describe('migrations v2 model', () => { targetIndex: '.kibana_7.11.0_001', tempIndexMappings: { properties: {} }, lastHitSortValue: undefined, + corruptDocumentIds: [], + transformErrors: [], }; it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_INDEX if the index has outdated documents to reindex', () => { @@ -802,6 +807,23 @@ describe('migrations v2 model', () => { expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_CLOSE_PIT'); expect(newState.sourceIndexPitId).toBe('pit_id'); }); + + it('REINDEX_SOURCE_TO_TEMP_READ -> FATAL if no outdated documents to reindex and transform failures seen with previous outdated documents', () => { + const testState: ReindexSourceToTempRead = { + ...state, + corruptDocumentIds: ['a:b'], + transformErrors: [], + }; + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.right({ + outdatedDocuments: [], + lastHitSortValue: undefined, + }); + const newState = model(testState, res) as FatalState; + expect(newState.controlState).toBe('FATAL'); + expect(newState.reason).toMatchInlineSnapshot( + `"Migrations failed. Reason: Corrupt saved object documents: a:b. To allow migrations to proceed, please delete these documents."` + ); + }); }); describe('REINDEX_SOURCE_TO_TEMP_CLOSE_PIT', () => { @@ -833,38 +855,89 @@ describe('migrations v2 model', () => { sourceIndexPitId: 'pit_id', targetIndex: '.kibana_7.11.0_001', lastHitSortValue: undefined, + corruptDocumentIds: [], + transformErrors: [], }; + const processedDocs = [ + { + _id: 'a:b', + _source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] }, + }, + ] as SavedObjectsRawDoc[]; - it('REINDEX_SOURCE_TO_TEMP_INDEX -> REINDEX_SOURCE_TO_TEMP_READ if action succeeded', () => { - const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX'> = Either.right( - 'bulk_index_succeeded' - ); + it('REINDEX_SOURCE_TO_TEMP_INDEX -> REINDEX_SOURCE_TO_TEMP_INDEX_BULK if action succeeded', () => { + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX'> = Either.right({ + processedDocs, + }); const newState = model(state, res); + expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_INDEX_BULK'); + }); + + it('REINDEX_SOURCE_TO_TEMP_INDEX -> REINDEX_SOURCE_TO_TEMP_READ if action succeeded but we have carried through previous failures', () => { + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX'> = Either.right({ + processedDocs, + }); + const testState = { + ...state, + corruptDocumentIds: ['a:b'], + transformErrors: [], + }; + const newState = model(testState, res) as ReindexSourceToTempIndex; expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_READ'); - expect(newState.retryCount).toEqual(0); - expect(newState.retryDelay).toEqual(0); + expect(newState.corruptDocumentIds.length).toEqual(1); + expect(newState.transformErrors.length).toEqual(0); }); - it('REINDEX_SOURCE_TO_TEMP_INDEX -> REINDEX_SOURCE_TO_TEMP_READ when response is left target_index_had_write_block', () => { + it('REINDEX_SOURCE_TO_TEMP_INDEX -> REINDEX_SOURCE_TO_TEMP_READ when response is left documents_transform_failed', () => { const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX'> = Either.left({ - type: 'target_index_had_write_block', + type: 'documents_transform_failed', + corruptDocumentIds: ['a:b'], + transformErrors: [], }); const newState = model(state, res) as ReindexSourceToTempRead; expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_READ'); + expect(newState.corruptDocumentIds.length).toEqual(1); + expect(newState.transformErrors.length).toEqual(0); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); - - it('REINDEX_SOURCE_TO_TEMP_INDEX -> REINDEX_SOURCE_TO_TEMP_READ when response is left index_not_found_exception for temp index', () => { - const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX'> = Either.left({ - type: 'index_not_found_exception', - index: state.tempIndex, - }); - const newState = model(state, res) as ReindexSourceToTempRead; + }); + describe('REINDEX_SOURCE_TO_TEMP_INDEX_BULK', () => { + const transformedDocs = [ + { + _id: 'a:b', + _source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] }, + }, + ] as SavedObjectsRawDoc[]; + const reindexSourceToTempIndexBulkState: ReindexSourceToTempIndexBulk = { + ...baseState, + controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK', + transformedDocs, + versionIndexReadyActions: Option.none, + sourceIndex: Option.some('.kibana') as Option.Some, + sourceIndexPitId: 'pit_id', + targetIndex: '.kibana_7.11.0_001', + lastHitSortValue: undefined, + }; + test('REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ if action succeeded', () => { + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'> = Either.right( + 'bulk_index_succeeded' + ); + const newState = model(reindexSourceToTempIndexBulkState, res); expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_READ'); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); + test('REINDEX_SOURCE_TO_TEMP_INDEX_BULK should throw a throwBadResponse error if action failed', () => { + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'> = Either.left({ + type: 'retryable_es_client_error', + message: 'random documents bulk index error', + }); + const newState = model(reindexSourceToTempIndexBulkState, res); + expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_INDEX_BULK'); + expect(newState.retryCount).toEqual(1); + expect(newState.retryDelay).toEqual(2000); + }); }); describe('SET_TEMP_WRITE_BLOCK', () => { @@ -943,6 +1016,8 @@ describe('migrations v2 model', () => { targetIndex: '.kibana_7.11.0_001', lastHitSortValue: undefined, hasTransformedDocs: false, + corruptDocumentIds: [], + transformErrors: [], }; it('OUTDATED_DOCUMENTS_SEARCH_READ -> OUTDATED_DOCUMENTS_TRANSFORM if found documents to transform', () => { @@ -967,6 +1042,37 @@ describe('migrations v2 model', () => { expect(newState.controlState).toBe('OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT'); expect(newState.pitId).toBe('pit_id'); }); + + it('OUTDATED_DOCUMENTS_SEARCH_READ -> FATAL if no outdated documents to transform and we have failed document migrations', () => { + const corruptDocumentIdsCarriedOver = ['a:somethingelse']; + const originalTransformError = new Error('something went wrong'); + const transFormErr = new TransformSavedObjectDocumentError( + '123', + 'vis', + undefined, + 'randomvis: 7.12.0', + 'failedDoc', + originalTransformError + ); + const transformationErrors = [ + { rawId: 'bob:tail', err: transFormErr }, + ] as TransformErrorObjects[]; + const res: ResponseType<'OUTDATED_DOCUMENTS_SEARCH_READ'> = Either.right({ + outdatedDocuments: [], + lastHitSortValue: undefined, + }); + const transformErrorsState: OutdatedDocumentsSearchRead = { + ...state, + corruptDocumentIds: [...corruptDocumentIdsCarriedOver], + transformErrors: [...transformationErrors], + }; + const newState = model(transformErrorsState, res) as FatalState; + expect(newState.controlState).toBe('FATAL'); + expect(newState.reason.includes('Migrations failed. Reason:')).toBe(true); + expect(newState.reason.includes('Corrupt saved object documents: ')).toBe(true); + expect(newState.reason.includes('Transformation errors: ')).toBe(true); + expect(newState.reason.includes('randomvis: 7.12.0')).toBe(true); + }); }); describe('OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT', () => { @@ -1006,9 +1112,20 @@ describe('migrations v2 model', () => { }); describe('OUTDATED_DOCUMENTS_TRANSFORM', () => { - const outdatedDocuments = ([ - Symbol('raw saved object doc'), - ] as unknown) as SavedObjectsRawDoc[]; + const outdatedDocuments = [{ _id: '1', _source: { type: 'vis' } }]; + const corruptDocumentIds = ['a:somethingelse']; + const originalTransformError = new Error('Dang diggity!'); + const transFormErr = new TransformSavedObjectDocumentError( + 'id', + 'type', + 'namespace', + 'failedTransform', + 'failedDoc', + originalTransformError + ); + const transformationErrors = [ + { rawId: 'bob:tail', err: transFormErr }, + ] as TransformErrorObjects[]; const outdatedDocumentsTransformState: OutdatedDocumentsTransform = { ...baseState, controlState: 'OUTDATED_DOCUMENTS_TRANSFORM', @@ -1016,18 +1133,132 @@ describe('migrations v2 model', () => { sourceIndex: Option.some('.kibana') as Option.Some, targetIndex: '.kibana_7.11.0_001', outdatedDocuments, + corruptDocumentIds: [], + transformErrors: [], pitId: 'pit_id', lastHitSortValue: [3, 4], hasTransformedDocs: false, }; - test('OUTDATED_DOCUMENTS_TRANSFORM -> OUTDATED_DOCUMENTS_SEARCH_READ if action succeeds', () => { - const res: ResponseType<'OUTDATED_DOCUMENTS_TRANSFORM'> = Either.right( - 'bulk_index_succeeded' - ); - const newState = model(outdatedDocumentsTransformState, res); - expect(newState.controlState).toEqual('OUTDATED_DOCUMENTS_SEARCH_READ'); - expect(newState.retryCount).toEqual(0); - expect(newState.retryDelay).toEqual(0); + describe('OUTDATED_DOCUMENTS_TRANSFORM if action succeeds', () => { + const processedDocs = [ + { + _id: 'a:b', + _source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] }, + }, + ] as SavedObjectsRawDoc[]; + test('OUTDATED_DOCUMENTS_TRANSFORM -> TRANSFORMED_DOCUMENTS_BULK_INDEX if action succeeds', () => { + const res: ResponseType<'OUTDATED_DOCUMENTS_TRANSFORM'> = Either.right({ processedDocs }); + const newState = model( + outdatedDocumentsTransformState, + res + ) as TransformedDocumentsBulkIndex; + expect(newState.controlState).toEqual('TRANSFORMED_DOCUMENTS_BULK_INDEX'); + expect(newState.transformedDocs).toEqual(processedDocs); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + test('OUTDATED_DOCUMENTS_TRANSFORM -> OUTDATED_DOCUMENTS_SEARCH_READ if there are are existing documents that failed transformation', () => { + const outdatedDocumentsTransformStateWithFailedDocuments: OutdatedDocumentsTransform = { + ...outdatedDocumentsTransformState, + corruptDocumentIds: [...corruptDocumentIds], + transformErrors: [], + }; + const res: ResponseType<'OUTDATED_DOCUMENTS_TRANSFORM'> = Either.right({ processedDocs }); + const newState = model( + outdatedDocumentsTransformStateWithFailedDocuments, + res + ) as OutdatedDocumentsSearchRead; + expect(newState.controlState).toEqual('OUTDATED_DOCUMENTS_SEARCH_READ'); + expect(newState.corruptDocumentIds).toEqual(corruptDocumentIds); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + test('OUTDATED_DOCUMENTS_TRANSFORM -> OUTDATED_DOCUMENTS_SEARCH_READ if there are are existing documents that failed transformation because of transform errors', () => { + const outdatedDocumentsTransformStateWithFailedDocuments: OutdatedDocumentsTransform = { + ...outdatedDocumentsTransformState, + corruptDocumentIds: [], + transformErrors: [...transformationErrors], + }; + const res: ResponseType<'OUTDATED_DOCUMENTS_TRANSFORM'> = Either.right({ processedDocs }); + const newState = model( + outdatedDocumentsTransformStateWithFailedDocuments, + res + ) as OutdatedDocumentsSearchRead; + expect(newState.controlState).toEqual('OUTDATED_DOCUMENTS_SEARCH_READ'); + expect(newState.corruptDocumentIds.length).toEqual(0); + expect(newState.transformErrors.length).toEqual(1); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + }); + describe('OUTDATED_DOCUMENTS_TRANSFORM if action fails', () => { + test('OUTDATED_DOCUMENTS_TRANSFORM -> OUTDATED_DOCUMENTS_SEARCH_READ adding newly failed documents to state if documents failed the transform', () => { + const res: ResponseType<'OUTDATED_DOCUMENTS_TRANSFORM'> = Either.left({ + type: 'documents_transform_failed', + corruptDocumentIds, + transformErrors: [], + }); + const newState = model( + outdatedDocumentsTransformState, + res + ) as OutdatedDocumentsSearchRead; + expect(newState.controlState).toEqual('OUTDATED_DOCUMENTS_SEARCH_READ'); + expect(newState.corruptDocumentIds).toEqual(corruptDocumentIds); + }); + test('OUTDATED_DOCUMENTS_TRANSFORM -> OUTDATED_DOCUMENTS_SEARCH_READ combines newly failed documents with those already on state if documents failed the transform', () => { + const newFailedTransformDocumentIds = ['b:other', 'c:__']; + const outdatedDocumentsTransformStateWithFailedDocuments: OutdatedDocumentsTransform = { + ...outdatedDocumentsTransformState, + corruptDocumentIds: [...corruptDocumentIds], + transformErrors: [...transformationErrors], + }; + const res: ResponseType<'OUTDATED_DOCUMENTS_TRANSFORM'> = Either.left({ + type: 'documents_transform_failed', + corruptDocumentIds: newFailedTransformDocumentIds, + transformErrors: transformationErrors, + }); + const newState = model( + outdatedDocumentsTransformStateWithFailedDocuments, + res + ) as OutdatedDocumentsSearchRead; + expect(newState.controlState).toEqual('OUTDATED_DOCUMENTS_SEARCH_READ'); + expect(newState.corruptDocumentIds).toEqual([ + ...corruptDocumentIds, + ...newFailedTransformDocumentIds, + ]); + }); + }); + }); + describe('TRANSFORMED_DOCUMENTS_BULK_INDEX', () => { + const transformedDocs = [ + { + _id: 'a:b', + _source: { type: 'a', a: { name: 'HOI!' }, migrationVersion: {}, references: [] }, + }, + ] as SavedObjectsRawDoc[]; + const transformedDocumentsBulkIndexState: TransformedDocumentsBulkIndex = { + ...baseState, + controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX', + transformedDocs, + versionIndexReadyActions: Option.none, + sourceIndex: Option.some('.kibana') as Option.Some, + targetIndex: '.kibana_7.11.0_001', + pitId: 'pit_id', + lastHitSortValue: [3, 4], + hasTransformedDocs: false, + }; + test('TRANSFORMED_DOCUMENTS_BULK_INDEX should throw a throwBadResponse error if action failed', () => { + const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.left({ + type: 'retryable_es_client_error', + message: 'random documents bulk index error', + }); + const newState = model( + transformedDocumentsBulkIndexState, + res + ) as TransformedDocumentsBulkIndex; + expect(newState.controlState).toEqual('TRANSFORMED_DOCUMENTS_BULK_INDEX'); + expect(newState.retryCount).toEqual(1); + expect(newState.retryDelay).toEqual(2000); }); }); diff --git a/src/core/server/saved_objects/migrationsv2/model.ts b/src/core/server/saved_objects/migrationsv2/model.ts index 318eff19d5e24..cf9d6aec6b5b0 100644 --- a/src/core/server/saved_objects/migrationsv2/model.ts +++ b/src/core/server/saved_objects/migrationsv2/model.ts @@ -16,7 +16,7 @@ import { IndexMapping } from '../mappings'; import { ResponseType } from './next'; import { SavedObjectsMigrationVersion } from '../types'; import { disableUnknownTypeMappingFields } from '../migrations/core/migration_context'; -import { excludeUnusedTypesQuery } from '../migrations/core'; +import { excludeUnusedTypesQuery, TransformErrorObjects } from '../migrations/core'; import { SavedObjectsMigrationConfigType } from '../saved_objects_config'; /** @@ -97,6 +97,31 @@ function getAliases(indices: FetchIndexResponse) { }, {} as Record); } +/** + * Constructs migration failure message strings from corrupt document ids and document transformation errors + */ +function extractTransformFailuresReason( + corruptDocumentIds: string[], + transformErrors: TransformErrorObjects[] +): { corruptDocsReason: string; transformErrsReason: string } { + const corruptDocumentIdReason = + corruptDocumentIds.length > 0 + ? ` Corrupt saved object documents: ${corruptDocumentIds.join(',')}` + : ''; + // we have both the saved object Id and the stack trace in each `transformErrors` item. + const transformErrorsReason = + transformErrors.length > 0 + ? ' Transformation errors: ' + + transformErrors + .map((errObj) => `${errObj.rawId}: ${errObj.err.message}\n ${errObj.err.stack ?? ''}`) + .join('/n') + : ''; + return { + corruptDocsReason: corruptDocumentIdReason, + transformErrsReason: transformErrorsReason, + }; +} + const delayRetryState = ( state: S, errorMessage: string, @@ -481,11 +506,15 @@ export const model = (currentState: State, resW: ResponseType): controlState: 'REINDEX_SOURCE_TO_TEMP_READ', sourceIndexPitId: res.right.pitId, lastHitSortValue: undefined, + // placeholders to collect document transform problems + corruptDocumentIds: [], + transformErrors: [], }; } else { throwBadResponse(stateP, res); } } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_READ') { + // we carry through any failures we've seen with transforming documents on state const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { if (res.right.outdatedDocuments.length > 0) { @@ -495,11 +524,27 @@ export const model = (currentState: State, resW: ResponseType): outdatedDocuments: res.right.outdatedDocuments, lastHitSortValue: res.right.lastHitSortValue, }; + } else { + // we don't have any more outdated documents and need to either fail or move on to updating the target mappings. + if (stateP.corruptDocumentIds.length > 0 || stateP.transformErrors.length > 0) { + const { corruptDocsReason, transformErrsReason } = extractTransformFailuresReason( + stateP.corruptDocumentIds, + stateP.transformErrors + ); + return { + ...stateP, + controlState: 'FATAL', + reason: `Migrations failed. Reason:${corruptDocsReason}${transformErrsReason}. To allow migrations to proceed, please delete these documents.`, + }; + } else { + // we don't have any more outdated documents and we haven't encountered any document transformation issues. + // Close the PIT search and carry on with the happy path. + return { + ...stateP, + controlState: 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT', + }; + } } - return { - ...stateP, - controlState: 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT', - }; } else { throwBadResponse(stateP, res); } @@ -516,34 +561,55 @@ export const model = (currentState: State, resW: ResponseType): throwBadResponse(stateP, res); } } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_INDEX') { + // We follow a similar control flow as for + // outdated document search -> outdated document transform -> transform documents bulk index + // collecting issues along the way rather than failing + // REINDEX_SOURCE_TO_TEMP_INDEX handles the document transforms const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { - return { - ...stateP, - controlState: 'REINDEX_SOURCE_TO_TEMP_READ', - }; + if (stateP.corruptDocumentIds.length === 0 && stateP.transformErrors.length === 0) { + return { + ...stateP, + controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK', // handles the actual bulk indexing into temp index + transformedDocs: [...res.right.processedDocs], + }; + } else { + // we don't have any transform issues with the current batch of outdated docs but + // we have carried through previous transformation issues. + // The migration will ultimately fail but before we do that, continue to + // search through remaining docs for more issues and pass the previous failures along on state + return { + ...stateP, + controlState: 'REINDEX_SOURCE_TO_TEMP_READ', + }; + } } else { + // we have failures from the current batch of documents and add them to the lists const left = res.left; - if ( - isLeftTypeof(left, 'target_index_had_write_block') || - (isLeftTypeof(left, 'index_not_found_exception') && left.index === stateP.tempIndex) - ) { - // index_not_found_exception: - // another instance completed the MARK_VERSION_INDEX_READY and - // removed the temp index. - // target_index_had_write_block - // another instance completed the SET_TEMP_WRITE_BLOCK step adding a - // write block to the temp index. - // - // For simplicity we continue linearly through the next steps even if - // we know another instance already completed these. + if (isLeftTypeof(left, 'documents_transform_failed')) { return { ...stateP, controlState: 'REINDEX_SOURCE_TO_TEMP_READ', + corruptDocumentIds: [...stateP.corruptDocumentIds, ...left.corruptDocumentIds], + transformErrors: [...stateP.transformErrors, ...left.transformErrors], }; + } else { + // should never happen + throwBadResponse(stateP, res as never); } - // should never happen - throwBadResponse(stateP, res as never); + } + } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK') { + const res = resW as ExcludeRetryableEsError>; + if (Either.isRight(res)) { + return { + ...stateP, + controlState: 'REINDEX_SOURCE_TO_TEMP_READ', + // we're still on the happy path with no transformation failures seen. + corruptDocumentIds: [], + transformErrors: [], + }; + } else { + throwBadResponse(stateP, res); } } else if (stateP.controlState === 'SET_TEMP_WRITE_BLOCK') { const res = resW as ExcludeRetryableEsError>; @@ -611,6 +677,8 @@ export const model = (currentState: State, resW: ResponseType): pitId: res.right.pitId, lastHitSortValue: undefined, hasTransformedDocs: false, + corruptDocumentIds: [], + transformErrors: [], }; } else { throwBadResponse(stateP, res); @@ -626,59 +694,111 @@ export const model = (currentState: State, resW: ResponseType): lastHitSortValue: res.right.lastHitSortValue, }; } else { + // we don't have any more outdated documents and need to either fail or move on to updating the target mappings. + if (stateP.corruptDocumentIds.length > 0 || stateP.transformErrors.length > 0) { + const { corruptDocsReason, transformErrsReason } = extractTransformFailuresReason( + stateP.corruptDocumentIds, + stateP.transformErrors + ); + return { + ...stateP, + controlState: 'FATAL', + reason: `Migrations failed. Reason:${corruptDocsReason}${transformErrsReason}. To allow migrations to proceed, please delete these documents.`, + }; + } else { + // If there are no more results we have transformed all outdated + // documents and we didn't encounter any corrupt documents or transformation errors + // and can proceed to the next step + return { + ...stateP, + controlState: 'OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT', + }; + } + } + } else { + throwBadResponse(stateP, res); + } + } else if (stateP.controlState === 'OUTDATED_DOCUMENTS_TRANSFORM') { + const res = resW as ExcludeRetryableEsError>; + if (Either.isRight(res)) { + // we haven't seen corrupt documents or any transformation errors thus far in the migration + // index the migrated docs + if (stateP.corruptDocumentIds.length === 0 && stateP.transformErrors.length === 0) { + return { + ...stateP, + controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX', + transformedDocs: [...res.right.processedDocs], + hasTransformedDocs: true, + }; + } else { + // We have seen corrupt documents and/or transformation errors + // skip indexing and go straight to reading and transforming more docs return { ...stateP, - controlState: 'OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT', + controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ', }; } } else { - throwBadResponse(stateP, res); + if (isLeftTypeof(res.left, 'documents_transform_failed')) { + // continue to build up any more transformation errors before failing the migration. + return { + ...stateP, + controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ', + corruptDocumentIds: [...stateP.corruptDocumentIds, ...res.left.corruptDocumentIds], + transformErrors: [...stateP.transformErrors, ...res.left.transformErrors], + hasTransformedDocs: false, + }; + } else { + throwBadResponse(stateP, res as never); + } } - } else if (stateP.controlState === 'OUTDATED_DOCUMENTS_REFRESH') { + } else if (stateP.controlState === 'TRANSFORMED_DOCUMENTS_BULK_INDEX') { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { return { ...stateP, - controlState: 'UPDATE_TARGET_MAPPINGS', + controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ', + corruptDocumentIds: [], + transformErrors: [], + hasTransformedDocs: true, }; } else { throwBadResponse(stateP, res); } - } else if (stateP.controlState === 'OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT') { + } else if (stateP.controlState === 'UPDATE_TARGET_MAPPINGS') { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { - const { pitId, hasTransformedDocs, ...state } = stateP; - if (hasTransformedDocs) { - return { - ...state, - controlState: 'OUTDATED_DOCUMENTS_REFRESH', - }; - } return { - ...state, - controlState: 'UPDATE_TARGET_MAPPINGS', + ...stateP, + controlState: 'UPDATE_TARGET_MAPPINGS_WAIT_FOR_TASK', + updateTargetMappingsTaskId: res.right.taskId, }; } else { - throwBadResponse(stateP, res); + throwBadResponse(stateP, res as never); } - } else if (stateP.controlState === 'OUTDATED_DOCUMENTS_TRANSFORM') { + } else if (stateP.controlState === 'OUTDATED_DOCUMENTS_REFRESH') { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { return { ...stateP, - controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ', - hasTransformedDocs: true, + controlState: 'UPDATE_TARGET_MAPPINGS', }; } else { - throwBadResponse(stateP, res as never); + throwBadResponse(stateP, res); } - } else if (stateP.controlState === 'UPDATE_TARGET_MAPPINGS') { + } else if (stateP.controlState === 'OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT') { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { + const { pitId, hasTransformedDocs, ...state } = stateP; + if (hasTransformedDocs) { + return { + ...state, + controlState: 'OUTDATED_DOCUMENTS_REFRESH', + }; + } return { - ...stateP, - controlState: 'UPDATE_TARGET_MAPPINGS_WAIT_FOR_TASK', - updateTargetMappingsTaskId: res.right.taskId, + ...state, + controlState: 'UPDATE_TARGET_MAPPINGS', }; } else { throwBadResponse(stateP, res); diff --git a/src/core/server/saved_objects/migrationsv2/next.ts b/src/core/server/saved_objects/migrationsv2/next.ts index 536c07d6a071d..07ebf80271d48 100644 --- a/src/core/server/saved_objects/migrationsv2/next.ts +++ b/src/core/server/saved_objects/migrationsv2/next.ts @@ -32,6 +32,8 @@ import type { SetTempWriteBlock, WaitForYellowSourceState, TransformRawDocs, + TransformedDocumentsBulkIndex, + ReindexSourceToTempIndexBulk, OutdatedDocumentsSearchOpenPit, OutdatedDocumentsSearchRead, OutdatedDocumentsSearchClosePit, @@ -82,11 +84,12 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra REINDEX_SOURCE_TO_TEMP_CLOSE_PIT: (state: ReindexSourceToTempClosePit) => Actions.closePit(client, state.sourceIndexPitId), REINDEX_SOURCE_TO_TEMP_INDEX: (state: ReindexSourceToTempIndex) => - Actions.transformDocs( + Actions.transformDocs(transformRawDocs, state.outdatedDocuments), + REINDEX_SOURCE_TO_TEMP_INDEX_BULK: (state: ReindexSourceToTempIndexBulk) => + Actions.bulkOverwriteTransformedDocuments( client, - transformRawDocs, - state.outdatedDocuments, state.tempIndex, + state.transformedDocs, /** * Since we don't run a search against the target index, we disable "refresh" to speed up * the migration process. @@ -121,11 +124,12 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra OUTDATED_DOCUMENTS_REFRESH: (state: OutdatedDocumentsRefresh) => Actions.refreshIndex(client, state.targetIndex), OUTDATED_DOCUMENTS_TRANSFORM: (state: OutdatedDocumentsTransform) => - Actions.transformDocs( + Actions.transformDocs(transformRawDocs, state.outdatedDocuments), + TRANSFORMED_DOCUMENTS_BULK_INDEX: (state: TransformedDocumentsBulkIndex) => + Actions.bulkOverwriteTransformedDocuments( client, - transformRawDocs, - state.outdatedDocuments, state.targetIndex, + state.transformedDocs, /** * Since we don't run a search against the target index, we disable "refresh" to speed up * the migration process. diff --git a/src/core/server/saved_objects/migrationsv2/types.ts b/src/core/server/saved_objects/migrationsv2/types.ts index ac807e9d61776..f5800a3cd9570 100644 --- a/src/core/server/saved_objects/migrationsv2/types.ts +++ b/src/core/server/saved_objects/migrationsv2/types.ts @@ -6,12 +6,18 @@ * Side Public License, v 1. */ +import * as TaskEither from 'fp-ts/lib/TaskEither'; import * as Option from 'fp-ts/lib/Option'; import { estypes } from '@elastic/elasticsearch'; import { ControlState } from './state_action_machine'; import { AliasAction } from './actions'; import { IndexMapping } from '../mappings'; import { SavedObjectsRawDoc } from '..'; +import { TransformErrorObjects } from '../migrations/core'; +import { + DocumentsTransformFailed, + DocumentsTransformSuccess, +} from '../migrations/core/migrate_raw_docs'; export type MigrationLogLevel = 'error' | 'info'; @@ -175,6 +181,8 @@ export interface ReindexSourceToTempRead extends PostInitState { readonly controlState: 'REINDEX_SOURCE_TO_TEMP_READ'; readonly sourceIndexPitId: string; readonly lastHitSortValue: number[] | undefined; + readonly corruptDocumentIds: string[]; + readonly transformErrors: TransformErrorObjects[]; } export interface ReindexSourceToTempClosePit extends PostInitState { @@ -187,6 +195,15 @@ export interface ReindexSourceToTempIndex extends PostInitState { readonly outdatedDocuments: SavedObjectsRawDoc[]; readonly sourceIndexPitId: string; readonly lastHitSortValue: number[] | undefined; + readonly corruptDocumentIds: string[]; + readonly transformErrors: TransformErrorObjects[]; +} + +export interface ReindexSourceToTempIndexBulk extends PostInitState { + readonly controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'; + readonly transformedDocs: SavedObjectsRawDoc[]; + readonly sourceIndexPitId: string; + readonly lastHitSortValue: number[] | undefined; } export type SetTempWriteBlock = PostInitState & { @@ -233,6 +250,8 @@ export interface OutdatedDocumentsSearchRead extends PostInitState { readonly pitId: string; readonly lastHitSortValue: number[] | undefined; readonly hasTransformedDocs: boolean; + readonly corruptDocumentIds: string[]; + readonly transformErrors: TransformErrorObjects[]; } export interface OutdatedDocumentsSearchClosePit extends PostInitState { @@ -249,12 +268,24 @@ export interface OutdatedDocumentsRefresh extends PostInitState { } export interface OutdatedDocumentsTransform extends PostInitState { - /** Transform a batch of outdated documents to their latest version and write them to the target index */ + /** Transform a batch of outdated documents to their latest version*/ readonly controlState: 'OUTDATED_DOCUMENTS_TRANSFORM'; readonly pitId: string; readonly outdatedDocuments: SavedObjectsRawDoc[]; readonly lastHitSortValue: number[] | undefined; readonly hasTransformedDocs: boolean; + readonly corruptDocumentIds: string[]; + readonly transformErrors: TransformErrorObjects[]; +} +export interface TransformedDocumentsBulkIndex extends PostInitState { + /** + * Write the up-to-date transformed documents to the target index + */ + readonly controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX'; + readonly transformedDocs: SavedObjectsRawDoc[]; + readonly lastHitSortValue: number[] | undefined; + readonly hasTransformedDocs: boolean; + readonly pitId: string; } export interface MarkVersionIndexReady extends PostInitState { @@ -351,6 +382,7 @@ export type State = | ReindexSourceToTempRead | ReindexSourceToTempClosePit | ReindexSourceToTempIndex + | ReindexSourceToTempIndexBulk | SetTempWriteBlock | CloneTempToSource | UpdateTargetMappingsState @@ -363,6 +395,7 @@ export type State = | OutdatedDocumentsRefresh | MarkVersionIndexReady | MarkVersionIndexReadyConflict + | TransformedDocumentsBulkIndex | LegacyCreateReindexTargetState | LegacySetWriteBlockState | LegacyReindexState @@ -376,4 +409,6 @@ export type AllControlStates = State['controlState']; */ export type AllActionStates = Exclude; -export type TransformRawDocs = (rawDocs: SavedObjectsRawDoc[]) => Promise; +export type TransformRawDocs = ( + rawDocs: SavedObjectsRawDoc[] +) => TaskEither.TaskEither;