Skip to content

Commit

Permalink
[HUDI-8219] add concurrent schema evolution conflict detection (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Feb 14, 2025
1 parent 85749c5 commit b14034d
Show file tree
Hide file tree
Showing 15 changed files with 1,837 additions and 18 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ target/
metastore_db/
.metals/
.mvn/
.vscode/
*.bloop/
*.vscode/
*.metals/

# OS Files #
.DS_Store
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction;

import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieSchemaEvolutionConflictException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;

/**
* Strategy interface for schema conflict resolution with multiple writers.
* Users can provide pluggable implementations for different kinds of strategies to resolve conflicts when multiple
* writers are mutating the schema of hudi table.
*/
@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
public interface SchemaConflictResolutionStrategy {

/**
* Resolves schema conflicts when multiple writers are mutating the schema of the Hudi table concurrently.
* NOTE: WE ASSUME the meta client of the table is already reloaded with the latest timeline before this method call.
*
* @param table Hoodie table
* @param config Hoodie write config
* @param lastCompletedTxnOwnerInstant Last completed instant when the current transaction started
* @param currTxnOwnerInstant Instant of the current transaction
* @return Resolved schema if hoodie.write.concurrency.schema.conflict.resolution.enable is turned on and the resolution is successful; Option.empty otherwise.
* @throws HoodieWriteConflictException if schema conflicts cannot be resolved.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
Option<Schema> resolveConcurrentSchemaEvolution(
HoodieTable table,
HoodieWriteConfig config,
Option<HoodieInstant> lastCompletedTxnOwnerInstant,
Option<HoodieInstant> currTxnOwnerInstant);

static void throwConcurrentSchemaEvolutionException(
Option<Schema> tableSchemaAtTxnStart, Schema tableSchemaAtTxnValidation, Schema writerSchemaOfTxn,
Option<HoodieInstant> lastCompletedTxnOwnerInstant,
Option<HoodieInstant> currTxnOwnerInstant) throws HoodieWriteConflictException {
String errMsg = String.format(
"Detected incompatible concurrent schema evolution. Schema when transaction starts: %s, "
+ "schema when transaction enters validation phase: %s tableSchemaAtTxnValidation, "
+ "schema the transaction tries to commit with: %s. lastCompletedTxnOwnerInstant is %s "
+ " and currTxnOwnerInstant is %s.",
tableSchemaAtTxnStart.isPresent() ? tableSchemaAtTxnStart : "Not exists as no commited txn at that time",
tableSchemaAtTxnValidation,
writerSchemaOfTxn,
lastCompletedTxnOwnerInstant.isPresent() ? lastCompletedTxnOwnerInstant : "Not exists",
currTxnOwnerInstant.isPresent() ? currTxnOwnerInstant : "Not exists");

throw new HoodieSchemaEvolutionConflictException(errMsg);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction;

import org.apache.hudi.avro.AvroSchemaComparatorForSchemaEvolution;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;

import static org.apache.hudi.client.transaction.SchemaConflictResolutionStrategy.throwConcurrentSchemaEvolutionException;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;

/**
* The implementation of SchemaConflictResolutionStrategy that detects incompatible
* schema evolution from multiple writers
*/
public class SimpleSchemaConflictResolutionStrategy implements SchemaConflictResolutionStrategy {

@Override
public Option<Schema> resolveConcurrentSchemaEvolution(
HoodieTable table,
HoodieWriteConfig config,
Option<HoodieInstant> lastCompletedTxnOwnerInstant,
Option<HoodieInstant> currTxnOwnerInstant) {

// If this is compaction table service, skip schema evolution check as it does not evolve schema.
if (!currTxnOwnerInstant.isPresent()
|| currTxnOwnerInstant.get().getAction().equals(COMPACTION_ACTION)
|| (currTxnOwnerInstant.get().getAction().equals(REPLACE_COMMIT_ACTION)
&& ClusteringUtils.isClusteringInstant(table.getMetaClient().getActiveTimeline(), currTxnOwnerInstant.get(), table.getMetaClient().getInstantGenerator()))) {
return Option.empty();
}

Schema writerSchemaOfTxn = new Schema.Parser().parse(config.getWriteSchema());
// Only consider instants that change the table schema
HoodieTimeline schemaEvolutionTimeline = table.getMetaClient().getSchemaEvolutionTimeline();
// If lastCompletedInstantsAtTxnStart is null it means no committed txn when the current one starts.
HoodieInstant lastCompletedInstantsAtTxnStart = lastCompletedTxnOwnerInstant.isPresent()
? schemaEvolutionTimeline.findInstantsBeforeOrEquals(
lastCompletedTxnOwnerInstant.get().requestedTime()).lastInstant().orElseGet(() -> null)
: null;
// If lastCompletedInstantsAtTxnValidation is null there are 2 possibilities:
// - No committed txn at validation starts
// - [Almost impossible, so we ignore it] there is a commited txn, yet it is archived which cannot be found
// in the active timeline.
HoodieInstant lastCompletedInstantsAtTxnValidation = schemaEvolutionTimeline.lastInstant().orElse(null);

// Please refer to RFC 82 for details of the case numbers.
// Case 1:
// We (curr txn) are the first to commit ever on this table, no conflict could happen.
if (lastCompletedInstantsAtTxnValidation == null) {
// Implies lastCompletedInstantsAtTxnStart is null as well.
return Option.of(writerSchemaOfTxn);
}

// Optional optimization: if no concurrent writes happen at all, no conflict could happen.
if (lastCompletedInstantsAtTxnValidation.equals(lastCompletedInstantsAtTxnStart)) {
return Option.of(writerSchemaOfTxn);
}

// Case 2, 4, 7: Both writers try to evolve to the same schema or neither evolves schema.
TableSchemaResolver schemaResolver = getSchemaResolver(table);
Schema tableSchemaAtTxnValidation = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantsAtTxnValidation);
if (AvroSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnValidation)) {
return Option.of(writerSchemaOfTxn);
}

// Case 3:
// We (curr txn) are the second to commit, and there is one commit that is done concurrently after this commit has started.
// txn 1: |-----read write-----|validate & commit|
// curr txn: -----------------|--------read write--------|--validate & commit--|
// lastCompletedInstantsAtTxnValidation != null is implied.
// Populate configs regardless of what's the case we are trying to handle.
if (lastCompletedInstantsAtTxnStart == null) {
// If they don't share the same schema, we simply abort as a naive way of handling without considering
// that they might be potentially compatible.
throwConcurrentSchemaEvolutionException(
Option.empty(), tableSchemaAtTxnValidation, writerSchemaOfTxn, lastCompletedTxnOwnerInstant, currTxnOwnerInstant);
}

// the transaction Compatible case 5
// Table schema has not changed from the start of the transaction till the pre-commit validation
Schema tableSchemaAtTxnStart = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantsAtTxnStart);
if (AvroSchemaComparatorForSchemaEvolution.schemaEquals(tableSchemaAtTxnStart, tableSchemaAtTxnValidation)) {
return Option.of(writerSchemaOfTxn);
}

// Case 6: Current txn does not evolve schema, the tableSchema we saw at validation phase
// might be an evolved one, use it.
if (AvroSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnStart)) {
return Option.of(tableSchemaAtTxnValidation);
}

// Incompatible case 8: Initial table schema is S1, there is a concurrent txn evolves schema to S2,
// current writer schema is S3.
// Before the curr txn started, there are commited txn, with optional txn that commited during the
// read-write phase of the curr txn (they can lead to concurrently schema evolution along with the curr txn).
// table schema: ----------------S1----------------------------S2-------------------------
// txn 1(S1): |validate & commit|
// txn 2(S2): --------|-----read write-------|validate & commit|
// curr txn(S3): --------------------------|--------read write--------|--validate X
throwConcurrentSchemaEvolutionException(
Option.of(tableSchemaAtTxnStart), tableSchemaAtTxnValidation, writerSchemaOfTxn,
lastCompletedTxnOwnerInstant, currTxnOwnerInstant);
// Not reachable
return Option.empty();
}

TableSchemaResolver getSchemaResolver(HoodieTable table) {
return new TableSchemaResolver(table.getMetaClient());
}

private static Schema getTableSchemaAtInstant(TableSchemaResolver schemaResolver, HoodieInstant instant) {
try {
return schemaResolver.getTableAvroSchema(instant, false);
} catch (Exception e) {
throw new HoodieException("Unable to get table schema", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.client.transaction.ConcurrentOperation;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
import org.apache.hudi.client.transaction.SimpleSchemaConflictResolutionStrategy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -34,6 +35,7 @@
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,6 +45,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.config.HoodieWriteConfig.ENABLE_SCHEMA_CONFLICT_RESOLUTION;

public class TransactionUtils {

private static final Logger LOG = LoggerFactory.getLogger(TransactionUtils.class);
Expand All @@ -56,7 +60,6 @@ public class TransactionUtils {
* @param config
* @param lastCompletedTxnOwnerInstant
* @param pendingInstants
*
* @return
* @throws HoodieWriteConflictException
*/
Expand All @@ -74,12 +77,14 @@ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(
if (!timelineRefreshedWithinTransaction) {
table.getMetaClient().reloadActiveTimeline();
}
Stream<HoodieInstant> completedInstantsDuringCurrentWriteOperation = getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(), pendingInstants);
Stream<HoodieInstant> completedInstantsDuringCurrentWriteOperation =
getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(), pendingInstants);
ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy();
Option<Schema> newTableSchema = resolveSchemaConflictIfNeeded(table, config, lastCompletedTxnOwnerInstant, currentTxnOwnerInstant);

Stream<HoodieInstant> instantStream = Stream.concat(resolutionStrategy.getCandidateInstants(
table.getMetaClient(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant),
completedInstantsDuringCurrentWriteOperation);
table.getMetaClient(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant),
completedInstantsDuringCurrentWriteOperation);

final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElseGet(HoodieCommitMetadata::new));
instantStream.forEach(instant -> {
Expand All @@ -96,11 +101,35 @@ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(
});
LOG.info("Successfully resolved conflicts, if any");

if (newTableSchema.isPresent()) {
thisOperation.getCommitMetadataOption().get().addMetadata(
HoodieCommitMetadata.SCHEMA_KEY, newTableSchema.get().toString());
}
return thisOperation.getCommitMetadataOption();
}
return thisCommitMetadata;
}

/**
* Resolves conflict of schema evolution if there is any.
*
* @param table {@link HoodieTable} instance
* @param config write config
* @param lastCompletedTxnOwnerInstant last completed instant
* @param currentTxnOwnerInstant current instant
* @return new table schema after successful schema resolution; empty if nothing to be resolved.
*/
public static Option<Schema> resolveSchemaConflictIfNeeded(final HoodieTable table,
final HoodieWriteConfig config,
final Option<HoodieInstant> lastCompletedTxnOwnerInstant,
final Option<HoodieInstant> currentTxnOwnerInstant) {
if (config.getBoolean(ENABLE_SCHEMA_CONFLICT_RESOLUTION)) {
return new SimpleSchemaConflictResolutionStrategy().resolveConcurrentSchemaEvolution(
table, config, lastCompletedTxnOwnerInstant, currentTxnOwnerInstant);
}
return Option.empty();
}

/**
* Get the last completed transaction hoodie instant and {@link HoodieCommitMetadata#getExtraMetadata()}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Schema string representing the latest schema of the table. Hudi passes this to "
+ "implementations of evolution of schema");

public static final ConfigProperty<Boolean> ENABLE_SCHEMA_CONFLICT_RESOLUTION = ConfigProperty
.key(CONCURRENCY_PREFIX + "schema.conflict.resolution.enable")
.defaultValue(true)
.markAdvanced()
.sinceVersion("1.0.2")
.withDocumentation("If turned on, we detect and abort incompatible concurrent schema evolution.");

public static final ConfigProperty<String> AVRO_SCHEMA_VALIDATE_ENABLE = ConfigProperty
.key("hoodie.avro.schema.validate")
.defaultValue("false")
Expand Down
Loading

0 comments on commit b14034d

Please sign in to comment.