Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7974] Adding support for empty cleans #12799

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -37,13 +38,16 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.MetadataConversionUtils;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.versioning.v1.ActiveTimelineV1;
import org.apache.hudi.common.table.timeline.versioning.v1.ArchivedTimelineV1;
import org.apache.hudi.common.table.timeline.versioning.v1.InstantComparatorV1;
import org.apache.hudi.common.table.timeline.versioning.v1.InstantFileNameGeneratorV1;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
Expand Down Expand Up @@ -227,6 +231,7 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
// unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled.
Option<HoodieInstant> firstSavepoint = table.getCompletedSavepointTimeline().firstInstant();
Set<String> savepointTimestamps = table.getSavepointTimestamps();
Option<String> earliestCommitToNotArchiveOpt = getEarliestCommitToNotArchive(table.getActiveTimeline(), table.getMetaClient());
if (!commitTimeline.empty() && commitTimeline.countInstants() > maxInstantsToKeep) {
// For Merge-On-Read table, inline or async compaction is enabled
// We need to make sure that there are enough delta commits in the active timeline
Expand Down Expand Up @@ -254,9 +259,16 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
// skip savepoint commits and proceed further
return !savepointTimestamps.contains(s.requestedTime());
} else {
// if no savepoint present, then don't filter
// stop at first savepoint commit
return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().requestedTime(), LESSER_THAN_OR_EQUALS, s.requestedTime()));
Option<String> firstSavepointOpt = table.getCompletedSavepointTimeline().firstInstant().map(HoodieInstant::requestedTime);
Option<String> commitToNotArchiveOpt = earliestCommitToNotArchiveOpt;
if (firstSavepointOpt.isPresent() && earliestCommitToNotArchiveOpt.isPresent()
&& compareTimestamps(firstSavepointOpt.get(), LESSER_THAN, earliestCommitToNotArchiveOpt.get())) {
LOG.error("earliestCommitToNotArchive {} is greater than first savepoint {}, using first savepoint as the commitToNotArchive",
earliestCommitToNotArchiveOpt.get(), firstSavepointOpt.get());
commitToNotArchiveOpt = firstSavepointOpt;
}
// stop at earliest commit to not archive
return !(commitToNotArchiveOpt.isPresent() && compareTimestamps(commitToNotArchiveOpt.get(), LESSER_THAN_OR_EQUALS, s.requestedTime()));
}
}).filter(s -> {
// oldestCommitToRetain is the highest completed commit instant that is less than the oldest inflight instant.
Expand All @@ -280,6 +292,29 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
}
}

private Option<String> getEarliestCommitToNotArchive(HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient) throws IOException {
// if clean policy is based on file versions, earliest commit to not archive may not be set. So, we have to explicitly check the savepoint timeline
// and guard against the first one.
InstantGenerator factory = metaClient.getInstantGenerator();
String earliestInstantToNotArchive = table.getCompletedSavepointTimeline().firstInstant().map(HoodieInstant::requestedTime).orElse(null);
if (config.getCleanerPolicy() != HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
Option<HoodieInstant> cleanInstantOpt =
activeTimeline.getCleanerTimeline().filterCompletedInstants().lastInstant();
if (cleanInstantOpt.isPresent()) {
HoodieInstant cleanInstant = cleanInstantOpt.get();
Map<String, String> extraMetadata = CleanerUtils.getCleanerPlan(metaClient, cleanInstant.isRequested()
? cleanInstant
: factory.getCleanRequestedInstant(cleanInstant.requestedTime()))
.getExtraMetadata();
if (extraMetadata != null) {
String cleanerEarliestInstantToNotArchive = extraMetadata.getOrDefault(CleanerUtils.EARLIEST_COMMIT_TO_NOT_ARCHIVE, null);
earliestInstantToNotArchive = InstantComparison.minTimestamp(earliestInstantToNotArchive, cleanerEarliestInstantToNotArchive);
}
}
}
return Option.ofNullable(earliestInstantToNotArchive);
}

private List<HoodieInstant> getInstantsToArchive() throws IOException {
if (config.isMetaserverEnabled()) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,17 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("0.14.0")
.withDocumentation("Maximum number of times to retry a batch on conflict failure.");

public static final ConfigProperty<Long> MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS = ConfigProperty
.key("hoodie.write.empty.clean.create.duration.ms")
.defaultValue(-1L)
.markAdvanced()
.withDocumentation("In some cases empty clean commit needs to be created to ensure the clean planner "
+ "does not look through entire dataset if there are no clean plans. This is possible for append-only "
+ "dataset. Also, for these datasets we cannot ignore clean completely since in the future there could "
+ "be upsert or replace operations. By creating empty clean commit, earliest_commit_to_retain value "
+ "will be updated so that now clean planner can only check for partitions that are modified after the "
+ "last empty clean's earliest_commit_toRetain value there by optimizing the clean planning");

public static final ConfigProperty<String> WRITE_SCHEMA_OVERRIDE = ConfigProperty
.key("hoodie.write.schema")
.noDefaultValue()
Expand Down Expand Up @@ -2727,6 +2738,10 @@ public Boolean doSkipDefaultPartitionValidation() {
return getBoolean(SKIP_DEFAULT_PARTITION_VALIDATION);
}

public long maxDurationToCreateEmptyCleanMs() {
return getLong(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS);
}

/**
* Are any table services configured to run inline for both scheduling and execution?
*
Expand Down Expand Up @@ -3344,6 +3359,11 @@ public Builder withWriteConcurrencyMode(WriteConcurrencyMode concurrencyMode) {
return this;
}

public Builder withMaxDurationToCreateEmptyClean(long duration) {
writeConfig.setValue(MAX_DURATION_TO_CREATE_EMPTY_CLEAN_MS, String.valueOf(duration));
return this;
}

public Builder withPopulateMetaFields(boolean populateMetaFields) {
writeConfig.setValue(HoodieTableConfig.POPULATE_META_FIELDS, Boolean.toString(populateMetaFields));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.util.CleanerUtils.CLEAN_METADATA_VERSION_2;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;

public class CleanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, O, HoodieCleanMetadata> {
Expand Down Expand Up @@ -133,10 +135,10 @@ private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator
* @throws IllegalArgumentException if unknown cleaning policy is provided
*/
List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
int cleanerParallelism = Math.min(
int cleanerParallelism = Math.max(1, Math.min(
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum(),
config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
config.getCleanerParallelism()));
LOG.info("Using cleanerParallelism: {}", cleanerParallelism);

context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of table: " + config.getTableName());

Expand All @@ -154,14 +156,14 @@ List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan clean

List<String> partitionsToBeDeleted = table.getMetaClient().getTableConfig().isTablePartitioned() && cleanerPlan.getPartitionsToBeDeleted() != null
? cleanerPlan.getPartitionsToBeDeleted()
: new ArrayList<>();
: Collections.emptyList();
partitionsToBeDeleted.forEach(entry -> {
try {
if (!isNullOrEmpty(entry)) {
deleteFileAndGetResult(table.getStorage(), table.getMetaClient().getBasePath() + "/" + entry);
}
} catch (IOException e) {
LOG.warn("Partition deletion failed " + entry);
LOG.warn("Partition deletion failed {}", entry);
}
});

Expand Down Expand Up @@ -217,17 +219,17 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
}

List<HoodieCleanStat> cleanStats = clean(context, cleanerPlan);
HoodieCleanMetadata metadata;
if (cleanStats.isEmpty()) {
return HoodieCleanMetadata.newBuilder().build();
metadata = createEmptyCleanMetadata(cleanerPlan, inflightInstant, timer.endTimer());
} else {
table.getMetaClient().reloadActiveTimeline();
metadata = CleanerUtils.convertCleanMetadata(
inflightInstant.requestedTime(),
Option.of(timer.endTimer()),
cleanStats,
cleanerPlan.getExtraMetadata());
}

table.getMetaClient().reloadActiveTimeline();
HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(
inflightInstant.requestedTime(),
Option.of(timer.endTimer()),
cleanStats,
cleanerPlan.getExtraMetadata()
);
if (!skipLocking) {
this.txnManager.beginTransaction(Option.of(inflightInstant), Option.empty());
}
Expand All @@ -245,6 +247,20 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
}
}

private static HoodieCleanMetadata createEmptyCleanMetadata(HoodieCleanerPlan cleanerPlan, HoodieInstant inflightInstant, long timeTakenMillis) {
return HoodieCleanMetadata.newBuilder()
.setStartCleanTime(inflightInstant.requestedTime())
.setTimeTakenInMillis(timeTakenMillis)
.setTotalFilesDeleted(0)
.setLastCompletedCommitTimestamp(cleanerPlan.getLastCompletedCommitTimestamp())
.setEarliestCommitToRetain(cleanerPlan.getEarliestInstantToRetain().getTimestamp())
.setVersion(CLEAN_METADATA_VERSION_2)
.setPartitionMetadata(Collections.emptyMap())
.setExtraMetadata(cleanerPlan.getExtraMetadata())
.setBootstrapPartitionMetadata(Collections.emptyMap())
.build();
}

@Override
public HoodieCleanMetadata execute() {
List<HoodieCleanMetadata> cleanMetadataList = new ArrayList<>();
Expand Down
Loading
Loading