Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mcp-mutator): new mcp mutator plugin #10904

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.SystemMetadata;
import java.lang.reflect.InvocationTargetException;
import javax.annotation.Nonnull;
Expand All @@ -26,6 +27,9 @@ public interface ReadItem {
*/
@Nonnull
default String getAspectName() {
if (getAspectSpec() == null) {
return GenericAspect.dataSchema().getName();
}
return getAspectSpec().getName();
}

Expand Down Expand Up @@ -72,6 +76,6 @@ static <T> T getAspect(Class<T> clazz, @Nullable RecordTemplate recordTemplate)
*
* @return aspect's specification
*/
@Nonnull
@Nullable
AspectSpec getAspectSpec();
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ static void applyWriteMutationHooks(
}
}

default Stream<MCPItem> applyProposalMutationHooks(
Collection<MCPItem> proposedItems, @Nonnull RetrieverContext retrieverContext) {
return retrieverContext.getAspectRetriever().getEntityRegistry().getAllMutationHooks().stream()
.flatMap(
mutationHook -> mutationHook.applyProposalMutation(proposedItems, retrieverContext));
}
Comment on lines +87 to +92
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approve the method design and suggest additional checks or optimizations.

The method applyProposalMutationHooks is well-designed and integrates seamlessly with existing functionality. Consider adding checks for null or empty collections to prevent unnecessary processing.

+ if (proposedItems == null || proposedItems.isEmpty()) {
+     return Stream.empty();
+ }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
default Stream<MCPItem> applyProposalMutationHooks(
Collection<MCPItem> proposedItems, @Nonnull RetrieverContext retrieverContext) {
return retrieverContext.getAspectRetriever().getEntityRegistry().getAllMutationHooks().stream()
.flatMap(
mutationHook -> mutationHook.applyProposalMutation(proposedItems, retrieverContext));
}
default Stream<MCPItem> applyProposalMutationHooks(
Collection<MCPItem> proposedItems, @Nonnull RetrieverContext retrieverContext) {
if (proposedItems == null || proposedItems.isEmpty()) {
return Stream.empty();
}
return retrieverContext.getAspectRetriever().getEntityRegistry().getAllMutationHooks().stream()
.flatMap(
mutationHook -> mutationHook.applyProposalMutation(proposedItems, retrieverContext));
}


default <T extends BatchItem> ValidationExceptionCollection validateProposed(
Collection<T> mcpItems) {
return validateProposed(mcpItems, getRetrieverContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -25,20 +24,13 @@ public boolean enabled() {
}

public boolean shouldApply(
@Nullable ChangeType changeType, @Nonnull Urn entityUrn, @Nonnull AspectSpec aspectSpec) {
return shouldApply(changeType, entityUrn.getEntityType(), aspectSpec);
@Nullable ChangeType changeType, @Nonnull Urn entityUrn, @Nonnull String aspectName) {
return shouldApply(changeType, entityUrn.getEntityType(), aspectName);
}

public boolean shouldApply(
@Nullable ChangeType changeType,
@Nonnull EntitySpec entitySpec,
@Nonnull AspectSpec aspectSpec) {
return shouldApply(changeType, entitySpec.getName(), aspectSpec.getName());
}

public boolean shouldApply(
@Nullable ChangeType changeType, @Nonnull String entityName, @Nonnull AspectSpec aspectSpec) {
return shouldApply(changeType, entityName, aspectSpec.getName());
@Nullable ChangeType changeType, @Nonnull EntitySpec entitySpec, @Nonnull String aspectName) {
return shouldApply(changeType, entitySpec.getName(), aspectName);
}

public boolean shouldApply(
Expand All @@ -49,8 +41,8 @@ && isChangeTypeSupported(changeType)
}

protected boolean isEntityAspectSupported(
@Nonnull EntitySpec entitySpec, @Nonnull AspectSpec aspectSpec) {
return isEntityAspectSupported(entitySpec.getName(), aspectSpec.getName());
@Nonnull EntitySpec entitySpec, @Nonnull String aspectName) {
return isEntityAspectSupported(entitySpec.getName(), aspectName);
}

protected boolean isEntityAspectSupported(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public final Stream<MCLItem> apply(
@Nonnull Collection<MCLItem> batchItems, @Nonnull RetrieverContext retrieverContext) {
return applyMCLSideEffect(
batchItems.stream()
.filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectSpec()))
.filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public final Stream<ChangeMCP> apply(
Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {
return applyMCPSideEffect(
changeMCPS.stream()
.filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectSpec()))
.filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}
Expand All @@ -41,7 +41,7 @@ public final Stream<MCPItem> postApply(
Collection<MCLItem> mclItems, @Nonnull RetrieverContext retrieverContext) {
return postMCPSideEffect(
mclItems.stream()
.filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectSpec()))
.filter(item -> shouldApply(item.getChangeType(), item.getUrn(), item.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.metadata.aspect.ReadItem;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.plugins.PluginSpec;
import com.linkedin.util.Pair;
import java.util.Collection;
Expand All @@ -24,7 +25,7 @@ public final Stream<Pair<ChangeMCP, Boolean>> applyWriteMutation(
@Nonnull Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {
return writeMutation(
changeMCPS.stream()
.filter(i -> shouldApply(i.getChangeType(), i.getEntitySpec(), i.getAspectSpec()))
.filter(i -> shouldApply(i.getChangeType(), i.getEntitySpec(), i.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}
Expand All @@ -34,7 +35,23 @@ public final Stream<Pair<ReadItem, Boolean>> applyReadMutation(
@Nonnull Collection<ReadItem> items, @Nonnull RetrieverContext retrieverContext) {
return readMutation(
items.stream()
.filter(i -> isEntityAspectSupported(i.getEntitySpec(), i.getAspectSpec()))
.filter(i -> isEntityAspectSupported(i.getEntitySpec(), i.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}

/**
* Apply Proposal mutations prior to validation
*
* @param mcpItems wrapper for MCP
* @param retrieverContext retriever context
* @return stream of mutated Proposal items
*/
public final Stream<MCPItem> applyProposalMutation(
@Nonnull Collection<MCPItem> mcpItems, @Nonnull RetrieverContext retrieverContext) {
return proposalMutation(
mcpItems.stream()
.filter(i -> shouldApply(i.getChangeType(), i.getEntitySpec(), i.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}
Expand All @@ -48,4 +65,9 @@ protected Stream<Pair<ChangeMCP, Boolean>> writeMutation(
@Nonnull Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {
return changeMCPS.stream().map(i -> Pair.of(i, false));
}

protected Stream<MCPItem> proposalMutation(
@Nonnull Collection<MCPItem> mcpItems, @Nonnull RetrieverContext retrieverContext) {
return Stream.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public final Stream<AspectValidationException> validateProposed(
@Nonnull RetrieverContext retrieverContext) {
return validateProposedAspects(
mcpItems.stream()
.filter(i -> shouldApply(i.getChangeType(), i.getUrn(), i.getAspectSpec()))
.filter(i -> shouldApply(i.getChangeType(), i.getUrn(), i.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}
Expand All @@ -37,7 +37,7 @@ public final Stream<AspectValidationException> validatePreCommit(
@Nonnull Collection<ChangeMCP> changeMCPs, @Nonnull RetrieverContext retrieverContext) {
return validatePreCommitAspects(
changeMCPs.stream()
.filter(i -> shouldApply(i.getChangeType(), i.getUrn(), i.getAspectSpec()))
.filter(i -> shouldApply(i.getChangeType(), i.getUrn(), i.getAspectName()))
.collect(Collectors.toList()),
retrieverContext);
}
Expand Down
1 change: 1 addition & 0 deletions metadata-io/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
api project(':metadata-service:services')
api project(':metadata-operation-context')

implementation spec.product.pegasus.restliServer
implementation spec.product.pegasus.data
implementation spec.product.pegasus.generator

Expand Down
7 changes: 7 additions & 0 deletions metadata-io/metadata-io-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,11 @@ dependencies {
implementation project(':metadata-utils')
compileOnly externalDependency.lombok
annotationProcessor externalDependency.lombok

testImplementation(externalDependency.testng)
testImplementation(externalDependency.mockito)
testImplementation(testFixtures(project(":entity-registry")))
testImplementation project(':metadata-operation-context')
testImplementation externalDependency.lombok
testAnnotationProcessor externalDependency.lombok
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.util.Pair;
Expand All @@ -18,6 +19,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -44,9 +46,20 @@ public class AspectsBatchImpl implements AspectsBatch {
public Pair<Map<String, Set<String>>, List<ChangeMCP>> toUpsertBatchItems(
final Map<String, Map<String, SystemAspect>> latestAspects) {

// Process proposals to change items
Stream<ChangeMCP> mutatedProposalsStream =
proposedItemsToChangeItemStream(
items.stream()
.filter(item -> item instanceof ProposedItem)
.map(item -> (MCPItem) item)
.collect(Collectors.toList()));
// Regular change items
Stream<? extends BatchItem> changeMCPStream =
items.stream().filter(item -> !(item instanceof ProposedItem));

// Convert patches to upserts if needed
LinkedList<ChangeMCP> upsertBatchItems =
items.stream()
Stream.concat(mutatedProposalsStream, changeMCPStream)
.map(
item -> {
final String urnStr = item.getUrn().toString();
Expand Down Expand Up @@ -85,6 +98,17 @@ public Pair<Map<String, Set<String>>, List<ChangeMCP>> toUpsertBatchItems(
return Pair.of(newUrnAspectNames, upsertBatchItems);
}

private Stream<ChangeMCP> proposedItemsToChangeItemStream(List<MCPItem> proposedItems) {
return applyProposalMutationHooks(proposedItems, retrieverContext)
.filter(mcpItem -> mcpItem.getMetadataChangeProposal() != null)
.map(
mcpItem ->
ChangeItemImpl.ChangeItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
retrieverContext.getAspectRetriever()));
}

public static class AspectsBatchImplBuilder {
/**
* Just one aspect record template
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.linkedin.metadata.entity.ebean.batch;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/** Represents an unvalidated wrapped MCP */
@Slf4j
@Getter
@Builder(toBuilder = true)
public class ProposedItem implements MCPItem {
@Nonnull private final MetadataChangeProposal metadataChangeProposal;
@Nonnull private final AuditStamp auditStamp;
// derived
@Nonnull private EntitySpec entitySpec;
@Nullable private AspectSpec aspectSpec;

@Nonnull
@Override
public String getAspectName() {
if (metadataChangeProposal.getAspectName() != null) {
return metadataChangeProposal.getAspectName();
} else {
return MCPItem.super.getAspectName();
}
}

@Nullable
public AspectSpec getAspectSpec() {
if (aspectSpec != null) {
return aspectSpec;
}
if (entitySpec.getAspectSpecMap().containsKey(getAspectName())) {
return entitySpec.getAspectSpecMap().get(getAspectName());
}
return null;
Comment on lines +40 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize getAspectSpec() method.

The method getAspectSpec() can be optimized by caching the result of entitySpec.getAspectSpecMap().get(getAspectName()) to avoid repeated map lookups.

- if (entitySpec.getAspectSpecMap().containsKey(getAspectName())) {
-   return entitySpec.getAspectSpecMap().get(getAspectName());
- }
+ AspectSpec aspect = entitySpec.getAspectSpecMap().get(getAspectName());
+ return aspect;
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@Nullable
public AspectSpec getAspectSpec() {
if (aspectSpec != null) {
return aspectSpec;
}
if (entitySpec.getAspectSpecMap().containsKey(getAspectName())) {
return entitySpec.getAspectSpecMap().get(getAspectName());
}
return null;
@Nullable
public AspectSpec getAspectSpec() {
if (aspectSpec != null) {
return aspectSpec;
}
AspectSpec aspect = entitySpec.getAspectSpecMap().get(getAspectName());
return aspect;

}

@Nullable
@Override
public RecordTemplate getRecordTemplate() {
if (getAspectSpec() != null) {
return GenericRecordUtils.deserializeAspect(
getMetadataChangeProposal().getAspect().getValue(),
getMetadataChangeProposal().getAspect().getContentType(),
getAspectSpec());
}
return null;
}

@Nonnull
@Override
public Urn getUrn() {
return metadataChangeProposal.getEntityUrn();
}

@Nullable
@Override
public SystemMetadata getSystemMetadata() {
return metadataChangeProposal.getSystemMetadata();
}

@Nonnull
@Override
public ChangeType getChangeType() {
return metadataChangeProposal.getChangeType();
}
}
Loading
Loading