Skip to content

Commit

Permalink
[HUDI-4385] Support to trigger the compaction in the flink batch mode…
Browse files Browse the repository at this point in the history
… write.
  • Loading branch information
LinMingQiang committed Jul 22, 2022
1 parent 51b5783 commit dfda12d
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,12 @@ private void handleEndInputEvent(WriteMetadataEvent event) {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// sync Hive synchronously if it is enabled in batch mode.
syncHive();
// when we are batch write to hoodie, this can do compaction online.
if (tableState.scheduleCompaction) {
LOG.info("scheduleCompaction for batch mode write !");
// if async compaction is on, schedule the compaction
CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.sink.compact;

import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -47,7 +48,7 @@
* <p>It should be singleton to avoid conflicts.
*/
public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPlanEvent>
implements OneInputStreamOperator<Object, CompactionPlanEvent> {
implements OneInputStreamOperator<Object, CompactionPlanEvent>, BoundedOneInput {

/**
* Config options.
Expand Down Expand Up @@ -141,4 +142,10 @@ private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) th
public void setOutput(Output<StreamRecord<CompactionPlanEvent>> output) {
this.output = output;
}

@Override
public void endInput() throws Exception {
// Called when the input data ends, only used in batch mode.
notifyCheckpointComplete(-1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
// compaction
if (OptionsResolver.needsAsyncCompaction(conf)) {
// batch mode write must use syncCompaction.
if (context.isBounded()) {
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
}
return Pipelines.compact(conf, pipeline);
} else {
return Pipelines.clean(conf, pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,29 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel
TestData.checkWrittenDataCOW(tempFile, EXPECTED3);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testHoodieFlinkCompactorInBatchModeWrite(boolean compactionOnBatchModeEnable) throws Exception {
// Create hoodie table and insert into data.
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "2");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), String.valueOf(compactionOnBatchModeEnable));
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
tableEnv.executeSql(hoodieTableDDL);
tableEnv.executeSql(TestSQL.INSERT_T1).await();
tableEnv.executeSql(TestSQL.UPDATE_INSERT_T1).await();
Map<String, List<String>> expected = compactionOnBatchModeEnable ? EXPECTED2 : new HashMap<>();
// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(3);
TestData.checkWrittenDataCOW(tempFile, expected);
}

private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, HoodieFlinkWriteClient<?> writeClient) {
boolean scheduled = false;
// judge whether there are any compaction operations.
Expand Down

0 comments on commit dfda12d

Please sign in to comment.