Skip to content

Commit

Permalink
[HUDI-3304] review
Browse files Browse the repository at this point in the history
  • Loading branch information
jian.feng committed Sep 10, 2022
1 parent f8963a6 commit cfd03d2
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ private static Stream<Arguments> writeLogTest() {
return Stream.of(data).map(Arguments::of);
}

private static Stream<Arguments> writeTest() {
// enable metadata table, enable embedded time line server
private static Stream<Arguments> writePayloadTest() {
// Payload class
Object[] data = new Object[] {
DefaultHoodieRecordPayload.class.getName(),
PartialUpdateAvroPayload.class.getName()
Expand All @@ -94,7 +94,7 @@ public void setup() {
}

@ParameterizedTest
@MethodSource("writeTest")
@MethodSource("writePayloadTest")
public void testWriteDuringCompaction(String payloadClass) throws IOException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,7 @@ private boolean flushBucket(DataBucket bucket) {
List<HoodieRecord> records = bucket.writeBuffer();
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
HoodieWriteConfig writeConfig = getHoodieClientConfig(config);
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, writeConfig.getSchema());
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema());
}
bucket.preWrite(records);
final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
Expand Down

0 comments on commit cfd03d2

Please sign in to comment.