From 64bf871cfc3cfc08478cf04e02d2f7086f72548e Mon Sep 17 00:00:00 2001 From: voonhous Date: Mon, 17 Apr 2023 21:12:06 +0800 Subject: [PATCH] [HUDI-6052] Standardise TIMESTAMP(6) format when writing to Parquet files (#8418) Co-authored-by: hbgstc123 --- .../row/HoodieRowDataParquetWriteSupport.java | 2 +- .../row/parquet/ParquetRowDataWriter.java | 63 ++++-- .../row/parquet/ParquetSchemaConverter.java | 10 +- .../parquet/TestParquetSchemaConverter.java | 2 +- .../apache/hudi/util/AvroSchemaConverter.java | 4 +- .../hudi/util/AvroToRowDataConverters.java | 2 +- .../cluster/ITTestHoodieFlinkClustering.java | 180 ++++++++++++++++++ .../reader/Int64TimestampColumnReader.java | 2 +- .../reader/Int64TimestampColumnReader.java | 2 +- .../reader/Int64TimestampColumnReader.java | 2 +- .../reader/Int64TimestampColumnReader.java | 2 +- 11 files changed, 244 insertions(+), 27 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java index b939498c3e240..4a3109db60a33 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java @@ -20,11 +20,11 @@ import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.util.Option; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.conf.Configuration; -import org.apache.hudi.common.util.Option; import org.apache.parquet.hadoop.api.WriteSupport; import java.nio.charset.StandardCharsets; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java index 3d9524eaa30e9..e5b9509d8798a 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java @@ -18,6 +18,8 @@ package org.apache.hudi.io.storage.row.parquet; +import org.apache.hudi.common.util.ValidationUtils; + import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalDataUtils; import org.apache.flink.table.data.MapData; @@ -124,17 +126,19 @@ private FieldWriter createWriter(LogicalType t) { return new DoubleWriter(); case TIMESTAMP_WITHOUT_TIME_ZONE: TimestampType timestampType = (TimestampType) t; - if (timestampType.getPrecision() == 3) { - return new Timestamp64Writer(); + final int tsPrecision = timestampType.getPrecision(); + if (tsPrecision == 3 || tsPrecision == 6) { + return new Timestamp64Writer(tsPrecision); } else { - return new Timestamp96Writer(timestampType.getPrecision()); + return new Timestamp96Writer(tsPrecision); } case TIMESTAMP_WITH_LOCAL_TIME_ZONE: LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) t; - if (localZonedTimestampType.getPrecision() == 3) { - return new Timestamp64Writer(); + final int tsLtzPrecision = localZonedTimestampType.getPrecision(); + if (tsLtzPrecision == 3 || tsLtzPrecision == 6) { + return new Timestamp64Writer(tsLtzPrecision); } else { - return new Timestamp96Writer(localZonedTimestampType.getPrecision()); + return new Timestamp96Writer(tsLtzPrecision); } case ARRAY: ArrayType arrayType = (ArrayType) t; @@ -284,33 +288,64 @@ public void write(ArrayData array, int ordinal) { } /** - * Timestamp of INT96 bytes, julianDay(4) + nanosOfDay(8). See + * TIMESTAMP_MILLIS and TIMESTAMP_MICROS is the deprecated ConvertedType of TIMESTAMP with the MILLIS and MICROS + * precision respectively. See * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp - * TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType. */ private class Timestamp64Writer implements FieldWriter { - private Timestamp64Writer() { + private final int precision; + private Timestamp64Writer(int precision) { + ValidationUtils.checkArgument(precision == 3 || precision == 6, + "Timestamp64Writer is only able to support precisions of {3, 6}"); + this.precision = precision; } @Override public void write(RowData row, int ordinal) { - recordConsumer.addLong(timestampToInt64(row.getTimestamp(ordinal, 3))); + TimestampData timestampData = row.getTimestamp(ordinal, precision); + recordConsumer.addLong(timestampToInt64(timestampData, precision)); } @Override public void write(ArrayData array, int ordinal) { - recordConsumer.addLong(timestampToInt64(array.getTimestamp(ordinal, 3))); + TimestampData timestampData = array.getTimestamp(ordinal, precision); + recordConsumer.addLong(timestampToInt64(timestampData, precision)); } } - private long timestampToInt64(TimestampData timestampData) { - return utcTimestamp ? timestampData.getMillisecond() : timestampData.toTimestamp().getTime(); + /** + * Converts a {@code TimestampData} to its corresponding int64 value. This function only accepts TimestampData of + * precision 3 or 6. Special attention will need to be given to a TimestampData of precision = 6. + *

+ * For example representing `1970-01-01T00:00:03.100001` of precision 6 will have: + *

+ * As such, the int64 value will be: + *

+ * millisecond * 1000 + nanoOfMillisecond / 1000 + * + * @param timestampData TimestampData to be converted to int64 format + * @param precision the precision of the TimestampData + * @return int64 value of the TimestampData + */ + private long timestampToInt64(TimestampData timestampData, int precision) { + if (precision == 3) { + return utcTimestamp ? timestampData.getMillisecond() : timestampData.toTimestamp().getTime(); + } else { + // using an else clause here as precision has been validated to be {3, 6} in the constructor + // convert timestampData to microseconds format + return utcTimestamp ? timestampData.getMillisecond() * 1000 + timestampData.getNanoOfMillisecond() / 1000 : + timestampData.toTimestamp().getTime() * 1000; + } } /** * Timestamp of INT96 bytes, julianDay(4) + nanosOfDay(8). See * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp - * TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType. + *

+ * TODO: Leaving this here as there might be a requirement to support TIMESTAMP(9) in the future */ private class Timestamp96Writer implements FieldWriter { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java index 5fb76f9418c86..7cd7c300670e7 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java @@ -600,9 +600,10 @@ private static Type convertToParquetType( .named(name); case TIMESTAMP_WITHOUT_TIME_ZONE: TimestampType timestampType = (TimestampType) type; - if (timestampType.getPrecision() == 3) { + if (timestampType.getPrecision() == 3 || timestampType.getPrecision() == 6) { + TimeUnit timeunit = timestampType.getPrecision() == 3 ? TimeUnit.MILLIS : TimeUnit.MICROS; return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition) - .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS)) + .as(LogicalTypeAnnotation.timestampType(true, timeunit)) .named(name); } else { return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition) @@ -610,9 +611,10 @@ private static Type convertToParquetType( } case TIMESTAMP_WITH_LOCAL_TIME_ZONE: LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type; - if (localZonedTimestampType.getPrecision() == 3) { + if (localZonedTimestampType.getPrecision() == 3 || localZonedTimestampType.getPrecision() == 6) { + TimeUnit timeunit = localZonedTimestampType.getPrecision() == 3 ? TimeUnit.MILLIS : TimeUnit.MICROS; return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition) - .as(LogicalTypeAnnotation.timestampType(false, TimeUnit.MILLIS)) + .as(LogicalTypeAnnotation.timestampType(false, timeunit)) .named(name); } else { return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition) diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java index a1a07a65f9931..3d5012b73b371 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java @@ -83,7 +83,7 @@ void testConvertTimestampTypes() { assertThat(messageType.getColumns().size(), is(3)); final String expected = "message converted {\n" + " optional int64 ts_3 (TIMESTAMP(MILLIS,true));\n" - + " optional int96 ts_6;\n" + + " optional int64 ts_6 (TIMESTAMP(MICROS,true));\n" + " optional int96 ts_9;\n" + "}\n"; assertThat(messageType.toString(), is(expected)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java index 44253e3732991..6e4c3ec613767 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java @@ -256,7 +256,7 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) { throw new IllegalArgumentException( "Avro does not support TIMESTAMP type with precision: " + precision - + ", it only supports precision less than 6."); + + ", it only support precisions <= 6."); } Schema timestamp = timestampLogicalType.addToSchema(SchemaBuilder.builder().longType()); return nullable ? nullableSchema(timestamp) : timestamp; @@ -273,7 +273,7 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) { throw new IllegalArgumentException( "Avro does not support LOCAL TIMESTAMP type with precision: " + precision - + ", it only supports precision less than 6."); + + ", it only support precisions <= 6."); } Schema localZonedTimestamp = localZonedTimestampLogicalType.addToSchema(SchemaBuilder.builder().longType()); return nullable ? nullableSchema(localZonedTimestamp) : localZonedTimestamp; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java index 5c9988dc0b2ed..38633b8ad9e77 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java @@ -215,7 +215,7 @@ private static AvroToRowDataConverter createTimestampConverter(int precision) { throw new IllegalArgumentException( "Avro does not support TIMESTAMP type with precision: " + precision - + ", it only supports precision less than 6."); + + ", it only support precisions <= 6."); } return avroObject -> { final Instant instant; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java index 9fdc6fc8b108f..095fe1d369e6f 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java @@ -52,8 +52,11 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; @@ -68,8 +71,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -419,4 +424,179 @@ public void testHoodieFlinkClusteringScheduleAfterArchive() throws Exception { .stream().anyMatch(fg -> fg.getSlices() .stream().anyMatch(s -> s.getDataFilePath().contains(firstClusteringInstant)))); } + + /** + * Test to ensure that creating a table with a column of TIMESTAMP(9) will throw errors + * @throws Exception + */ + @Test + public void testHoodieFlinkClusteringWithTimestampNanos() { + // create hoodie table and insert into data + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); + tableEnv.getConfig().getConfiguration() + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4); + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + + // use append mode + options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value()); + options.put(FlinkOptions.INSERT_CLUSTER.key(), "false"); + + // row schema + final DataType dataType = DataTypes.ROW( + DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key + DataTypes.FIELD("name", DataTypes.VARCHAR(10)), + DataTypes.FIELD("age", DataTypes.INT()), + DataTypes.FIELD("ts", DataTypes.TIMESTAMP(9)), // precombine field + DataTypes.FIELD("partition", DataTypes.VARCHAR(10))) + .notNull(); + + final RowType rowType = (RowType) dataType.getLogicalType(); + final List fields = rowType.getFields().stream() + .map(RowType.RowField::asSummaryString).collect(Collectors.toList()); + + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL( + "t1", fields, options, true, "uuid", "partition"); + TableResult tableResult = tableEnv.executeSql(hoodieTableDDL); + + // insert rows with timestamp of microseconds precision; timestamp(6) + final String insertSql = "insert into t1 values\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01.100001001','par1'),\n" + + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02.100001001','par1'),\n" + + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03.100001001','par2'),\n" + + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04.100001001','par2'),\n" + + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05.100001001','par3'),\n" + + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06.100001001','par3'),\n" + + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07.100001001','par4'),\n" + + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08.100001001','par4')"; + + assertThrows(ValidationException.class, () -> tableEnv.executeSql(insertSql), + "Avro does not support TIMESTAMP type with precision: 9, it only support precisions <= 6."); + } + + @Test + public void testHoodieFlinkClusteringWithTimestampMicros() throws Exception { + // create hoodie table and insert into data + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); + tableEnv.getConfig().getConfiguration() + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4); + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + + // use append mode + options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value()); + + // row schema + final DataType dataType = DataTypes.ROW( + DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key + DataTypes.FIELD("name", DataTypes.VARCHAR(10)), + DataTypes.FIELD("age", DataTypes.INT()), + DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)), // precombine field + DataTypes.FIELD("partition", DataTypes.VARCHAR(10))) + .notNull(); + final RowType rowType = (RowType) dataType.getLogicalType(); + final List fields = rowType.getFields().stream() + .map(RowType.RowField::asSummaryString).collect(Collectors.toList()); + + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL( + "t1", fields, options, true, "uuid", "partition"); + tableEnv.executeSql(hoodieTableDDL); + + // insert rows with timestamp of microseconds precision; timestamp(6) + final String insertSql = "insert into t1 values\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01.100001','par1'),\n" + + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02.100001','par1'),\n" + + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03.100001','par2'),\n" + + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04.100001','par2'),\n" + + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05.100001','par3'),\n" + + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06.100001','par3'),\n" + + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07.100001','par4'),\n" + + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08.100001','par4')"; + tableEnv.executeSql(insertSql).await(); + + // wait for the asynchronous commit to finish + TimeUnit.SECONDS.sleep(3); + + // make configuration and setAvroSchema. + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + FlinkClusteringConfig cfg = new FlinkClusteringConfig(); + cfg.path = tempFile.getAbsolutePath(); + cfg.targetPartitions = 4; + Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); + + // create metaClient + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); + + // set the table name + conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); + + // set record key field + conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp()); + // set partition field + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp()); + + long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout(); + conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition"); + + // set table schema + CompactionUtil.setAvroSchema(conf, metaClient); + + // judge whether have operation + // To compute the clustering instant time and do clustering. + String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); + + HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient(conf); + HoodieFlinkTable table = writeClient.getHoodieTable(); + + boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); + + assertTrue(scheduled, "The clustering plan should be scheduled"); + + // fetch the instant based on the configured execution sequence + table.getMetaClient().reloadActiveTimeline(); + HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline() + .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED); + + // generate clustering plan + // should support configurable commit metadata + Option> clusteringPlanOption = ClusteringUtils.getClusteringPlan( + table.getMetaClient(), timeline.lastInstant().get()); + + HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight(); + + // Mark instant as clustering inflight + HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime); + table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty()); + + DataStream dataStream = env.addSource(new ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan)) + .name("clustering_source") + .uid("uid_clustering_source") + .rebalance() + .transform("clustering_task", + TypeInformation.of(ClusteringCommitEvent.class), + new ClusteringOperator(conf, rowType)) + .setParallelism(clusteringPlan.getInputGroups().size()); + + ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(), + conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); + + dataStream + .addSink(new ClusteringCommitSink(conf)) + .name("clustering_commit") + .uid("uid_clustering_commit") + .setParallelism(1); + + env.execute("flink_hudi_clustering"); + + // test output + final Map expected = new HashMap<>(); + expected.put("par1", "[id1,par1,id1,Danny,23,1100001,par1, id2,par1,id2,Stephen,33,2100001,par1]"); + expected.put("par2", "[id3,par2,id3,Julian,53,3100001,par2, id4,par2,id4,Fabian,31,4100001,par2]"); + expected.put("par3", "[id5,par3,id5,Sophia,18,5100001,par3, id6,par3,id6,Emma,20,6100001,par3]"); + expected.put("par4", "[id7,par4,id7,Bob,44,7100001,par4, id8,par4,id8,Han,56,8100001,par4]"); + TestData.checkWrittenData(tempFile, expected, 4); + } } diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java index 555853bda6bd8..70638a9c43200 100644 --- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java +++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java @@ -59,7 +59,7 @@ public Int64TimestampColumnReader( throw new IllegalArgumentException( "Avro does not support TIMESTAMP type with precision: " + precision - + ", it only supports precision less than 6."); + + ", it only support precisions <= 6."); } checkTypeName(PrimitiveType.PrimitiveTypeName.INT64); } diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java index 555853bda6bd8..70638a9c43200 100644 --- a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java +++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java @@ -59,7 +59,7 @@ public Int64TimestampColumnReader( throw new IllegalArgumentException( "Avro does not support TIMESTAMP type with precision: " + precision - + ", it only supports precision less than 6."); + + ", it only support precisions <= 6."); } checkTypeName(PrimitiveType.PrimitiveTypeName.INT64); } diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java index 417b1155bbd7b..b44273b57ca26 100644 --- a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java +++ b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java @@ -59,7 +59,7 @@ public Int64TimestampColumnReader( throw new IllegalArgumentException( "Avro does not support TIMESTAMP type with precision: " + precision - + ", it only supports precision less than 6."); + + ", it only support precisions <= 6."); } checkTypeName(PrimitiveType.PrimitiveTypeName.INT64); } diff --git a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java index 417b1155bbd7b..b44273b57ca26 100644 --- a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java +++ b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java @@ -59,7 +59,7 @@ public Int64TimestampColumnReader( throw new IllegalArgumentException( "Avro does not support TIMESTAMP type with precision: " + precision - + ", it only supports precision less than 6."); + + ", it only support precisions <= 6."); } checkTypeName(PrimitiveType.PrimitiveTypeName.INT64); }