Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-113]: Use Pair over # delimited string #672

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Hashing;
import com.uber.hoodie.common.util.collection.Pair;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
Expand Down Expand Up @@ -139,11 +141,10 @@ public int numPartitions() {

@Override
public int getPartition(Object key) {
String[] parts = ((String) key).split("#");
String fileName = parts[0];
final long hashOfKey = Hashing.md5().hashString(parts[1], StandardCharsets.UTF_8).asLong();
List<Integer> candidatePartitions = fileGroupToPartitions.get(fileName);
int idx = (int) Math.floorMod(hashOfKey, candidatePartitions.size());
final Pair<String, String> parts = (Pair<String, String>) key;
final long hashOfKey = Hashing.md5().hashString(parts.getRight(), StandardCharsets.UTF_8).asLong();
final List<Integer> candidatePartitions = fileGroupToPartitions.get(parts.getLeft());
final int idx = (int) Math.floorMod(hashOfKey, candidatePartitions.size());
pseudomuto marked this conversation as resolved.
Show resolved Hide resolved
assert idx >= 0;
return candidatePartitions.get(idx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.ParquetUtils;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.MetadataNotFoundException;
import com.uber.hoodie.index.HoodieIndex;
Expand All @@ -42,9 +43,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -171,7 +175,7 @@ private Map<String, Long> computeComparisonsPerFileGroup(final Map<String, Long>
// we will just try exploding the input and then count to determine comparisons
// FIX(vc): Only do sampling here and extrapolate?
fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo,
partitionRecordKeyPairRDD).mapToPair(t -> t._2()).countByKey();
partitionRecordKeyPairRDD).mapToPair(t -> t).countByKey();
} else {
fileToComparisons = new HashMap<>();
partitionToFileInfo.entrySet().stream().forEach(e -> {
Expand Down Expand Up @@ -290,8 +294,6 @@ public boolean isImplicitWithStorage() {
return true;
}



/**
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
* checked. For datasets, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
Expand All @@ -301,24 +303,21 @@ public boolean isImplicitWithStorage() {
* recordKey ranges in the index info.
*/
@VisibleForTesting
JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
IndexFileFilter indexFileFilter = config.useBloomIndexTreebasedFilter()
? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
: new ListBasedIndexFileFilter(partitionToFileIndexInfo);

return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
String recordKey = partitionRecordKeyPair._2();
String partitionPath = partitionRecordKeyPair._1();
List<Tuple2<String, Tuple2<String, HoodieKey>>> recordComparisons = new ArrayList<>();
indexFileFilter.getMatchingFiles(partitionPath, recordKey).forEach(matchingFile -> {
recordComparisons.add(
new Tuple2<>(String.format("%s#%s", matchingFile, recordKey),
new Tuple2<>(matchingFile,
new HoodieKey(recordKey, partitionPath))));
});
return recordComparisons;
}).flatMapToPair(List::iterator);

return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream()
.map(matchingFile -> new Tuple2<>(matchingFile, new HoodieKey(recordKey, partitionPath)))
.collect(Collectors.toList());
}).flatMap(List::iterator);
}

/**
Expand All @@ -332,28 +331,32 @@ JavaPairRDD<String, String> findMatchingFilesForRecordKeys(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTableMetaClient metaClient,
Map<String, Long> fileGroupToComparisons) {
JavaPairRDD<String, Tuple2<String, HoodieKey>> fileSortedTripletRDD =
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);

if (config.useBloomIndexBucketizedChecking()) {
BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism,
fileGroupToComparisons, config.getBloomIndexKeysPerBucket());
fileSortedTripletRDD = fileSortedTripletRDD.repartitionAndSortWithinPartitions(partitioner);
Partitioner partitioner = new BucketizedBloomCheckPartitioner(
shuffleParallelism,
fileGroupToComparisons,
config.getBloomIndexKeysPerBucket()
);

fileComparisonsRDD = fileComparisonsRDD
.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t))
.repartitionAndSortWithinPartitions(partitioner)
.map(Tuple2::_2);
} else {
// sort further based on filename, such that all checking for the file can happen within
// a single partition, on-the-fly
fileSortedTripletRDD = fileSortedTripletRDD.sortByKey(true, shuffleParallelism);
fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism);
}
return fileSortedTripletRDD.mapPartitionsWithIndex(
new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true)

return fileComparisonsRDD
.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true)
.flatMap(List::iterator)
.filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0)
.flatMapToPair(lookupResult -> {
List<Tuple2<String, String>> vals = new ArrayList<>();
for (String recordKey : lookupResult.getMatchingRecordKeys()) {
vals.add(new Tuple2<>(recordKey, lookupResult.getFileName()));
}
return vals.iterator();
});
.filter(lr -> lr.getMatchingRecordKeys().size() > 0)
.flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream()
.map(recordKey -> new Tuple2<>(recordKey, lookupResult.getFileName()))
.collect(Collectors.toList())
.iterator());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* actual files
*/
public class HoodieBloomIndexCheckFunction implements
Function2<Integer, Iterator<Tuple2<String, Tuple2<String, HoodieKey>>>,
Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
Iterator<List<KeyLookupResult>>> {

private static Logger logger = LogManager.getLogger(HoodieBloomIndexCheckFunction.class);
Expand Down Expand Up @@ -84,13 +84,13 @@ public static List<String> checkCandidatesAgainstFile(Configuration configuratio

@Override
public Iterator<List<KeyLookupResult>> call(Integer partition,
Iterator<Tuple2<String, Tuple2<String, HoodieKey>>> fileParitionRecordKeyTripletItr)
Iterator<Tuple2<String, HoodieKey>> fileParitionRecordKeyTripletItr)
throws Exception {
return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
}

class LazyKeyCheckIterator extends
LazyIterableIterator<Tuple2<String, Tuple2<String, HoodieKey>>, List<KeyLookupResult>> {
LazyIterableIterator<Tuple2<String, HoodieKey>, List<KeyLookupResult>> {

private List<String> candidateRecordKeys;

Expand All @@ -103,7 +103,7 @@ class LazyKeyCheckIterator extends
private long totalKeysChecked;

LazyKeyCheckIterator(
Iterator<Tuple2<String, Tuple2<String, HoodieKey>>> filePartitionRecordKeyTripletItr) {
Iterator<Tuple2<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
super(filePartitionRecordKeyTripletItr);
currentFile = null;
candidateRecordKeys = new ArrayList<>();
Expand Down Expand Up @@ -162,10 +162,10 @@ protected List<KeyLookupResult> computeNext() {
try {
// process one file in each go.
while (inputItr.hasNext()) {
Tuple2<String, Tuple2<String, HoodieKey>> currentTuple = inputItr.next();
String fileName = currentTuple._2._1;
String partitionPath = currentTuple._2._2.getPartitionPath();
String recordKey = currentTuple._2._2.getRecordKey();
Tuple2<String, HoodieKey> currentTuple = inputItr.next();
String fileName = currentTuple._1;
String partitionPath = currentTuple._2.getPartitionPath();
String recordKey = currentTuple._2.getRecordKey();

// lazily init state
if (currentFile == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

Expand Down Expand Up @@ -76,7 +78,7 @@ List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitio

@Override
@VisibleForTesting
JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
Map<String, String> indexToPartitionMap = new HashMap<>();
Expand All @@ -87,17 +89,14 @@ JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileCompariso
IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges()
? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo)
: new ListBasedGlobalIndexFileFilter(partitionToFileIndexInfo);

return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
String recordKey = partitionRecordKeyPair._2();
String partitionPath = partitionRecordKeyPair._1();
List<Tuple2<String, Tuple2<String, HoodieKey>>> recordComparisons = new ArrayList<>();
indexFileFilter.getMatchingFiles(partitionPath, recordKey).forEach(matchingFile -> {
recordComparisons.add(
new Tuple2<>(String.format("%s#%s", matchingFile, recordKey),
new Tuple2<>(matchingFile,
new HoodieKey(recordKey, indexToPartitionMap.get(matchingFile)))));
});
return recordComparisons;
}).flatMapToPair(List::iterator);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice clean up

return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream()
.map(file -> new Tuple2<>(file, new HoodieKey(recordKey, indexToPartitionMap.get(file))))
.collect(Collectors.toList());
}).flatMap(List::iterator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,12 @@ public void testRangePruning() {
new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"),
new Tuple2<>("2017/10/22", "004"))).mapToPair(t -> t);

List<Tuple2<String, Tuple2<String, HoodieKey>>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons(
List<Tuple2<String, HoodieKey>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons(
partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect();

assertEquals(10, comparisonKeyList.size());
Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy(
t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList())));
t -> t._2.getRecordKey(), Collectors.mapping(t -> t._1, Collectors.toList())));

assertEquals(4, recordKeyToFileComps.size());
assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("002")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,25 +190,25 @@ public void testExplodeRecordRDDWithFileComparisons() {
new Tuple2<>("2017/10/21", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"),
new Tuple2<>("2017/10/23", "004"))).mapToPair(t -> t);

List<Tuple2<String, Tuple2<String, HoodieKey>>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons(
partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect();

/* epecting:
f4#003, f4, HoodieKey { recordKey=003 partitionPath=2017/10/23}
f1#003, f1, HoodieKey { recordKey=003 partitionPath=2017/10/22}
f3#003, f3, HoodieKey { recordKey=003 partitionPath=2017/10/22}
f4#002, f4, HoodieKey { recordKey=002 partitionPath=2017/10/23}
f1#002, f1, HoodieKey { recordKey=002 partitionPath=2017/10/22}
f3#002, f3, HoodieKey { recordKey=002 partitionPath=2017/10/22}
f4#005, f4, HoodieKey { recordKey=005 partitionPath=2017/10/23}
f1#005, f1, HoodieKey { recordKey=005 partitionPath=2017/10/22}
f4#004, f4, HoodieKey { recordKey=004 partitionPath=2017/10/23}
f1#004, f1, HoodieKey { recordKey=004 partitionPath=2017/10/22}
List<Tuple2<String, HoodieKey>> comparisonKeyList =
index.explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect();

/* expecting:
f4, HoodieKey { recordKey=003 partitionPath=2017/10/23}
f1, HoodieKey { recordKey=003 partitionPath=2017/10/22}
f3, HoodieKey { recordKey=003 partitionPath=2017/10/22}
f4, HoodieKey { recordKey=002 partitionPath=2017/10/23}
f1, HoodieKey { recordKey=002 partitionPath=2017/10/22}
f3, HoodieKey { recordKey=002 partitionPath=2017/10/22}
f4, HoodieKey { recordKey=005 partitionPath=2017/10/23}
f1, HoodieKey { recordKey=005 partitionPath=2017/10/22}
f4, HoodieKey { recordKey=004 partitionPath=2017/10/23}
f1, HoodieKey { recordKey=004 partitionPath=2017/10/22}
*/
assertEquals(10, comparisonKeyList.size());

Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy(
t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList())));
Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
.collect(Collectors.groupingBy(t -> t._2.getRecordKey(), Collectors.mapping(Tuple2::_1, Collectors.toList())));

assertEquals(4, recordKeyToFileComps.size());
assertEquals(new HashSet<>(Arrays.asList("f4", "f1", "f3")), new HashSet<>(recordKeyToFileComps.get("002")));
Expand Down