Skip to content

Commit

Permalink
[HUDI-8491] Handle empty commits for concurrent schema evolution (#1013)
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Feb 6, 2025
1 parent 59b4b48 commit cb46d26
Show file tree
Hide file tree
Showing 9 changed files with 457 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Option<Schema> resolveConcurrentSchemaEvolution(
Option<HoodieInstant> currTxnOwnerInstant);

static void throwConcurrentSchemaEvolutionException(
Option<Schema> tableSchemaAtTxnStart, Schema tableSchemaAtTxnValidation, Schema writerSchemaOfTxn,
Option<Schema> tableSchemaAtTxnStart, Option<Schema> tableSchemaAtTxnValidation, Schema writerSchemaOfTxn,
Option<HoodieInstant> lastCompletedTxnOwnerInstant,
Option<HoodieInstant> currTxnOwnerInstant) throws HoodieWriteConflictException {
String errMsg = String.format(
Expand All @@ -66,7 +66,7 @@ static void throwConcurrentSchemaEvolutionException(
+ "schema the transaction tries to commit with: %s. lastCompletedTxnOwnerInstant is %s "
+ " and currTxnOwnerInstant is %s.",
tableSchemaAtTxnStart.isPresent() ? tableSchemaAtTxnStart : "Not exists as no commited txn at that time",
tableSchemaAtTxnValidation,
tableSchemaAtTxnValidation.isPresent() ? tableSchemaAtTxnValidation : "Not exists",
writerSchemaOfTxn,
lastCompletedTxnOwnerInstant.isPresent() ? lastCompletedTxnOwnerInstant : "Not exists",
currTxnOwnerInstant.isPresent() ? currTxnOwnerInstant : "Not exists");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,28 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hudi.avro.HoodieAvroUtils.isSchemaNull;
import static org.apache.hudi.client.transaction.SchemaConflictResolutionStrategy.throwConcurrentSchemaEvolutionException;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;

/**
* The implementation of SchemaConflictResolutionStrategy that detects incompatible
* schema evolution from multiple writers
*/
public class SimpleSchemaConflictResolutionStrategy implements SchemaConflictResolutionStrategy {
private static final Logger LOG = LoggerFactory.getLogger(SimpleSchemaConflictResolutionStrategy.class);

@Override
public Option<Schema> resolveConcurrentSchemaEvolution(
Expand All @@ -55,64 +62,88 @@ public Option<Schema> resolveConcurrentSchemaEvolution(
return Option.empty();
}

// Guard against unrecognized cases where writers do not come with a writer schema.
if (StringUtils.isNullOrEmpty(config.getWriteSchema())) {
LOG.warn(StringUtils.join("Writer config does not come with a valid writer schema. Writer config: ",
config.toString(), ". Owner instant: ", currTxnOwnerInstant.get().toString()));
return Option.empty();
}

Schema writerSchemaOfTxn = new Schema.Parser().parse(config.getWriteSchema());
// Only consider instants that change the table schema
HoodieTimeline schemaEvolutionTimeline = table.getMetaClient().getSchemaEvolutionTimeline();
// If lastCompletedInstantsAtTxnStart is null it means no committed txn when the current one starts.
HoodieInstant lastCompletedInstantsAtTxnStart = lastCompletedTxnOwnerInstant.isPresent()
? schemaEvolutionTimeline.findInstantsBeforeOrEquals(
lastCompletedTxnOwnerInstant.get().requestedTime()).lastInstant().orElseGet(() -> null)
// If a writer does not come with a meaningful schema, skip the schema resolution.
if (isSchemaNull(writerSchemaOfTxn)) {
return getTableSchemaAtInstant(new TableSchemaResolver(table.getMetaClient()), currTxnOwnerInstant.get());
}

// Fast path: We can tell there is no schema conflict by just comparing the instants without involving table/writer schema comparison.
TableSchemaResolver schemaResolver = new TableSchemaResolver(table.getMetaClient());
HoodieTimeline reverseOrderTimeline = schemaResolver.getCachedSchemaEvolutionTimelineReverseOrder();

// schema and writer schema.
HoodieInstant lastCompletedInstantAtTxnStart = lastCompletedTxnOwnerInstant.isPresent()
? getInstantInTimelineImmediatelyPriorToTimestamp(lastCompletedTxnOwnerInstant.get().requestedTime(), reverseOrderTimeline).orElse(null)
: null;
// If lastCompletedInstantsAtTxnValidation is null there are 2 possibilities:
// If lastCompletedInstantAtTxnValidation is null there are 2 possibilities:
// - No committed txn at validation starts
// - [Almost impossible, so we ignore it] there is a commited txn, yet it is archived which cannot be found
// in the active timeline.
HoodieInstant lastCompletedInstantsAtTxnValidation = schemaEvolutionTimeline.lastInstant().orElse(null);

HoodieInstant lastCompletedInstantAtTxnValidation = reverseOrderTimeline.firstInstant().orElse(null);
// Please refer to RFC 82 for details of the case numbers.
// Case 1:
// We (curr txn) are the first to commit ever on this table, no conflict could happen.
if (lastCompletedInstantsAtTxnValidation == null) {
// Implies lastCompletedInstantsAtTxnStart is null as well.
if (lastCompletedInstantAtTxnValidation == null) {
// Implies lastCompletedInstantAtTxnStart is null as well.
return Option.of(writerSchemaOfTxn);
}

// Optional optimization: if no concurrent writes happen at all, no conflict could happen.
if (lastCompletedInstantsAtTxnValidation.equals(lastCompletedInstantsAtTxnStart)) {
if (lastCompletedInstantAtTxnValidation.equals(lastCompletedInstantAtTxnStart)) {
return Option.of(writerSchemaOfTxn);
}

TableSchemaResolver resolver = new TableSchemaResolver(table.getMetaClient());
Option<Schema> tableSchemaAtTxnValidation = getTableSchemaAtInstant(resolver, lastCompletedInstantAtTxnValidation);
// If table schema is not defined, it's still case 1. There can be cases where there are commits but they didn't
// write any data.
if (!tableSchemaAtTxnValidation.isPresent()) {
return Option.of(writerSchemaOfTxn);
}
// Case 2, 4, 7: Both writers try to evolve to the same schema or neither evolves schema.
TableSchemaResolver schemaResolver = getSchemaResolver(table);
Schema tableSchemaAtTxnValidation = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantsAtTxnValidation);
if (AvroSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnValidation)) {
boolean writerSchemaIsCurrentTableSchema = AvroSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnValidation.get());
if (writerSchemaIsCurrentTableSchema) {
return Option.of(writerSchemaOfTxn);
}

// Case 3:
// We (curr txn) are the second to commit, and there is one commit that is done concurrently after this commit has started.
// txn 1: |-----read write-----|validate & commit|
// curr txn: -----------------|--------read write--------|--validate & commit--|
// lastCompletedInstantsAtTxnValidation != null is implied.
// lastCompletedInstantAtTxnValidation != null is implied.
// Populate configs regardless of what's the case we are trying to handle.
if (lastCompletedInstantsAtTxnStart == null) {
if (lastCompletedInstantAtTxnStart == null) {
// If they don't share the same schema, we simply abort as a naive way of handling without considering
// that they might be potentially compatible.
throwConcurrentSchemaEvolutionException(
Option.empty(), tableSchemaAtTxnValidation, writerSchemaOfTxn, lastCompletedTxnOwnerInstant, currTxnOwnerInstant);
}
Option<Schema> tableSchemaAtTxnStart = getTableSchemaAtInstant(resolver, lastCompletedInstantAtTxnStart);
// If no table schema is defined, fall back to case 3.
if (!tableSchemaAtTxnStart.isPresent()) {
throwConcurrentSchemaEvolutionException(
Option.empty(), tableSchemaAtTxnValidation, writerSchemaOfTxn, lastCompletedTxnOwnerInstant, currTxnOwnerInstant);
}

// the transaction Compatible case 5
// Case 5:
// Table schema has not changed from the start of the transaction till the pre-commit validation
Schema tableSchemaAtTxnStart = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantsAtTxnStart);
if (AvroSchemaComparatorForSchemaEvolution.schemaEquals(tableSchemaAtTxnStart, tableSchemaAtTxnValidation)) {
// If table schema parsing failed we will blindly go with writer schema. use option.empty
if (AvroSchemaComparatorForSchemaEvolution.schemaEquals(tableSchemaAtTxnStart.get(), tableSchemaAtTxnValidation.get())) {
return Option.of(writerSchemaOfTxn);
}

// Case 6: Current txn does not evolve schema, the tableSchema we saw at validation phase
// might be an evolved one, use it.
if (AvroSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnStart)) {
return Option.of(tableSchemaAtTxnValidation);
if (AvroSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnStart.get())) {
return tableSchemaAtTxnValidation;
}

// Incompatible case 8: Initial table schema is S1, there is a concurrent txn evolves schema to S2,
Expand All @@ -124,21 +155,25 @@ public Option<Schema> resolveConcurrentSchemaEvolution(
// txn 2(S2): --------|-----read write-------|validate & commit|
// curr txn(S3): --------------------------|--------read write--------|--validate X
throwConcurrentSchemaEvolutionException(
Option.of(tableSchemaAtTxnStart), tableSchemaAtTxnValidation, writerSchemaOfTxn,
tableSchemaAtTxnStart, tableSchemaAtTxnValidation, writerSchemaOfTxn,
lastCompletedTxnOwnerInstant, currTxnOwnerInstant);
// Not reachable
return Option.empty();
}

TableSchemaResolver getSchemaResolver(HoodieTable table) {
return new TableSchemaResolver(table.getMetaClient());
private Option<HoodieInstant> getInstantInTimelineImmediatelyPriorToTimestamp(
String timestamp, HoodieTimeline reverseOrderTimeline) {
return Option.fromJavaOptional(reverseOrderTimeline.getInstantsAsStream()
.filter(s -> compareTimestamps(s.requestedTime(), LESSER_THAN_OR_EQUALS, timestamp))
.findFirst());
}

private static Schema getTableSchemaAtInstant(TableSchemaResolver schemaResolver, HoodieInstant instant) {
private static Option<Schema> getTableSchemaAtInstant(TableSchemaResolver schemaResolver, HoodieInstant instant) {
try {
return schemaResolver.getTableAvroSchema(instant, false);
} catch (Exception e) {
throw new HoodieException("Unable to get table schema", e);
return schemaResolver.getTableAvroSchemaIfPresent(false, Option.of(instant));
} catch (Exception ex) {
LOG.error("Cannot get table schema for instant {}", instant);
throw new HoodieException("Unable to get table schema", ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(
table.getMetaClient().reloadActiveTimeline();
}
Stream<HoodieInstant> completedInstantsDuringCurrentWriteOperation =
getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(), pendingInstants);
getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(), pendingInstants);
ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy();
Option<Schema> newTableSchema = resolveSchemaConflictIfNeeded(table, config, lastCompletedTxnOwnerInstant, currentTxnOwnerInstant);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -156,6 +157,19 @@ protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context, Hoo
this.taskContextSupplier = context.getTaskContextSupplier();
}

@VisibleForTesting
protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient, FileSystemViewManager viewManager, TaskContextSupplier supplier) {
this.config = config;
this.context = context;
this.isMetadataTable = HoodieTableMetadata.isMetadataTable(config.getBasePath());
this.instantGenerator = metaClient.getInstantGenerator();
this.instantFileNameGenerator = metaClient.getInstantFileNameGenerator();
this.instantFileNameParser = metaClient.getInstantFileNameParser();
this.viewManager = viewManager;
this.metaClient = metaClient;
this.taskContextSupplier = supplier;
}

public boolean isMetadataTable() {
return isMetadataTable;
}
Expand Down
Loading

0 comments on commit cb46d26

Please sign in to comment.