Skip to content

Commit

Permalink
[HUDI-7528] Fixing RowCustomColumnsSortPartitioner to use repartition…
Browse files Browse the repository at this point in the history
… instead of coalesce (#608)
  • Loading branch information
nsivabalan committed Feb 5, 2025
1 parent 14c292c commit 85d4893
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public RowCustomColumnsSortPartitioner(String[] columnNames, HoodieWriteConfig c
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
return records
.sort(Arrays.stream(sortColumnNames).map(Column::new).toArray(Column[]::new))
.coalesce(outputSparkPartitions);
.repartition(outputSparkPartitions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ public void testCustomColumnSortPartitionerWithRows(boolean suffixRecordKey) {
.build();

testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns, config),
records, true, true, true, generateExpectedPartitionNumRecords(records), Option.of(comparator), true);
records, true, false, true, generateExpectedPartitionNumRecords(records), Option.of(comparator), true);

testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config),
records, true, true, true, generateExpectedPartitionNumRecords(records), Option.of(comparator), true);
records, true, false, true, generateExpectedPartitionNumRecords(records), Option.of(comparator), true);
}

private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.bulkinsert.NonSortPartitionerWithRows;
import org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
Expand Down Expand Up @@ -199,6 +200,35 @@ public void testBulkInsertHelperNoMetaFields() {
assertTrue(dataset.except(trimmedOutput).count() == 0);
}

@Test
public void testRowCustomSortPartitioner() {
List<Row> rows = DataSourceTestUtils.generateRandomRows(100);
Map<String, String> props = getPropsAllSet("_row_key");
props.put(HoodieWriteConfig.BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS.key(), "ts");
HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(props)
.withPopulateMetaFields(false).build();
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
RowCustomColumnsSortPartitioner partitioner = new RowCustomColumnsSortPartitioner(config);
Dataset<Row> output = partitioner.repartitionRecords(dataset, 1);
output.cache();

assertEquals(output.count(), 100);
assertEquals(output.rdd().getNumPartitions(), 1);

// if original df contains higher num partitions, it should reduce and honor lower value passed in.
output = partitioner.repartitionRecords(dataset.repartition(5), 3);
output.cache();

assertEquals(output.count(), 100);
assertEquals(output.rdd().getNumPartitions(), 3);

// higher value of output spark partitions.
output = partitioner.repartitionRecords(dataset.repartition(3), 6);
output.cache();
assertEquals(output.count(), 100);
assertEquals(output.rdd().getNumPartitions(), 6);
}

@ParameterizedTest
@MethodSource("providePreCombineArgs")
public void testBulkInsertPreCombine(boolean enablePreCombine) {
Expand Down

0 comments on commit 85d4893

Please sign in to comment.