Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3836] Improve the way of fetching metadata partitions from table #5286

Merged
merged 18 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;

/**
Expand Down Expand Up @@ -143,7 +142,7 @@ private List<Pair<String, BloomIndexFileInfo>> getBloomIndexFileInfoForPartition
if (config.getBloomIndexPruneByRanges()) {
// load column ranges from metadata index if column stats index is enabled and column_stats metadata partition is available
if (config.getBloomIndexUseMetadata()
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig()).contains(COLUMN_STATS.getPartitionPath())) {
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath())) {
fileInfoList = loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable);
}
// fallback to loading column ranges from files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;

/**
Expand All @@ -64,7 +63,7 @@ private BloomFilter getBloomFilter() {
HoodieTimer timer = new HoodieTimer().startTimer();
try {
if (config.getBloomIndexUseMetadata()
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig())
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
.contains(BLOOM_FILTERS.getPartitionPath())) {
bloomFilter = hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight())
.orElseThrow(() -> new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;

Expand Down Expand Up @@ -579,7 +578,7 @@ private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Opti
}

private void updateInitializedPartitionsInTableConfig(List<MetadataPartitionType> partitionTypes) {
Set<String> completedPartitions = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
Set<String> completedPartitions = dataMetaClient.getTableConfig().getMetadataPartitions();
completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
Expand Down Expand Up @@ -716,7 +715,7 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
}

public void dropMetadataPartitions(List<MetadataPartitionType> metadataPartitions) throws IOException {
Set<String> completedIndexes = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
Set<String> completedIndexes = dataMetaClient.getTableConfig().getMetadataPartitions();
Set<String> inflightIndexes = getInflightMetadataPartitions(dataMetaClient.getTableConfig());

for (MetadataPartitionType partitionType : metadataPartitions) {
Expand Down Expand Up @@ -806,7 +805,7 @@ private <T> void processAndCommit(String instantTime, ConvertMetadataFunction co

private Set<String> getMetadataPartitionsToUpdate() {
// fetch partitions to update from table config
Set<String> partitionsToUpdate = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
Set<String> partitionsToUpdate = dataMetaClient.getTableConfig().getMetadataPartitions();
// add inflight indexes as well because the file groups have already been initialized, so writers can log updates
// NOTE: Async HoodieIndexer can move some partition to inflight. While that partition is still being built,
// the regular ingestion writers should not be blocked. They can go ahead and log updates to the metadata partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;

/**
Expand Down Expand Up @@ -873,7 +872,7 @@ private boolean shouldDeleteMetadataPartition(MetadataPartitionType partitionTyp
return false;
}
return metadataIndexDisabled
&& getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(partitionType.getPartitionPath());
&& metaClient.getTableConfig().getMetadataPartitions().contains(partitionType.getPartitionPath());
}

private boolean shouldExecuteMetadataTableDeletion() {
Expand All @@ -899,7 +898,7 @@ private void clearMetadataTablePartitionsConfig(Option<MetadataPartitionType> pa
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
return;
}
Set<String> completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
Set<String> completedPartitions = metaClient.getTableConfig().getMetadataPartitions();
completedPartitions.remove(partitionType.get().getPartitionPath());
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
Expand Down Expand Up @@ -172,7 +171,7 @@ public Option<HoodieIndexCommitMetadata> execute() {

private void abort(HoodieInstant indexInstant, Set<String> requestedPartitions) {
Set<String> inflightPartitions = getInflightMetadataPartitions(table.getMetaClient().getTableConfig());
Set<String> completedPartitions = getCompletedMetadataPartitions(table.getMetaClient().getTableConfig());
Set<String> completedPartitions = table.getMetaClient().getTableConfig().getMetadataPartitions();
// update table config
requestedPartitions.forEach(partition -> {
inflightPartitions.remove(partition);
Expand Down Expand Up @@ -282,7 +281,7 @@ private static List<HoodieInstant> getCompletedArchivedAndActiveInstantsAfter(St
private void updateMetadataPartitionsTableConfig(HoodieTableMetaClient metaClient, Set<String> metadataPartitions) {
// remove from inflight and update completed indexes
Set<String> inflightPartitions = getInflightMetadataPartitions(metaClient.getTableConfig());
Set<String> completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
Set<String> completedPartitions = metaClient.getTableConfig().getMetadataPartitions();
inflightPartitions.removeAll(metadataPartitions);
completedPartitions.addAll(metadataPartitions);
// update table config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@

import scala.Tuple2;

import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;

/**
Expand Down Expand Up @@ -81,7 +80,7 @@ public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecor

JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
if (config.getBloomIndexUseMetadata()
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig())
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
.contains(BLOOM_FILTERS.getPartitionPath())) {
// Step 1: Sort by file id
JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
Expand Down Expand Up @@ -240,9 +239,9 @@ public void testTurnOffMetadataIndexAfterEnable() throws Exception {
HoodieTableMetaClient.reload(metaClient);
HoodieTableConfig tableConfig = metaClient.getTableConfig();
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
assertFalse(tableConfig.getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));
assertFalse(tableConfig.getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));

// enable column stats and run 1 upserts
HoodieWriteConfig cfgWithColStatsEnabled = HoodieWriteConfig.newBuilder()
Expand All @@ -265,9 +264,9 @@ public void testTurnOffMetadataIndexAfterEnable() throws Exception {
HoodieTableMetaClient.reload(metaClient);
tableConfig = metaClient.getTableConfig();
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));
assertFalse(tableConfig.getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));

// disable column stats and run 1 upsert
HoodieWriteConfig cfgWithColStatsDisabled = HoodieWriteConfig.newBuilder()
Expand All @@ -291,9 +290,9 @@ public void testTurnOffMetadataIndexAfterEnable() throws Exception {
HoodieTableMetaClient.reload(metaClient);
tableConfig = metaClient.getTableConfig();
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
assertFalse(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
assertFalse(tableConfig.getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));
assertFalse(tableConfig.getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));

// enable bloom filter as well as column stats and run 1 upsert
HoodieWriteConfig cfgWithBloomFilterEnabled = HoodieWriteConfig.newBuilder()
Expand All @@ -317,9 +316,9 @@ public void testTurnOffMetadataIndexAfterEnable() throws Exception {
HoodieTableMetaClient.reload(metaClient);
tableConfig = metaClient.getTableConfig();
assertFalse(tableConfig.getMetadataPartitions().isEmpty());
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(FILES.getPartitionPath()));
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(COLUMN_STATS.getPartitionPath()));
assertTrue(getCompletedMetadataPartitions(tableConfig).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(FILES.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));
assertTrue(tableConfig.getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath()));
}

@Test
Expand Down Expand Up @@ -360,7 +359,7 @@ public void testTurnOffMetadataTableAfterEnable() throws Exception {

HoodieTableConfig hoodieTableConfig2 =
new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig2.getPayloadClass());
assertEquals(Collections.emptyList(), hoodieTableConfig2.getMetadataPartitions());
assertEquals(Collections.emptySet(), hoodieTableConfig2.getMetadataPartitions());
// Assert metadata table folder is deleted
assertFalse(metaClient.getFs().exists(
new Path(HoodieTableMetadata.getMetadataTableBasePath(writeConfig2.getBasePath()))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@

import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -231,7 +230,7 @@ public void testLookupIndexWithOrWithoutColumnStats() throws Exception {
// check column_stats partition exists
metaClient = HoodieTableMetaClient.reload(metaClient);
assertTrue(metadataPartitionExists(metaClient.getBasePath(), context, COLUMN_STATS));
assertTrue(getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(COLUMN_STATS.getPartitionPath()));
assertTrue(metaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));

// delete the column_stats partition
deleteMetadataPartition(metaClient.getBasePath(), context, COLUMN_STATS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.HashSet;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -236,6 +237,8 @@ public class HoodieTableConfig extends HoodieConfig {

private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // <database_name>.<table_name>

private static Set<String> metadataPartition = null;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep the result in static variable is not a good way, either we keep the code as it is either we introduce some cache tools like the guava interns and initialize it when building the table config(in the constructor).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review, static is not a good way, Can we keep the result in a Set, and init when the first get is called?

public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) {
super();
Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
Expand Down Expand Up @@ -625,11 +628,14 @@ public List<String> getMetadataPartitionsInflight() {
);
}

public List<String> getMetadataPartitions() {
return StringUtils.split(
getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING),
CONFIG_VALUES_DELIMITER
);
public Set<String> getMetadataPartitions() {
if (metadataPartition == null) {
return new HashSet<>(
StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING),
CONFIG_VALUES_DELIMITER)
);
}
return metadataPartition;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,13 +1353,9 @@ public static Set<String> getInflightMetadataPartitions(HoodieTableConfig tableC
return new HashSet<>(tableConfig.getMetadataPartitionsInflight());
}

public static Set<String> getCompletedMetadataPartitions(HoodieTableConfig tableConfig) {
return new HashSet<>(tableConfig.getMetadataPartitions());
}

public static Set<String> getInflightAndCompletedMetadataPartitions(HoodieTableConfig tableConfig) {
Set<String> inflightAndCompletedPartitions = getInflightMetadataPartitions(tableConfig);
inflightAndCompletedPartitions.addAll(getCompletedMetadataPartitions(tableConfig));
inflightAndCompletedPartitions.addAll(tableConfig.getMetadataPartitions());
return inflightAndCompletedPartitions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ case class HoodieFileIndex(spark: SparkSession,
override def sizeInBytes: Long = cachedFileSize

private def isColumnStatsIndexAvailable =
HoodieTableMetadataUtil.getCompletedMetadataPartitions(metaClient.getTableConfig)
metaClient.getTableConfig.getMetadataPartitions
.contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)

private def isDataSkippingEnabled: Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
Expand Down Expand Up @@ -239,7 +238,7 @@ private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> clien
}

private boolean indexExists(List<MetadataPartitionType> partitionTypes) {
Set<String> indexedMetadataPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
Set<String> indexedMetadataPartitions = metaClient.getTableConfig().getMetadataPartitions();
Set<String> requestedIndexPartitionPaths = partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
requestedIndexPartitionPaths.retainAll(indexedMetadataPartitions);
if (!requestedIndexPartitionPaths.isEmpty()) {
Expand Down
Loading