Skip to content

Commit

Permalink
[HUDI-4914] Managed memory weight should be set when sort clustering …
Browse files Browse the repository at this point in the history
…is enabled (apache#6792)
  • Loading branch information
SteNicholas authored and voonhous committed Oct 7, 2022
1 parent d6bbcec commit 1e08f9c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
private transient int[] requiredPos;
private transient AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
private transient HoodieFlinkWriteClient writeClient;
private transient BulkInsertWriterHelper writerHelper;

private transient BinaryExternalSorter sorter;
private transient StreamRecordCollector<ClusteringCommitEvent> collector;
private transient BinaryRowDataSerializer binarySerializer;

Expand Down Expand Up @@ -153,10 +150,6 @@ public void open() throws Exception {
this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(rowType);
this.binarySerializer = new BinaryRowDataSerializer(rowType.getFieldCount());

if (this.sortClusteringEnabled) {
initSorter();
}

if (this.asyncClustering) {
this.executor = NonThrownExecutor.builder(LOG).build();
}
Expand Down Expand Up @@ -186,6 +179,7 @@ public void close() {
if (this.writeClient != null) {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
this.writeClient = null;
}
}

Expand All @@ -203,7 +197,9 @@ public void endInput() {
private void doClustering(String instantTime, ClusteringPlanEvent event) throws Exception {
final ClusteringGroupInfo clusteringGroupInfo = event.getClusteringGroupInfo();

initWriterHelper(instantTime);
BulkInsertWriterHelper writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
instantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
this.rowType);

List<ClusteringOperation> clusteringOps = clusteringGroupInfo.getOperations();
boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
Expand All @@ -220,33 +216,27 @@ private void doClustering(String instantTime, ClusteringPlanEvent event) throws
RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);

if (this.sortClusteringEnabled) {
BinaryExternalSorter sorter = initSorter();
while (iterator.hasNext()) {
RowData rowData = iterator.next();
BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();
this.sorter.write(binaryRowData);
sorter.write(binaryRowData);
}

BinaryRowData row = binarySerializer.createInstance();
while ((row = sorter.getIterator().next(row)) != null) {
this.writerHelper.write(row);
writerHelper.write(row);
}
sorter.close();
} else {
while (iterator.hasNext()) {
this.writerHelper.write(iterator.next());
writerHelper.write(iterator.next());
}
}

List<WriteStatus> writeStatuses = this.writerHelper.getWriteStatuses(this.taskID);
List<WriteStatus> writeStatuses = writerHelper.getWriteStatuses(this.taskID);
collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses, this.taskID));
this.writerHelper = null;
}

private void initWriterHelper(String clusteringInstantTime) {
if (this.writerHelper == null) {
this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
clusteringInstantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
this.rowType);
}
writerHelper.close();
}

/**
Expand Down Expand Up @@ -338,13 +328,13 @@ private int[] getRequiredPositions() {
.toArray();
}

private void initSorter() {
private BinaryExternalSorter initSorter() {
ClassLoader cl = getContainingTask().getUserCodeClassLoader();
NormalizedKeyComputer computer = createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl);
RecordComparator comparator = createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl);

MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager();
this.sorter =
BinaryExternalSorter sorter =
new BinaryExternalSorter(
this.getContainingTask(),
memManager,
Expand All @@ -355,12 +345,13 @@ private void initSorter() {
computer,
comparator,
getContainingTask().getJobConfiguration());
this.sorter.startThreads();
sorter.startThreads();

// register the metrics.
getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
return sorter;
}

private SortCodeGenerator createSortCodeGenerator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf,
* @return the clustering pipeline
*/
public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf, RowType rowType, DataStream<Object> dataStream) {
return dataStream.transform("cluster_plan_generate",
DataStream<ClusteringCommitEvent> clusteringStream = dataStream.transform("cluster_plan_generate",
TypeInformation.of(ClusteringPlanEvent.class),
new ClusteringPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
Expand All @@ -413,8 +413,12 @@ public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf,
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))
.setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS))
.addSink(new ClusteringCommitSink(conf))
.setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS));
if (OptionsResolver.sortClusteringEnabled(conf)) {
ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
return clusteringStream.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.setParallelism(1); // compaction commit should be singleton
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,22 @@ public void testWriteMergeOnReadWithCompaction(String indexType) throws Exceptio

@Test
public void testWriteCopyOnWriteWithClustering() throws Exception {
testWriteCopyOnWriteWithClustering(false);
}

@Test
public void testWriteCopyOnWriteWithSortClustering() throws Exception {
testWriteCopyOnWriteWithClustering(true);
}

private void testWriteCopyOnWriteWithClustering(boolean sortClusteringEnabled) throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.OPERATION, "insert");
if (sortClusteringEnabled) {
conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, "uuid");
}

testWriteToHoodieWithCluster(conf, "cow_write_with_cluster", 1, EXPECTED);
}
Expand Down

0 comments on commit 1e08f9c

Please sign in to comment.