Skip to content

Commit

Permalink
[Core] [SOR] BWC bulkUpdate (elastic#171245)
Browse files Browse the repository at this point in the history
Co-authored-by: Kibana Machine <[email protected]>
Co-authored-by: pgayvallet <[email protected]>
  • Loading branch information
3 people authored Nov 28, 2023
1 parent d81c367 commit 2ca730d
Show file tree
Hide file tree
Showing 7 changed files with 608 additions and 187 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import {
DecoratedError,
AuthorizeUpdateObject,
SavedObjectsRawDoc,
SavedObjectsRawDocSource,
SavedObjectSanitizedDoc,
} from '@kbn/core-saved-objects-server';
import { ALL_NAMESPACES_STRING, SavedObjectsUtils } from '@kbn/core-saved-objects-utils-server';
import { encodeVersion } from '@kbn/core-saved-objects-base-server-internal';
Expand All @@ -35,6 +37,8 @@ import {
isLeft,
isRight,
rawDocExistsInNamespace,
getSavedObjectFromSource,
mergeForUpdate,
} from './utils';
import { ApiExecutionContext } from './types';

Expand All @@ -43,32 +47,51 @@ export interface PerformUpdateParams<T = unknown> {
options: SavedObjectsBulkUpdateOptions;
}

type DocumentToSave = Record<string, unknown>;
type ExpectedBulkGetResult = Either<
{ type: string; id: string; error: Payload },
{
type: string;
id: string;
version?: string;
documentToSave: DocumentToSave;
objectNamespace?: string;
esRequestIndex: number;
migrationVersionCompatibility?: 'raw' | 'compatible';
}
>;

type ExpectedBulkUpdateResult = Either<
{ type: string; id: string; error: Payload },
{
type: string;
id: string;
namespaces?: string[];
documentToSave: DocumentToSave;
esRequestIndex: number;
rawMigratedUpdatedDoc: SavedObjectsRawDoc;
}
>;

export const performBulkUpdate = async <T>(
{ objects, options }: PerformUpdateParams<T>,
{ registry, helpers, allowedTypes, client, serializer, extensions = {} }: ApiExecutionContext
): Promise<SavedObjectsBulkUpdateResponse<T>> => {
const { common: commonHelper, encryption: encryptionHelper } = helpers;
const {
common: commonHelper,
encryption: encryptionHelper,
migration: migrationHelper,
} = helpers;
const { securityExtension } = extensions;

const { migrationVersionCompatibility } = options;
const namespace = commonHelper.getCurrentNamespace(options.namespace);
const time = getCurrentTime();

let bulkGetRequestIndexCounter = 0;
type DocumentToSave = Record<string, unknown>;
type ExpectedBulkGetResult = Either<
{ type: string; id: string; error: Payload },
{
type: string;
id: string;
version?: string;
documentToSave: DocumentToSave;
objectNamespace?: string;
esRequestIndex?: number;
}
>;
const expectedBulkGetResults = objects.map<ExpectedBulkGetResult>((object) => {
const { type, id, attributes, references, version, namespace: objectNamespace } = object;
let error: DecoratedError | undefined;

if (!allowedTypes.includes(type)) {
error = SavedObjectsErrorHelpers.createGenericNotFoundError(type, id);
} else {
Expand All @@ -91,21 +114,19 @@ export const performBulkUpdate = async <T>(
...(Array.isArray(references) && { references }),
};

const requiresNamespacesCheck = registry.isMultiNamespace(object.type);

return right({
type,
id,
version,
documentToSave,
objectNamespace,
...(requiresNamespacesCheck && { esRequestIndex: bulkGetRequestIndexCounter++ }),
esRequestIndex: bulkGetRequestIndexCounter++,
migrationVersionCompatibility,
});
});

const validObjects = expectedBulkGetResults.filter(isRight);
if (validObjects.length === 0) {
// We only have error results; return early to avoid potentially trying authZ checks for 0 types which would result in an exception.
return {
// Technically the returned array should only contain SavedObject results, but for errors this is not true (we cast to 'any' below)
saved_objects: expectedBulkGetResults.map<SavedObject<T>>(
Expand All @@ -117,20 +138,25 @@ export const performBulkUpdate = async <T>(
// `objectNamespace` is a namespace string, while `namespace` is a namespace ID.
// The object namespace string, if defined, will supersede the operation's namespace ID.
const namespaceString = SavedObjectsUtils.namespaceIdToString(namespace);

const getNamespaceId = (objectNamespace?: string) =>
objectNamespace !== undefined
? SavedObjectsUtils.namespaceStringToId(objectNamespace)
: namespace;

const getNamespaceString = (objectNamespace?: string) => objectNamespace ?? namespaceString;
const bulkGetDocs = validObjects
.filter(({ value }) => value.esRequestIndex !== undefined)
.map(({ value: { type, id, objectNamespace } }) => ({
_id: serializer.generateRawId(getNamespaceId(objectNamespace), type, id),
_index: commonHelper.getIndexForType(type),
_source: ['type', 'namespaces'],
}));

const bulkGetDocs = validObjects.map(({ value: { type, id, objectNamespace } }) => ({
_id: serializer.generateRawId(getNamespaceId(objectNamespace), type, id),
_index: commonHelper.getIndexForType(type),
_source: true,
}));

const bulkGetResponse = bulkGetDocs.length
? await client.mget({ body: { docs: bulkGetDocs } }, { ignore: [404], meta: true })
? await client.mget<SavedObjectsRawDocSource>(
{ body: { docs: bulkGetDocs } },
{ ignore: [404], meta: true }
)
: undefined;
// fail fast if we can't verify a 404 response is from Elasticsearch
if (
Expand All @@ -145,14 +171,24 @@ export const performBulkUpdate = async <T>(

const authObjects: AuthorizeUpdateObject[] = validObjects.map((element) => {
const { type, id, objectNamespace, esRequestIndex: index } = element.value;
const preflightResult = index !== undefined ? bulkGetResponse?.body.docs[index] : undefined;
return {
type,
id,
objectNamespace,
// @ts-expect-error MultiGetHit._source is optional
existingNamespaces: preflightResult?._source?.namespaces ?? [],
};
const preflightResult = bulkGetResponse!.body.docs[index];

if (registry.isMultiNamespace(type)) {
return {
type,
id,
objectNamespace,
// @ts-expect-error MultiGetHit._source is optional
existingNamespaces: preflightResult._source?.namespaces ?? [],
};
} else {
return {
type,
id,
objectNamespace,
existingNamespaces: [],
};
}
});

const authorizationResult = await securityExtension?.authorizeBulkUpdate({
Expand All @@ -162,16 +198,7 @@ export const performBulkUpdate = async <T>(

let bulkUpdateRequestIndexCounter = 0;
const bulkUpdateParams: object[] = [];
type ExpectedBulkUpdateResult = Either<
{ type: string; id: string; error: Payload },
{
type: string;
id: string;
namespaces: string[];
documentToSave: DocumentToSave;
esRequestIndex: number;
}
>;

const expectedBulkUpdateResults = await Promise.all(
expectedBulkGetResults.map<Promise<ExpectedBulkUpdateResult>>(async (expectedBulkGetResult) => {
if (isLeft(expectedBulkGetResult)) {
Expand All @@ -181,67 +208,105 @@ export const performBulkUpdate = async <T>(
const { esRequestIndex, id, type, version, documentToSave, objectNamespace } =
expectedBulkGetResult.value;

let namespaces;
let versionProperties;
if (esRequestIndex !== undefined) {
const indexFound = bulkGetResponse?.statusCode !== 404;
const actualResult = indexFound ? bulkGetResponse?.body.docs[esRequestIndex] : undefined;
const docFound = indexFound && isMgetDoc(actualResult) && actualResult.found;
if (
!docFound ||
let namespaces: string[] | undefined;
const versionProperties = getExpectedVersionProperties(version);
const indexFound = bulkGetResponse?.statusCode !== 404;
const actualResult = indexFound ? bulkGetResponse?.body.docs[esRequestIndex] : undefined;
const docFound = indexFound && isMgetDoc(actualResult) && actualResult.found;
const isMultiNS = registry.isMultiNamespace(type);

if (
!docFound ||
(isMultiNS &&
!rawDocExistsInNamespace(
registry,
actualResult as SavedObjectsRawDoc,
getNamespaceId(objectNamespace)
)
) {
return left({
id,
type,
error: errorContent(SavedObjectsErrorHelpers.createGenericNotFoundError(type, id)),
});
}
))
) {
return left({
id,
type,
error: errorContent(SavedObjectsErrorHelpers.createGenericNotFoundError(type, id)),
});
}

if (isMultiNS) {
// @ts-expect-error MultiGetHit is incorrectly missing _id, _source
namespaces = actualResult!._source.namespaces ?? [
// @ts-expect-error MultiGetHit is incorrectly missing _id, _source
SavedObjectsUtils.namespaceIdToString(actualResult!._source.namespace),
];
versionProperties = getExpectedVersionProperties(version);
} else {
if (registry.isSingleNamespace(type)) {
// if `objectNamespace` is undefined, fall back to `options.namespace`
namespaces = [getNamespaceString(objectNamespace)];
}
versionProperties = getExpectedVersionProperties(version);
} else if (registry.isSingleNamespace(type)) {
// if `objectNamespace` is undefined, fall back to `options.namespace`
namespaces = [getNamespaceString(objectNamespace)];
}

const document = getSavedObjectFromSource<T>(
registry,
type,
id,
actualResult as SavedObjectsRawDoc,
{ migrationVersionCompatibility }
);

let migrated: SavedObject<T>;
try {
migrated = migrationHelper.migrateStorageDocument(document) as SavedObject<T>;
} catch (migrateStorageDocError) {
throw SavedObjectsErrorHelpers.decorateGeneralError(
migrateStorageDocError,
'Failed to migrate document to the latest version.'
);
}

const typeDefinition = registry.getType(type)!;
const updatedAttributes = mergeForUpdate({
targetAttributes: {
...migrated!.attributes,
},
updatedAttributes: await encryptionHelper.optionallyEncryptAttributes(
type,
id,
objectNamespace || namespace,
documentToSave[type]
),
typeMappings: typeDefinition.mappings,
});

const migratedUpdatedSavedObjectDoc = migrationHelper.migrateInputDocument({
...migrated!,
id,
type,
namespace,
namespaces,
attributes: updatedAttributes,
updated_at: time,
...(Array.isArray(documentToSave.references) && { references: documentToSave.references }),
});
const updatedMigratedDocumentToSave = serializer.savedObjectToRaw(
migratedUpdatedSavedObjectDoc as SavedObjectSanitizedDoc
);

const expectedResult = {
type,
id,
namespaces,
esRequestIndex: bulkUpdateRequestIndexCounter++,
documentToSave: expectedBulkGetResult.value.documentToSave,
rawMigratedUpdatedDoc: updatedMigratedDocumentToSave,
migrationVersionCompatibility,
};

bulkUpdateParams.push(
{
update: {
index: {
_id: serializer.generateRawId(getNamespaceId(objectNamespace), type, id),
_index: commonHelper.getIndexForType(type),
...versionProperties,
},
},
{
doc: {
...documentToSave,
[type]: await encryptionHelper.optionallyEncryptAttributes(
type,
id,
objectNamespace || namespace,
documentToSave[type]
),
},
}
updatedMigratedDocumentToSave._source
);

return right(expectedResult);
Expand All @@ -264,7 +329,8 @@ export const performBulkUpdate = async <T>(
return expectedResult.value as any;
}

const { type, id, namespaces, documentToSave, esRequestIndex } = expectedResult.value;
const { type, id, namespaces, documentToSave, esRequestIndex, rawMigratedUpdatedDoc } =
expectedResult.value;
const response = bulkUpdateResponse?.items[esRequestIndex] ?? {};
const rawResponse = Object.values(response)[0] as any;

Expand All @@ -273,14 +339,12 @@ export const performBulkUpdate = async <T>(
return { type, id, error };
}

// When a bulk update operation is completed, any fields specified in `_sourceIncludes` will be found in the "get" value of the
// returned object. We need to retrieve the `originId` if it exists so we can return it to the consumer.
const { _seq_no: seqNo, _primary_term: primaryTerm, get } = rawResponse;
const { _seq_no: seqNo, _primary_term: primaryTerm } = rawResponse;

// eslint-disable-next-line @typescript-eslint/naming-convention
const { [type]: attributes, references, updated_at } = documentToSave;

const { originId } = get._source;
const { originId } = rawMigratedUpdatedDoc._source;
return {
id,
type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export const executeUpdate = async <T>(
preflightDocResult,
});

const existingNamespaces = preflightDocNSResult?.savedObjectNamespaces ?? [];
const existingNamespaces = preflightDocNSResult.savedObjectNamespaces ?? [];
const authorizationResult = await securityExtension?.authorizeUpdate({
namespace,
object: { type, id, existingNamespaces },
Expand Down
Loading

0 comments on commit 2ca730d

Please sign in to comment.