Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-12164]: Parses change streams fields as json / strings #16726

Merged
merged 2 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Value;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -277,22 +279,25 @@ private ChildPartitionsRecord toChildPartitionsRecord(
}

private ColumnType columnTypeFrom(Struct struct) {
// TODO: Move to type struct.getJson when backend is fully migrated
final String type = getJsonString(struct.getValue(TYPE_COLUMN));
return new ColumnType(
struct.getString(NAME_COLUMN),
new TypeCode(struct.getString(TYPE_COLUMN)),
new TypeCode(type),
struct.getBoolean(IS_PRIMARY_KEY_COLUMN),
struct.getLong(ORDINAL_POSITION_COLUMN));
}

private Mod modFrom(Struct struct) {
final String keysJson = struct.getString(KEYS_COLUMN);
final String oldValuesJson =
struct.isNull(OLD_VALUES_COLUMN) ? null : struct.getString(OLD_VALUES_COLUMN);
final String newValuesJson =
struct.isNull(NEW_VALUES_COLUMN)
? null
: struct.getString(ChangeStreamRecordMapper.NEW_VALUES_COLUMN);
return new Mod(keysJson, oldValuesJson, newValuesJson);
// TODO: Move to keys struct.getJson when backend is fully migrated
final String keys = getJsonString(struct.getValue(KEYS_COLUMN));
// TODO: Move to oldValues struct.getJson when backend is fully migrated
final String oldValues =
struct.isNull(OLD_VALUES_COLUMN) ? null : getJsonString(struct.getValue(OLD_VALUES_COLUMN));
// TODO: Move to newValues struct.getJson when backend is fully migrated
final String newValues =
struct.isNull(NEW_VALUES_COLUMN) ? null : getJsonString(struct.getValue(NEW_VALUES_COLUMN));
return new Mod(keys, oldValues, newValues);
}

private ChildPartition childPartitionFrom(String partitionToken, Struct struct) {
Expand Down Expand Up @@ -324,4 +329,15 @@ private ChangeStreamRecordMetadata changeStreamRecordMetadataFrom(
.withNumberOfRecordsRead(resultSetMetadata.getNumberOfRecordsRead())
.build();
}

// TODO: Remove when backend is fully migrated to JSON
private String getJsonString(Value value) {
if (value.getType().equals(Type.json())) {
return value.getJson();
} else if (value.getType().equals(Type.string())) {
return value.getString();
} else {
throw new IllegalArgumentException("Can not extract string from value " + value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;

import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStruct;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithJson;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithStrings;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -98,11 +99,15 @@ public void testMappingUpdateStructRowToDataChangeRecord() {
10L,
2L,
null);
final Struct struct = recordsToStruct(dataChangeRecord);
final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);

assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
}

@Test
Expand All @@ -125,11 +130,15 @@ public void testMappingInsertStructRowToDataChangeRecord() {
10L,
2L,
null);
final Struct struct = recordsToStruct(dataChangeRecord);
final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);

assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
}

@Test
Expand All @@ -152,18 +161,22 @@ public void testMappingDeleteStructRowToDataChangeRecord() {
10L,
2L,
null);
final Struct struct = recordsToStruct(dataChangeRecord);
final Struct stringFieldsStruct = recordsToStructWithStrings(dataChangeRecord);
final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);

assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
mapper.toChangeStreamRecords(partition, stringFieldsStruct, resultSetMetadata));
assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
}

@Test
public void testMappingStructRowToHeartbeatRecord() {
final HeartbeatRecord heartbeatRecord =
new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null);
final Struct struct = recordsToStruct(heartbeatRecord);
final Struct struct = recordsToStructWithStrings(heartbeatRecord);

assertEquals(
Collections.singletonList(heartbeatRecord),
Expand All @@ -180,7 +193,7 @@ public void testMappingStructRowToChildPartitionRecord() {
new ChildPartition("childToken1", Sets.newHashSet("parentToken1", "parentToken2")),
new ChildPartition("childToken2", Sets.newHashSet("parentToken1", "parentToken2"))),
null);
final Struct struct = recordsToStruct(childPartitionsRecord);
final Struct struct = recordsToStructWithStrings(childPartitionsRecord);

assertEquals(
Collections.singletonList(childPartitionsRecord),
Expand All @@ -191,7 +204,7 @@ public void testMappingStructRowToChildPartitionRecord() {
@Test
public void testMappingStructRowFromInitialPartitionToChildPartitionRecord() {
final Struct struct =
recordsToStruct(
recordsToStructWithStrings(
new ChildPartitionsRecord(
Timestamp.ofTimeSecondsAndNanos(10L, 20),
"1",
Expand All @@ -217,7 +230,4 @@ public void testMappingStructRowFromInitialPartitionToChildPartitionRecord() {
Collections.singletonList(expected),
mapper.toChangeStreamRecords(initialPartition, struct, resultSetMetadata));
}

// TODO: Add test case for unknown record type
// TODO: Add test case for malformed record
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,53 @@ public class TestStructMapper {
Type.struct(
StructField.of("token", Type.string()),
StructField.of("parent_partition_tokens", Type.array(Type.string())));
private static final Type COLUMN_TYPE_TYPE =
// TODO: Remove COLUMN_TYPE_STRING_TYPE when backend has fully migrated to use JSON
private static final Type COLUMN_TYPE_STRING_TYPE =
Type.struct(
StructField.of("name", Type.string()),
StructField.of("type", Type.string()),
StructField.of("is_primary_key", Type.bool()),
StructField.of("ordinal_position", Type.int64()));
private static final Type MOD_TYPE =
private static final Type COLUMN_TYPE_JSON_TYPE =
Type.struct(
StructField.of("name", Type.string()),
StructField.of("type", Type.json()),
StructField.of("is_primary_key", Type.bool()),
StructField.of("ordinal_position", Type.int64()));
// TODO: Remove MOD_STRING_TYPE when backend has fully migrated to use JSON
private static final Type MOD_STRING_TYPE =
Type.struct(
StructField.of("keys", Type.string()),
StructField.of("new_values", Type.string()),
StructField.of("old_values", Type.string()));
private static final Type DATA_CHANGE_RECORD_TYPE =
private static final Type MOD_JSON_TYPE =
Type.struct(
StructField.of("keys", Type.json()),
StructField.of("new_values", Type.json()),
StructField.of("old_values", Type.json()));
// TODO: Remove DATA_CHANGE_RECORD_STRING_TYPE when backend has fully migrated to use JSON
private static final Type DATA_CHANGE_RECORD_STRING_TYPE =
Type.struct(
StructField.of("commit_timestamp", Type.timestamp()),
StructField.of("record_sequence", Type.string()),
StructField.of("server_transaction_id", Type.string()),
StructField.of("is_last_record_in_transaction_in_partition", Type.bool()),
StructField.of("table_name", Type.string()),
StructField.of("column_types", Type.array(COLUMN_TYPE_STRING_TYPE)),
StructField.of("mods", Type.array(MOD_STRING_TYPE)),
StructField.of("mod_type", Type.string()),
StructField.of("value_capture_type", Type.string()),
StructField.of("number_of_records_in_transaction", Type.int64()),
StructField.of("number_of_partitions_in_transaction", Type.int64()));
private static final Type DATA_CHANGE_RECORD_JSON_TYPE =
Type.struct(
StructField.of("commit_timestamp", Type.timestamp()),
StructField.of("record_sequence", Type.string()),
StructField.of("server_transaction_id", Type.string()),
StructField.of("is_last_record_in_transaction_in_partition", Type.bool()),
StructField.of("table_name", Type.string()),
StructField.of("column_types", Type.array(COLUMN_TYPE_TYPE)),
StructField.of("mods", Type.array(MOD_TYPE)),
StructField.of("column_types", Type.array(COLUMN_TYPE_JSON_TYPE)),
StructField.of("mods", Type.array(MOD_JSON_TYPE)),
StructField.of("mod_type", Type.string()),
StructField.of("value_capture_type", Type.string()),
StructField.of("number_of_records_in_transaction", Type.int64()),
Expand All @@ -69,39 +96,59 @@ public class TestStructMapper {
StructField.of("start_timestamp", Type.timestamp()),
StructField.of("record_sequence", Type.string()),
StructField.of("child_partitions", Type.array(CHILD_PARTITION_TYPE)));
private static final Type STREAM_RECORD_TYPE =
// TODO: Remove STREAM_RECORD_STRING_TYPE when backend has fully migrated to use JSON
private static final Type STREAM_RECORD_STRING_TYPE =
Type.struct(
StructField.of("data_change_record", Type.array(DATA_CHANGE_RECORD_TYPE)),
StructField.of("data_change_record", Type.array(DATA_CHANGE_RECORD_STRING_TYPE)),
StructField.of("heartbeat_record", Type.array(HEARTBEAT_RECORD_TYPE)),
StructField.of("child_partitions_record", Type.array(CHILD_PARTITIONS_RECORD_TYPE)));
private static final Type STREAM_RECORD_JSON_TYPE =
Type.struct(
StructField.of("data_change_record", Type.array(DATA_CHANGE_RECORD_JSON_TYPE)),
StructField.of("heartbeat_record", Type.array(HEARTBEAT_RECORD_TYPE)),
StructField.of("child_partitions_record", Type.array(CHILD_PARTITIONS_RECORD_TYPE)));

public static Struct recordsToStructWithJson(ChangeStreamRecord... records) {
return recordsToStruct(true, records);
}

// TODO: Remove when backend is fully migrated to JSON
public static Struct recordsToStructWithStrings(ChangeStreamRecord... records) {
return recordsToStruct(false, records);
}

public static Struct recordsToStruct(ChangeStreamRecord... records) {
private static Struct recordsToStruct(boolean useJsonFields, ChangeStreamRecord... records) {
final Type streamRecordType =
useJsonFields ? STREAM_RECORD_JSON_TYPE : STREAM_RECORD_STRING_TYPE;
return Struct.newBuilder()
.add(
Value.structArray(
STREAM_RECORD_TYPE,
streamRecordType,
Arrays.stream(records)
.map(TestStructMapper::streamRecordStructFrom)
.map(record -> TestStructMapper.streamRecordStructFrom(record, useJsonFields))
.collect(Collectors.toList())))
.build();
}

private static Struct streamRecordStructFrom(ChangeStreamRecord record) {
private static Struct streamRecordStructFrom(ChangeStreamRecord record, boolean useJsonFields) {
if (record instanceof DataChangeRecord) {
return streamRecordStructFrom((DataChangeRecord) record);
return streamRecordStructFrom((DataChangeRecord) record, useJsonFields);
} else if (record instanceof HeartbeatRecord) {
return streamRecordStructFrom((HeartbeatRecord) record);
return streamRecordStructFrom((HeartbeatRecord) record, useJsonFields);
} else if (record instanceof ChildPartitionsRecord) {
return streamRecordStructFrom((ChildPartitionsRecord) record);
return streamRecordStructFrom((ChildPartitionsRecord) record, useJsonFields);
} else {
throw new UnsupportedOperationException("Unimplemented mapping for " + record.getClass());
}
}

private static Struct streamRecordStructFrom(ChildPartitionsRecord record) {
private static Struct streamRecordStructFrom(
ChildPartitionsRecord record, boolean useJsonFields) {
final Type dataChangeRecordType =
useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE : DATA_CHANGE_RECORD_STRING_TYPE;
return Struct.newBuilder()
.set("data_change_record")
.to(Value.structArray(DATA_CHANGE_RECORD_TYPE, Collections.emptyList()))
.to(Value.structArray(dataChangeRecordType, Collections.emptyList()))
.set("heartbeat_record")
.to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList()))
.set("child_partitions_record")
Expand All @@ -128,10 +175,12 @@ private static Struct recordStructFrom(ChildPartitionsRecord record) {
.build();
}

private static Struct streamRecordStructFrom(HeartbeatRecord record) {
private static Struct streamRecordStructFrom(HeartbeatRecord record, boolean useJsonFields) {
final Type dataChangeRecordType =
useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE : DATA_CHANGE_RECORD_STRING_TYPE;
return Struct.newBuilder()
.set("data_change_record")
.to(Value.structArray(DATA_CHANGE_RECORD_TYPE, Collections.emptyList()))
.to(Value.structArray(dataChangeRecordType, Collections.emptyList()))
.set("heartbeat_record")
.to(
Value.structArray(
Expand All @@ -145,31 +194,36 @@ private static Struct recordStructFrom(HeartbeatRecord record) {
return Struct.newBuilder().set("timestamp").to(record.getTimestamp()).build();
}

private static Struct streamRecordStructFrom(DataChangeRecord record) {
private static Struct streamRecordStructFrom(DataChangeRecord record, boolean useJsonFields) {
final Type dataChangeRecordType =
useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE : DATA_CHANGE_RECORD_STRING_TYPE;
return Struct.newBuilder()
.set("data_change_record")
.to(
Value.structArray(
DATA_CHANGE_RECORD_TYPE, Collections.singletonList(recordStructFrom(record))))
dataChangeRecordType,
Collections.singletonList(recordStructFrom(record, useJsonFields))))
.set("heartbeat_record")
.to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList()))
.set("child_partitions_record")
.to(Value.structArray(CHILD_PARTITIONS_RECORD_TYPE, Collections.emptyList()))
.build();
}

private static Struct recordStructFrom(DataChangeRecord record) {
private static Struct recordStructFrom(DataChangeRecord record, boolean useJsonFields) {
final Type columnTypeType = useJsonFields ? COLUMN_TYPE_JSON_TYPE : COLUMN_TYPE_STRING_TYPE;
final Type modType = useJsonFields ? MOD_JSON_TYPE : MOD_STRING_TYPE;
final Value columnTypes =
Value.structArray(
COLUMN_TYPE_TYPE,
columnTypeType,
record.getRowType().stream()
.map(TestStructMapper::columnTypeStructFrom)
.map(rowType -> TestStructMapper.columnTypeStructFrom(rowType, useJsonFields))
.collect(Collectors.toList()));
final Value mods =
Value.structArray(
MOD_TYPE,
modType,
record.getMods().stream()
.map(TestStructMapper::modStructFrom)
.map(mod -> TestStructMapper.modStructFrom(mod, useJsonFields))
.collect(Collectors.toList()));
return Struct.newBuilder()
.set("commit_timestamp")
Expand Down Expand Up @@ -197,27 +251,37 @@ private static Struct recordStructFrom(DataChangeRecord record) {
.build();
}

private static Struct columnTypeStructFrom(ColumnType columnType) {
private static Struct columnTypeStructFrom(ColumnType columnType, boolean useJsonFields) {
final Value type =
useJsonFields
? Value.json(columnType.getType().getCode())
: Value.string(columnType.getType().getCode());
return Struct.newBuilder()
.set("name")
.to(columnType.getName())
.set("type")
.to(columnType.getType().getCode())
.to(type)
.set("is_primary_key")
.to(columnType.isPrimaryKey())
.set("ordinal_position")
.to(columnType.getOrdinalPosition())
.build();
}

private static Struct modStructFrom(Mod mod) {
private static Struct modStructFrom(Mod mod, boolean useJsonFields) {
final Value keys =
useJsonFields ? Value.json(mod.getKeysJson()) : Value.string(mod.getKeysJson());
final Value newValues =
useJsonFields ? Value.json(mod.getNewValuesJson()) : Value.string(mod.getNewValuesJson());
final Value oldValues =
useJsonFields ? Value.json(mod.getOldValuesJson()) : Value.string(mod.getOldValuesJson());
return Struct.newBuilder()
.set("keys")
.to(mod.getKeysJson())
.to(keys)
.set("new_values")
.to(mod.getNewValuesJson())
.to(newValues)
.set("old_values")
.to(mod.getOldValuesJson())
.to(oldValues)
.build();
}

Expand Down