Skip to content

Commit

Permalink
[pipeline-connector][starrocks] Fix char/varchar length inconsistency…
Browse files Browse the repository at this point in the history
… between cdc and starrocks (#2830)

Co-authored-by: PengFei Li <[email protected]>
  • Loading branch information
lvyanquan and banmoy authored Dec 6, 2023
1 parent f7dee46 commit b18d29a
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,14 @@ fieldPos, getPrecision(fieldType))
public static final String DATETIME = "DATETIME";
public static final String JSON = "JSON";

/** Max size of char type of StarRocks. */
public static final int MAX_CHAR_SIZE = 255;

/** Max size of varchar type of StarRocks. */
public static final int MAX_VARCHAR_SIZE = 1048576;

/** Transforms CDC {@link DataType} to StarRocks data type. */
private static class CdcDataTypeTransformer
public static class CdcDataTypeTransformer
extends DataTypeDefaultVisitor<StarRocksColumn.Builder> {

private final StarRocksColumn.Builder builder;
Expand Down Expand Up @@ -298,17 +301,37 @@ public StarRocksColumn.Builder visit(DecimalType decimalType) {

@Override
public StarRocksColumn.Builder visit(CharType charType) {
builder.setDataType(CHAR);
builder.setNullable(charType.isNullable());
builder.setColumnSize(charType.getLength());
// CDC and StarRocks use different units for the length. It's the number
// of characters in CDC, and the number of bytes in StarRocks. One chinese
// character will use 3 bytes because it uses UTF-8, so the length of StarRocks
// char type should be three times as that of CDC char type. Specifically, if
// the length of StarRocks exceeds the MAX_CHAR_SIZE, map CDC char type to StarRocks
// varchar type
int length = charType.getLength();
long starRocksLength = length * 3L;
if (starRocksLength <= MAX_CHAR_SIZE) {
builder.setDataType(CHAR);
builder.setNullable(charType.isNullable());
builder.setColumnSize((int) starRocksLength);
} else {
builder.setDataType(VARCHAR);
builder.setNullable(charType.isNullable());
builder.setColumnSize((int) Math.min(starRocksLength, MAX_VARCHAR_SIZE));
}
return builder;
}

@Override
public StarRocksColumn.Builder visit(VarCharType varCharType) {
// CDC and StarRocks use different units for the length. It's the number
// of characters in CDC, and the number of bytes in StarRocks. One chinese
// character will use 3 bytes because it uses UTF-8, so the length of StarRocks
// varchar type should be three times as that of CDC varchar type.
int length = varCharType.getLength();
long starRocksLength = length * 3L;
builder.setDataType(VARCHAR);
builder.setNullable(varCharType.isNullable());
builder.setColumnSize(Math.min(varCharType.getLength(), MAX_VARCHAR_SIZE));
builder.setColumnSize((int) Math.min(starRocksLength, MAX_VARCHAR_SIZE));
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververical.cdc.connectors.starrocks.sink;

import com.starrocks.connector.flink.catalog.StarRocksColumn;
import com.ververica.cdc.common.types.CharType;
import com.ververica.cdc.common.types.VarCharType;
import com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/** Tests for {@link StarRocksUtils.CdcDataTypeTransformer}. */
public class CdcDataTypeTransformerTest {

@Test
public void testCharType() {
// map to char of StarRocks if CDC length <= StarRocksUtils.MAX_CHAR_SIZE
StarRocksColumn.Builder smallLengthBuilder =
new StarRocksColumn.Builder().setColumnName("small_char").setOrdinalPosition(0);
new CharType(1).accept(new StarRocksUtils.CdcDataTypeTransformer(smallLengthBuilder));
StarRocksColumn smallLengthColumn = smallLengthBuilder.build();
assertEquals("small_char", smallLengthColumn.getColumnName());
assertEquals(0, smallLengthColumn.getOrdinalPosition());
assertEquals(StarRocksUtils.CHAR, smallLengthColumn.getDataType());
assertEquals(Integer.valueOf(3), smallLengthColumn.getColumnSize().orElse(null));
assertTrue(smallLengthColumn.isNullable());

// map to varchar of StarRocks if CDC length > StarRocksUtils.MAX_CHAR_SIZE
StarRocksColumn.Builder largeLengthBuilder =
new StarRocksColumn.Builder().setColumnName("large_char").setOrdinalPosition(1);
new CharType(StarRocksUtils.MAX_CHAR_SIZE)
.accept(new StarRocksUtils.CdcDataTypeTransformer(largeLengthBuilder));
StarRocksColumn largeLengthColumn = largeLengthBuilder.build();
assertEquals("large_char", largeLengthColumn.getColumnName());
assertEquals(1, largeLengthColumn.getOrdinalPosition());
assertEquals(StarRocksUtils.VARCHAR, largeLengthColumn.getDataType());
assertEquals(
Integer.valueOf(StarRocksUtils.MAX_CHAR_SIZE * 3),
largeLengthColumn.getColumnSize().orElse(null));
assertTrue(largeLengthColumn.isNullable());
}

@Test
public void testVarCharType() {
// the length fo StarRocks should be 3 times as that of CDC if CDC length * 3 <=
// StarRocksUtils.MAX_VARCHAR_SIZE
StarRocksColumn.Builder smallLengthBuilder =
new StarRocksColumn.Builder().setColumnName("small_varchar").setOrdinalPosition(0);
new VarCharType(3).accept(new StarRocksUtils.CdcDataTypeTransformer(smallLengthBuilder));
StarRocksColumn smallLengthColumn = smallLengthBuilder.build();
assertEquals("small_varchar", smallLengthColumn.getColumnName());
assertEquals(0, smallLengthColumn.getOrdinalPosition());
assertEquals(StarRocksUtils.VARCHAR, smallLengthColumn.getDataType());
assertEquals(Integer.valueOf(9), smallLengthColumn.getColumnSize().orElse(null));
assertTrue(smallLengthColumn.isNullable());

// the length fo StarRocks should be StarRocksUtils.MAX_VARCHAR_SIZE if CDC length * 3 >
// StarRocksUtils.MAX_VARCHAR_SIZE
StarRocksColumn.Builder largeLengthBuilder =
new StarRocksColumn.Builder().setColumnName("large_varchar").setOrdinalPosition(1);
new CharType(StarRocksUtils.MAX_VARCHAR_SIZE + 1)
.accept(new StarRocksUtils.CdcDataTypeTransformer(largeLengthBuilder));
StarRocksColumn largeLengthColumn = largeLengthBuilder.build();
assertEquals("large_varchar", largeLengthColumn.getColumnName());
assertEquals(1, largeLengthColumn.getOrdinalPosition());
assertEquals(StarRocksUtils.VARCHAR, largeLengthColumn.getDataType());
assertEquals(
Integer.valueOf(StarRocksUtils.MAX_VARCHAR_SIZE),
largeLengthColumn.getColumnSize().orElse(null));
assertTrue(largeLengthColumn.isNullable());
}
}

0 comments on commit b18d29a

Please sign in to comment.