Skip to content

Commit

Permalink
Iceberg and Delta target: sync schema field comments
Browse files Browse the repository at this point in the history
It fixed the issue that the comments are not synced correctly when the target table is Iceberg or Delta.
For Delta target, extract the comment from internal schema and add to Spark schema representative.
For Iceberg target, make sure any difference of comments is captured during syncSchema() and the schema is updated with new comments.
  • Loading branch information
Hanzhi Wang authored and the-other-tim-brown committed Nov 4, 2024
1 parent fb08e2d commit ced9223
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DeltaSchemaExtractor {
private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id";
private static final String COMMENT = "comment";
private static final DeltaSchemaExtractor INSTANCE = new DeltaSchemaExtractor();

public static DeltaSchemaExtractor getInstance() {
Expand All @@ -74,7 +75,7 @@ public StructType fromInternalSchema(InternalSchema internalSchema) {
field.getName(),
convertFieldType(field),
field.getSchema().isNullable(),
getMetaData(field.getSchema().getDataType())))
getMetaData(field.getSchema())))
.toArray(StructField[]::new);
return new StructType(fields);
}
Expand Down Expand Up @@ -144,12 +145,16 @@ private DataType convertFieldType(InternalField field) {
}
}

private Metadata getMetaData(InternalType type) {
private Metadata getMetaData(InternalSchema schema) {
InternalType type = schema.getDataType();
MetadataBuilder metadataBuilder = new MetadataBuilder();
if (type == InternalType.UUID) {
return new MetadataBuilder().putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid").build();
} else {
return Metadata.empty();
metadataBuilder.putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid");
}
if (schema.getComment() != null) {
metadataBuilder.putString(COMMENT, schema.getComment());
}
return metadataBuilder.build();
}

public InternalSchema toInternalSchema(StructType structType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;

Expand Down Expand Up @@ -132,6 +133,12 @@ private static Map<Integer, Supplier<UpdateSchema>> updateColumn(
latestColumn.fieldId(), () -> updateSchema.requireColumn(latestColumn.name()));
}
}
// update the comment of the column
if (!Objects.equals(currentColumn.doc(), latestColumn.doc())) {
updates.put(
latestColumn.fieldId(),
() -> updateSchema.updateColumnDoc(latestColumn.name(), latestColumn.doc()));
}
if (latestColumn.type().isStructType()) {
updates.putAll(
addUpdates(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void testPrimitiveTypes() {
.name("boolean")
.dataType(InternalType.BOOLEAN)
.isNullable(false)
.comment("requiredBooleanComment")
.build())
.build(),
InternalField.builder()
Expand Down Expand Up @@ -226,7 +227,7 @@ public void testPrimitiveTypes() {

StructType structRepresentation =
new StructType()
.add("requiredBoolean", DataTypes.BooleanType, false)
.add("requiredBoolean", DataTypes.BooleanType, false, "requiredBooleanComment")
.add("optionalBoolean", DataTypes.BooleanType, true)
.add("requiredInt", DataTypes.IntegerType, false)
.add("optionalInt", DataTypes.IntegerType, true)
Expand Down Expand Up @@ -268,6 +269,7 @@ public void testFixedBytes() {
.name("fixed")
.dataType(InternalType.FIXED)
.isNullable(false)
.comment("comment")
.build())
.build(),
InternalField.builder()
Expand Down Expand Up @@ -296,6 +298,7 @@ public void testFixedBytes() {
.name("binary")
.dataType(InternalType.BYTES)
.isNullable(false)
.comment("comment")
.build())
.build(),
InternalField.builder()
Expand All @@ -311,7 +314,7 @@ public void testFixedBytes() {
.build();
StructType structRepresentation =
new StructType()
.add("requiredFixed", DataTypes.BinaryType, false)
.add("requiredFixed", DataTypes.BinaryType, false, "comment")
.add("optionalFixed", DataTypes.BinaryType, true);

Assertions.assertEquals(
Expand Down Expand Up @@ -681,6 +684,7 @@ public void testNestedRecords() {
.name("struct")
.dataType(InternalType.RECORD)
.isNullable(true)
.comment("comment")
.fields(
Arrays.asList(
InternalField.builder()
Expand All @@ -691,6 +695,7 @@ public void testNestedRecords() {
.name("integer")
.dataType(InternalType.INT)
.isNullable(true)
.comment("nestedOptionalIntComment")
.build())
.defaultValue(
InternalField.Constants.NULL_DEFAULT_VALUE)
Expand Down Expand Up @@ -740,13 +745,18 @@ public void testNestedRecords() {
.add(
"nestedOne",
new StructType()
.add("nestedOptionalInt", DataTypes.IntegerType, true)
.add(
"nestedOptionalInt",
DataTypes.IntegerType,
true,
"nestedOptionalIntComment")
.add("nestedRequiredDouble", DataTypes.DoubleType, false)
.add(
"nestedTwo",
new StructType().add("doublyNestedString", DataTypes.StringType, true),
false),
true);
true,
"comment");
Assertions.assertEquals(
structRepresentation,
DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ public class TestIcebergSchemaSync {

private static final Schema SCHEMA =
new Schema(
Types.NestedField.required(1, "timestamp_field", Types.TimestampType.withoutZone()),
Types.NestedField.required(
1, "timestamp_field", Types.TimestampType.withoutZone(), "doc"),
Types.NestedField.optional(2, "date_field", Types.DateType.get()),
Types.NestedField.required(3, "group_id", Types.IntegerType.get()),
Types.NestedField.required(
4,
"record",
Types.StructType.of(
Types.NestedField.required(5, "string_field", Types.StringType.get()),
Types.NestedField.required(5, "string_field", Types.StringType.get(), "doc"),
Types.NestedField.required(6, "int_field", Types.IntegerType.get()))),
Types.NestedField.required(
7,
Expand Down Expand Up @@ -228,6 +229,112 @@ public void testMakeExistingColumnRequired() {
verify(mockUpdateSchema).commit();
}

@Test
void testAddFieldComment() {
UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class);
when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema);
Types.NestedField updated =
Types.NestedField.optional(2, "date_field", Types.DateType.get(), "doc");
Schema latest = addCommentToDefault(updated, 2);

schemaSync.sync(SCHEMA, latest, mockTransaction);

verify(mockUpdateSchema).updateColumnDoc("date_field", "doc");
verify(mockUpdateSchema).commit();
}

@Test
void testDropFieldComment() {
UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class);
when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema);
Types.NestedField updated =
Types.NestedField.optional(1, "timestamp_field", Types.DateType.get());
Schema latest = addCommentToDefault(updated, 1);

schemaSync.sync(SCHEMA, latest, mockTransaction);

verify(mockUpdateSchema).updateColumnDoc("timestamp_field", null);
verify(mockUpdateSchema).commit();
}

@Test
void tesUpdatedFieldComment() {
UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class);
when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema);
Types.NestedField updated =
Types.NestedField.optional(1, "timestamp_field", Types.DateType.get(), "new comment");
Schema latest = addCommentToDefault(updated, 1);

schemaSync.sync(SCHEMA, latest, mockTransaction);

verify(mockUpdateSchema).updateColumnDoc("timestamp_field", "new comment");
verify(mockUpdateSchema).commit();
}

@Test
void testAddNestedFieldComment() {
UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class);
when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema);
Types.NestedField updated =
Types.NestedField.required(
4,
"record",
Types.StructType.of(
Types.NestedField.required(5, "string_field", Types.StringType.get(), "doc"),
Types.NestedField.required(6, "int_field", Types.IntegerType.get(), "doc")));
Schema latest = addCommentToDefault(updated, 4);

schemaSync.sync(SCHEMA, latest, mockTransaction);

verify(mockUpdateSchema).updateColumnDoc("int_field", "doc");
verify(mockUpdateSchema).commit();
}

@Test
void testAddListFieldComment() {
UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class);
when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema);
Types.NestedField updated =
Types.NestedField.required(
10,
"array_field",
Types.ListType.ofRequired(
11,
Types.StructType.of(
Types.NestedField.required(15, "element_string", Types.StringType.get(), "doc"),
Types.NestedField.optional(16, "element_int", Types.IntegerType.get()))));
Schema latest = addCommentToDefault(updated, 10);

schemaSync.sync(SCHEMA, latest, mockTransaction);

verify(mockUpdateSchema).updateColumnDoc("element_string", "doc");
verify(mockUpdateSchema).commit();
}

@Test
void testAddMapFieldComment() {
UpdateSchema mockUpdateSchema = Mockito.mock(UpdateSchema.class);
when(mockTransaction.updateSchema()).thenReturn(mockUpdateSchema);
Types.NestedField updated =
Types.NestedField.required(
7,
"map_field",
Types.MapType.ofRequired(
8,
9,
Types.StructType.of(
Types.NestedField.required(12, "key_string", Types.StringType.get())),
Types.StructType.of(
Types.NestedField.required(13, "value_string", Types.StringType.get(), "doc"),
Types.NestedField.optional(14, "value_int", Types.IntegerType.get()))));
Schema latest = addCommentToDefault(updated, 7);

schemaSync.sync(SCHEMA, latest, mockTransaction);

verify(mockUpdateSchema).updateColumnDoc("value_string", "doc");
verify(mockUpdateSchema).commit();
}

private Schema addColumnToDefault(Schema schema, Types.NestedField field, Integer parentId) {
List<Types.NestedField> fields = new ArrayList<>();
for (Types.NestedField existingField : schema.columns()) {
Expand All @@ -251,6 +358,18 @@ private Schema addColumnToDefault(Schema schema, Types.NestedField field, Intege
return new Schema(fields);
}

private Schema addCommentToDefault(Types.NestedField updated, int fieldId) {
List<Types.NestedField> fields = new ArrayList<>();
for (Types.NestedField existingField : SCHEMA.columns()) {
if (existingField.fieldId() == fieldId) {
fields.add(updated);
} else {
fields.add(existingField);
}
}
return new Schema(fields);
}

private Schema updateFieldRequired(int fieldId) {
List<Types.NestedField> fields = new ArrayList<>();
for (Types.NestedField existingField : SCHEMA.columns()) {
Expand All @@ -260,7 +379,8 @@ private Schema updateFieldRequired(int fieldId) {
existingField.fieldId(),
!existingField.isOptional(),
existingField.name(),
existingField.type()));
existingField.type(),
existingField.doc()));
} else {
fields.add(existingField);
}
Expand Down

0 comments on commit ced9223

Please sign in to comment.