Skip to content

Commit

Permalink
[HUDI-3836] Improve the way of fetching metadata partitions from table (
Browse files Browse the repository at this point in the history
#5286)

Co-authored-by: xicm <[email protected]>
  • Loading branch information
xicm and xicm authored Jul 5, 2022
1 parent fbda4ad commit 23c9c5c
Show file tree
Hide file tree
Showing 13 changed files with 49 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;

/**
Expand Down Expand Up @@ -143,7 +142,7 @@ private List<Pair<String, BloomIndexFileInfo>> getBloomIndexFileInfoForPartition
if (config.getBloomIndexPruneByRanges()) {
// load column ranges from metadata index if column stats index is enabled and column_stats metadata partition is available
if (config.getBloomIndexUseMetadata()
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig()).contains(COLUMN_STATS.getPartitionPath())) {
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath())) {
fileInfoList = loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable);
}
// fallback to loading column ranges from files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;

/**
Expand All @@ -64,7 +63,7 @@ private BloomFilter getBloomFilter() {
HoodieTimer timer = new HoodieTimer().startTimer();
try {
if (config.getBloomIndexUseMetadata()
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig())
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
.contains(BLOOM_FILTERS.getPartitionPath())) {
bloomFilter = hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight())
.orElseThrow(() -> new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;

Expand Down Expand Up @@ -579,7 +578,7 @@ private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Opti
}

private void updateInitializedPartitionsInTableConfig(List<MetadataPartitionType> partitionTypes) {
Set<String> completedPartitions = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
Set<String> completedPartitions = dataMetaClient.getTableConfig().getMetadataPartitions();
completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
Expand Down Expand Up @@ -716,7 +715,7 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
}

public void dropMetadataPartitions(List<MetadataPartitionType> metadataPartitions) throws IOException {
Set<String> completedIndexes = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
Set<String> completedIndexes = dataMetaClient.getTableConfig().getMetadataPartitions();
Set<String> inflightIndexes = getInflightMetadataPartitions(dataMetaClient.getTableConfig());

for (MetadataPartitionType partitionType : metadataPartitions) {
Expand Down Expand Up @@ -806,7 +805,7 @@ private <T> void processAndCommit(String instantTime, ConvertMetadataFunction co

private Set<String> getMetadataPartitionsToUpdate() {
// fetch partitions to update from table config
Set<String> partitionsToUpdate = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
Set<String> partitionsToUpdate = dataMetaClient.getTableConfig().getMetadataPartitions();
// add inflight indexes as well because the file groups have already been initialized, so writers can log updates
// NOTE: Async HoodieIndexer can move some partition to inflight. While that partition is still being built,
// the regular ingestion writers should not be blocked. They can go ahead and log updates to the metadata partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;

/**
Expand Down Expand Up @@ -900,7 +899,7 @@ private boolean shouldDeleteMetadataPartition(MetadataPartitionType partitionTyp
return false;
}
return metadataIndexDisabled
&& getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(partitionType.getPartitionPath());
&& metaClient.getTableConfig().getMetadataPartitions().contains(partitionType.getPartitionPath());
}

private boolean shouldExecuteMetadataTableDeletion() {
Expand All @@ -919,7 +918,7 @@ private boolean shouldExecuteMetadataTableDeletion() {
* Clears hoodie.table.metadata.partitions in hoodie.properties
*/
private void clearMetadataTablePartitionsConfig(Option<MetadataPartitionType> partitionType, boolean clearAll) {
Set<String> partitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
Set<String> partitions = metaClient.getTableConfig().getMetadataPartitions();
if (clearAll && partitions.size() > 0) {
LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), EMPTY_STRING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
Expand Down Expand Up @@ -192,7 +191,7 @@ public Option<HoodieIndexCommitMetadata> execute() {

private void abort(HoodieInstant indexInstant, Set<String> requestedPartitions) {
Set<String> inflightPartitions = getInflightMetadataPartitions(table.getMetaClient().getTableConfig());
Set<String> completedPartitions = getCompletedMetadataPartitions(table.getMetaClient().getTableConfig());
Set<String> completedPartitions = table.getMetaClient().getTableConfig().getMetadataPartitions();
// update table config
requestedPartitions.forEach(partition -> {
inflightPartitions.remove(partition);
Expand Down Expand Up @@ -302,7 +301,7 @@ private static List<HoodieInstant> getCompletedArchivedAndActiveInstantsAfter(St
private void updateMetadataPartitionsTableConfig(HoodieTableMetaClient metaClient, Set<String> metadataPartitions) {
// remove from inflight and update completed indexes
Set<String> inflightPartitions = getInflightMetadataPartitions(metaClient.getTableConfig());
Set<String> completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
Set<String> completedPartitions = metaClient.getTableConfig().getMetadataPartitions();
inflightPartitions.removeAll(metadataPartitions);
completedPartitions.addAll(metadataPartitions);
// update table config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@

import scala.Tuple2;

import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;

/**
Expand Down Expand Up @@ -81,7 +80,7 @@ public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecor

JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
if (config.getBloomIndexUseMetadata()
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig())
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
.contains(BLOOM_FILTERS.getPartitionPath())) {
// Step 1: Sort by file id
JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
Expand Down Expand Up @@ -240,9 +239,9 @@ public void testTurnOffMetadataIndexAfterEnable() throws Exception {
HoodieTableMetaClient.reload(metaClient);
HoodieTableConfig tableConfig = metaClient.getTableConfig();
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
assertFalse(tableConfig.getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));
assertFalse(tableConfig.getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));

// enable column stats and run 1 upserts
HoodieWriteConfig cfgWithColStatsEnabled = HoodieWriteConfig.newBuilder()
Expand All @@ -265,9 +264,9 @@ public void testTurnOffMetadataIndexAfterEnable() throws Exception {
HoodieTableMetaClient.reload(metaClient);
tableConfig = metaClient.getTableConfig();
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));
assertFalse(tableConfig.getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));

// disable column stats and run 1 upsert
HoodieWriteConfig cfgWithColStatsDisabled = HoodieWriteConfig.newBuilder()
Expand All @@ -291,9 +290,9 @@ public void testTurnOffMetadataIndexAfterEnable() throws Exception {
HoodieTableMetaClient.reload(metaClient);
tableConfig = metaClient.getTableConfig();
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
assertFalse(tableConfig.getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));
assertFalse(tableConfig.getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));

// enable bloom filter as well as column stats and run 1 upsert
HoodieWriteConfig cfgWithBloomFilterEnabled = HoodieWriteConfig.newBuilder()
Expand All @@ -317,9 +316,9 @@ public void testTurnOffMetadataIndexAfterEnable() throws Exception {
HoodieTableMetaClient.reload(metaClient);
tableConfig = metaClient.getTableConfig();
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));
}

@Test
Expand Down Expand Up @@ -360,7 +359,7 @@ public void testTurnOffMetadataTableAfterEnable() throws Exception {

HoodieTableConfig hoodieTableConfig2 =
new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig2.getPayloadClass());
assertEquals(Collections.emptyList(), hoodieTableConfig2.getMetadataPartitions());
assertEquals(Collections.emptySet(), hoodieTableConfig2.getMetadataPartitions());
// Assert metadata table folder is deleted
assertFalse(metaClient.getFs().exists(
new Path(HoodieTableMetadata.getMetadataTableBasePath(writeConfig2.getBasePath()))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@

import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -234,7 +233,7 @@ public void testLookupIndexWithOrWithoutColumnStats() throws Exception {
// check column_stats partition exists
metaClient = HoodieTableMetaClient.reload(metaClient);
assertTrue(metadataPartitionExists(metaClient.getBasePath(), context, COLUMN_STATS));
assertTrue(getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(COLUMN_STATS.getPartitionPath()));
assertTrue(metaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));

// delete the column_stats partition
deleteMetadataPartition(metaClient.getBasePath(), context, COLUMN_STATS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.HashSet;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -618,11 +619,10 @@ public List<String> getMetadataPartitionsInflight() {
);
}

public List<String> getMetadataPartitions() {
return StringUtils.split(
getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING),
CONFIG_VALUES_DELIMITER
);
public Set<String> getMetadataPartitions() {
return new HashSet<>(
StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING),
CONFIG_VALUES_DELIMITER));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,13 +1353,9 @@ public static Set<String> getInflightMetadataPartitions(HoodieTableConfig tableC
return new HashSet<>(tableConfig.getMetadataPartitionsInflight());
}

public static Set<String> getCompletedMetadataPartitions(HoodieTableConfig tableConfig) {
return new HashSet<>(tableConfig.getMetadataPartitions());
}

public static Set<String> getInflightAndCompletedMetadataPartitions(HoodieTableConfig tableConfig) {
Set<String> inflightAndCompletedPartitions = getInflightMetadataPartitions(tableConfig);
inflightAndCompletedPartitions.addAll(getCompletedMetadataPartitions(tableConfig));
inflightAndCompletedPartitions.addAll(tableConfig.getMetadataPartitions());
return inflightAndCompletedPartitions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ case class HoodieFileIndex(spark: SparkSession,
override def sizeInBytes: Long = cachedFileSize

private def isColumnStatsIndexAvailable =
HoodieTableMetadataUtil.getCompletedMetadataPartitions(metaClient.getTableConfig)
metaClient.getTableConfig.getMetadataPartitions
.contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)

private def isDataSkippingEnabled: Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
Expand Down Expand Up @@ -243,7 +242,7 @@ private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> clien
}

private boolean indexExists(List<MetadataPartitionType> partitionTypes) {
Set<String> indexedMetadataPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
Set<String> indexedMetadataPartitions = metaClient.getTableConfig().getMetadataPartitions();
Set<String> requestedIndexPartitionPaths = partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
requestedIndexPartitionPaths.retainAll(indexedMetadataPartitions);
if (!requestedIndexPartitionPaths.isEmpty()) {
Expand All @@ -254,7 +253,7 @@ private boolean indexExists(List<MetadataPartitionType> partitionTypes) {
}

private boolean isMetadataInitialized() {
Set<String> indexedMetadataPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
Set<String> indexedMetadataPartitions = metaClient.getTableConfig().getMetadataPartitions();
return !indexedMetadataPartitions.isEmpty();
}

Expand Down
Loading

0 comments on commit 23c9c5c

Please sign in to comment.