diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java index 1d81313ed625..4368ba7af564 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java @@ -19,6 +19,8 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.Type; +import com.google.cloud.spanner.Value; import java.util.HashSet; import java.util.List; import java.util.stream.Collectors; @@ -277,22 +279,25 @@ private ChildPartitionsRecord toChildPartitionsRecord( } private ColumnType columnTypeFrom(Struct struct) { + // TODO: Move to type struct.getJson when backend is fully migrated + final String type = getJsonString(struct.getValue(TYPE_COLUMN)); return new ColumnType( struct.getString(NAME_COLUMN), - new TypeCode(struct.getString(TYPE_COLUMN)), + new TypeCode(type), struct.getBoolean(IS_PRIMARY_KEY_COLUMN), struct.getLong(ORDINAL_POSITION_COLUMN)); } private Mod modFrom(Struct struct) { - final String keysJson = struct.getString(KEYS_COLUMN); - final String oldValuesJson = - struct.isNull(OLD_VALUES_COLUMN) ? null : struct.getString(OLD_VALUES_COLUMN); - final String newValuesJson = - struct.isNull(NEW_VALUES_COLUMN) - ? null - : struct.getString(ChangeStreamRecordMapper.NEW_VALUES_COLUMN); - return new Mod(keysJson, oldValuesJson, newValuesJson); + // TODO: Move to keys struct.getJson when backend is fully migrated + final String keys = getJsonString(struct.getValue(KEYS_COLUMN)); + // TODO: Move to oldValues struct.getJson when backend is fully migrated + final String oldValues = + struct.isNull(OLD_VALUES_COLUMN) ? null : getJsonString(struct.getValue(OLD_VALUES_COLUMN)); + // TODO: Move to newValues struct.getJson when backend is fully migrated + final String newValues = + struct.isNull(NEW_VALUES_COLUMN) ? null : getJsonString(struct.getValue(NEW_VALUES_COLUMN)); + return new Mod(keys, oldValues, newValues); } private ChildPartition childPartitionFrom(String partitionToken, Struct struct) { @@ -324,4 +329,15 @@ private ChangeStreamRecordMetadata changeStreamRecordMetadataFrom( .withNumberOfRecordsRead(resultSetMetadata.getNumberOfRecordsRead()) .build(); } + + // TODO: Remove when backend is fully migrated to JSON + private String getJsonString(Value value) { + if (value.getType().equals(Type.json())) { + return value.getJson(); + } else if (value.getType().equals(Type.string())) { + return value.getString(); + } else { + throw new IllegalArgumentException("Can not extract string from value " + value); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java index 26609f650d77..50f42927e58f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java @@ -17,7 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper; -import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStruct; +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithJson; +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithStrings; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -98,11 +99,15 @@ public void testMappingUpdateStructRowToDataChangeRecord() { 10L, 2L, null); - final Struct struct = recordsToStruct(dataChangeRecord); + final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord); + final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord); assertEquals( Collections.singletonList(dataChangeRecord), - mapper.toChangeStreamRecords(partition, struct, resultSetMetadata)); + mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata)); + assertEquals( + Collections.singletonList(dataChangeRecord), + mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata)); } @Test @@ -125,11 +130,15 @@ public void testMappingInsertStructRowToDataChangeRecord() { 10L, 2L, null); - final Struct struct = recordsToStruct(dataChangeRecord); + final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord); + final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord); assertEquals( Collections.singletonList(dataChangeRecord), - mapper.toChangeStreamRecords(partition, struct, resultSetMetadata)); + mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata)); + assertEquals( + Collections.singletonList(dataChangeRecord), + mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata)); } @Test @@ -152,18 +161,22 @@ public void testMappingDeleteStructRowToDataChangeRecord() { 10L, 2L, null); - final Struct struct = recordsToStruct(dataChangeRecord); + final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord); + final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord); assertEquals( Collections.singletonList(dataChangeRecord), - mapper.toChangeStreamRecords(partition, struct, resultSetMetadata)); + mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata)); + assertEquals( + Collections.singletonList(dataChangeRecord), + mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata)); } @Test public void testMappingStructRowToHeartbeatRecord() { final HeartbeatRecord heartbeatRecord = new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null); - final Struct struct = recordsToStruct(heartbeatRecord); + final Struct struct = recordsToStructWithStrings(heartbeatRecord); assertEquals( Collections.singletonList(heartbeatRecord), @@ -180,7 +193,7 @@ public void testMappingStructRowToChildPartitionRecord() { new ChildPartition("childToken1", Sets.newHashSet("parentToken1", "parentToken2")), new ChildPartition("childToken2", Sets.newHashSet("parentToken1", "parentToken2"))), null); - final Struct struct = recordsToStruct(childPartitionsRecord); + final Struct struct = recordsToStructWithStrings(childPartitionsRecord); assertEquals( Collections.singletonList(childPartitionsRecord), @@ -191,7 +204,7 @@ public void testMappingStructRowToChildPartitionRecord() { @Test public void testMappingStructRowFromInitialPartitionToChildPartitionRecord() { final Struct struct = - recordsToStruct( + recordsToStructWithStrings( new ChildPartitionsRecord( Timestamp.ofTimeSecondsAndNanos(10L, 20), "1", @@ -217,7 +230,4 @@ public void testMappingStructRowFromInitialPartitionToChildPartitionRecord() { Collections.singletonList(expected), mapper.toChangeStreamRecords(initialPartition, struct, resultSetMetadata)); } - - // TODO: Add test case for unknown record type - // TODO: Add test case for malformed record } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java index b699e0a98663..a3a434c6eed3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java @@ -38,26 +38,53 @@ public class TestStructMapper { Type.struct( StructField.of("token", Type.string()), StructField.of("parent_partition_tokens", Type.array(Type.string()))); - private static final Type COLUMN_TYPE_TYPE = + // TODO: Remove COLUMN_TYPE_STRING_TYPE when backend has fully migrated to use JSON + private static final Type COLUMN_TYPE_STRING_TYPE = Type.struct( StructField.of("name", Type.string()), StructField.of("type", Type.string()), StructField.of("is_primary_key", Type.bool()), StructField.of("ordinal_position", Type.int64())); - private static final Type MOD_TYPE = + private static final Type COLUMN_TYPE_JSON_TYPE = + Type.struct( + StructField.of("name", Type.string()), + StructField.of("type", Type.json()), + StructField.of("is_primary_key", Type.bool()), + StructField.of("ordinal_position", Type.int64())); + // TODO: Remove MOD_STRING_TYPE when backend has fully migrated to use JSON + private static final Type MOD_STRING_TYPE = Type.struct( StructField.of("keys", Type.string()), StructField.of("new_values", Type.string()), StructField.of("old_values", Type.string())); - private static final Type DATA_CHANGE_RECORD_TYPE = + private static final Type MOD_JSON_TYPE = + Type.struct( + StructField.of("keys", Type.json()), + StructField.of("new_values", Type.json()), + StructField.of("old_values", Type.json())); + // TODO: Remove DATA_CHANGE_RECORD_STRING_TYPE when backend has fully migrated to use JSON + private static final Type DATA_CHANGE_RECORD_STRING_TYPE = + Type.struct( + StructField.of("commit_timestamp", Type.timestamp()), + StructField.of("record_sequence", Type.string()), + StructField.of("server_transaction_id", Type.string()), + StructField.of("is_last_record_in_transaction_in_partition", Type.bool()), + StructField.of("table_name", Type.string()), + StructField.of("column_types", Type.array(COLUMN_TYPE_STRING_TYPE)), + StructField.of("mods", Type.array(MOD_STRING_TYPE)), + StructField.of("mod_type", Type.string()), + StructField.of("value_capture_type", Type.string()), + StructField.of("number_of_records_in_transaction", Type.int64()), + StructField.of("number_of_partitions_in_transaction", Type.int64())); + private static final Type DATA_CHANGE_RECORD_JSON_TYPE = Type.struct( StructField.of("commit_timestamp", Type.timestamp()), StructField.of("record_sequence", Type.string()), StructField.of("server_transaction_id", Type.string()), StructField.of("is_last_record_in_transaction_in_partition", Type.bool()), StructField.of("table_name", Type.string()), - StructField.of("column_types", Type.array(COLUMN_TYPE_TYPE)), - StructField.of("mods", Type.array(MOD_TYPE)), + StructField.of("column_types", Type.array(COLUMN_TYPE_JSON_TYPE)), + StructField.of("mods", Type.array(MOD_JSON_TYPE)), StructField.of("mod_type", Type.string()), StructField.of("value_capture_type", Type.string()), StructField.of("number_of_records_in_transaction", Type.int64()), @@ -69,39 +96,59 @@ public class TestStructMapper { StructField.of("start_timestamp", Type.timestamp()), StructField.of("record_sequence", Type.string()), StructField.of("child_partitions", Type.array(CHILD_PARTITION_TYPE))); - private static final Type STREAM_RECORD_TYPE = + // TODO: Remove STREAM_RECORD_STRING_TYPE when backend has fully migrated to use JSON + private static final Type STREAM_RECORD_STRING_TYPE = Type.struct( - StructField.of("data_change_record", Type.array(DATA_CHANGE_RECORD_TYPE)), + StructField.of("data_change_record", Type.array(DATA_CHANGE_RECORD_STRING_TYPE)), StructField.of("heartbeat_record", Type.array(HEARTBEAT_RECORD_TYPE)), StructField.of("child_partitions_record", Type.array(CHILD_PARTITIONS_RECORD_TYPE))); + private static final Type STREAM_RECORD_JSON_TYPE = + Type.struct( + StructField.of("data_change_record", Type.array(DATA_CHANGE_RECORD_JSON_TYPE)), + StructField.of("heartbeat_record", Type.array(HEARTBEAT_RECORD_TYPE)), + StructField.of("child_partitions_record", Type.array(CHILD_PARTITIONS_RECORD_TYPE))); + + public static Struct recordsToStructWithJson(ChangeStreamRecord... records) { + return recordsToStruct(true, records); + } + + // TODO: Remove when backend is fully migrated to JSON + public static Struct recordsToStructWithStrings(ChangeStreamRecord... records) { + return recordsToStruct(false, records); + } - public static Struct recordsToStruct(ChangeStreamRecord... records) { + private static Struct recordsToStruct(boolean useJsonFields, ChangeStreamRecord... records) { + final Type streamRecordType = + useJsonFields ? STREAM_RECORD_JSON_TYPE : STREAM_RECORD_STRING_TYPE; return Struct.newBuilder() .add( Value.structArray( - STREAM_RECORD_TYPE, + streamRecordType, Arrays.stream(records) - .map(TestStructMapper::streamRecordStructFrom) + .map(record -> TestStructMapper.streamRecordStructFrom(record, useJsonFields)) .collect(Collectors.toList()))) .build(); } - private static Struct streamRecordStructFrom(ChangeStreamRecord record) { + private static Struct streamRecordStructFrom(ChangeStreamRecord record, boolean useJsonFields) { if (record instanceof DataChangeRecord) { - return streamRecordStructFrom((DataChangeRecord) record); + return streamRecordStructFrom((DataChangeRecord) record, useJsonFields); } else if (record instanceof HeartbeatRecord) { - return streamRecordStructFrom((HeartbeatRecord) record); + return streamRecordStructFrom((HeartbeatRecord) record, useJsonFields); } else if (record instanceof ChildPartitionsRecord) { - return streamRecordStructFrom((ChildPartitionsRecord) record); + return streamRecordStructFrom((ChildPartitionsRecord) record, useJsonFields); } else { throw new UnsupportedOperationException("Unimplemented mapping for " + record.getClass()); } } - private static Struct streamRecordStructFrom(ChildPartitionsRecord record) { + private static Struct streamRecordStructFrom( + ChildPartitionsRecord record, boolean useJsonFields) { + final Type dataChangeRecordType = + useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE : DATA_CHANGE_RECORD_STRING_TYPE; return Struct.newBuilder() .set("data_change_record") - .to(Value.structArray(DATA_CHANGE_RECORD_TYPE, Collections.emptyList())) + .to(Value.structArray(dataChangeRecordType, Collections.emptyList())) .set("heartbeat_record") .to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList())) .set("child_partitions_record") @@ -128,10 +175,12 @@ private static Struct recordStructFrom(ChildPartitionsRecord record) { .build(); } - private static Struct streamRecordStructFrom(HeartbeatRecord record) { + private static Struct streamRecordStructFrom(HeartbeatRecord record, boolean useJsonFields) { + final Type dataChangeRecordType = + useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE : DATA_CHANGE_RECORD_STRING_TYPE; return Struct.newBuilder() .set("data_change_record") - .to(Value.structArray(DATA_CHANGE_RECORD_TYPE, Collections.emptyList())) + .to(Value.structArray(dataChangeRecordType, Collections.emptyList())) .set("heartbeat_record") .to( Value.structArray( @@ -145,12 +194,15 @@ private static Struct recordStructFrom(HeartbeatRecord record) { return Struct.newBuilder().set("timestamp").to(record.getTimestamp()).build(); } - private static Struct streamRecordStructFrom(DataChangeRecord record) { + private static Struct streamRecordStructFrom(DataChangeRecord record, boolean useJsonFields) { + final Type dataChangeRecordType = + useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE : DATA_CHANGE_RECORD_STRING_TYPE; return Struct.newBuilder() .set("data_change_record") .to( Value.structArray( - DATA_CHANGE_RECORD_TYPE, Collections.singletonList(recordStructFrom(record)))) + dataChangeRecordType, + Collections.singletonList(recordStructFrom(record, useJsonFields)))) .set("heartbeat_record") .to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList())) .set("child_partitions_record") @@ -158,18 +210,20 @@ private static Struct streamRecordStructFrom(DataChangeRecord record) { .build(); } - private static Struct recordStructFrom(DataChangeRecord record) { + private static Struct recordStructFrom(DataChangeRecord record, boolean useJsonFields) { + final Type columnTypeType = useJsonFields ? COLUMN_TYPE_JSON_TYPE : COLUMN_TYPE_STRING_TYPE; + final Type modType = useJsonFields ? MOD_JSON_TYPE : MOD_STRING_TYPE; final Value columnTypes = Value.structArray( - COLUMN_TYPE_TYPE, + columnTypeType, record.getRowType().stream() - .map(TestStructMapper::columnTypeStructFrom) + .map(rowType -> TestStructMapper.columnTypeStructFrom(rowType, useJsonFields)) .collect(Collectors.toList())); final Value mods = Value.structArray( - MOD_TYPE, + modType, record.getMods().stream() - .map(TestStructMapper::modStructFrom) + .map(mod -> TestStructMapper.modStructFrom(mod, useJsonFields)) .collect(Collectors.toList())); return Struct.newBuilder() .set("commit_timestamp") @@ -197,12 +251,16 @@ private static Struct recordStructFrom(DataChangeRecord record) { .build(); } - private static Struct columnTypeStructFrom(ColumnType columnType) { + private static Struct columnTypeStructFrom(ColumnType columnType, boolean useJsonFields) { + final Value type = + useJsonFields + ? Value.json(columnType.getType().getCode()) + : Value.string(columnType.getType().getCode()); return Struct.newBuilder() .set("name") .to(columnType.getName()) .set("type") - .to(columnType.getType().getCode()) + .to(type) .set("is_primary_key") .to(columnType.isPrimaryKey()) .set("ordinal_position") @@ -210,14 +268,20 @@ private static Struct columnTypeStructFrom(ColumnType columnType) { .build(); } - private static Struct modStructFrom(Mod mod) { + private static Struct modStructFrom(Mod mod, boolean useJsonFields) { + final Value keys = + useJsonFields ? Value.json(mod.getKeysJson()) : Value.string(mod.getKeysJson()); + final Value newValues = + useJsonFields ? Value.json(mod.getNewValuesJson()) : Value.string(mod.getNewValuesJson()); + final Value oldValues = + useJsonFields ? Value.json(mod.getOldValuesJson()) : Value.string(mod.getOldValuesJson()); return Struct.newBuilder() .set("keys") - .to(mod.getKeysJson()) + .to(keys) .set("new_values") - .to(mod.getNewValuesJson()) + .to(newValues) .set("old_values") - .to(mod.getOldValuesJson()) + .to(oldValues) .build(); }