diff --git a/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/apis/bulk_update.test.ts b/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/apis/bulk_update.test.ts index d24c11f190696..60deaa64e3e63 100644 --- a/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/apis/bulk_update.test.ts +++ b/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/apis/bulk_update.test.ts @@ -75,6 +75,17 @@ describe('SavedObjectsRepository', () => { return expect.toBeDocumentWithoutError(type, id); }; + const expectMigrationArgs = (args: unknown, contains = true, n = 1) => { + const obj = contains ? expect.objectContaining(args) : expect.not.objectContaining(args); + expect(migrator.migrateDocument).toHaveBeenNthCalledWith( + n, + obj, + expect.objectContaining({ + allowDowngrade: expect.any(Boolean), + }) + ); + }; + beforeEach(() => { pointInTimeFinderMock.mockClear(); client = elasticsearchClientMock.createElasticsearchClient(); @@ -121,7 +132,7 @@ describe('SavedObjectsRepository', () => { const originId = 'some-origin-id'; const namespace = 'foo-namespace'; - // bulk create calls have two objects for each source -- the action, and the source + // bulk index calls have two objects for each source -- the action, and the source const expectClientCallArgsAction = ( objects: TypeIdTuple[], { @@ -153,14 +164,26 @@ describe('SavedObjectsRepository', () => { ); }; - const expectObjArgs = ({ type, attributes }: { type: string; attributes: unknown }) => [ - expect.any(Object), + const expectObjArgs = ( { - doc: expect.objectContaining({ - [type]: attributes, - ...mockTimestampFields, - }), + type, + attributes, + references, + }: { + type: string; + attributes: unknown; + references?: SavedObjectReference[]; }, + overrides: Record = {} + ) => [ + expect.any(Object), + expect.objectContaining({ + [type]: attributes, + references, + type, + ...overrides, + ...mockTimestampFields, + }), ]; describe('client calls', () => { @@ -169,13 +192,14 @@ describe('SavedObjectsRepository', () => { expect(client.bulk).toHaveBeenCalled(); }); - it(`should use the ES mget action before bulk action for any types that are multi-namespace`, async () => { + it(`should use the ES mget action before bulk action for any types that are valid`, async () => { const objects = [obj1, { ...obj2, type: MULTI_NAMESPACE_ISOLATED_TYPE }]; await bulkUpdateSuccess(client, repository, registry, objects); expect(client.bulk).toHaveBeenCalled(); expect(client.mget).toHaveBeenCalled(); const docs = [ + expect.objectContaining({ _id: `${obj1.type}:${obj1.id}` }), expect.objectContaining({ _id: `${MULTI_NAMESPACE_ISOLATED_TYPE}:${obj2.id}` }), ]; expect(client.mget).toHaveBeenCalledWith( @@ -186,21 +210,14 @@ describe('SavedObjectsRepository', () => { it(`formats the ES request`, async () => { await bulkUpdateSuccess(client, repository, registry, [obj1, obj2]); - const body = [...expectObjArgs(obj1), ...expectObjArgs(obj2)]; - expect(client.bulk).toHaveBeenCalledWith( - expect.objectContaining({ body }), - expect.anything() - ); + // expect client.bulk call args should include the whole doc + expectClientCallArgsAction([obj1, obj2], { method: 'index' }); }); it(`formats the ES request for any types that are multi-namespace`, async () => { const _obj2 = { ...obj2, type: MULTI_NAMESPACE_ISOLATED_TYPE }; await bulkUpdateSuccess(client, repository, registry, [obj1, _obj2]); - const body = [...expectObjArgs(obj1), ...expectObjArgs(_obj2)]; - expect(client.bulk).toHaveBeenCalledWith( - expect.objectContaining({ body }), - expect.anything() - ); + expectClientCallArgsAction([obj1, _obj2], { method: 'index' }); }); it(`doesnt call Elasticsearch if there are no valid objects to update`, async () => { @@ -211,8 +228,10 @@ describe('SavedObjectsRepository', () => { it(`defaults to no references`, async () => { await bulkUpdateSuccess(client, repository, registry, [obj1, obj2]); - const expected = { doc: expect.not.objectContaining({ references: expect.anything() }) }; - const body = [expect.any(Object), expected, expect.any(Object), expected]; + const body = [ + ...expectObjArgs({ ...obj1, references: [] }), + ...expectObjArgs({ ...obj2, references: [] }), + ]; expect(client.bulk).toHaveBeenCalledWith( expect.objectContaining({ body }), expect.anything() @@ -223,13 +242,16 @@ describe('SavedObjectsRepository', () => { const test = async (references: SavedObjectReference[]) => { const objects = [obj1, obj2].map((obj) => ({ ...obj, references })); await bulkUpdateSuccess(client, repository, registry, objects); - const expected = { doc: expect.objectContaining({ references }) }; - const body = [expect.any(Object), expected, expect.any(Object), expected]; + const body = [ + ...expectObjArgs({ ...obj1, references }), + ...expectObjArgs({ ...obj2, references }), + ]; expect(client.bulk).toHaveBeenCalledWith( expect.objectContaining({ body }), expect.anything() ); client.bulk.mockClear(); + client.mget.mockClear(); }; await test(references); await test([{ type: 'type', id: 'id', name: 'some ref' }]); @@ -238,15 +260,18 @@ describe('SavedObjectsRepository', () => { it(`doesn't accept custom references if not an array`, async () => { const test = async (references: unknown) => { - const objects = [obj1, obj2]; // .map((obj) => ({ ...obj })); + const objects = [obj1, obj2]; await bulkUpdateSuccess(client, repository, registry, objects); - const expected = { doc: expect.not.objectContaining({ references: expect.anything() }) }; - const body = [expect.any(Object), expected, expect.any(Object), expected]; + const body = [ + ...expectObjArgs({ ...obj1, references: expect.not.arrayContaining([references]) }), + ...expectObjArgs({ ...obj2, references: expect.not.arrayContaining([references]) }), + ]; expect(client.bulk).toHaveBeenCalledWith( expect.objectContaining({ body }), expect.anything() ); client.bulk.mockClear(); + client.mget.mockClear(); }; await test('string'); await test(123); @@ -265,7 +290,7 @@ describe('SavedObjectsRepository', () => { it(`defaults to no version for types that are not multi-namespace`, async () => { const objects = [obj1, { ...obj2, type: NAMESPACE_AGNOSTIC_TYPE }]; await bulkUpdateSuccess(client, repository, registry, objects); - expectClientCallArgsAction(objects, { method: 'update' }); + expectClientCallArgsAction(objects, { method: 'index' }); }); it(`accepts version`, async () => { @@ -277,13 +302,13 @@ describe('SavedObjectsRepository', () => { ]; await bulkUpdateSuccess(client, repository, registry, objects); const overrides = { if_seq_no: 100, if_primary_term: 200 }; - expectClientCallArgsAction(objects, { method: 'update', overrides }); + expectClientCallArgsAction(objects, { method: 'index', overrides }); }); it(`prepends namespace to the id when providing namespace for single-namespace type`, async () => { const getId = (type: string, id: string) => `${namespace}:${type}:${id}`; // test that the raw document ID equals this (e.g., has a namespace prefix) await bulkUpdateSuccess(client, repository, registry, [obj1, obj2], { namespace }); - expectClientCallArgsAction([obj1, obj2], { method: 'update', getId }); + expectClientCallArgsAction([obj1, obj2], { method: 'index', getId }); jest.clearAllMocks(); // test again with object namespace string that supersedes the operation's namespace ID @@ -291,13 +316,13 @@ describe('SavedObjectsRepository', () => { { ...obj1, namespace }, { ...obj2, namespace }, ]); - expectClientCallArgsAction([obj1, obj2], { method: 'update', getId }); + expectClientCallArgsAction([obj1, obj2], { method: 'index', getId }); }); it(`doesn't prepend namespace to the id when providing no namespace for single-namespace type`, async () => { const getId = (type: string, id: string) => `${type}:${id}`; // test that the raw document ID equals this (e.g., does not have a namespace prefix) await bulkUpdateSuccess(client, repository, registry, [obj1, obj2]); - expectClientCallArgsAction([obj1, obj2], { method: 'update', getId }); + expectClientCallArgsAction([obj1, obj2], { method: 'index', getId }); jest.clearAllMocks(); // test again with object namespace string that supersedes the operation's namespace ID @@ -311,7 +336,7 @@ describe('SavedObjectsRepository', () => { ], { namespace } ); - expectClientCallArgsAction([obj1, obj2], { method: 'update', getId }); + expectClientCallArgsAction([obj1, obj2], { method: 'index', getId }); }); it(`normalizes options.namespace from 'default' to undefined`, async () => { @@ -319,7 +344,7 @@ describe('SavedObjectsRepository', () => { await bulkUpdateSuccess(client, repository, registry, [obj1, obj2], { namespace: 'default', }); - expectClientCallArgsAction([obj1, obj2], { method: 'update', getId }); + expectClientCallArgsAction([obj1, obj2], { method: 'index', getId }); }); it(`doesn't prepend namespace to the id when not using single-namespace type`, async () => { @@ -328,18 +353,20 @@ describe('SavedObjectsRepository', () => { const _obj2 = { ...obj2, type: MULTI_NAMESPACE_ISOLATED_TYPE }; await bulkUpdateSuccess(client, repository, registry, [_obj1], { namespace }); - expectClientCallArgsAction([_obj1], { method: 'update', getId }); + expectClientCallArgsAction([_obj1], { method: 'index', getId }); client.bulk.mockClear(); + client.mget.mockClear(); await bulkUpdateSuccess(client, repository, registry, [_obj2], { namespace }); - expectClientCallArgsAction([_obj2], { method: 'update', getId }); + expectClientCallArgsAction([_obj2], { method: 'index', getId }); jest.clearAllMocks(); // test again with object namespace string that supersedes the operation's namespace ID await bulkUpdateSuccess(client, repository, registry, [{ ..._obj1, namespace }]); - expectClientCallArgsAction([_obj1], { method: 'update', getId }); + expectClientCallArgsAction([_obj1], { method: 'index', getId }); client.bulk.mockClear(); + client.mget.mockClear(); await bulkUpdateSuccess(client, repository, registry, [{ ..._obj2, namespace }]); - expectClientCallArgsAction([_obj2], { method: 'update', getId }); + expectClientCallArgsAction([_obj2], { method: 'index', getId }); }); }); @@ -359,51 +386,71 @@ describe('SavedObjectsRepository', () => { isBulkError: boolean, expectedErrorResult: ExpectedErrorResult ) => { - const objects = [obj1, obj, obj2]; - const mockResponse = getMockBulkUpdateResponse(registry, objects); + const objects = [obj1, obj2, obj]; + + const mockedMgetResponse = getMockMgetResponse(registry, [obj1, obj2, obj]); + client.bulk.mockClear(); + client.mget.mockClear(); + client.mget.mockResponseOnce(mockedMgetResponse); + + const mockBulkIndexResponse = getMockBulkUpdateResponse(registry, objects); if (isBulkError) { - // mock the bulk error for only the second object + // mock the bulk error for only the third object + mockGetBulkOperationError.mockReturnValueOnce(undefined); mockGetBulkOperationError.mockReturnValueOnce(undefined); mockGetBulkOperationError.mockReturnValueOnce(expectedErrorResult.error as Payload); } - client.bulk.mockResponseOnce(mockResponse); + client.bulk.mockResponseOnce(mockBulkIndexResponse); const result = await repository.bulkUpdate(objects); + + expect(client.mget).toHaveBeenCalled(); expect(client.bulk).toHaveBeenCalled(); - const objCall = isBulkError ? expectObjArgs(obj) : []; - const body = [...expectObjArgs(obj1), ...objCall, ...expectObjArgs(obj2)]; - expect(client.bulk).toHaveBeenCalledWith( - expect.objectContaining({ body }), - expect.anything() - ); + + const expectClientCallObjects = isBulkError ? [obj1, obj2, obj] : [obj1, obj2]; + expectClientCallArgsAction(expectClientCallObjects, { method: 'index' }); + expect(result).toEqual({ - saved_objects: [expectSuccess(obj1), expectedErrorResult, expectSuccess(obj2)], + saved_objects: [expectSuccess(obj1), expectSuccess(obj2), expectedErrorResult], }); }; const bulkUpdateMultiError = async ( - [obj1, _obj, obj2]: SavedObjectsBulkUpdateObject[], + [obj1, obj2, _obj]: SavedObjectsBulkUpdateObject[], options: SavedObjectsBulkUpdateOptions | undefined, mgetResponse: estypes.MgetResponse, mgetOptions?: { statusCode?: number } ) => { + client.bulk.mockClear(); + client.mget.mockClear(); + // we only need to mock the response once. A 404 status code will apply to the response for all client.mget.mockResponseOnce(mgetResponse, { statusCode: mgetOptions?.statusCode }); - const bulkResponse = getMockBulkUpdateResponse(registry, [obj1, obj2], { namespace }); - client.bulk.mockResponseOnce(bulkResponse); + const mockBulkIndexResponse = getMockBulkUpdateResponse(registry, [obj1, obj2], { + namespace, + }); + client.bulk.mockResponseOnce(mockBulkIndexResponse); + + const result = await repository.bulkUpdate([obj1, obj2, _obj], options); - const result = await repository.bulkUpdate([obj1, _obj, obj2], options); - expect(client.bulk).toHaveBeenCalled(); expect(client.mget).toHaveBeenCalled(); - const body = [...expectObjArgs(obj1), ...expectObjArgs(obj2)]; - expect(client.bulk).toHaveBeenCalledWith( - expect.objectContaining({ body }), - expect.anything() - ); - - expect(result).toEqual({ - saved_objects: [expectSuccess(obj1), expectErrorNotFound(_obj), expectSuccess(obj2)], - }); + if (mgetOptions?.statusCode === 404) { + expect(client.bulk).not.toHaveBeenCalled(); + expect(result).toEqual({ + saved_objects: [ + expectErrorNotFound(obj1), + expectErrorNotFound(obj2), + expectErrorNotFound(_obj), + ], + }); + } else { + expect(client.bulk).toHaveBeenCalled(); + expectClientCallArgsAction([obj1, obj2], { method: 'index' }); + + expect(result).toEqual({ + saved_objects: [expectSuccess(obj1), expectSuccess(obj2), expectErrorNotFound(_obj)], + }); + } }; it(`throws when options.namespace is '*'`, async () => { @@ -433,22 +480,22 @@ describe('SavedObjectsRepository', () => { it(`returns error when ES is unable to find the document (mget)`, async () => { const _obj = { ...obj, type: MULTI_NAMESPACE_ISOLATED_TYPE, found: false }; - const mgetResponse = getMockMgetResponse(registry, [_obj]); - await bulkUpdateMultiError([obj1, _obj, obj2], undefined, mgetResponse); + const mgetResponse = getMockMgetResponse(registry, [obj1, obj2, _obj]); + await bulkUpdateMultiError([obj1, obj2, _obj], undefined, mgetResponse); }); it(`returns error when ES is unable to find the index (mget)`, async () => { const _obj = { ...obj, type: MULTI_NAMESPACE_ISOLATED_TYPE }; - const mgetResponse = getMockMgetResponse(registry, [_obj]); - await bulkUpdateMultiError([obj1, _obj, obj2], { namespace }, mgetResponse, { + const mgetResponse = getMockMgetResponse(registry, [obj1, obj2, _obj]); + await bulkUpdateMultiError([obj1, obj2, _obj], { namespace }, mgetResponse, { statusCode: 404, }); }); it(`returns error when there is a conflict with an existing multi-namespace saved object (mget)`, async () => { const _obj = { ...obj, type: MULTI_NAMESPACE_ISOLATED_TYPE }; - const mgetResponse = getMockMgetResponse(registry, [_obj], 'bar-namespace'); - await bulkUpdateMultiError([obj1, _obj, obj2], { namespace }, mgetResponse); + const mgetResponse = getMockMgetResponse(registry, [obj1, obj2, _obj], 'bar-namespace'); + await bulkUpdateMultiError([obj1, obj2, _obj], { namespace }, mgetResponse); }); it(`returns bulk error`, async () => { @@ -460,6 +507,52 @@ describe('SavedObjectsRepository', () => { await bulkUpdateError(obj, true, expectedErrorResult); }); }); + describe('migration', () => { + it('migrates the fetched documents from Mget', async () => { + const modifiedObj2 = { ...obj2, coreMigrationVersion: '8.0.0' }; + const objects = [modifiedObj2]; + migrator.migrateDocument.mockImplementationOnce((doc) => ({ ...doc, migrated: true })); + + await bulkUpdateSuccess(client, repository, registry, objects); + expect(migrator.migrateDocument).toHaveBeenCalledTimes(2); + expectMigrationArgs({ + id: modifiedObj2.id, + type: modifiedObj2.type, + }); + }); + + it('migrates namespace agnostic and multinamespace object documents', async () => { + const modifiedObj2 = { + ...obj2, + coreMigrationVersion: '8.0.0', + type: MULTI_NAMESPACE_ISOLATED_TYPE, + namespace: 'default', + }; + const modifiedObj1 = { ...obj1, type: NAMESPACE_AGNOSTIC_TYPE }; + const objects = [modifiedObj2, modifiedObj1]; + migrator.migrateDocument.mockImplementationOnce((doc) => ({ ...doc, migrated: true })); + + await bulkUpdateSuccess(client, repository, registry, objects, { namespace }); + + expect(migrator.migrateDocument).toHaveBeenCalledTimes(4); + expectMigrationArgs( + { + id: modifiedObj2.id, + type: modifiedObj2.type, + }, + true, + 1 + ); + expectMigrationArgs( + { + id: modifiedObj1.id, + type: modifiedObj1.type, + }, + true, + 2 + ); + }); + }); describe('returns', () => { it(`formats the ES response`, async () => { @@ -483,14 +576,24 @@ describe('SavedObjectsRepository', () => { id: 'three', attributes: {}, }; - const objects = [obj1, obj, obj2]; - const mockResponse = getMockBulkUpdateResponse(registry, objects); - client.bulk.mockResponseOnce(mockResponse); + const objects = [obj1, obj2, obj]; + const mockedMgetResponse = getMockMgetResponse(registry, [obj1, obj2, obj]); + client.bulk.mockClear(); + client.mget.mockClear(); + client.mget.mockResponseOnce(mockedMgetResponse); + const mockBulkIndexResponse = getMockBulkUpdateResponse(registry, objects); + client.bulk.mockResponseOnce(mockBulkIndexResponse); const result = await repository.bulkUpdate(objects); - expect(client.bulk).toHaveBeenCalledTimes(1); + + expect(client.mget).toHaveBeenCalled(); + expect(client.bulk).toHaveBeenCalled(); + + const expectClientCallObjects = [obj1, obj2]; + expectClientCallArgsAction(expectClientCallObjects, { method: 'index' }); + expect(result).toEqual({ - saved_objects: [expectUpdateResult(obj1), expectError(obj), expectUpdateResult(obj2)], + saved_objects: [expectUpdateResult(obj1), expectUpdateResult(obj2), expectError(obj)], }); }); @@ -515,14 +618,25 @@ describe('SavedObjectsRepository', () => { id: 'three', attributes: {}, }; - const result = await bulkUpdateSuccess( - client, - repository, - registry, - [obj1, obj], - {}, - originId - ); + client.bulk.mockClear(); + client.mget.mockClear(); + const objects = [ + { ...obj1, originId }, + { ...obj, originId }, + ]; + const mockedMgetResponse = getMockMgetResponse(registry, objects); + + client.mget.mockResponseOnce(mockedMgetResponse); + + const mockBulkIndexResponse = getMockBulkUpdateResponse(registry, objects, {}, originId); + client.bulk.mockResponseOnce(mockBulkIndexResponse); + const result = await repository.bulkUpdate(objects); + + expect(client.mget).toHaveBeenCalled(); + expect(client.bulk).toHaveBeenCalled(); + + const expectClientCallObjects = objects; + expectClientCallArgsAction(expectClientCallObjects, { method: 'index' }); expect(result).toEqual({ saved_objects: [ expect.objectContaining({ originId }), diff --git a/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/apis/bulk_update.ts b/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/apis/bulk_update.ts index b9c0f10a9021f..9c119ff86e7dd 100644 --- a/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/apis/bulk_update.ts +++ b/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/apis/bulk_update.ts @@ -14,6 +14,8 @@ import { DecoratedError, AuthorizeUpdateObject, SavedObjectsRawDoc, + SavedObjectsRawDocSource, + SavedObjectSanitizedDoc, } from '@kbn/core-saved-objects-server'; import { ALL_NAMESPACES_STRING, SavedObjectsUtils } from '@kbn/core-saved-objects-utils-server'; import { encodeVersion } from '@kbn/core-saved-objects-base-server-internal'; @@ -35,6 +37,8 @@ import { isLeft, isRight, rawDocExistsInNamespace, + getSavedObjectFromSource, + mergeForUpdate, } from './utils'; import { ApiExecutionContext } from './types'; @@ -43,32 +47,51 @@ export interface PerformUpdateParams { options: SavedObjectsBulkUpdateOptions; } +type DocumentToSave = Record; +type ExpectedBulkGetResult = Either< + { type: string; id: string; error: Payload }, + { + type: string; + id: string; + version?: string; + documentToSave: DocumentToSave; + objectNamespace?: string; + esRequestIndex: number; + migrationVersionCompatibility?: 'raw' | 'compatible'; + } +>; + +type ExpectedBulkUpdateResult = Either< + { type: string; id: string; error: Payload }, + { + type: string; + id: string; + namespaces?: string[]; + documentToSave: DocumentToSave; + esRequestIndex: number; + rawMigratedUpdatedDoc: SavedObjectsRawDoc; + } +>; + export const performBulkUpdate = async ( { objects, options }: PerformUpdateParams, { registry, helpers, allowedTypes, client, serializer, extensions = {} }: ApiExecutionContext ): Promise> => { - const { common: commonHelper, encryption: encryptionHelper } = helpers; + const { + common: commonHelper, + encryption: encryptionHelper, + migration: migrationHelper, + } = helpers; const { securityExtension } = extensions; - + const { migrationVersionCompatibility } = options; const namespace = commonHelper.getCurrentNamespace(options.namespace); const time = getCurrentTime(); let bulkGetRequestIndexCounter = 0; - type DocumentToSave = Record; - type ExpectedBulkGetResult = Either< - { type: string; id: string; error: Payload }, - { - type: string; - id: string; - version?: string; - documentToSave: DocumentToSave; - objectNamespace?: string; - esRequestIndex?: number; - } - >; const expectedBulkGetResults = objects.map((object) => { const { type, id, attributes, references, version, namespace: objectNamespace } = object; let error: DecoratedError | undefined; + if (!allowedTypes.includes(type)) { error = SavedObjectsErrorHelpers.createGenericNotFoundError(type, id); } else { @@ -91,21 +114,19 @@ export const performBulkUpdate = async ( ...(Array.isArray(references) && { references }), }; - const requiresNamespacesCheck = registry.isMultiNamespace(object.type); - return right({ type, id, version, documentToSave, objectNamespace, - ...(requiresNamespacesCheck && { esRequestIndex: bulkGetRequestIndexCounter++ }), + esRequestIndex: bulkGetRequestIndexCounter++, + migrationVersionCompatibility, }); }); const validObjects = expectedBulkGetResults.filter(isRight); if (validObjects.length === 0) { - // We only have error results; return early to avoid potentially trying authZ checks for 0 types which would result in an exception. return { // Technically the returned array should only contain SavedObject results, but for errors this is not true (we cast to 'any' below) saved_objects: expectedBulkGetResults.map>( @@ -117,20 +138,25 @@ export const performBulkUpdate = async ( // `objectNamespace` is a namespace string, while `namespace` is a namespace ID. // The object namespace string, if defined, will supersede the operation's namespace ID. const namespaceString = SavedObjectsUtils.namespaceIdToString(namespace); + const getNamespaceId = (objectNamespace?: string) => objectNamespace !== undefined ? SavedObjectsUtils.namespaceStringToId(objectNamespace) : namespace; + const getNamespaceString = (objectNamespace?: string) => objectNamespace ?? namespaceString; - const bulkGetDocs = validObjects - .filter(({ value }) => value.esRequestIndex !== undefined) - .map(({ value: { type, id, objectNamespace } }) => ({ - _id: serializer.generateRawId(getNamespaceId(objectNamespace), type, id), - _index: commonHelper.getIndexForType(type), - _source: ['type', 'namespaces'], - })); + + const bulkGetDocs = validObjects.map(({ value: { type, id, objectNamespace } }) => ({ + _id: serializer.generateRawId(getNamespaceId(objectNamespace), type, id), + _index: commonHelper.getIndexForType(type), + _source: true, + })); + const bulkGetResponse = bulkGetDocs.length - ? await client.mget({ body: { docs: bulkGetDocs } }, { ignore: [404], meta: true }) + ? await client.mget( + { body: { docs: bulkGetDocs } }, + { ignore: [404], meta: true } + ) : undefined; // fail fast if we can't verify a 404 response is from Elasticsearch if ( @@ -145,14 +171,24 @@ export const performBulkUpdate = async ( const authObjects: AuthorizeUpdateObject[] = validObjects.map((element) => { const { type, id, objectNamespace, esRequestIndex: index } = element.value; - const preflightResult = index !== undefined ? bulkGetResponse?.body.docs[index] : undefined; - return { - type, - id, - objectNamespace, - // @ts-expect-error MultiGetHit._source is optional - existingNamespaces: preflightResult?._source?.namespaces ?? [], - }; + const preflightResult = bulkGetResponse!.body.docs[index]; + + if (registry.isMultiNamespace(type)) { + return { + type, + id, + objectNamespace, + // @ts-expect-error MultiGetHit._source is optional + existingNamespaces: preflightResult._source?.namespaces ?? [], + }; + } else { + return { + type, + id, + objectNamespace, + existingNamespaces: [], + }; + } }); const authorizationResult = await securityExtension?.authorizeBulkUpdate({ @@ -162,16 +198,7 @@ export const performBulkUpdate = async ( let bulkUpdateRequestIndexCounter = 0; const bulkUpdateParams: object[] = []; - type ExpectedBulkUpdateResult = Either< - { type: string; id: string; error: Payload }, - { - type: string; - id: string; - namespaces: string[]; - documentToSave: DocumentToSave; - esRequestIndex: number; - } - >; + const expectedBulkUpdateResults = await Promise.all( expectedBulkGetResults.map>(async (expectedBulkGetResult) => { if (isLeft(expectedBulkGetResult)) { @@ -181,67 +208,105 @@ export const performBulkUpdate = async ( const { esRequestIndex, id, type, version, documentToSave, objectNamespace } = expectedBulkGetResult.value; - let namespaces; - let versionProperties; - if (esRequestIndex !== undefined) { - const indexFound = bulkGetResponse?.statusCode !== 404; - const actualResult = indexFound ? bulkGetResponse?.body.docs[esRequestIndex] : undefined; - const docFound = indexFound && isMgetDoc(actualResult) && actualResult.found; - if ( - !docFound || + let namespaces: string[] | undefined; + const versionProperties = getExpectedVersionProperties(version); + const indexFound = bulkGetResponse?.statusCode !== 404; + const actualResult = indexFound ? bulkGetResponse?.body.docs[esRequestIndex] : undefined; + const docFound = indexFound && isMgetDoc(actualResult) && actualResult.found; + const isMultiNS = registry.isMultiNamespace(type); + + if ( + !docFound || + (isMultiNS && !rawDocExistsInNamespace( registry, actualResult as SavedObjectsRawDoc, getNamespaceId(objectNamespace) - ) - ) { - return left({ - id, - type, - error: errorContent(SavedObjectsErrorHelpers.createGenericNotFoundError(type, id)), - }); - } + )) + ) { + return left({ + id, + type, + error: errorContent(SavedObjectsErrorHelpers.createGenericNotFoundError(type, id)), + }); + } + + if (isMultiNS) { // @ts-expect-error MultiGetHit is incorrectly missing _id, _source namespaces = actualResult!._source.namespaces ?? [ // @ts-expect-error MultiGetHit is incorrectly missing _id, _source SavedObjectsUtils.namespaceIdToString(actualResult!._source.namespace), ]; - versionProperties = getExpectedVersionProperties(version); - } else { - if (registry.isSingleNamespace(type)) { - // if `objectNamespace` is undefined, fall back to `options.namespace` - namespaces = [getNamespaceString(objectNamespace)]; - } - versionProperties = getExpectedVersionProperties(version); + } else if (registry.isSingleNamespace(type)) { + // if `objectNamespace` is undefined, fall back to `options.namespace` + namespaces = [getNamespaceString(objectNamespace)]; + } + + const document = getSavedObjectFromSource( + registry, + type, + id, + actualResult as SavedObjectsRawDoc, + { migrationVersionCompatibility } + ); + + let migrated: SavedObject; + try { + migrated = migrationHelper.migrateStorageDocument(document) as SavedObject; + } catch (migrateStorageDocError) { + throw SavedObjectsErrorHelpers.decorateGeneralError( + migrateStorageDocError, + 'Failed to migrate document to the latest version.' + ); } + const typeDefinition = registry.getType(type)!; + const updatedAttributes = mergeForUpdate({ + targetAttributes: { + ...migrated!.attributes, + }, + updatedAttributes: await encryptionHelper.optionallyEncryptAttributes( + type, + id, + objectNamespace || namespace, + documentToSave[type] + ), + typeMappings: typeDefinition.mappings, + }); + + const migratedUpdatedSavedObjectDoc = migrationHelper.migrateInputDocument({ + ...migrated!, + id, + type, + namespace, + namespaces, + attributes: updatedAttributes, + updated_at: time, + ...(Array.isArray(documentToSave.references) && { references: documentToSave.references }), + }); + const updatedMigratedDocumentToSave = serializer.savedObjectToRaw( + migratedUpdatedSavedObjectDoc as SavedObjectSanitizedDoc + ); + const expectedResult = { type, id, namespaces, esRequestIndex: bulkUpdateRequestIndexCounter++, documentToSave: expectedBulkGetResult.value.documentToSave, + rawMigratedUpdatedDoc: updatedMigratedDocumentToSave, + migrationVersionCompatibility, }; bulkUpdateParams.push( { - update: { + index: { _id: serializer.generateRawId(getNamespaceId(objectNamespace), type, id), _index: commonHelper.getIndexForType(type), ...versionProperties, }, }, - { - doc: { - ...documentToSave, - [type]: await encryptionHelper.optionallyEncryptAttributes( - type, - id, - objectNamespace || namespace, - documentToSave[type] - ), - }, - } + updatedMigratedDocumentToSave._source ); return right(expectedResult); @@ -264,7 +329,8 @@ export const performBulkUpdate = async ( return expectedResult.value as any; } - const { type, id, namespaces, documentToSave, esRequestIndex } = expectedResult.value; + const { type, id, namespaces, documentToSave, esRequestIndex, rawMigratedUpdatedDoc } = + expectedResult.value; const response = bulkUpdateResponse?.items[esRequestIndex] ?? {}; const rawResponse = Object.values(response)[0] as any; @@ -273,14 +339,12 @@ export const performBulkUpdate = async ( return { type, id, error }; } - // When a bulk update operation is completed, any fields specified in `_sourceIncludes` will be found in the "get" value of the - // returned object. We need to retrieve the `originId` if it exists so we can return it to the consumer. - const { _seq_no: seqNo, _primary_term: primaryTerm, get } = rawResponse; + const { _seq_no: seqNo, _primary_term: primaryTerm } = rawResponse; // eslint-disable-next-line @typescript-eslint/naming-convention const { [type]: attributes, references, updated_at } = documentToSave; - const { originId } = get._source; + const { originId } = rawMigratedUpdatedDoc._source; return { id, type, diff --git a/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/apis/update.ts b/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/apis/update.ts index fd9c587502d7b..61f9cb4cfdb27 100644 --- a/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/apis/update.ts +++ b/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/apis/update.ts @@ -106,7 +106,7 @@ export const executeUpdate = async ( preflightDocResult, }); - const existingNamespaces = preflightDocNSResult?.savedObjectNamespaces ?? []; + const existingNamespaces = preflightDocNSResult.savedObjectNamespaces ?? []; const authorizationResult = await securityExtension?.authorizeUpdate({ namespace, object: { type, id, existingNamespaces }, diff --git a/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/repository.spaces_extension.test.ts b/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/repository.spaces_extension.test.ts index 29983177adc99..82a7d2930f8d5 100644 --- a/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/repository.spaces_extension.test.ts +++ b/packages/core/saved-objects/core-saved-objects-api-server-internal/src/lib/repository.spaces_extension.test.ts @@ -696,26 +696,23 @@ describe('SavedObjectsRepository Spaces Extension', () => { expect.objectContaining({ body: expect.arrayContaining([ expect.objectContaining({ - update: expect.objectContaining({ + index: expect.objectContaining({ _id: `${ currentSpace.expectedNamespace ? `${currentSpace.expectedNamespace}:` : '' }${obj1.type}:${obj1.id}`, }), }), expect.objectContaining({ - doc: expect.objectContaining({ - config: obj1.attributes, - }), + config: obj1.attributes, }), + expect.objectContaining({ - update: expect.objectContaining({ + index: expect.objectContaining({ _id: `${obj2.type}:${obj2.id}`, }), }), expect.objectContaining({ - doc: expect.objectContaining({ - multiNamespaceType: obj2.attributes, - }), + multiNamespaceType: obj2.attributes, }), ]), }), diff --git a/packages/core/saved-objects/core-saved-objects-api-server-internal/src/test_helpers/repository.test.common.ts b/packages/core/saved-objects/core-saved-objects-api-server-internal/src/test_helpers/repository.test.common.ts index 29c00e9d41ac1..d3a31a905de5c 100644 --- a/packages/core/saved-objects/core-saved-objects-api-server-internal/src/test_helpers/repository.test.common.ts +++ b/packages/core/saved-objects/core-saved-objects-api-server-internal/src/test_helpers/repository.test.common.ts @@ -469,7 +469,9 @@ export const getMockGetResponse = ( export const getMockMgetResponse = ( registry: SavedObjectTypeRegistry, - objects: Array, + objects: Array< + TypeIdTuple & { found?: boolean; initialNamespaces?: string[]; originId?: string } + >, namespace?: string ) => ({ @@ -649,10 +651,10 @@ export const getMockBulkUpdateResponse = ( objects: TypeIdTuple[], options?: SavedObjectsBulkUpdateOptions, originId?: string -) => - ({ +) => { + return { items: objects.map(({ type, id }) => ({ - update: { + index: { _id: `${ registry.isSingleNamespace(type) && options?.namespace ? `${options?.namespace}:` : '' }${type}:${id}`, @@ -667,7 +669,8 @@ export const getMockBulkUpdateResponse = ( result: 'updated', }, })), - } as estypes.BulkResponse); + } as estypes.BulkResponse; +}; export const bulkUpdateSuccess = async ( client: ElasticsearchClientMock, @@ -678,19 +681,26 @@ export const bulkUpdateSuccess = async ( originId?: string, multiNamespaceSpace?: string // the space for multi namespace objects returned by mock mget (this is only needed for space ext testing) ) => { - const multiNamespaceObjects = objects.filter(({ type }) => registry.isMultiNamespace(type)); - if (multiNamespaceObjects?.length) { - const response = getMockMgetResponse( - registry, - multiNamespaceObjects, - multiNamespaceSpace ?? options?.namespace - ); - client.mget.mockResponseOnce(response); + let mockedMgetResponse; + const validObjects = objects.filter(({ type }) => registry.getType(type) !== undefined); + const multiNamespaceObjects = validObjects.filter(({ type }) => registry.isMultiNamespace(type)); + + if (validObjects?.length) { + if (multiNamespaceObjects.length > 0) { + mockedMgetResponse = getMockMgetResponse( + registry, + validObjects, + multiNamespaceSpace ?? options?.namespace + ); + } else { + mockedMgetResponse = getMockMgetResponse(registry, validObjects); + } + client.mget.mockResponseOnce(mockedMgetResponse); } const response = getMockBulkUpdateResponse(registry, objects, options, originId); client.bulk.mockResponseOnce(response); const result = await repository.bulkUpdate(objects, options); - expect(client.mget).toHaveBeenCalledTimes(multiNamespaceObjects?.length ? 1 : 0); + expect(client.mget).toHaveBeenCalledTimes(validObjects?.length ? 1 : 0); return result; }; diff --git a/packages/core/saved-objects/core-saved-objects-api-server/src/apis/bulk_update.ts b/packages/core/saved-objects/core-saved-objects-api-server/src/apis/bulk_update.ts index 6d10aee397b2f..49bc8d769a1d6 100644 --- a/packages/core/saved-objects/core-saved-objects-api-server/src/apis/bulk_update.ts +++ b/packages/core/saved-objects/core-saved-objects-api-server/src/apis/bulk_update.ts @@ -39,6 +39,8 @@ export interface SavedObjectsBulkUpdateObject export interface SavedObjectsBulkUpdateOptions extends SavedObjectsBaseOptions { /** The Elasticsearch Refresh setting for this operation */ refresh?: MutatingOperationRefreshSetting; + /** {@link SavedObjectsRawDocParseOptions.migrationVersionCompatibility} */ + migrationVersionCompatibility?: 'compatible' | 'raw'; } /** diff --git a/src/core/server/integration_tests/saved_objects/service/lib/bulk_update.test.ts b/src/core/server/integration_tests/saved_objects/service/lib/bulk_update.test.ts new file mode 100644 index 0000000000000..dc620f87ea55b --- /dev/null +++ b/src/core/server/integration_tests/saved_objects/service/lib/bulk_update.test.ts @@ -0,0 +1,234 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import Path from 'path'; +import fs from 'fs/promises'; +import { pick } from 'lodash'; +import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; +import type { SavedObjectsType, SavedObjectsModelVersionMap } from '@kbn/core-saved-objects-server'; +import { type TestElasticsearchUtils } from '@kbn/core-test-helpers-kbn-server'; +import '../../migrations/jest_matchers'; +import { + getKibanaMigratorTestKit, + startElasticsearch, +} from '../../migrations/kibana_migrator_test_kit'; +import { delay } from '../../migrations/test_utils'; +import { getBaseMigratorParams } from '../../migrations/fixtures/zdt_base.fixtures'; + +export const logFilePath = Path.join(__dirname, 'bulk_update.test.log'); + +describe('SOR - bulk_update API', () => { + let esServer: TestElasticsearchUtils['es']; + + beforeAll(async () => { + await fs.unlink(logFilePath).catch(() => {}); + esServer = await startElasticsearch(); + }); + + const getType = (version: 'v1' | 'v2'): SavedObjectsType => { + const versionMap: SavedObjectsModelVersionMap = { + 1: { + changes: [], + schemas: { + forwardCompatibility: (attributes) => { + return pick(attributes, 'count'); + }, + }, + }, + }; + + if (version === 'v2') { + versionMap[2] = { + changes: [ + { + type: 'data_backfill', + backfillFn: (document) => { + return { attributes: { even: document.attributes.count % 2 === 0 } }; + }, + }, + ], + }; + } + + return { + name: 'my-test-type', + hidden: false, + namespaceType: 'agnostic', + mappings: { + dynamic: false, + properties: { + count: { type: 'integer' }, + ...(version === 'v2' ? { even: { type: 'boolean' } } : {}), + }, + }, + management: { + importableAndExportable: true, + }, + switchToModelVersionAt: '8.10.0', + modelVersions: versionMap, + }; + }; + + const getOtherType = (version: 'v1' | 'v2'): SavedObjectsType => { + const versionOtherMap: SavedObjectsModelVersionMap = { + 1: { + changes: [], + schemas: { + forwardCompatibility: (attributes) => { + return pick(attributes, 'sum'); + }, + }, + }, + }; + + if (version === 'v2') { + versionOtherMap[2] = { + changes: [ + { + type: 'data_backfill', + backfillFn: (document) => { + return { attributes: { isodd: document.attributes.sum % 2 !== 0 } }; + }, + }, + ], + }; + } + + return { + name: 'my-other-test-type', + hidden: false, + namespaceType: 'agnostic', + mappings: { + dynamic: false, + properties: { + sum: { type: 'integer' }, + ...(version === 'v2' ? { isodd: { type: 'boolean' } } : {}), + }, + }, + management: { + importableAndExportable: true, + }, + switchToModelVersionAt: '8.10.0', + modelVersions: versionOtherMap, + }; + }; + afterAll(async () => { + await esServer?.stop(); + await delay(10); + }); + + const setup = async () => { + const { runMigrations: runMigrationV1, savedObjectsRepository: repositoryV1 } = + await getKibanaMigratorTestKit({ + ...getBaseMigratorParams(), + types: [getType('v1'), getOtherType('v1')], + }); + await runMigrationV1(); + + const { + runMigrations: runMigrationV2, + savedObjectsRepository: repositoryV2, + client: esClient, + } = await getKibanaMigratorTestKit({ + ...getBaseMigratorParams(), + types: [getType('v2'), getOtherType('v2')], + }); + await runMigrationV2(); + + return { repositoryV1, repositoryV2, esClient }; + }; + + it('supports updates between older and newer versions', async () => { + const { repositoryV1, repositoryV2, esClient } = await setup(); + + await repositoryV1.create('my-test-type', { count: 12 }, { id: 'my-id' }); + await repositoryV1.create('my-other-test-type', { sum: 24 }, { id: 'my-other-id' }); + + let repoV2Docs = await repositoryV2.bulkGet([ + { type: 'my-test-type', id: 'my-id' }, + { type: 'my-other-test-type', id: 'my-other-id' }, + ]); + const [doc, otherDoc] = repoV2Docs.saved_objects; + + expect(doc.attributes).toEqual({ + count: 12, + even: true, + }); + expect(otherDoc.attributes).toEqual({ + sum: 24, + isodd: false, + }); + + await repositoryV2.bulkUpdate([ + { type: 'my-test-type', id: doc.id, attributes: { count: 11, even: false } }, + // @ts-expect-error cannot assign to partial + { type: 'my-other-test-type', id: otherDoc.id, attributes: { sum: 23, isodd: true } }, + ]); + + const repoV1Docs = await repositoryV1.bulkGet([ + { type: 'my-test-type', id: 'my-id' }, + { type: 'my-other-test-type', id: 'my-other-id' }, + ]); + const [doc1, otherDoc1] = repoV1Docs.saved_objects; + + expect(doc1.attributes).toEqual({ + count: 11, + }); + expect(otherDoc1.attributes).toEqual({ + sum: 23, + }); + + await repositoryV1.bulkUpdate([ + { type: 'my-test-type', id: doc1.id, attributes: { count: 14 } }, + // @ts-expect-error cannot assign to partial + { type: 'my-other-test-type', id: otherDoc1.id, attributes: { sum: 24 } }, + ]); + + repoV2Docs = await repositoryV2.bulkGet([ + { type: 'my-test-type', id: 'my-id' }, + { type: 'my-other-test-type', id: 'my-other-id' }, + ]); + const [doc2, otherDoc2] = repoV2Docs.saved_objects; + + expect(doc2.attributes).toEqual({ + count: 14, + even: true, + }); + expect(otherDoc2.attributes).toEqual({ + sum: 24, + isodd: false, + }); + + const rawDoc = await fetchDoc(esClient, 'my-test-type', 'my-id'); + expect(rawDoc._source).toEqual( + expect.objectContaining({ + typeMigrationVersion: '10.1.0', + 'my-test-type': { + count: 14, + }, + }) + ); + + const otherRawDoc = await fetchDoc(esClient, 'my-other-test-type', 'my-other-id'); + expect(otherRawDoc._source).toEqual( + expect.objectContaining({ + typeMigrationVersion: '10.1.0', + 'my-other-test-type': { + sum: 24, + }, + }) + ); + }); + + const fetchDoc = async (client: ElasticsearchClient, type: string, id: string) => { + return await client.get({ + index: '.kibana', + id: `${type}:${id}`, + }); + }; +});