Skip to content

Commit

Permalink
[HUDI-6052] Standardise TIMESTAMP(6) format when writing to Parquet f…
Browse files Browse the repository at this point in the history
…iles (apache#8418)

Co-authored-by: hbgstc123 <[email protected]>
  • Loading branch information
voonhous and hbgstc123 authored Apr 17, 2023
1 parent 036bba8 commit 64bf871
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.util.Option;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.util.Option;
import org.apache.parquet.hadoop.api.WriteSupport;

import java.nio.charset.StandardCharsets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.io.storage.row.parquet;

import org.apache.hudi.common.util.ValidationUtils;

import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalDataUtils;
import org.apache.flink.table.data.MapData;
Expand Down Expand Up @@ -124,17 +126,19 @@ private FieldWriter createWriter(LogicalType t) {
return new DoubleWriter();
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) t;
if (timestampType.getPrecision() == 3) {
return new Timestamp64Writer();
final int tsPrecision = timestampType.getPrecision();
if (tsPrecision == 3 || tsPrecision == 6) {
return new Timestamp64Writer(tsPrecision);
} else {
return new Timestamp96Writer(timestampType.getPrecision());
return new Timestamp96Writer(tsPrecision);
}
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) t;
if (localZonedTimestampType.getPrecision() == 3) {
return new Timestamp64Writer();
final int tsLtzPrecision = localZonedTimestampType.getPrecision();
if (tsLtzPrecision == 3 || tsLtzPrecision == 6) {
return new Timestamp64Writer(tsLtzPrecision);
} else {
return new Timestamp96Writer(localZonedTimestampType.getPrecision());
return new Timestamp96Writer(tsLtzPrecision);
}
case ARRAY:
ArrayType arrayType = (ArrayType) t;
Expand Down Expand Up @@ -284,33 +288,64 @@ public void write(ArrayData array, int ordinal) {
}

/**
* Timestamp of INT96 bytes, julianDay(4) + nanosOfDay(8). See
* TIMESTAMP_MILLIS and TIMESTAMP_MICROS is the deprecated ConvertedType of TIMESTAMP with the MILLIS and MICROS
* precision respectively. See
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
* TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType.
*/
private class Timestamp64Writer implements FieldWriter {
private Timestamp64Writer() {
private final int precision;
private Timestamp64Writer(int precision) {
ValidationUtils.checkArgument(precision == 3 || precision == 6,
"Timestamp64Writer is only able to support precisions of {3, 6}");
this.precision = precision;
}

@Override
public void write(RowData row, int ordinal) {
recordConsumer.addLong(timestampToInt64(row.getTimestamp(ordinal, 3)));
TimestampData timestampData = row.getTimestamp(ordinal, precision);
recordConsumer.addLong(timestampToInt64(timestampData, precision));
}

@Override
public void write(ArrayData array, int ordinal) {
recordConsumer.addLong(timestampToInt64(array.getTimestamp(ordinal, 3)));
TimestampData timestampData = array.getTimestamp(ordinal, precision);
recordConsumer.addLong(timestampToInt64(timestampData, precision));
}
}

private long timestampToInt64(TimestampData timestampData) {
return utcTimestamp ? timestampData.getMillisecond() : timestampData.toTimestamp().getTime();
/**
* Converts a {@code TimestampData} to its corresponding int64 value. This function only accepts TimestampData of
* precision 3 or 6. Special attention will need to be given to a TimestampData of precision = 6.
* <p>
* For example representing `1970-01-01T00:00:03.100001` of precision 6 will have:
* <ul>
* <li>millisecond = 3100</li>
* <li>nanoOfMillisecond = 1000</li>
* </ul>
* As such, the int64 value will be:
* <p>
* millisecond * 1000 + nanoOfMillisecond / 1000
*
* @param timestampData TimestampData to be converted to int64 format
* @param precision the precision of the TimestampData
* @return int64 value of the TimestampData
*/
private long timestampToInt64(TimestampData timestampData, int precision) {
if (precision == 3) {
return utcTimestamp ? timestampData.getMillisecond() : timestampData.toTimestamp().getTime();
} else {
// using an else clause here as precision has been validated to be {3, 6} in the constructor
// convert timestampData to microseconds format
return utcTimestamp ? timestampData.getMillisecond() * 1000 + timestampData.getNanoOfMillisecond() / 1000 :
timestampData.toTimestamp().getTime() * 1000;
}
}

/**
* Timestamp of INT96 bytes, julianDay(4) + nanosOfDay(8). See
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
* TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType.
* <p>
* TODO: Leaving this here as there might be a requirement to support TIMESTAMP(9) in the future
*/
private class Timestamp96Writer implements FieldWriter {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,19 +600,21 @@ private static Type convertToParquetType(
.named(name);
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) type;
if (timestampType.getPrecision() == 3) {
if (timestampType.getPrecision() == 3 || timestampType.getPrecision() == 6) {
TimeUnit timeunit = timestampType.getPrecision() == 3 ? TimeUnit.MILLIS : TimeUnit.MICROS;
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
.as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS))
.as(LogicalTypeAnnotation.timestampType(true, timeunit))
.named(name);
} else {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
.named(name);
}
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type;
if (localZonedTimestampType.getPrecision() == 3) {
if (localZonedTimestampType.getPrecision() == 3 || localZonedTimestampType.getPrecision() == 6) {
TimeUnit timeunit = localZonedTimestampType.getPrecision() == 3 ? TimeUnit.MILLIS : TimeUnit.MICROS;
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
.as(LogicalTypeAnnotation.timestampType(false, TimeUnit.MILLIS))
.as(LogicalTypeAnnotation.timestampType(false, timeunit))
.named(name);
} else {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void testConvertTimestampTypes() {
assertThat(messageType.getColumns().size(), is(3));
final String expected = "message converted {\n"
+ " optional int64 ts_3 (TIMESTAMP(MILLIS,true));\n"
+ " optional int96 ts_6;\n"
+ " optional int64 ts_6 (TIMESTAMP(MICROS,true));\n"
+ " optional int96 ts_9;\n"
+ "}\n";
assertThat(messageType.toString(), is(expected));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 6.");
+ ", it only support precisions <= 6.");
}
Schema timestamp = timestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(timestamp) : timestamp;
Expand All @@ -273,7 +273,7 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
throw new IllegalArgumentException(
"Avro does not support LOCAL TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 6.");
+ ", it only support precisions <= 6.");
}
Schema localZonedTimestamp = localZonedTimestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(localZonedTimestamp) : localZonedTimestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private static AvroToRowDataConverter createTimestampConverter(int precision) {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 6.");
+ ", it only support precisions <= 6.");
}
return avroObject -> {
final Instant instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
Expand All @@ -68,8 +71,10 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
Expand Down Expand Up @@ -419,4 +424,179 @@ public void testHoodieFlinkClusteringScheduleAfterArchive() throws Exception {
.stream().anyMatch(fg -> fg.getSlices()
.stream().anyMatch(s -> s.getDataFilePath().contains(firstClusteringInstant))));
}

/**
* Test to ensure that creating a table with a column of TIMESTAMP(9) will throw errors
* @throws Exception
*/
@Test
public void testHoodieFlinkClusteringWithTimestampNanos() {
// create hoodie table and insert into data
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());

// use append mode
options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");

// row schema
final DataType dataType = DataTypes.ROW(
DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(9)), // precombine field
DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
.notNull();

final RowType rowType = (RowType) dataType.getLogicalType();
final List<String> fields = rowType.getFields().stream()
.map(RowType.RowField::asSummaryString).collect(Collectors.toList());

String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL(
"t1", fields, options, true, "uuid", "partition");
TableResult tableResult = tableEnv.executeSql(hoodieTableDDL);

// insert rows with timestamp of microseconds precision; timestamp(6)
final String insertSql = "insert into t1 values\n"
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01.100001001','par1'),\n"
+ "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02.100001001','par1'),\n"
+ "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03.100001001','par2'),\n"
+ "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04.100001001','par2'),\n"
+ "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05.100001001','par3'),\n"
+ "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06.100001001','par3'),\n"
+ "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07.100001001','par4'),\n"
+ "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08.100001001','par4')";

assertThrows(ValidationException.class, () -> tableEnv.executeSql(insertSql),
"Avro does not support TIMESTAMP type with precision: 9, it only support precisions <= 6.");
}

@Test
public void testHoodieFlinkClusteringWithTimestampMicros() throws Exception {
// create hoodie table and insert into data
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());

// use append mode
options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());

// row schema
final DataType dataType = DataTypes.ROW(
DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)), // precombine field
DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
.notNull();
final RowType rowType = (RowType) dataType.getLogicalType();
final List<String> fields = rowType.getFields().stream()
.map(RowType.RowField::asSummaryString).collect(Collectors.toList());

String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL(
"t1", fields, options, true, "uuid", "partition");
tableEnv.executeSql(hoodieTableDDL);

// insert rows with timestamp of microseconds precision; timestamp(6)
final String insertSql = "insert into t1 values\n"
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01.100001','par1'),\n"
+ "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02.100001','par1'),\n"
+ "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03.100001','par2'),\n"
+ "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04.100001','par2'),\n"
+ "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05.100001','par3'),\n"
+ "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06.100001','par3'),\n"
+ "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07.100001','par4'),\n"
+ "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08.100001','par4')";
tableEnv.executeSql(insertSql).await();

// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(3);

// make configuration and setAvroSchema.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkClusteringConfig cfg = new FlinkClusteringConfig();
cfg.path = tempFile.getAbsolutePath();
cfg.targetPartitions = 4;
Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);

// create metaClient
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);

// set the table name
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());

// set record key field
conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
// set partition field
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());

long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");

// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);

// judge whether have operation
// To compute the clustering instant time and do clustering.
String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();

HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient(conf);
HoodieFlinkTable<?> table = writeClient.getHoodieTable();

boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());

assertTrue(scheduled, "The clustering plan should be scheduled");

// fetch the instant based on the configured execution sequence
table.getMetaClient().reloadActiveTimeline();
HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED);

// generate clustering plan
// should support configurable commit metadata
Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
table.getMetaClient(), timeline.lastInstant().get());

HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();

// Mark instant as clustering inflight
HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());

DataStream<ClusteringCommitEvent> dataStream = env.addSource(new ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan))
.name("clustering_source")
.uid("uid_clustering_source")
.rebalance()
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))
.setParallelism(clusteringPlan.getInputGroups().size());

ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);

dataStream
.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.uid("uid_clustering_commit")
.setParallelism(1);

env.execute("flink_hudi_clustering");

// test output
final Map<String, String> expected = new HashMap<>();
expected.put("par1", "[id1,par1,id1,Danny,23,1100001,par1, id2,par1,id2,Stephen,33,2100001,par1]");
expected.put("par2", "[id3,par2,id3,Julian,53,3100001,par2, id4,par2,id4,Fabian,31,4100001,par2]");
expected.put("par3", "[id5,par3,id5,Sophia,18,5100001,par3, id6,par3,id6,Emma,20,6100001,par3]");
expected.put("par4", "[id7,par4,id7,Bob,44,7100001,par4, id8,par4,id8,Han,56,8100001,par4]");
TestData.checkWrittenData(tempFile, expected, 4);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Int64TimestampColumnReader(
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 6.");
+ ", it only support precisions <= 6.");
}
checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
}
Expand Down
Loading

0 comments on commit 64bf871

Please sign in to comment.