Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37300][common] Add RawDataType to represent the type in data sources #3923

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public DataType getElementType() {
}

@Override
public DataType copy(boolean isNullable) {
protected DataType copy(boolean isNullable) {
return new ArrayType(isNullable, elementType.copy());
}

Expand All @@ -63,7 +63,7 @@ public String asSummaryString() {
}

@Override
public String asSerializableString() {
protected String asSerializableString() {
return withNullability(FORMAT, elementType.asSerializableString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ public BigIntType() {
}

@Override
public DataType copy(boolean isNullable) {
protected DataType copy(boolean isNullable) {
return new BigIntType(isNullable);
}

@Override
public String asSerializableString() {
protected String asSerializableString() {
return withNullability(FORMAT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public BinaryType() {
this(DEFAULT_LENGTH);
}

/** Helper constructor for {@link #ofEmptyLiteral()} and {@link #copy(boolean)}. */
/** Helper constructor for {@link #ofEmptyLiteral()} and {@link DataType#copy(boolean)}. */
private BinaryType(int length, boolean isNullable) {
super(isNullable, DataTypeRoot.BINARY);
this.length = length;
Expand All @@ -89,12 +89,12 @@ public int getLength() {
}

@Override
public DataType copy(boolean isNullable) {
protected DataType copy(boolean isNullable) {
return new BinaryType(length, isNullable);
}

@Override
public String asSerializableString() {
protected String asSerializableString() {
if (length == EMPTY_LITERAL_LENGTH) {
throw new IllegalArgumentException(
"Zero-length binary strings have no serializable string representation.");
Expand All @@ -104,7 +104,8 @@ public String asSerializableString() {

@Override
public String asSummaryString() {
return withNullability(FORMAT, length);
return withNullability(FORMAT, length)
+ (getRawDataType() == null ? "" : " " + getRawDataType());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ public BooleanType() {
}

@Override
public DataType copy(boolean isNullable) {
protected DataType copy(boolean isNullable) {
return new BooleanType(isNullable);
}

@Override
public String asSerializableString() {
protected String asSerializableString() {
return withNullability(FORMAT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public CharType() {
this(DEFAULT_LENGTH);
}

/** Helper constructor for {@link #ofEmptyLiteral()} and {@link #copy(boolean)}. */
/** Helper constructor for {@link #ofEmptyLiteral()} and {@link DataType#copy(boolean)}. */
private CharType(int length, boolean isNullable) {
super(isNullable, DataTypeRoot.CHAR);
this.length = length;
Expand All @@ -88,12 +88,12 @@ public int getLength() {
}

@Override
public DataType copy(boolean isNullable) {
protected DataType copy(boolean isNullable) {
return new CharType(length, isNullable);
}

@Override
public String asSerializableString() {
protected String asSerializableString() {
if (length == EMPTY_LITERAL_LENGTH) {
throw new IllegalArgumentException(
"Zero-length character strings have no serializable string representation.");
Expand All @@ -103,7 +103,8 @@ public String asSerializableString() {

@Override
public String asSummaryString() {
return withNullability(FORMAT, length);
return withNullability(FORMAT, length)
+ (getRawDataType() == null ? "" : " " + getRawDataType());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.flink.cdc.common.utils.Preconditions;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
Expand All @@ -33,6 +35,8 @@ public abstract class DataType implements Serializable {

private final DataTypeRoot typeRoot;

@Nullable private RawDataType rawDataType;

public DataType(boolean isNullable, DataTypeRoot typeRoot) {
this.isNullable = isNullable;
this.typeRoot = Preconditions.checkNotNull(typeRoot);
Expand All @@ -50,6 +54,15 @@ public DataTypeRoot getTypeRoot() {
return typeRoot;
}

public void setRawDataType(@Nullable RawDataType rawDataType) {
this.rawDataType = rawDataType;
}

@Nullable
public RawDataType getRawDataType() {
return rawDataType;
}

/**
* Returns whether the root of the type equals to the {@code typeRoot} or not.
*
Expand Down Expand Up @@ -93,24 +106,47 @@ public boolean is(DataTypeFamily family) {
* @param isNullable the intended nullability of the copied type
* @return a deep copy
*/
public abstract DataType copy(boolean isNullable);
protected abstract DataType copy(boolean isNullable);

/**
* Returns a deep copy of this type. It requires an implementation of {@link #copy(boolean)}.
*
* @return a deep copy
*/
public final DataType copy() {
return copy(isNullable);
return copy(isNullable, rawDataType);
}

/**
* Returns a string that fully serializes this instance. The serialized string can be used for
* transmitting or persisting a type.
* Returns a deep copy of this type with possibly different nullability and raw data type. It
* requires an implementation of {@link #copy(boolean)}.
*
* @param isNullable the intended nullability of the copied type
* @param rawDataType the intended raw data type of the copied type
* @return a deep copy
*/
public final DataType copy(boolean isNullable, RawDataType rawDataType) {
DataType dataType = copy(isNullable);
dataType.setRawDataType(rawDataType);
return dataType;
}

/**
* Returns a string that fully serializes this instance except for the raw type.
*
* @return detailed string for transmission or persistence
*/
public abstract String asSerializableString();
protected abstract String asSerializableString();

/**
* Returns a string that fully serializes this instance, including the raw type. The serialized
* string can be used for transmitting or persisting a type.
*
* @return detailed string for transmission or persistence
*/
public String asSerializableStringWithRawDataType() {
return asSerializableString() + (rawDataType == null ? "" : " " + rawDataType);
}

/**
* Returns a string that summarizes this type for printing to a console. An implementation might
Expand All @@ -121,7 +157,7 @@ public final DataType copy() {
* @return summary string of this type for debugging purposes
*/
public String asSummaryString() {
return asSerializableString();
return asSerializableStringWithRawDataType();
}

public abstract List<DataType> getChildren();
Expand All @@ -142,20 +178,22 @@ public boolean equals(Object o) {
return false;
}
DataType that = (DataType) o;
return isNullable == that.isNullable && typeRoot == that.typeRoot;
return isNullable == that.isNullable
&& typeRoot == that.typeRoot
&& Objects.equals(rawDataType, that.rawDataType);
}

@Override
public int hashCode() {
return Objects.hash(isNullable, typeRoot);
return Objects.hash(isNullable, typeRoot, rawDataType);
}

public DataType notNull() {
return copy(false);
return copy(false, rawDataType);
}

public DataType nullable() {
return copy(true);
return copy(true, rawDataType);
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public DateType() {
}

@Override
public DataType copy(boolean isNullable) {
protected DataType copy(boolean isNullable) {
return new DateType(isNullable);
}

@Override
public String asSerializableString() {
protected String asSerializableString() {
return withNullability(FORMAT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ public int getScale() {
}

@Override
public DataType copy(boolean isNullable) {
protected DataType copy(boolean isNullable) {
return new DecimalType(isNullable, precision, scale);
}

@Override
public String asSerializableString() {
protected String asSerializableString() {
return withNullability(FORMAT, precision, scale);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ public DoubleType() {
}

@Override
public DataType copy(boolean isNullable) {
protected DataType copy(boolean isNullable) {
return new DoubleType(isNullable);
}

@Override
public String asSerializableString() {
protected String asSerializableString() {
return withNullability(FORMAT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ public FloatType() {
}

@Override
public DataType copy(boolean isNullable) {
protected DataType copy(boolean isNullable) {
return new FloatType(isNullable);
}

@Override
public String asSerializableString() {
protected String asSerializableString() {
return withNullability(FORMAT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ public IntType() {
}

@Override
public DataType copy(boolean isNullable) {
protected DataType copy(boolean isNullable) {
return new IntType(isNullable);
}

@Override
public String asSerializableString() {
protected String asSerializableString() {
return withNullability(FORMAT);
}

Expand Down
Loading
Loading