Skip to content

Commit

Permalink
Add validation of number of RDD partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua committed Sep 27, 2022
1 parent 4b207ae commit f29b765
Showing 1 changed file with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -461,22 +461,27 @@ private void testDeduplication(
HoodieData<HoodieRecord<RawTripTestPayload>> records = HoodieJavaRDD.of(
jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1));
HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
.combineInput(true, true);
.combineInput(true, true);
addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
HoodieWriteConfig writeConfig = configBuilder.build();

// Global dedup should be done based on recordKey only
HoodieIndex index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(true);
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, writeConfig.getSchema()).collectAsList();
int dedupParallelism = records.getNumPartitions() + 100;
HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema());
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = dedupedRecsRdd.collectAsList();
assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
assertEquals(1, dedupedRecs.size());
assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath());
assertNodupesWithinPartition(dedupedRecs);

// non-Global dedup should be done based on both recordKey and partitionPath
index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(false);
dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, writeConfig.getSchema()).collectAsList();
dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema());
dedupedRecs = dedupedRecsRdd.collectAsList();
assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
assertEquals(2, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);

Expand Down

0 comments on commit f29b765

Please sign in to comment.