Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4914] Managed memory weight should be set when sort clustering is enabled #6792

Merged
merged 1 commit into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
private transient int[] requiredPos;
private transient AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
private transient HoodieFlinkWriteClient writeClient;
private transient BulkInsertWriterHelper writerHelper;

private transient BinaryExternalSorter sorter;
private transient StreamRecordCollector<ClusteringCommitEvent> collector;
private transient BinaryRowDataSerializer binarySerializer;

Expand Down Expand Up @@ -153,10 +150,6 @@ public void open() throws Exception {
this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(rowType);
this.binarySerializer = new BinaryRowDataSerializer(rowType.getFieldCount());

if (this.sortClusteringEnabled) {
initSorter();
}

if (this.asyncClustering) {
this.executor = NonThrownExecutor.builder(LOG).build();
}
Expand Down Expand Up @@ -186,6 +179,7 @@ public void close() {
if (this.writeClient != null) {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
this.writeClient = null;
}
}

Expand All @@ -203,7 +197,9 @@ public void endInput() {
private void doClustering(String instantTime, ClusteringPlanEvent event) throws Exception {
final ClusteringGroupInfo clusteringGroupInfo = event.getClusteringGroupInfo();

initWriterHelper(instantTime);
BulkInsertWriterHelper writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
instantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
this.rowType);

List<ClusteringOperation> clusteringOps = clusteringGroupInfo.getOperations();
boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
Expand All @@ -220,33 +216,27 @@ private void doClustering(String instantTime, ClusteringPlanEvent event) throws
RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);

if (this.sortClusteringEnabled) {
BinaryExternalSorter sorter = initSorter();
while (iterator.hasNext()) {
RowData rowData = iterator.next();
BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();
this.sorter.write(binaryRowData);
sorter.write(binaryRowData);
}

BinaryRowData row = binarySerializer.createInstance();
while ((row = sorter.getIterator().next(row)) != null) {
this.writerHelper.write(row);
writerHelper.write(row);
}
sorter.close();
} else {
while (iterator.hasNext()) {
this.writerHelper.write(iterator.next());
writerHelper.write(iterator.next());
}
}

List<WriteStatus> writeStatuses = this.writerHelper.getWriteStatuses(this.taskID);
List<WriteStatus> writeStatuses = writerHelper.getWriteStatuses(this.taskID);
collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses, this.taskID));
this.writerHelper = null;
}

private void initWriterHelper(String clusteringInstantTime) {
if (this.writerHelper == null) {
this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
clusteringInstantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
this.rowType);
}
writerHelper.close();
}

/**
Expand Down Expand Up @@ -338,13 +328,13 @@ private int[] getRequiredPositions() {
.toArray();
}

private void initSorter() {
private BinaryExternalSorter initSorter() {
ClassLoader cl = getContainingTask().getUserCodeClassLoader();
NormalizedKeyComputer computer = createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl);
RecordComparator comparator = createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl);

MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager();
this.sorter =
BinaryExternalSorter sorter =
new BinaryExternalSorter(
this.getContainingTask(),
memManager,
Expand All @@ -355,12 +345,13 @@ private void initSorter() {
computer,
comparator,
getContainingTask().getJobConfiguration());
this.sorter.startThreads();
sorter.startThreads();

// register the metrics.
getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
return sorter;
}

private SortCodeGenerator createSortCodeGenerator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf,
* @return the clustering pipeline
*/
public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf, RowType rowType, DataStream<Object> dataStream) {
return dataStream.transform("cluster_plan_generate",
DataStream<ClusteringCommitEvent> clusteringStream = dataStream.transform("cluster_plan_generate",
TypeInformation.of(ClusteringPlanEvent.class),
new ClusteringPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
Expand All @@ -413,8 +413,12 @@ public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf,
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))
.setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS))
.addSink(new ClusteringCommitSink(conf))
.setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS));
if (OptionsResolver.sortClusteringEnabled(conf)) {
ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
return clusteringStream.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.setParallelism(1); // compaction commit should be singleton
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,22 @@ public void testWriteMergeOnReadWithCompaction(String indexType) throws Exceptio

@Test
public void testWriteCopyOnWriteWithClustering() throws Exception {
testWriteCopyOnWriteWithClustering(false);
}

@Test
public void testWriteCopyOnWriteWithSortClustering() throws Exception {
testWriteCopyOnWriteWithClustering(true);
}

private void testWriteCopyOnWriteWithClustering(boolean sortClusteringEnabled) throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.OPERATION, "insert");
if (sortClusteringEnabled) {
conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, "uuid");
}

testWriteToHoodieWithCluster(conf, "cow_write_with_cluster", 1, EXPECTED);
}
Expand Down