From eaf2bed5ca7919b420c46d75b59405623733b241 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 16 Jul 2024 16:56:51 -0500 Subject: [PATCH] feat(mcp-mutator): new mcp mutator plugin (#10904) --- .../linkedin/metadata/aspect/ReadItem.java | 6 +- .../metadata/aspect/batch/AspectsBatch.java | 7 + .../metadata/aspect/plugins/PluginSpec.java | 20 +- .../aspect/plugins/hooks/MCLSideEffect.java | 2 +- .../aspect/plugins/hooks/MCPSideEffect.java | 4 +- .../aspect/plugins/hooks/MutationHook.java | 26 +- .../validation/AspectPayloadValidator.java | 4 +- metadata-io/build.gradle | 1 + metadata-io/metadata-io-api/build.gradle | 7 + .../entity/ebean/batch/AspectsBatchImpl.java | 26 +- .../entity/ebean/batch/ProposedItem.java | 80 +++++ .../ebean/batch/AspectsBatchImplTest.java | 320 ++++++++++++++++++ .../test/resources/AspectsBatchImplTest.yaml | 19 ++ .../aspect/hooks/IgnoreUnknownMutator.java | 80 +++++ .../hooks/IgnoreUnknownMutatorTest.java | 143 ++++++++ .../kafka/MaeConsumerApplication.java | 1 + .../MCLSpringCommonTestConfiguration.java | 3 + .../kafka/MceConsumerApplication.java | 3 +- .../src/main/resources/entity-registry.yml | 6 + .../metadata/context/RequestContext.java | 1 + .../src/main/resources/application.yaml | 2 + .../ConfigEntityRegistryFactory.java | 5 +- .../SpringStandardPluginConfiguration.java | 33 ++ .../metadata/aspect/SpringPluginFactory.java | 12 +- .../linkedin/gms/CommonApplicationConfig.java | 1 + 25 files changed, 786 insertions(+), 26 deletions(-) create mode 100644 metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java create mode 100644 metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java create mode 100644 metadata-io/metadata-io-api/src/test/resources/AspectsBatchImplTest.yaml create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutatorTest.java create mode 100644 metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/ReadItem.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/ReadItem.java index 342b5376d8a755..106596bf80ccf0 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/ReadItem.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/ReadItem.java @@ -5,6 +5,7 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.mxe.GenericAspect; import com.linkedin.mxe.SystemMetadata; import java.lang.reflect.InvocationTargetException; import javax.annotation.Nonnull; @@ -26,6 +27,9 @@ public interface ReadItem { */ @Nonnull default String getAspectName() { + if (getAspectSpec() == null) { + return GenericAspect.dataSchema().getName(); + } return getAspectSpec().getName(); } @@ -72,6 +76,6 @@ static T getAspect(Class clazz, @Nullable RecordTemplate recordTemplate) * * @return aspect's specification */ - @Nonnull + @Nullable AspectSpec getAspectSpec(); } diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java index a302632e1936fd..77820948b00cbc 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java @@ -84,6 +84,13 @@ static void applyWriteMutationHooks( } } + default Stream applyProposalMutationHooks( + Collection proposedItems, @Nonnull RetrieverContext retrieverContext) { + return retrieverContext.getAspectRetriever().getEntityRegistry().getAllMutationHooks().stream() + .flatMap( + mutationHook -> mutationHook.applyProposalMutation(proposedItems, retrieverContext)); + } + default ValidationExceptionCollection validateProposed( Collection mcpItems) { return validateProposed(mcpItems, getRetrieverContext()); diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/PluginSpec.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/PluginSpec.java index 1adb1be81ecc1d..f99dd18d3c9c1f 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/PluginSpec.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/PluginSpec.java @@ -3,7 +3,6 @@ import com.linkedin.common.urn.Urn; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; -import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -25,20 +24,13 @@ public boolean enabled() { } public boolean shouldApply( - @Nullable ChangeType changeType, @Nonnull Urn entityUrn, @Nonnull AspectSpec aspectSpec) { - return shouldApply(changeType, entityUrn.getEntityType(), aspectSpec); + @Nullable ChangeType changeType, @Nonnull Urn entityUrn, @Nonnull String aspectName) { + return shouldApply(changeType, entityUrn.getEntityType(), aspectName); } public boolean shouldApply( - @Nullable ChangeType changeType, - @Nonnull EntitySpec entitySpec, - @Nonnull AspectSpec aspectSpec) { - return shouldApply(changeType, entitySpec.getName(), aspectSpec.getName()); - } - - public boolean shouldApply( - @Nullable ChangeType changeType, @Nonnull String entityName, @Nonnull AspectSpec aspectSpec) { - return shouldApply(changeType, entityName, aspectSpec.getName()); + @Nullable ChangeType changeType, @Nonnull EntitySpec entitySpec, @Nonnull String aspectName) { + return shouldApply(changeType, entitySpec.getName(), aspectName); } public boolean shouldApply( @@ -49,8 +41,8 @@ && isChangeTypeSupported(changeType) } protected boolean isEntityAspectSupported( - @Nonnull EntitySpec entitySpec, @Nonnull AspectSpec aspectSpec) { - return isEntityAspectSupported(entitySpec.getName(), aspectSpec.getName()); + @Nonnull EntitySpec entitySpec, @Nonnull String aspectName) { + return isEntityAspectSupported(entitySpec.getName(), aspectName); } protected boolean isEntityAspectSupported( diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MCLSideEffect.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MCLSideEffect.java index 57016404648d50..853c2ef5f796c2 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MCLSideEffect.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MCLSideEffect.java @@ -24,7 +24,7 @@ public final Stream apply( @Nonnull Collection batchItems, @Nonnull RetrieverContext retrieverContext) { return applyMCLSideEffect( batchItems.stream() - .filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectSpec())) + .filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectName())) .collect(Collectors.toList()), retrieverContext); } diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MCPSideEffect.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MCPSideEffect.java index 52920d8c6f3966..ce49dd057bc3ed 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MCPSideEffect.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MCPSideEffect.java @@ -25,7 +25,7 @@ public final Stream apply( Collection changeMCPS, @Nonnull RetrieverContext retrieverContext) { return applyMCPSideEffect( changeMCPS.stream() - .filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectSpec())) + .filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectName())) .collect(Collectors.toList()), retrieverContext); } @@ -41,7 +41,7 @@ public final Stream postApply( Collection mclItems, @Nonnull RetrieverContext retrieverContext) { return postMCPSideEffect( mclItems.stream() - .filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectSpec())) + .filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectName())) .collect(Collectors.toList()), retrieverContext); } diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MutationHook.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MutationHook.java index c067954912a032..b2fd997d49444d 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MutationHook.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/MutationHook.java @@ -3,6 +3,7 @@ import com.linkedin.metadata.aspect.ReadItem; import com.linkedin.metadata.aspect.RetrieverContext; import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.batch.MCPItem; import com.linkedin.metadata.aspect.plugins.PluginSpec; import com.linkedin.util.Pair; import java.util.Collection; @@ -24,7 +25,7 @@ public final Stream> applyWriteMutation( @Nonnull Collection changeMCPS, @Nonnull RetrieverContext retrieverContext) { return writeMutation( changeMCPS.stream() - .filter(i -> shouldApply(i.getChangeType(), i.getEntitySpec(), i.getAspectSpec())) + .filter(i -> shouldApply(i.getChangeType(), i.getEntitySpec(), i.getAspectName())) .collect(Collectors.toList()), retrieverContext); } @@ -34,7 +35,23 @@ public final Stream> applyReadMutation( @Nonnull Collection items, @Nonnull RetrieverContext retrieverContext) { return readMutation( items.stream() - .filter(i -> isEntityAspectSupported(i.getEntitySpec(), i.getAspectSpec())) + .filter(i -> isEntityAspectSupported(i.getEntitySpec(), i.getAspectName())) + .collect(Collectors.toList()), + retrieverContext); + } + + /** + * Apply Proposal mutations prior to validation + * + * @param mcpItems wrapper for MCP + * @param retrieverContext retriever context + * @return stream of mutated Proposal items + */ + public final Stream applyProposalMutation( + @Nonnull Collection mcpItems, @Nonnull RetrieverContext retrieverContext) { + return proposalMutation( + mcpItems.stream() + .filter(i -> shouldApply(i.getChangeType(), i.getEntitySpec(), i.getAspectName())) .collect(Collectors.toList()), retrieverContext); } @@ -48,4 +65,9 @@ protected Stream> writeMutation( @Nonnull Collection changeMCPS, @Nonnull RetrieverContext retrieverContext) { return changeMCPS.stream().map(i -> Pair.of(i, false)); } + + protected Stream proposalMutation( + @Nonnull Collection mcpItems, @Nonnull RetrieverContext retrieverContext) { + return Stream.empty(); + } } diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/validation/AspectPayloadValidator.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/validation/AspectPayloadValidator.java index b39c38c2768a7f..4083329899fee0 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/validation/AspectPayloadValidator.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/plugins/validation/AspectPayloadValidator.java @@ -22,7 +22,7 @@ public final Stream validateProposed( @Nonnull RetrieverContext retrieverContext) { return validateProposedAspects( mcpItems.stream() - .filter(i -> shouldApply(i.getChangeType(), i.getUrn(), i.getAspectSpec())) + .filter(i -> shouldApply(i.getChangeType(), i.getUrn(), i.getAspectName())) .collect(Collectors.toList()), retrieverContext); } @@ -37,7 +37,7 @@ public final Stream validatePreCommit( @Nonnull Collection changeMCPs, @Nonnull RetrieverContext retrieverContext) { return validatePreCommitAspects( changeMCPs.stream() - .filter(i -> shouldApply(i.getChangeType(), i.getUrn(), i.getAspectSpec())) + .filter(i -> shouldApply(i.getChangeType(), i.getUrn(), i.getAspectName())) .collect(Collectors.toList()), retrieverContext); } diff --git a/metadata-io/build.gradle b/metadata-io/build.gradle index 6666e335446884..ff29cb5fff47d2 100644 --- a/metadata-io/build.gradle +++ b/metadata-io/build.gradle @@ -21,6 +21,7 @@ dependencies { api project(':metadata-service:services') api project(':metadata-operation-context') + implementation spec.product.pegasus.restliServer implementation spec.product.pegasus.data implementation spec.product.pegasus.generator diff --git a/metadata-io/metadata-io-api/build.gradle b/metadata-io/metadata-io-api/build.gradle index bd79e8cb3ddefb..b8028fad07bb65 100644 --- a/metadata-io/metadata-io-api/build.gradle +++ b/metadata-io/metadata-io-api/build.gradle @@ -8,4 +8,11 @@ dependencies { implementation project(':metadata-utils') compileOnly externalDependency.lombok annotationProcessor externalDependency.lombok + + testImplementation(externalDependency.testng) + testImplementation(externalDependency.mockito) + testImplementation(testFixtures(project(":entity-registry"))) + testImplementation project(':metadata-operation-context') + testImplementation externalDependency.lombok + testAnnotationProcessor externalDependency.lombok } diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java index 0914df744e413a..a23f6ab175046b 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java @@ -8,6 +8,7 @@ import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.batch.MCPItem; import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.util.Pair; @@ -18,6 +19,7 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nonnull; import lombok.Builder; import lombok.Getter; @@ -44,9 +46,20 @@ public class AspectsBatchImpl implements AspectsBatch { public Pair>, List> toUpsertBatchItems( final Map> latestAspects) { + // Process proposals to change items + Stream mutatedProposalsStream = + proposedItemsToChangeItemStream( + items.stream() + .filter(item -> item instanceof ProposedItem) + .map(item -> (MCPItem) item) + .collect(Collectors.toList())); + // Regular change items + Stream changeMCPStream = + items.stream().filter(item -> !(item instanceof ProposedItem)); + // Convert patches to upserts if needed LinkedList upsertBatchItems = - items.stream() + Stream.concat(mutatedProposalsStream, changeMCPStream) .map( item -> { final String urnStr = item.getUrn().toString(); @@ -85,6 +98,17 @@ public Pair>, List> toUpsertBatchItems( return Pair.of(newUrnAspectNames, upsertBatchItems); } + private Stream proposedItemsToChangeItemStream(List proposedItems) { + return applyProposalMutationHooks(proposedItems, retrieverContext) + .filter(mcpItem -> mcpItem.getMetadataChangeProposal() != null) + .map( + mcpItem -> + ChangeItemImpl.ChangeItemImplBuilder.build( + mcpItem.getMetadataChangeProposal(), + mcpItem.getAuditStamp(), + retrieverContext.getAspectRetriever())); + } + public static class AspectsBatchImplBuilder { /** * Just one aspect record template diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java new file mode 100644 index 00000000000000..452ed39ddf3174 --- /dev/null +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/ProposedItem.java @@ -0,0 +1,80 @@ +package com.linkedin.metadata.entity.ebean.batch; + +import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.batch.MCPItem; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.mxe.SystemMetadata; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** Represents an unvalidated wrapped MCP */ +@Slf4j +@Getter +@Builder(toBuilder = true) +public class ProposedItem implements MCPItem { + @Nonnull private final MetadataChangeProposal metadataChangeProposal; + @Nonnull private final AuditStamp auditStamp; + // derived + @Nonnull private EntitySpec entitySpec; + @Nullable private AspectSpec aspectSpec; + + @Nonnull + @Override + public String getAspectName() { + if (metadataChangeProposal.getAspectName() != null) { + return metadataChangeProposal.getAspectName(); + } else { + return MCPItem.super.getAspectName(); + } + } + + @Nullable + public AspectSpec getAspectSpec() { + if (aspectSpec != null) { + return aspectSpec; + } + if (entitySpec.getAspectSpecMap().containsKey(getAspectName())) { + return entitySpec.getAspectSpecMap().get(getAspectName()); + } + return null; + } + + @Nullable + @Override + public RecordTemplate getRecordTemplate() { + if (getAspectSpec() != null) { + return GenericRecordUtils.deserializeAspect( + getMetadataChangeProposal().getAspect().getValue(), + getMetadataChangeProposal().getAspect().getContentType(), + getAspectSpec()); + } + return null; + } + + @Nonnull + @Override + public Urn getUrn() { + return metadataChangeProposal.getEntityUrn(); + } + + @Nullable + @Override + public SystemMetadata getSystemMetadata() { + return metadataChangeProposal.getSystemMetadata(); + } + + @Nonnull + @Override + public ChangeType getChangeType() { + return metadataChangeProposal.getChangeType(); + } +} diff --git a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java new file mode 100644 index 00000000000000..d2e7243d045604 --- /dev/null +++ b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java @@ -0,0 +1,320 @@ +package com.linkedin.metadata.entity.ebean.batch; + +import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; +import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; +import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTIES_ASPECT_NAME; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + +import com.linkedin.common.Status; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.ByteString; +import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.AspectRetriever; +import com.linkedin.metadata.aspect.GraphRetriever; +import com.linkedin.metadata.aspect.batch.MCPItem; +import com.linkedin.metadata.aspect.patch.GenericJsonPatch; +import com.linkedin.metadata.aspect.patch.PatchOperationType; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; +import com.linkedin.metadata.entity.SearchRetriever; +import com.linkedin.metadata.models.registry.ConfigEntityRegistry; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.models.registry.EntityRegistryException; +import com.linkedin.metadata.models.registry.MergedEntityRegistry; +import com.linkedin.metadata.models.registry.SnapshotEntityRegistry; +import com.linkedin.metadata.snapshot.Snapshot; +import com.linkedin.metadata.utils.AuditStampUtils; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.mxe.GenericAspect; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.mxe.SystemMetadata; +import com.linkedin.structured.StructuredProperties; +import com.linkedin.structured.StructuredPropertyValueAssignmentArray; +import com.linkedin.util.Pair; +import io.datahubproject.metadata.context.RetrieverContext; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +public class AspectsBatchImplTest { + private EntityRegistry testRegistry; + private AspectRetriever mockAspectRetriever; + private RetrieverContext retrieverContext; + + @BeforeTest + public void beforeTest() throws EntityRegistryException { + PathSpecBasedSchemaAnnotationVisitor.class + .getClassLoader() + .setClassAssertionStatus(PathSpecBasedSchemaAnnotationVisitor.class.getName(), false); + + EntityRegistry snapshotEntityRegistry = new SnapshotEntityRegistry(); + EntityRegistry configEntityRegistry = + new ConfigEntityRegistry( + Snapshot.class.getClassLoader().getResourceAsStream("AspectsBatchImplTest.yaml")); + this.testRegistry = + new MergedEntityRegistry(snapshotEntityRegistry).apply(configEntityRegistry); + } + + @BeforeMethod + public void setup() { + this.mockAspectRetriever = mock(AspectRetriever.class); + when(this.mockAspectRetriever.getEntityRegistry()).thenReturn(testRegistry); + this.retrieverContext = + RetrieverContext.builder() + .searchRetriever(mock(SearchRetriever.class)) + .aspectRetriever(mockAspectRetriever) + .graphRetriever(mock(GraphRetriever.class)) + .build(); + } + + @Test + public void toUpsertBatchItemsChangeItemTest() { + List testItems = + List.of( + ChangeItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)")) + .changeType(ChangeType.UPSERT) + .aspectName(STATUS_ASPECT_NAME) + .entitySpec(testRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectSpec( + testRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(STATUS_ASPECT_NAME)) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .recordTemplate(new Status().setRemoved(true)) + .build(mockAspectRetriever), + ChangeItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)")) + .changeType(ChangeType.UPSERT) + .aspectName(STATUS_ASPECT_NAME) + .entitySpec(testRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectSpec( + testRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(STATUS_ASPECT_NAME)) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .recordTemplate(new Status().setRemoved(false)) + .build(mockAspectRetriever)); + + AspectsBatchImpl testBatch = + AspectsBatchImpl.builder().items(testItems).retrieverContext(retrieverContext).build(); + + assertEquals( + testBatch.toUpsertBatchItems(Map.of()), + Pair.of(Map.of(), testItems), + "Expected noop, pass through with no additional MCPs or changes"); + } + + @Test + public void toUpsertBatchItemsPatchItemTest() { + GenericJsonPatch.PatchOp testPatchOp = new GenericJsonPatch.PatchOp(); + testPatchOp.setOp(PatchOperationType.REMOVE.getValue()); + testPatchOp.setPath( + String.format( + "/properties/%s", "urn:li:structuredProperty:io.acryl.privacy.retentionTime")); + + List testItems = + List.of( + PatchItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)")) + .entitySpec(testRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectName(STRUCTURED_PROPERTIES_ASPECT_NAME) + .aspectSpec( + testRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(STRUCTURED_PROPERTIES_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys(Map.of("properties", List.of("propertyUrn"))) + .patch(List.of(testPatchOp)) + .build() + .getJsonPatch()) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(retrieverContext.getAspectRetriever().getEntityRegistry()), + PatchItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)")) + .entitySpec(testRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectName(STRUCTURED_PROPERTIES_ASPECT_NAME) + .aspectSpec( + testRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(STRUCTURED_PROPERTIES_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys(Map.of("properties", List.of("propertyUrn"))) + .patch(List.of(testPatchOp)) + .build() + .getJsonPatch()) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(retrieverContext.getAspectRetriever().getEntityRegistry())); + + AspectsBatchImpl testBatch = + AspectsBatchImpl.builder().items(testItems).retrieverContext(retrieverContext).build(); + + assertEquals( + testBatch.toUpsertBatchItems(Map.of()), + Pair.of( + Map.of(), + List.of( + ChangeItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)")) + .changeType(ChangeType.UPSERT) + .aspectName(STRUCTURED_PROPERTIES_ASPECT_NAME) + .entitySpec(testRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectSpec( + testRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(STRUCTURED_PROPERTIES_ASPECT_NAME)) + .auditStamp(testItems.get(0).getAuditStamp()) + .recordTemplate( + new StructuredProperties() + .setProperties(new StructuredPropertyValueAssignmentArray())) + .systemMetadata(testItems.get(0).getSystemMetadata()) + .build(mockAspectRetriever), + ChangeItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)")) + .changeType(ChangeType.UPSERT) + .aspectName(STRUCTURED_PROPERTIES_ASPECT_NAME) + .entitySpec(testRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectSpec( + testRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(STRUCTURED_PROPERTIES_ASPECT_NAME)) + .auditStamp(testItems.get(1).getAuditStamp()) + .recordTemplate( + new StructuredProperties() + .setProperties(new StructuredPropertyValueAssignmentArray())) + .systemMetadata(testItems.get(1).getSystemMetadata()) + .build(mockAspectRetriever))), + "Expected patch items converted to upsert change items"); + } + + @Test + public void toUpsertBatchItemsProposedItemTest() { + List testItems = + List.of( + ProposedItem.builder() + .entitySpec(testRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .metadataChangeProposal( + new MetadataChangeProposal() + .setEntityUrn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)")) + .setAspectName("my-custom-aspect") + .setEntityType(DATASET_ENTITY_NAME) + .setChangeType(ChangeType.UPSERT) + .setAspect( + new GenericAspect() + .setContentType("application/json") + .setValue( + ByteString.copyString( + "{\"foo\":\"bar\"}", StandardCharsets.UTF_8))) + .setSystemMetadata(new SystemMetadata())) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(), + ProposedItem.builder() + .entitySpec(testRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .metadataChangeProposal( + new MetadataChangeProposal() + .setEntityUrn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)")) + .setAspectName("my-custom-aspect") + .setEntityType(DATASET_ENTITY_NAME) + .setChangeType(ChangeType.UPSERT) + .setAspect( + new GenericAspect() + .setContentType("application/json") + .setValue( + ByteString.copyString( + "{\"foo\":\"bar\"}", StandardCharsets.UTF_8))) + .setSystemMetadata(new SystemMetadata())) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build()); + + AspectsBatchImpl testBatch = + AspectsBatchImpl.builder().items(testItems).retrieverContext(retrieverContext).build(); + + assertEquals( + testBatch.toUpsertBatchItems(Map.of()), + Pair.of( + Map.of(), + List.of( + ChangeItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)")) + .changeType(ChangeType.UPSERT) + .aspectName(STATUS_ASPECT_NAME) + .entitySpec(testRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectSpec( + testRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(STATUS_ASPECT_NAME)) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .systemMetadata(testItems.get(0).getSystemMetadata()) + .recordTemplate(new Status().setRemoved(false)) + .build(mockAspectRetriever), + ChangeItemImpl.builder() + .urn( + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)")) + .changeType(ChangeType.UPSERT) + .aspectName(STATUS_ASPECT_NAME) + .entitySpec(testRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectSpec( + testRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(STATUS_ASPECT_NAME)) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .systemMetadata(testItems.get(1).getSystemMetadata()) + .recordTemplate(new Status().setRemoved(false)) + .build(mockAspectRetriever))), + "Mutation to status aspect"); + } + + /** Converts unsupported to status aspect */ + @Getter + @Setter + @Accessors(chain = true) + public static class TestMutator extends MutationHook { + private AspectPluginConfig config; + + @Override + protected Stream proposalMutation( + @Nonnull Collection mcpItems, + @Nonnull com.linkedin.metadata.aspect.RetrieverContext retrieverContext) { + return mcpItems.stream() + .peek( + item -> + item.getMetadataChangeProposal() + .setAspectName(STATUS_ASPECT_NAME) + .setAspect( + GenericRecordUtils.serializeAspect(new Status().setRemoved(false)))); + } + } +} diff --git a/metadata-io/metadata-io-api/src/test/resources/AspectsBatchImplTest.yaml b/metadata-io/metadata-io-api/src/test/resources/AspectsBatchImplTest.yaml new file mode 100644 index 00000000000000..9716b0cab9b2f9 --- /dev/null +++ b/metadata-io/metadata-io-api/src/test/resources/AspectsBatchImplTest.yaml @@ -0,0 +1,19 @@ +entities: + - name: dataset + doc: Datasets represent logical or physical data assets stored or represented in various data platforms. Tables, Views, Streams are all instances of datasets. + category: core + keyAspect: datasetKey + aspects: + - status + - structuredProperties +plugins: + mutationHooks: + - className: 'com.linkedin.metadata.entity.ebean.batch.AspectsBatchImplTest$TestMutator' + packageScan: + - 'com.linkedin.metadata.entity.ebean.batch' + enabled: true + supportedOperations: + - UPSERT + supportedEntityAspectNames: + - entityName: 'dataset' + aspectName: '*' \ No newline at end of file diff --git a/metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java b/metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java new file mode 100644 index 00000000000000..8d6bdffceacb93 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java @@ -0,0 +1,80 @@ +package com.linkedin.metadata.aspect.hooks; + +import com.datahub.util.exception.ModelConversionException; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.data.transform.filter.request.MaskTree; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.batch.MCPItem; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; +import com.linkedin.metadata.entity.validation.ValidationApiUtils; +import com.linkedin.metadata.entity.validation.ValidationException; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.mxe.GenericAspect; +import com.linkedin.restli.internal.server.util.RestUtils; +import java.util.Collection; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +/** This mutator will log and drop unknown aspects. It will also log and drop unknown fields. */ +@Slf4j +@Setter +@Getter +@Accessors(chain = true) +public class IgnoreUnknownMutator extends MutationHook { + @Nonnull private AspectPluginConfig config; + + @Override + protected Stream proposalMutation( + @Nonnull Collection mcpItems, @Nonnull RetrieverContext retrieverContext) { + return mcpItems.stream() + .filter( + item -> { + if (item.getEntitySpec().getAspectSpec(item.getAspectName()) == null) { + log.warn( + "Dropping unknown aspect {} on entity {}", + item.getAspectName(), + item.getAspectSpec().getName()); + return false; + } + if (!"application/json" + .equals(item.getMetadataChangeProposal().getAspect().getContentType())) { + log.warn( + "Dropping unknown content type {} for aspect {} on entity {}", + item.getMetadataChangeProposal().getAspect().getContentType(), + item.getAspectName(), + item.getEntitySpec().getName()); + return false; + } + return true; + }) + .peek( + item -> { + try { + AspectSpec aspectSpec = item.getEntitySpec().getAspectSpec(item.getAspectName()); + GenericAspect aspect = item.getMetadataChangeProposal().getAspect(); + RecordTemplate recordTemplate = + GenericRecordUtils.deserializeAspect( + aspect.getValue(), aspect.getContentType(), aspectSpec); + try { + ValidationApiUtils.validateOrThrow(recordTemplate); + } catch (ValidationException | ModelConversionException e) { + log.warn( + "Failed to validate aspect. Coercing aspect {} on entity {}", + item.getAspectName(), + item.getEntitySpec().getName()); + RestUtils.trimRecordTemplate(recordTemplate, new MaskTree(), false); + item.getMetadataChangeProposal() + .setAspect(GenericRecordUtils.serializeAspect(recordTemplate)); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutatorTest.java b/metadata-io/src/test/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutatorTest.java new file mode 100644 index 00000000000000..11a3153abcaeed --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutatorTest.java @@ -0,0 +1,143 @@ +package com.linkedin.metadata.aspect.hooks; + +import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; +import static com.linkedin.metadata.Constants.DATASET_PROPERTIES_ASPECT_NAME; +import static com.linkedin.metadata.Constants.GLOBAL_TAGS_ASPECT_NAME; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; + +import com.linkedin.common.GlobalTags; +import com.linkedin.common.TagAssociation; +import com.linkedin.common.TagAssociationArray; +import com.linkedin.common.urn.TagUrn; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.ByteString; +import com.linkedin.data.template.StringMap; +import com.linkedin.dataset.DatasetProperties; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.AspectRetriever; +import com.linkedin.metadata.aspect.batch.MCPItem; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.entity.SearchRetriever; +import com.linkedin.metadata.entity.ebean.batch.ProposedItem; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.utils.AuditStampUtils; +import com.linkedin.mxe.GenericAspect; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.mxe.SystemMetadata; +import com.linkedin.test.metadata.aspect.TestEntityRegistry; +import io.datahubproject.metadata.context.RetrieverContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class IgnoreUnknownMutatorTest { + private static final EntityRegistry TEST_REGISTRY = new TestEntityRegistry(); + private static final AspectPluginConfig TEST_PLUGIN_CONFIG = + AspectPluginConfig.builder() + .className(IgnoreUnknownMutator.class.getName()) + .enabled(true) + .supportedOperations(List.of("UPSERT")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(DATASET_ENTITY_NAME) + .aspectName("*") + .build())) + .build(); + private static final Urn TEST_DATASET_URN = + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:postgres,calm-pagoda-323403.jaffle_shop.customers,PROD)"); + private AspectRetriever mockAspectRetriever; + private RetrieverContext retrieverContext; + + @BeforeMethod + public void setup() { + mockAspectRetriever = mock(AspectRetriever.class); + retrieverContext = + RetrieverContext.builder() + .searchRetriever(mock(SearchRetriever.class)) + .aspectRetriever(mockAspectRetriever) + .graphRetriever(TestOperationContexts.emptyGraphRetriever) + .build(); + } + + @Test + public void testUnknownFieldInTagAssociationArray() throws URISyntaxException { + IgnoreUnknownMutator test = new IgnoreUnknownMutator(); + test.setConfig(TEST_PLUGIN_CONFIG); + + List testItems = + List.of( + ProposedItem.builder() + .entitySpec(TEST_REGISTRY.getEntitySpec(DATASET_ENTITY_NAME)) + .metadataChangeProposal( + new MetadataChangeProposal() + .setEntityUrn(TEST_DATASET_URN) + .setAspectName(GLOBAL_TAGS_ASPECT_NAME) + .setEntityType(DATASET_ENTITY_NAME) + .setChangeType(ChangeType.UPSERT) + .setAspect( + new GenericAspect() + .setContentType("application/json") + .setValue( + ByteString.copyString( + "{\"tags\":[{\"tag\":\"urn:li:tag:Legacy\",\"foo\":\"bar\"}]}", + StandardCharsets.UTF_8))) + .setSystemMetadata(new SystemMetadata())) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build()); + + List result = test.proposalMutation(testItems, retrieverContext).toList(); + + assertEquals(1, result.size()); + assertEquals( + result.get(0).getAspect(GlobalTags.class), + new GlobalTags() + .setTags( + new TagAssociationArray( + List.of( + new TagAssociation() + .setTag(TagUrn.createFromString("urn:li:tag:Legacy")))))); + } + + @Test + public void testUnknownFieldDatasetProperties() throws URISyntaxException { + IgnoreUnknownMutator test = new IgnoreUnknownMutator(); + test.setConfig(TEST_PLUGIN_CONFIG); + + List testItems = + List.of( + ProposedItem.builder() + .entitySpec(TEST_REGISTRY.getEntitySpec(DATASET_ENTITY_NAME)) + .metadataChangeProposal( + new MetadataChangeProposal() + .setEntityUrn(TEST_DATASET_URN) + .setAspectName(DATASET_PROPERTIES_ASPECT_NAME) + .setEntityType(DATASET_ENTITY_NAME) + .setChangeType(ChangeType.UPSERT) + .setAspect( + new GenericAspect() + .setContentType("application/json") + .setValue( + ByteString.copyString( + "{\"foo\":\"bar\",\"customProperties\":{\"prop2\":\"pikachu\",\"prop1\":\"fakeprop\"}}", + StandardCharsets.UTF_8))) + .setSystemMetadata(new SystemMetadata())) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build()); + + List result = test.proposalMutation(testItems, retrieverContext).toList(); + + assertEquals(1, result.size()); + assertEquals( + result.get(0).getAspect(DatasetProperties.class), + new DatasetProperties() + .setCustomProperties(new StringMap(Map.of("prop1", "fakeprop", "prop2", "pikachu")))); + } +} diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java index 9a4c01dabf9a77..f6533a6ac1d8a9 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java +++ b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MaeConsumerApplication.java @@ -34,6 +34,7 @@ "com.linkedin.gms.factory.context", "com.linkedin.gms.factory.timeseries", "com.linkedin.gms.factory.assertion", + "com.linkedin.gms.factory.plugins" }, excludeFilters = { @ComponentScan.Filter( diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java index 2666f58de862ef..f6f71a12a6951f 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java @@ -6,6 +6,7 @@ import com.datahub.authentication.Authentication; import com.datahub.metadata.ingestion.IngestionScheduler; import com.linkedin.entity.client.SystemEntityClient; +import com.linkedin.gms.factory.plugins.SpringStandardPluginConfiguration; import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener; import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; import com.linkedin.metadata.models.registry.EntityRegistry; @@ -85,4 +86,6 @@ public OperationContext operationContext( indexConvention, mock(RetrieverContext.class)); } + + @MockBean SpringStandardPluginConfiguration springStandardPluginConfiguration; } diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java index af3caecba865c1..4ea5e6ea34d5b3 100644 --- a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java +++ b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java @@ -33,7 +33,8 @@ "com.linkedin.gms.factory.form", "com.linkedin.metadata.dao.producer", "io.datahubproject.metadata.jobs.common.health.kafka", - "com.linkedin.gms.factory.context" + "com.linkedin.gms.factory.context", + "com.linkedin.gms.factory.plugins" }, excludeFilters = { @ComponentScan.Filter( diff --git a/metadata-models/src/main/resources/entity-registry.yml b/metadata-models/src/main/resources/entity-registry.yml index c8344b7de1e127..6006ca179d162c 100644 --- a/metadata-models/src/main/resources/entity-registry.yml +++ b/metadata-models/src/main/resources/entity-registry.yml @@ -665,3 +665,9 @@ plugins: aspectName: 'schemaMetadata' - entityName: '*' aspectName: 'editableSchemaMetadata' + - className: 'com.linkedin.metadata.aspect.plugins.hooks.MutationHook' + enabled: true + spring: + enabled: true + packageScan: + - com.linkedin.gms.factory.plugins \ No newline at end of file diff --git a/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java b/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java index dcea185fcbc7ca..1eee0498f112a6 100644 --- a/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java +++ b/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java @@ -35,6 +35,7 @@ public class RequestContext implements ContextInterface { @Nonnull private final String requestID; @Nonnull private final String userAgent; + @Builder.Default private boolean validated = true; public RequestContext( @Nonnull String actorUrn, diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 599f7e7be344fd..1d5b7c7904f978 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -466,6 +466,8 @@ businessAttribute: keepAliveTime: ${BUSINESS_ATTRIBUTE_PROPAGATION_CONCURRENCY_KEEP_ALIVE:60} # Number of seconds to keep inactive threads alive metadataChangeProposal: + validation: + ignoreUnknown: ${MCP_VALIDATION_IGNORE_UNKNOWN:true} throttle: updateIntervalMs: ${MCP_THROTTLE_UPDATE_INTERVAL_MS:60000} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/ConfigEntityRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/ConfigEntityRegistryFactory.java index f1518f9c8f9d74..9f4dfb86c0fcd4 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/ConfigEntityRegistryFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/ConfigEntityRegistryFactory.java @@ -1,6 +1,7 @@ package com.linkedin.gms.factory.entityregistry; import com.datahub.plugins.metadata.aspect.SpringPluginFactory; +import com.linkedin.gms.factory.plugins.SpringStandardPluginConfiguration; import com.linkedin.metadata.aspect.plugins.PluginFactory; import com.linkedin.metadata.aspect.plugins.config.PluginConfiguration; import com.linkedin.metadata.models.registry.ConfigEntityRegistry; @@ -29,7 +30,9 @@ public class ConfigEntityRegistryFactory { @Bean(name = "configEntityRegistry") @Nonnull - protected ConfigEntityRegistry getInstance() throws IOException, EntityRegistryException { + protected ConfigEntityRegistry getInstance( + SpringStandardPluginConfiguration springStandardPluginConfiguration) + throws IOException, EntityRegistryException { BiFunction, PluginFactory> pluginFactoryProvider = (config, loaders) -> new SpringPluginFactory(applicationContext, config, loaders); if (entityRegistryConfigPath != null) { diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java new file mode 100644 index 00000000000000..fa4f520dc88c7c --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java @@ -0,0 +1,33 @@ +package com.linkedin.gms.factory.plugins; + +import com.linkedin.metadata.aspect.hooks.IgnoreUnknownMutator; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; +import java.util.List; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class SpringStandardPluginConfiguration { + + @Value("${metadataChangeProposal.validation.ignoreUnknown}") + private boolean ignoreUnknownEnabled; + + @Bean + public MutationHook ignoreUnknownMutator() { + return new IgnoreUnknownMutator() + .setConfig( + AspectPluginConfig.builder() + .className(IgnoreUnknownMutator.class.getName()) + .enabled(ignoreUnknownEnabled) + .supportedOperations(List.of("CREATE", "CREATE_ENTITY", "UPSERT")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName("*") + .aspectName("*") + .build())) + .build()); + } +} diff --git a/metadata-service/plugin/src/main/java/com/datahub/plugins/metadata/aspect/SpringPluginFactory.java b/metadata-service/plugin/src/main/java/com/datahub/plugins/metadata/aspect/SpringPluginFactory.java index 043b0016abaaae..f7e911c2629088 100644 --- a/metadata-service/plugin/src/main/java/com/datahub/plugins/metadata/aspect/SpringPluginFactory.java +++ b/metadata-service/plugin/src/main/java/com/datahub/plugins/metadata/aspect/SpringPluginFactory.java @@ -78,6 +78,15 @@ private static Stream filterSpringConfigs( config -> config.getSpring() != null && config.getSpring().isEnabled()); } + @Nonnull + @Override + public List getClassLoaders() { + if (!super.getClassLoaders().isEmpty()) { + return super.getClassLoaders(); + } + return List.of(SpringPluginFactory.class.getClassLoader()); + } + /** * Override to inject classes from Spring * @@ -137,7 +146,8 @@ protected List build( log.warn( "Failed to load class {} from loader {}", config.getClassName(), - classLoader.getName()); + classLoader.getName(), + e); } } diff --git a/metadata-service/war/src/main/java/com/linkedin/gms/CommonApplicationConfig.java b/metadata-service/war/src/main/java/com/linkedin/gms/CommonApplicationConfig.java index c44cb4eaa1ac3b..bc623c3cc983c2 100644 --- a/metadata-service/war/src/main/java/com/linkedin/gms/CommonApplicationConfig.java +++ b/metadata-service/war/src/main/java/com/linkedin/gms/CommonApplicationConfig.java @@ -37,6 +37,7 @@ "com.linkedin.gms.factory.search", "com.linkedin.gms.factory.secret", "com.linkedin.gms.factory.timeseries", + "com.linkedin.gms.factory.plugins" }) @PropertySource(value = "classpath:/application.yaml", factory = YamlPropertySourceFactory.class) @Configuration