Skip to content

Commit

Permalink
[HUDI-4808] Fix HoodieSimpleBucketIndex not consider bucket num in lo… (
Browse files Browse the repository at this point in the history
#6630)

* [HUDI-4808] Fix HoodieSimpleBucketIndex not consider bucket num in log file issue

Co-authored-by: xiaoxingstack <[email protected]>
  • Loading branch information
2 people authored and yuzhaojing committed Sep 29, 2022
1 parent e524a62 commit 0bd1c53
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
Expand Down Expand Up @@ -72,6 +73,26 @@ public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
return Collections.emptyList();
}

/**
* Fetches Pair of partition path and {@link FileSlice}s for interested partitions.
*
* @param partition Partition of interest
* @param hoodieTable Instance of {@link HoodieTable} of interest
* @return the list of {@link FileSlice}
*/
public static List<FileSlice> getLatestFileSlicesForPartition(
final String partition,
final HoodieTable hoodieTable) {
Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
if (latestCommitTime.isPresent()) {
return hoodieTable.getHoodieView()
.getLatestFileSlicesBeforeOrOn(partition, latestCommitTime.get().getTimestamp(), true)
.collect(toList());
}
return Collections.emptyList();
}

/**
* Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.table.HoodieTable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -52,10 +51,11 @@ private Map<Integer, HoodieRecordLocation> loadPartitionBucketIdFileIdMapping(
Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = new HashMap<>();
hoodieTable.getMetaClient().reloadActiveTimeline();
HoodieIndexUtils
.getLatestBaseFilesForPartition(partition, hoodieTable)
.forEach(file -> {
String fileId = file.getFileId();
String commitTime = file.getCommitTime();
.getLatestFileSlicesForPartition(partition, hoodieTable)
.forEach(fileSlice -> {
String fileId = fileSlice.getFileId();
String commitTime = fileSlice.getBaseInstantTime();

int bucketId = BucketIdentifier.bucketIdFromFileId(fileId);
if (!bucketIdToFileIdMapping.containsKey(bucketId)) {
bucketIdToFileIdMapping.put(bucketId, new HoodieRecordLocation(commitTime, fileId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -45,9 +44,9 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.io.storage.HoodieOrcWriter;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -152,27 +151,21 @@ public Path withInserts(String partition, String fileId, List<HoodieRecord> reco
return baseFilePath;
}

public Map<String, List<HoodieLogFile>> withLogAppends(List<HoodieRecord> records) throws Exception {
public Map<String, List<HoodieLogFile>> withLogAppends(String partition, String fileId, List<HoodieRecord> records) throws Exception {
Map<String, List<HoodieLogFile>> partitionToLogfilesMap = new HashMap<>();
for (List<HoodieRecord> groupedRecords : records.stream()
.collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)).values()) {
final Pair<String, HoodieLogFile> appendedLogFile = appendRecordsToLogFile(groupedRecords);
partitionToLogfilesMap.computeIfAbsent(
appendedLogFile.getKey(), k -> new ArrayList<>()).add(appendedLogFile.getValue());
}
final Pair<String, HoodieLogFile> appendedLogFile = appendRecordsToLogFile(partition, fileId, records);
partitionToLogfilesMap.computeIfAbsent(appendedLogFile.getKey(), k -> new ArrayList<>()).add(appendedLogFile.getValue());
return partitionToLogfilesMap;
}

private Pair<String, HoodieLogFile> appendRecordsToLogFile(List<HoodieRecord> groupedRecords) throws Exception {
String partitionPath = groupedRecords.get(0).getPartitionPath();
HoodieRecordLocation location = groupedRecords.get(0).getCurrentLocation();
private Pair<String, HoodieLogFile> appendRecordsToLogFile(String partitionPath, String fileId, List<HoodieRecord> records) throws Exception {
try (HoodieLogFormat.Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
.overBaseCommit(location.getInstantTime()).withFs(fs).build()) {
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
.overBaseCommit(currentInstantTime).withFs(fs).build()) {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getInstantTime());
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, currentInstantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
logWriter.appendBlock(new HoodieAvroDataBlock(groupedRecords.stream().map(r -> {
logWriter.appendBlock(new HoodieAvroDataBlock(records.stream().map(r -> {
try {
GenericRecord val = (GenericRecord) ((HoodieRecordPayload) r.getData()).getInsertValue(schema).get();
HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), r.getPartitionPath(), "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Arrays;
import java.util.Properties;
Expand Down Expand Up @@ -89,8 +91,9 @@ public void testBucketIndexValidityCheck() {
.withBucketNum("8").build();
}

@Test
public void testTagLocation() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testTagLocation(boolean isInsert) throws Exception {
String rowKey1 = UUID.randomUUID().toString();
String rowKey2 = UUID.randomUUID().toString();
String rowKey3 = UUID.randomUUID().toString();
Expand Down Expand Up @@ -119,9 +122,17 @@ public void testTagLocation() throws Exception {
assertFalse(taggedRecordRDD.collectAsList().stream().anyMatch(r -> r.isCurrentLocationKnown()));

HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(table, SCHEMA);
testTable.addCommit("001").withInserts("2016/01/31", getRecordFileId(record1), record1);
testTable.addCommit("002").withInserts("2016/01/31", getRecordFileId(record2), record2);
testTable.addCommit("003").withInserts("2016/01/31", getRecordFileId(record3), record3);

if (isInsert) {
testTable.addCommit("001").withInserts("2016/01/31", getRecordFileId(record1), record1);
testTable.addCommit("002").withInserts("2016/01/31", getRecordFileId(record2), record2);
testTable.addCommit("003").withInserts("2016/01/31", getRecordFileId(record3), record3);
} else {
testTable.addCommit("001").withLogAppends("2016/01/31", getRecordFileId(record1), record1);
testTable.addCommit("002").withLogAppends("2016/01/31", getRecordFileId(record2), record2);
testTable.addCommit("003").withLogAppends("2016/01/31", getRecordFileId(record3), record3);
}

taggedRecordRDD = bucketIndex.tagLocation(HoodieJavaRDD.of(recordRDD), context,
HoodieSparkTable.create(config, context, metaClient));
assertFalse(taggedRecordRDD.collectAsList().stream().filter(r -> r.isCurrentLocationKnown())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
Expand All @@ -36,6 +37,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;

public class HoodieSparkWriteableTestTable extends HoodieWriteableTestTable {
Expand Down Expand Up @@ -116,4 +118,13 @@ public HoodieSparkWriteableTestTable withInserts(String partition, String fileId
public Path withInserts(String partition, String fileId, List<HoodieRecord> records) throws Exception {
return super.withInserts(partition, fileId, records, new SparkTaskContextSupplier());
}

public HoodieSparkWriteableTestTable withLogAppends(String partition, String fileId, HoodieRecord... records) throws Exception {
withLogAppends(partition, fileId, Arrays.asList(records));
return this;
}

public Map<String, List<HoodieLogFile>> withLogAppends(String partition, String fileId, List<HoodieRecord> records) throws Exception {
return super.withLogAppends(partition, fileId, records);
}
}

0 comments on commit 0bd1c53

Please sign in to comment.