Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(changeGenerator): fix logic around descriptions and make execution more efficient #11539

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.datahub.util.RecordUtils;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
Expand Down Expand Up @@ -173,7 +174,7 @@ private static List<ChangeEvent> getFieldPropertyChangeEvents(
SchemaField baseField,
SchemaField targetField,
Urn datasetUrn,
ChangeCategory changeCategory,
Set<ChangeCategory> changeCategories,
AuditStamp auditStamp) {
List<ChangeEvent> propChangeEvents = new ArrayList<>();
String datasetFieldUrn;
Expand All @@ -184,7 +185,7 @@ private static List<ChangeEvent> getFieldPropertyChangeEvents(
}

// Description Change.
if (ChangeCategory.DOCUMENTATION.equals(changeCategory)) {
if (changeCategories != null && changeCategories.contains(ChangeCategory.DOCUMENTATION)) {
ChangeEvent descriptionChangeEvent =
getDescriptionChange(baseField, targetField, datasetFieldUrn, auditStamp);
if (descriptionChangeEvent != null) {
Expand All @@ -193,14 +194,14 @@ private static List<ChangeEvent> getFieldPropertyChangeEvents(
}

// Global Tags
if (ChangeCategory.TAG.equals(changeCategory)) {
if (changeCategories != null && changeCategories.contains(ChangeCategory.TAG)) {
propChangeEvents.addAll(
getGlobalTagChangeEvents(
baseField, targetField, datasetUrn.toString(), datasetFieldUrn, auditStamp));
}

// Glossary terms.
if (ChangeCategory.GLOSSARY_TERM.equals(changeCategory)) {
if (changeCategories != null && changeCategories.contains(ChangeCategory.GLOSSARY_TERM)) {
propChangeEvents.addAll(
getGlossaryTermsChangeEvents(
baseField, targetField, datasetUrn.toString(), datasetFieldUrn, auditStamp));
Expand All @@ -213,7 +214,7 @@ private static List<ChangeEvent> computeDiffs(
SchemaMetadata baseSchema,
SchemaMetadata targetSchema,
Urn datasetUrn,
ChangeCategory changeCategory,
Set<ChangeCategory> changeCategories,
AuditStamp auditStamp) {
// Sort the fields by their field path. This aligns both sets of fields based on field paths for
// comparisons.
Expand Down Expand Up @@ -247,11 +248,11 @@ private static List<ChangeEvent> computeDiffs(
// This is the same field. Check for change events from property changes.
if (!curBaseField.getNativeDataType().equals(curTargetField.getNativeDataType())) {
processNativeTypeChange(
changeCategory, changeEvents, datasetUrn, curBaseField, curTargetField, auditStamp);
changeCategories, changeEvents, datasetUrn, curBaseField, curTargetField, auditStamp);
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
curBaseField, curTargetField, datasetUrn, changeCategories, auditStamp);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
++targetFieldIdx;
Expand All @@ -268,16 +269,17 @@ private static List<ChangeEvent> computeDiffs(
targetFields.subList(targetFieldIdx, targetFields.size()),
renamedFields);
if (renamedField == null) {
processRemoval(changeCategory, changeEvents, datasetUrn, curBaseField, auditStamp);
processRemoval(changeCategories, changeEvents, datasetUrn, curBaseField, auditStamp);
++baseFieldIdx;
} else {
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
if (changeCategories != null
&& changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) {
changeEvents.add(
generateRenameEvent(datasetUrn, curBaseField, renamedField, auditStamp));
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
curBaseField, renamedField, datasetUrn, changeCategories, auditStamp);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
renamedFields.add(renamedField);
Expand All @@ -289,16 +291,17 @@ private static List<ChangeEvent> computeDiffs(
findRenamedField(
curTargetField, baseFields.subList(baseFieldIdx, baseFields.size()), renamedFields);
if (renamedField == null) {
processAdd(changeCategory, changeEvents, datasetUrn, curTargetField, auditStamp);
processAdd(changeCategories, changeEvents, datasetUrn, curTargetField, auditStamp);
++targetFieldIdx;
} else {
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
if (changeCategories != null
&& changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) {
changeEvents.add(
generateRenameEvent(datasetUrn, renamedField, curTargetField, auditStamp));
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
renamedField, curTargetField, datasetUrn, changeCategories, auditStamp);
changeEvents.addAll(propChangeEvents);
++targetFieldIdx;
renamedFields.add(renamedField);
Expand All @@ -309,22 +312,23 @@ private static List<ChangeEvent> computeDiffs(
// Handle removed fields. Non-backward compatible change + major version bump
SchemaField baseField = baseFields.get(baseFieldIdx);
if (!renamedFields.contains(baseField)) {
processRemoval(changeCategory, changeEvents, datasetUrn, baseField, auditStamp);
processRemoval(changeCategories, changeEvents, datasetUrn, baseField, auditStamp);
}
++baseFieldIdx;
}
while (targetFieldIdx < targetFields.size()) {
// Newly added fields. Forwards & backwards compatible change + minor version bump.
SchemaField targetField = targetFields.get(targetFieldIdx);
if (!renamedFields.contains(targetField)) {
processAdd(changeCategory, changeEvents, datasetUrn, targetField, auditStamp);
processAdd(changeCategories, changeEvents, datasetUrn, targetField, auditStamp);
}
++targetFieldIdx;
}

// Handle primary key constraint change events.
List<ChangeEvent> primaryKeyChangeEvents =
getPrimaryKeyChangeEvents(changeCategory, baseSchema, targetSchema, datasetUrn, auditStamp);
getPrimaryKeyChangeEvents(
changeCategories, baseSchema, targetSchema, datasetUrn, auditStamp);
changeEvents.addAll(primaryKeyChangeEvents);

// Handle foreign key constraint change events, currently no-op due to field not being utilized.
Expand Down Expand Up @@ -375,12 +379,12 @@ private static boolean descriptionsMatch(SchemaField curField, SchemaField schem
}

private static void processRemoval(
ChangeCategory changeCategory,
Set<ChangeCategory> changeCategories,
List<ChangeEvent> changeEvents,
Urn datasetUrn,
SchemaField baseField,
AuditStamp auditStamp) {
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
if (changeCategories != null && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) {
changeEvents.add(
DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder()
.modifier(getSchemaFieldUrn(datasetUrn, baseField).toString())
Expand All @@ -401,17 +405,17 @@ private static void processRemoval(
.build());
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(baseField, null, datasetUrn, changeCategory, auditStamp);
getFieldPropertyChangeEvents(baseField, null, datasetUrn, changeCategories, auditStamp);
changeEvents.addAll(propChangeEvents);
}

private static void processAdd(
ChangeCategory changeCategory,
Set<ChangeCategory> changeCategories,
List<ChangeEvent> changeEvents,
Urn datasetUrn,
SchemaField targetField,
AuditStamp auditStamp) {
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
if (changeCategories != null && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) {
changeEvents.add(
DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder()
.modifier(getSchemaFieldUrn(datasetUrn, targetField).toString())
Expand All @@ -428,22 +432,23 @@ private static void processAdd(
.fieldUrn(getSchemaFieldUrn(datasetUrn, targetField))
.nullable(targetField.isNullable())
.auditStamp(auditStamp)
.modificationCategory(SchemaFieldModificationCategory.OTHER)
.build());
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(null, targetField, datasetUrn, changeCategory, auditStamp);
getFieldPropertyChangeEvents(null, targetField, datasetUrn, changeCategories, auditStamp);
changeEvents.addAll(propChangeEvents);
}

private static void processNativeTypeChange(
ChangeCategory changeCategory,
Set<ChangeCategory> changeCategories,
List<ChangeEvent> changeEvents,
Urn datasetUrn,
SchemaField curBaseField,
SchemaField curTargetField,
AuditStamp auditStamp) {
// Non-backward compatible change + Major version bump
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
if (changeCategories != null && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) {
changeEvents.add(
DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
Expand Down Expand Up @@ -505,12 +510,12 @@ private static List<ChangeEvent> getForeignKeyChangeEvents() {
}

private static List<ChangeEvent> getPrimaryKeyChangeEvents(
ChangeCategory changeCategory,
Set<ChangeCategory> changeCategories,
SchemaMetadata baseSchema,
SchemaMetadata targetSchema,
Urn datasetUrn,
AuditStamp auditStamp) {
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
if (changeCategories != null && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) {
List<ChangeEvent> primaryKeyChangeEvents = new ArrayList<>();
Set<String> basePrimaryKeys =
(baseSchema != null && baseSchema.getPrimaryKeys() != null)
Expand Down Expand Up @@ -598,7 +603,7 @@ public ChangeTransaction getSemanticDiff(
baseSchema,
targetSchema,
DatasetUrn.createFromString(currentValue.getUrn()),
changeCategory,
Collections.singleton(changeCategory),
null));
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Malformed DatasetUrn " + currentValue.getUrn());
Expand Down Expand Up @@ -632,18 +637,16 @@ public List<ChangeEvent> getChangeEvents(
@Nonnull Aspect<SchemaMetadata> from,
@Nonnull Aspect<SchemaMetadata> to,
@Nonnull AuditStamp auditStamp) {
final List<ChangeEvent> changeEvents = new ArrayList<>();
changeEvents.addAll(
computeDiffs(
from.getValue(), to.getValue(), urn, ChangeCategory.DOCUMENTATION, auditStamp));
changeEvents.addAll(
computeDiffs(from.getValue(), to.getValue(), urn, ChangeCategory.TAG, auditStamp));
changeEvents.addAll(
return new ArrayList<>(
computeDiffs(
from.getValue(), to.getValue(), urn, ChangeCategory.TECHNICAL_SCHEMA, auditStamp));
changeEvents.addAll(
computeDiffs(
from.getValue(), to.getValue(), urn, ChangeCategory.GLOSSARY_TERM, auditStamp));
return changeEvents;
from.getValue(),
to.getValue(),
urn,
ImmutableSet.of(
ChangeCategory.DOCUMENTATION,
ChangeCategory.TAG,
ChangeCategory.TECHNICAL_SCHEMA,
ChangeCategory.GLOSSARY_TERM),
auditStamp));
}
}
Loading
Loading