Skip to content

Commit

Permalink
[HUDI-3558] Consistent bucket index: bucket resizing (split&merge) & …
Browse files Browse the repository at this point in the history
…concurrent write during resizing (#4958)

RFC-42 implementation
- Implement bucket resizing for consistent hashing index.
- Support concurrent write during bucket resizing.

This change added tests and can be verified as follows:
- The test of the consistent bucket index is enhanced to include the case of bucket resizing.
- Tests of different bucket resizing cases.
- Tests of concurrent resizing, and concurrent writes during resizing.
  • Loading branch information
YuweiXiao authored Sep 12, 2022
1 parent 70a724a commit 6badae4
Show file tree
Hide file tree
Showing 52 changed files with 2,196 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;

import javax.annotation.Nonnull;
Expand All @@ -53,10 +54,14 @@ public class HoodieClusteringConfig extends HoodieConfig {
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
public static final String FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy";
public static final String SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy";
public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy";
public static final String SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy";
public static final String JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy";
public static final String PLAN_PARTITION_FILTER_MODE =
Expand Down Expand Up @@ -589,18 +594,46 @@ public Builder withDataOptimizeBuildCurveSampleNumber(int sampleNumber) {
}

public HoodieClusteringConfig build() {
clusteringConfig.setDefaultValue(
PLAN_STRATEGY_CLASS_NAME, getDefaultPlanStrategyClassName(engineType));
clusteringConfig.setDefaultValue(
EXECUTION_STRATEGY_CLASS_NAME, getDefaultExecutionStrategyClassName(engineType));
setDefaults();
validate();

return clusteringConfig;
}

private void setDefaults() {
// Consistent hashing bucket index
if (clusteringConfig.contains(HoodieIndexConfig.INDEX_TYPE.key())
&& clusteringConfig.contains(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key())
&& clusteringConfig.getString(HoodieIndexConfig.INDEX_TYPE.key()).equalsIgnoreCase(HoodieIndex.IndexType.BUCKET.name())
&& clusteringConfig.getString(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key()).equalsIgnoreCase(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name())) {
clusteringConfig.setDefaultValue(PLAN_STRATEGY_CLASS_NAME, SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
clusteringConfig.setDefaultValue(EXECUTION_STRATEGY_CLASS_NAME, SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY);
} else {
clusteringConfig.setDefaultValue(
PLAN_STRATEGY_CLASS_NAME, getDefaultPlanStrategyClassName(engineType));
clusteringConfig.setDefaultValue(
EXECUTION_STRATEGY_CLASS_NAME, getDefaultExecutionStrategyClassName(engineType));
}
clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName());
}

private void validate() {
boolean inlineCluster = clusteringConfig.getBoolean(HoodieClusteringConfig.INLINE_CLUSTERING);
boolean inlineClusterSchedule = clusteringConfig.getBoolean(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING);
ValidationUtils.checkArgument(!(inlineCluster && inlineClusterSchedule), String.format("Either of inline clustering (%s) or "
+ "schedule inline clustering (%s) can be enabled. Both can't be set to true at the same time. %s,%s", HoodieClusteringConfig.INLINE_CLUSTERING.key(),
HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), inlineCluster, inlineClusterSchedule));
return clusteringConfig;

// Consistent hashing bucket index
if (clusteringConfig.contains(HoodieIndexConfig.INDEX_TYPE.key())
&& clusteringConfig.contains(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key())
&& clusteringConfig.getString(HoodieIndexConfig.INDEX_TYPE.key()).equalsIgnoreCase(HoodieIndex.IndexType.BUCKET.name())
&& clusteringConfig.getString(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key()).equalsIgnoreCase(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name())) {
ValidationUtils.checkArgument(clusteringConfig.getString(PLAN_STRATEGY_CLASS_NAME).equals(SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY),
"Consistent hashing bucket index only supports clustering plan strategy : " + SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
ValidationUtils.checkArgument(clusteringConfig.getString(EXECUTION_STRATEGY_CLASS_NAME).equals(SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY),
"Consistent hashing bucket index only supports clustering execution strategy : " + SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY);
}
}

private String getDefaultPlanStrategyClassName(EngineType engineType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.concurrent.Immutable;

import java.io.File;
Expand Down Expand Up @@ -62,6 +65,8 @@
+ "which tags incoming records as either inserts or updates to older records.")
public class HoodieIndexConfig extends HoodieConfig {

private static final Logger LOG = LogManager.getLogger(HoodieIndexConfig.class);

public static final ConfigProperty<String> INDEX_TYPE = ConfigProperty
.key("hoodie.index.type")
// Builder#getDefaultIndexType has already set it according to engine type
Expand Down Expand Up @@ -263,12 +268,37 @@ public class HoodieIndexConfig extends HoodieConfig {
.withDocumentation("Only applies if index type is BUCKET. Determine the number of buckets in the hudi table, "
+ "and each partition is divided to N buckets.");

public static final ConfigProperty<String> BUCKET_INDEX_MAX_NUM_BUCKETS = ConfigProperty
.key("hoodie.bucket.index.max.num.buckets")
.noDefaultValue()
.withDocumentation("Only applies if bucket index engine is consistent hashing. Determine the upper bound of "
+ "the number of buckets in the hudi table. Bucket resizing cannot be done higher than this max limit.");

public static final ConfigProperty<String> BUCKET_INDEX_MIN_NUM_BUCKETS = ConfigProperty
.key("hoodie.bucket.index.min.num.buckets")
.noDefaultValue()
.withDocumentation("Only applies if bucket index engine is consistent hashing. Determine the lower bound of "
+ "the number of buckets in the hudi table. Bucket resizing cannot be done lower than this min limit.");

public static final ConfigProperty<String> BUCKET_INDEX_HASH_FIELD = ConfigProperty
.key("hoodie.bucket.index.hash.field")
.noDefaultValue()
.withDocumentation("Index key. It is used to index the record and find its file group. "
+ "If not set, use record key field as default");

public static final ConfigProperty<Double> BUCKET_SPLIT_THRESHOLD = ConfigProperty
.key("hoodie.bucket.index.split.threshold")
.defaultValue(2.0)
.withDocumentation("Control if the bucket should be split when using consistent hashing bucket index."
+ "Specifically, if a file slice size reaches `hoodie.xxxx.max.file.size` * threshold, then split will be carried out.");

public static final ConfigProperty<Double> BUCKET_MERGE_THRESHOLD = ConfigProperty
.key("hoodie.bucket.index.merge.threshold")
.defaultValue(0.2)
.withDocumentation("Control if buckets should be merged when using consistent hashing bucket index"
+ "Specifically, if a file slice size is smaller than `hoodie.xxxx.max.file.size` * threshold, then it will be considered"
+ "as a merge candidate.");

/**
* Deprecated configs. These are now part of {@link HoodieHBaseIndexConfig}.
*/
Expand Down Expand Up @@ -600,6 +630,16 @@ public Builder withBucketNum(String bucketNum) {
return this;
}

public Builder withBucketMaxNum(int bucketMaxNum) {
hoodieIndexConfig.setValue(BUCKET_INDEX_MAX_NUM_BUCKETS, String.valueOf(bucketMaxNum));
return this;
}

public Builder withBucketMinNum(int bucketMinNum) {
hoodieIndexConfig.setValue(BUCKET_INDEX_MIN_NUM_BUCKETS, String.valueOf(bucketMinNum));
return this;
}

public Builder withIndexKeyField(String keyField) {
hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD, keyField);
return this;
Expand Down Expand Up @@ -650,6 +690,20 @@ private void validateBucketIndexConfig() {
if (hoodieIndexConfig.getIntOrDefault(BUCKET_INDEX_NUM_BUCKETS) <= 0) {
throw new HoodieIndexException("When using bucket index, hoodie.bucket.index.num.buckets cannot be negative.");
}
int bucketNum = hoodieIndexConfig.getInt(BUCKET_INDEX_NUM_BUCKETS);
if (StringUtils.isNullOrEmpty(hoodieIndexConfig.getString(BUCKET_INDEX_MAX_NUM_BUCKETS))) {
hoodieIndexConfig.setValue(BUCKET_INDEX_MAX_NUM_BUCKETS, Integer.toString(bucketNum));
} else if (hoodieIndexConfig.getInt(BUCKET_INDEX_MAX_NUM_BUCKETS) < bucketNum) {
LOG.warn("Maximum bucket number is smaller than bucket number, maximum: " + hoodieIndexConfig.getInt(BUCKET_INDEX_MAX_NUM_BUCKETS) + ", bucketNum: " + bucketNum);
hoodieIndexConfig.setValue(BUCKET_INDEX_MAX_NUM_BUCKETS, Integer.toString(bucketNum));
}

if (StringUtils.isNullOrEmpty(hoodieIndexConfig.getString(BUCKET_INDEX_MIN_NUM_BUCKETS))) {
hoodieIndexConfig.setValue(BUCKET_INDEX_MIN_NUM_BUCKETS, Integer.toString(bucketNum));
} else if (hoodieIndexConfig.getInt(BUCKET_INDEX_MIN_NUM_BUCKETS) > bucketNum) {
LOG.warn("Minimum bucket number is larger than the bucket number, minimum: " + hoodieIndexConfig.getInt(BUCKET_INDEX_MIN_NUM_BUCKETS) + ", bucketNum: " + bucketNum);
hoodieIndexConfig.setValue(BUCKET_INDEX_MIN_NUM_BUCKETS, Integer.toString(bucketNum));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1644,13 +1644,42 @@ public int getBucketIndexNumBuckets() {
return getIntOrDefault(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS);
}

public int getBucketIndexMaxNumBuckets() {
return getInt(HoodieIndexConfig.BUCKET_INDEX_MAX_NUM_BUCKETS);
}

public int getBucketIndexMinNumBuckets() {
return getInt(HoodieIndexConfig.BUCKET_INDEX_MIN_NUM_BUCKETS);
}

public double getBucketSplitThreshold() {
return getDouble(HoodieIndexConfig.BUCKET_SPLIT_THRESHOLD);
}

public double getBucketMergeThreshold() {
return getDouble(HoodieIndexConfig.BUCKET_MERGE_THRESHOLD);
}

public String getBucketIndexHashField() {
return getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD);
}

/**
* storage properties.
*/
public long getMaxFileSize(HoodieFileFormat format) {
switch (format) {
case PARQUET:
return getParquetMaxFileSize();
case HFILE:
return getHFileMaxFileSize();
case ORC:
return getOrcMaxFileSize();
default:
throw new HoodieNotSupportedException("Unknown file format: " + format);
}
}

public long getParquetMaxFileSize() {
return getLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ public abstract HoodieData<WriteStatus> updateLocation(
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException;


/**
* Extracts the location of written records, and updates the index.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public HoodieData<WriteStatus> updateLocation(
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
HoodieTable hoodieTable, String instant) throws HoodieIndexException {
return updateLocation(writeStatuses, context, hoodieTable);
}


/**
* Rollback the effects of the commit made at instantTime.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
public interface BucketIndexLocationMapper extends Serializable {

/**
* Get record location given hoodie key and partition path
* Get record location given hoodie key
*/
Option<HoodieRecordLocation> getRecordLocation(HoodieKey key, String partitionPath);
Option<HoodieRecordLocation> getRecordLocation(HoodieKey key);

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,22 @@
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.hash.HashID;
import org.apache.hudi.exception.HoodieClusteringException;

import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;

public class ConsistentBucketIdentifier extends BucketIdentifier {

Expand Down Expand Up @@ -68,7 +76,7 @@ public int getNumBuckets() {
/**
* Get bucket of the given file group
*
* @param fileId the file group id. NOTE: not filePfx (i.e., uuid)
* @param fileId the file group id. NOTE: not filePrefix (i.e., uuid)
*/
public ConsistentHashingNode getBucketByFileId(String fileId) {
return fileIdToBucket.get(fileId);
Expand All @@ -88,17 +96,107 @@ protected ConsistentHashingNode getBucket(int hashValue) {
return tailMap.isEmpty() ? ring.firstEntry().getValue() : tailMap.get(tailMap.firstKey());
}

/**
* Get the former node of the given node (inferred from file id).
*/
public ConsistentHashingNode getFormerBucket(String fileId) {
return getFormerBucket(getBucketByFileId(fileId).getValue());
}

/**
* Get the former node of the given node (inferred from hash value).
*/
public ConsistentHashingNode getFormerBucket(int hashValue) {
SortedMap<Integer, ConsistentHashingNode> headMap = ring.headMap(hashValue);
return headMap.isEmpty() ? ring.lastEntry().getValue() : headMap.get(headMap.lastKey());
}

public List<ConsistentHashingNode> mergeBucket(List<String> fileIds) {
ValidationUtils.checkArgument(fileIds.size() >= 2, "At least two file groups should be provided for merging");
// Get nodes using fileIds
List<ConsistentHashingNode> nodes = fileIds.stream().map(this::getBucketByFileId).collect(Collectors.toList());

// Validate the input
for (int i = 0; i < nodes.size() - 1; ++i) {
ValidationUtils.checkState(getFormerBucket(nodes.get(i + 1).getValue()).getValue() == nodes.get(i).getValue(), "Cannot merge discontinuous hash range");
}

// Create child nodes with proper tag (keep the last one and delete other nodes)
List<ConsistentHashingNode> childNodes = new ArrayList<>(nodes.size());
for (int i = 0; i < nodes.size() - 1; ++i) {
childNodes.add(new ConsistentHashingNode(nodes.get(i).getValue(), null, ConsistentHashingNode.NodeTag.DELETE));
}
childNodes.add(new ConsistentHashingNode(nodes.get(nodes.size() - 1).getValue(), FSUtils.createNewFileIdPfx(), ConsistentHashingNode.NodeTag.REPLACE));
return childNodes;
}

public Option<List<ConsistentHashingNode>> splitBucket(String fileId) {
ConsistentHashingNode bucket = getBucketByFileId(fileId);
ValidationUtils.checkState(bucket != null, "FileId has no corresponding bucket");
return splitBucket(bucket);
}

/**
* Split bucket in the range middle, also generate the corresponding file ids
*
* TODO support different split criteria, e.g., distribute records evenly using statistics
*
* @param bucket parent bucket
* @return lists of children buckets
*/
public Option<List<ConsistentHashingNode>> splitBucket(@NotNull ConsistentHashingNode bucket) {
ConsistentHashingNode formerBucket = getFormerBucket(bucket.getValue());

long mid = (long) formerBucket.getValue() + bucket.getValue()
+ (formerBucket.getValue() < bucket.getValue() ? 0 : (HoodieConsistentHashingMetadata.HASH_VALUE_MASK + 1L));
mid = (mid >> 1) & HoodieConsistentHashingMetadata.HASH_VALUE_MASK;

// Cannot split as it already is the smallest bucket range
if (mid == formerBucket.getValue() || mid == bucket.getValue()) {
return Option.empty();
}

return Option.of(Arrays.asList(
new ConsistentHashingNode((int) mid, FSUtils.createNewFileIdPfx(), ConsistentHashingNode.NodeTag.REPLACE),
new ConsistentHashingNode(bucket.getValue(), FSUtils.createNewFileIdPfx(), ConsistentHashingNode.NodeTag.REPLACE))
);
}

/**
* Initialize necessary data structure to facilitate bucket identifying.
* Specifically, we construct:
* - An in-memory tree (ring) to speed up range mapping searching.
* - A hash table (fileIdToBucket) to allow lookup of bucket using fileId.
* <p>
* Children nodes are also considered, and will override the original nodes,
* which is used during bucket resizing (i.e., children nodes take the place
* of the original nodes)
*/
private void initialize() {
for (ConsistentHashingNode p : metadata.getNodes()) {
ring.put(p.getValue(), p);
// One bucket has only one file group, so append 0 directly
fileIdToBucket.put(FSUtils.createNewFileId(p.getFileIdPrefix(), 0), p);
}

// Handle children nodes, i.e., replace or delete the original nodes
ConsistentHashingNode tmp;
for (ConsistentHashingNode p : metadata.getChildrenNodes()) {
switch (p.getTag()) {
case REPLACE:
tmp = ring.put(p.getValue(), p);
if (tmp != null) {
fileIdToBucket.remove(FSUtils.createNewFileId(tmp.getFileIdPrefix(), 0));
}
fileIdToBucket.put(FSUtils.createNewFileId(p.getFileIdPrefix(), 0), p);
break;
case DELETE:
tmp = ring.remove(p.getValue());
fileIdToBucket.remove(FSUtils.createNewFileId(tmp.getFileIdPrefix(), 0));
break;
default:
throw new HoodieClusteringException("Children node is tagged as NORMAL or unknown tag: " + p);
}
}
}
}
Loading

0 comments on commit 6badae4

Please sign in to comment.