diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java index 140b026768a..378ea21cfb9 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java @@ -70,7 +70,7 @@ public byte[] serialize(SourceSplitBase split) throws IOException { boolean useCatalogBeforeSchema = SerializerUtils.shouldUseCatalogBeforeSchema(snapshotSplit.getTableId()); out.writeBoolean(useCatalogBeforeSchema); - out.writeUTF(snapshotSplit.getTableId().toString()); + out.writeUTF(snapshotSplit.getTableId().toDoubleQuotedString()); out.writeUTF(snapshotSplit.splitId()); out.writeUTF(snapshotSplit.getSplitKeyType().asSerializableString()); diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializerTest.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializerTest.java new file mode 100644 index 00000000000..978c13804e3 --- /dev/null +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializerTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.base.source.meta.split; + +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.RowType; + +import com.ververica.cdc.connectors.base.source.meta.offset.Offset; +import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory; +import io.debezium.relational.TableId; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** Tests for {@link SourceSplitSerializer}. */ +public class SourceSplitSerializerTest { + + @Test + public void testSnapshotTableIdSerializeAndDeserialize() throws IOException { + SnapshotSplit snapshotSplitBefore = + new SnapshotSplit( + new TableId("cata`log\"", "s\"che`ma", "ta\"ble.1`"), + "test", + new RowType( + Collections.singletonList( + new RowType.RowField("id", new BigIntType()))), + null, + null, + null, + new HashMap<>()); + + SourceSplitSerializer sourceSplitSerializer = + new SourceSplitSerializer() { + @Override + public OffsetFactory getOffsetFactory() { + return new OffsetFactory() { + @Override + public Offset newOffset(Map offset) { + return null; + } + + @Override + public Offset newOffset(String filename, Long position) { + return null; + } + + @Override + public Offset newOffset(Long position) { + return null; + } + + @Override + public Offset createTimestampOffset(long timestampMillis) { + return null; + } + + @Override + public Offset createInitialOffset() { + return null; + } + + @Override + public Offset createNoStoppingOffset() { + return null; + } + }; + } + }; + + SnapshotSplit snapshotSplitAfter = + (SnapshotSplit) + sourceSplitSerializer.deserialize( + sourceSplitSerializer.getVersion(), + sourceSplitSerializer.serialize(snapshotSplitBefore)); + + assertEquals(snapshotSplitBefore.getTableId(), snapshotSplitAfter.getTableId()); + } +}