Skip to content

Commit

Permalink
Merge pull request #16726 from [BEAM-12164]: Parses change streams fi…
Browse files Browse the repository at this point in the history
…elds as json / strings

The backend is migrating from returning strings for certain fields to
json. We need to change the parsing logic to accommodate for both until
the migration is completed.
  • Loading branch information
thiagotnunes authored Feb 7, 2022
1 parent 30a667d commit 7bafabc
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Value;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -277,22 +279,25 @@ private ChildPartitionsRecord toChildPartitionsRecord(
}

private ColumnType columnTypeFrom(Struct struct) {
// TODO: Move to type struct.getJson when backend is fully migrated
final String type = getJsonString(struct.getValue(TYPE_COLUMN));
return new ColumnType(
struct.getString(NAME_COLUMN),
new TypeCode(struct.getString(TYPE_COLUMN)),
new TypeCode(type),
struct.getBoolean(IS_PRIMARY_KEY_COLUMN),
struct.getLong(ORDINAL_POSITION_COLUMN));
}

private Mod modFrom(Struct struct) {
final String keysJson = struct.getString(KEYS_COLUMN);
final String oldValuesJson =
struct.isNull(OLD_VALUES_COLUMN) ? null : struct.getString(OLD_VALUES_COLUMN);
final String newValuesJson =
struct.isNull(NEW_VALUES_COLUMN)
? null
: struct.getString(ChangeStreamRecordMapper.NEW_VALUES_COLUMN);
return new Mod(keysJson, oldValuesJson, newValuesJson);
// TODO: Move to keys struct.getJson when backend is fully migrated
final String keys = getJsonString(struct.getValue(KEYS_COLUMN));
// TODO: Move to oldValues struct.getJson when backend is fully migrated
final String oldValues =
struct.isNull(OLD_VALUES_COLUMN) ? null : getJsonString(struct.getValue(OLD_VALUES_COLUMN));
// TODO: Move to newValues struct.getJson when backend is fully migrated
final String newValues =
struct.isNull(NEW_VALUES_COLUMN) ? null : getJsonString(struct.getValue(NEW_VALUES_COLUMN));
return new Mod(keys, oldValues, newValues);
}

private ChildPartition childPartitionFrom(String partitionToken, Struct struct) {
Expand Down Expand Up @@ -324,4 +329,15 @@ private ChangeStreamRecordMetadata changeStreamRecordMetadataFrom(
.withNumberOfRecordsRead(resultSetMetadata.getNumberOfRecordsRead())
.build();
}

// TODO: Remove when backend is fully migrated to JSON
private String getJsonString(Value value) {
if (value.getType().equals(Type.json())) {
return value.getJson();
} else if (value.getType().equals(Type.string())) {
return value.getString();
} else {
throw new IllegalArgumentException("Can not extract string from value " + value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;

import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStruct;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithJson;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithStrings;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -98,11 +99,15 @@ public void testMappingUpdateStructRowToDataChangeRecord() {
10L,
2L,
null);
final Struct struct = recordsToStruct(dataChangeRecord);
final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);

assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
}

@Test
Expand All @@ -125,11 +130,15 @@ public void testMappingInsertStructRowToDataChangeRecord() {
10L,
2L,
null);
final Struct struct = recordsToStruct(dataChangeRecord);
final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);

assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
}

@Test
Expand All @@ -152,18 +161,22 @@ public void testMappingDeleteStructRowToDataChangeRecord() {
10L,
2L,
null);
final Struct struct = recordsToStruct(dataChangeRecord);
final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);

assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
}

@Test
public void testMappingStructRowToHeartbeatRecord() {
final HeartbeatRecord heartbeatRecord =
new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null);
final Struct struct = recordsToStruct(heartbeatRecord);
final Struct struct = recordsToStructWithStrings(heartbeatRecord);

assertEquals(
Collections.singletonList(heartbeatRecord),
Expand All @@ -180,7 +193,7 @@ public void testMappingStructRowToChildPartitionRecord() {
new ChildPartition("childToken1", Sets.newHashSet("parentToken1", "parentToken2")),
new ChildPartition("childToken2", Sets.newHashSet("parentToken1", "parentToken2"))),
null);
final Struct struct = recordsToStruct(childPartitionsRecord);
final Struct struct = recordsToStructWithStrings(childPartitionsRecord);

assertEquals(
Collections.singletonList(childPartitionsRecord),
Expand All @@ -191,7 +204,7 @@ public void testMappingStructRowToChildPartitionRecord() {
@Test
public void testMappingStructRowFromInitialPartitionToChildPartitionRecord() {
final Struct struct =
recordsToStruct(
recordsToStructWithStrings(
new ChildPartitionsRecord(
Timestamp.ofTimeSecondsAndNanos(10L, 20),
"1",
Expand All @@ -217,7 +230,4 @@ public void testMappingStructRowFromInitialPartitionToChildPartitionRecord() {
Collections.singletonList(expected),
mapper.toChangeStreamRecords(initialPartition, struct, resultSetMetadata));
}

// TODO: Add test case for unknown record type
// TODO: Add test case for malformed record
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,53 @@ public class TestStructMapper {
Type.struct(
StructField.of("token", Type.string()),
StructField.of("parent_partition_tokens", Type.array(Type.string())));
private static final Type COLUMN_TYPE_TYPE =
// TODO: Remove COLUMN_TYPE_STRING_TYPE when backend has fully migrated to use JSON
private static final Type COLUMN_TYPE_STRING_TYPE =
Type.struct(
StructField.of("name", Type.string()),
StructField.of("type", Type.string()),
StructField.of("is_primary_key", Type.bool()),
StructField.of("ordinal_position", Type.int64()));
private static final Type MOD_TYPE =
private static final Type COLUMN_TYPE_JSON_TYPE =
Type.struct(
StructField.of("name", Type.string()),
StructField.of("type", Type.json()),
StructField.of("is_primary_key", Type.bool()),
StructField.of("ordinal_position", Type.int64()));
// TODO: Remove MOD_STRING_TYPE when backend has fully migrated to use JSON
private static final Type MOD_STRING_TYPE =
Type.struct(
StructField.of("keys", Type.string()),
StructField.of("new_values", Type.string()),
StructField.of("old_values", Type.string()));
private static final Type DATA_CHANGE_RECORD_TYPE =
private static final Type MOD_JSON_TYPE =
Type.struct(
StructField.of("keys", Type.json()),
StructField.of("new_values", Type.json()),
StructField.of("old_values", Type.json()));
// TODO: Remove DATA_CHANGE_RECORD_STRING_TYPE when backend has fully migrated to use JSON
private static final Type DATA_CHANGE_RECORD_STRING_TYPE =
Type.struct(
StructField.of("commit_timestamp", Type.timestamp()),
StructField.of("record_sequence", Type.string()),
StructField.of("server_transaction_id", Type.string()),
StructField.of("is_last_record_in_transaction_in_partition", Type.bool()),
StructField.of("table_name", Type.string()),
StructField.of("column_types", Type.array(COLUMN_TYPE_STRING_TYPE)),
StructField.of("mods", Type.array(MOD_STRING_TYPE)),
StructField.of("mod_type", Type.string()),
StructField.of("value_capture_type", Type.string()),
StructField.of("number_of_records_in_transaction", Type.int64()),
StructField.of("number_of_partitions_in_transaction", Type.int64()));
private static final Type DATA_CHANGE_RECORD_JSON_TYPE =
Type.struct(
StructField.of("commit_timestamp", Type.timestamp()),
StructField.of("record_sequence", Type.string()),
StructField.of("server_transaction_id", Type.string()),
StructField.of("is_last_record_in_transaction_in_partition", Type.bool()),
StructField.of("table_name", Type.string()),
StructField.of("column_types", Type.array(COLUMN_TYPE_TYPE)),
StructField.of("mods", Type.array(MOD_TYPE)),
StructField.of("column_types", Type.array(COLUMN_TYPE_JSON_TYPE)),
StructField.of("mods", Type.array(MOD_JSON_TYPE)),
StructField.of("mod_type", Type.string()),
StructField.of("value_capture_type", Type.string()),
StructField.of("number_of_records_in_transaction", Type.int64()),
Expand All @@ -69,39 +96,59 @@ public class TestStructMapper {
StructField.of("start_timestamp", Type.timestamp()),
StructField.of("record_sequence", Type.string()),
StructField.of("child_partitions", Type.array(CHILD_PARTITION_TYPE)));
private static final Type STREAM_RECORD_TYPE =
// TODO: Remove STREAM_RECORD_STRING_TYPE when backend has fully migrated to use JSON
private static final Type STREAM_RECORD_STRING_TYPE =
Type.struct(
StructField.of("data_change_record", Type.array(DATA_CHANGE_RECORD_TYPE)),
StructField.of("data_change_record", Type.array(DATA_CHANGE_RECORD_STRING_TYPE)),
StructField.of("heartbeat_record", Type.array(HEARTBEAT_RECORD_TYPE)),
StructField.of("child_partitions_record", Type.array(CHILD_PARTITIONS_RECORD_TYPE)));
private static final Type STREAM_RECORD_JSON_TYPE =
Type.struct(
StructField.of("data_change_record", Type.array(DATA_CHANGE_RECORD_JSON_TYPE)),
StructField.of("heartbeat_record", Type.array(HEARTBEAT_RECORD_TYPE)),
StructField.of("child_partitions_record", Type.array(CHILD_PARTITIONS_RECORD_TYPE)));

public static Struct recordsToStructWithJson(ChangeStreamRecord... records) {
return recordsToStruct(true, records);
}

// TODO: Remove when backend is fully migrated to JSON
public static Struct recordsToStructWithStrings(ChangeStreamRecord... records) {
return recordsToStruct(false, records);
}

public static Struct recordsToStruct(ChangeStreamRecord... records) {
private static Struct recordsToStruct(boolean useJsonFields, ChangeStreamRecord... records) {
final Type streamRecordType =
useJsonFields ? STREAM_RECORD_JSON_TYPE : STREAM_RECORD_STRING_TYPE;
return Struct.newBuilder()
.add(
Value.structArray(
STREAM_RECORD_TYPE,
streamRecordType,
Arrays.stream(records)
.map(TestStructMapper::streamRecordStructFrom)
.map(record -> TestStructMapper.streamRecordStructFrom(record, useJsonFields))
.collect(Collectors.toList())))
.build();
}

private static Struct streamRecordStructFrom(ChangeStreamRecord record) {
private static Struct streamRecordStructFrom(ChangeStreamRecord record, boolean useJsonFields) {
if (record instanceof DataChangeRecord) {
return streamRecordStructFrom((DataChangeRecord) record);
return streamRecordStructFrom((DataChangeRecord) record, useJsonFields);
} else if (record instanceof HeartbeatRecord) {
return streamRecordStructFrom((HeartbeatRecord) record);
return streamRecordStructFrom((HeartbeatRecord) record, useJsonFields);
} else if (record instanceof ChildPartitionsRecord) {
return streamRecordStructFrom((ChildPartitionsRecord) record);
return streamRecordStructFrom((ChildPartitionsRecord) record, useJsonFields);
} else {
throw new UnsupportedOperationException("Unimplemented mapping for " + record.getClass());
}
}

private static Struct streamRecordStructFrom(ChildPartitionsRecord record) {
private static Struct streamRecordStructFrom(
ChildPartitionsRecord record, boolean useJsonFields) {
final Type dataChangeRecordType =
useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE : DATA_CHANGE_RECORD_STRING_TYPE;
return Struct.newBuilder()
.set("data_change_record")
.to(Value.structArray(DATA_CHANGE_RECORD_TYPE, Collections.emptyList()))
.to(Value.structArray(dataChangeRecordType, Collections.emptyList()))
.set("heartbeat_record")
.to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList()))
.set("child_partitions_record")
Expand All @@ -128,10 +175,12 @@ private static Struct recordStructFrom(ChildPartitionsRecord record) {
.build();
}

private static Struct streamRecordStructFrom(HeartbeatRecord record) {
private static Struct streamRecordStructFrom(HeartbeatRecord record, boolean useJsonFields) {
final Type dataChangeRecordType =
useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE : DATA_CHANGE_RECORD_STRING_TYPE;
return Struct.newBuilder()
.set("data_change_record")
.to(Value.structArray(DATA_CHANGE_RECORD_TYPE, Collections.emptyList()))
.to(Value.structArray(dataChangeRecordType, Collections.emptyList()))
.set("heartbeat_record")
.to(
Value.structArray(
Expand All @@ -145,31 +194,36 @@ private static Struct recordStructFrom(HeartbeatRecord record) {
return Struct.newBuilder().set("timestamp").to(record.getTimestamp()).build();
}

private static Struct streamRecordStructFrom(DataChangeRecord record) {
private static Struct streamRecordStructFrom(DataChangeRecord record, boolean useJsonFields) {
final Type dataChangeRecordType =
useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE : DATA_CHANGE_RECORD_STRING_TYPE;
return Struct.newBuilder()
.set("data_change_record")
.to(
Value.structArray(
DATA_CHANGE_RECORD_TYPE, Collections.singletonList(recordStructFrom(record))))
dataChangeRecordType,
Collections.singletonList(recordStructFrom(record, useJsonFields))))
.set("heartbeat_record")
.to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList()))
.set("child_partitions_record")
.to(Value.structArray(CHILD_PARTITIONS_RECORD_TYPE, Collections.emptyList()))
.build();
}

private static Struct recordStructFrom(DataChangeRecord record) {
private static Struct recordStructFrom(DataChangeRecord record, boolean useJsonFields) {
final Type columnTypeType = useJsonFields ? COLUMN_TYPE_JSON_TYPE : COLUMN_TYPE_STRING_TYPE;
final Type modType = useJsonFields ? MOD_JSON_TYPE : MOD_STRING_TYPE;
final Value columnTypes =
Value.structArray(
COLUMN_TYPE_TYPE,
columnTypeType,
record.getRowType().stream()
.map(TestStructMapper::columnTypeStructFrom)
.map(rowType -> TestStructMapper.columnTypeStructFrom(rowType, useJsonFields))
.collect(Collectors.toList()));
final Value mods =
Value.structArray(
MOD_TYPE,
modType,
record.getMods().stream()
.map(TestStructMapper::modStructFrom)
.map(mod -> TestStructMapper.modStructFrom(mod, useJsonFields))
.collect(Collectors.toList()));
return Struct.newBuilder()
.set("commit_timestamp")
Expand Down Expand Up @@ -197,27 +251,37 @@ private static Struct recordStructFrom(DataChangeRecord record) {
.build();
}

private static Struct columnTypeStructFrom(ColumnType columnType) {
private static Struct columnTypeStructFrom(ColumnType columnType, boolean useJsonFields) {
final Value type =
useJsonFields
? Value.json(columnType.getType().getCode())
: Value.string(columnType.getType().getCode());
return Struct.newBuilder()
.set("name")
.to(columnType.getName())
.set("type")
.to(columnType.getType().getCode())
.to(type)
.set("is_primary_key")
.to(columnType.isPrimaryKey())
.set("ordinal_position")
.to(columnType.getOrdinalPosition())
.build();
}

private static Struct modStructFrom(Mod mod) {
private static Struct modStructFrom(Mod mod, boolean useJsonFields) {
final Value keys =
useJsonFields ? Value.json(mod.getKeysJson()) : Value.string(mod.getKeysJson());
final Value newValues =
useJsonFields ? Value.json(mod.getNewValuesJson()) : Value.string(mod.getNewValuesJson());
final Value oldValues =
useJsonFields ? Value.json(mod.getOldValuesJson()) : Value.string(mod.getOldValuesJson());
return Struct.newBuilder()
.set("keys")
.to(mod.getKeysJson())
.to(keys)
.set("new_values")
.to(mod.getNewValuesJson())
.to(newValues)
.set("old_values")
.to(mod.getOldValuesJson())
.to(oldValues)
.build();
}

Expand Down

0 comments on commit 7bafabc

Please sign in to comment.