Skip to content

Commit

Permalink
[flink] Redistribute splits after rescaling (apache#31313)
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Jan 16, 2025
1 parent 7680207 commit adaaacf
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,15 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname

@Override
public void addSplitsBack(List<FlinkSourceSplit<T>> splits, int subtaskId) {
LOG.info("Adding splits {} back from subtask {}", splits, subtaskId);
List<FlinkSourceSplit<T>> splitsForSubtask =
pendingSplits.computeIfAbsent(subtaskId, ignored -> new ArrayList<>());
splitsForSubtask.addAll(splits);
LOG.info("Adding splits {}", splits);
// reshuffle splits, needed after rescaling
splits.forEach(
split -> {
int target = split.splitIndex() % context.currentParallelism();
List<FlinkSourceSplit<T>> splitsForSubtask =
pendingSplits.computeIfAbsent(target, ignored -> new ArrayList<>());
splitsForSubtask.add(split);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestBoundedCountingSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource;
Expand Down Expand Up @@ -130,6 +132,49 @@ public void testAddSplitsBack() throws IOException {
}
}

@Test
public void testAddSplitsBackAfterRescale() throws Exception {
final int numSubtasks = 2;
final int numSplits = 8; // divisible by 2 and 4
final int totalNumRecords = 8;
TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>> testContext =
new TestingSplitEnumeratorContext<>(numSubtasks);
TestBoundedCountingSource testSource =
new TestBoundedCountingSource(numSplits, totalNumRecords);
final Map<Integer, List<FlinkSourceSplit<KV<Integer, Integer>>>> assignment;
try (FlinkSourceSplitEnumerator<KV<Integer, Integer>> splitEnumerator =
new FlinkSourceSplitEnumerator<>(
testContext, testSource, FlinkPipelineOptions.defaults(), numSplits)) {
splitEnumerator.start();
for (int i = 0; i < numSubtasks; i++) {
testContext.registerReader(i, String.valueOf(i));
splitEnumerator.addReader(i);
}
testContext.getExecutorService().triggerAll();
assignment =
testContext.getSplitAssignments().entrySet().stream()
.map(e -> KV.of(e.getKey(), e.getValue().getAssignedSplits()))
.collect(Collectors.toMap(KV::getKey, KV::getValue));
}

// add more tasks
testContext = new TestingSplitEnumeratorContext<>(numSubtasks + 2);
try (FlinkSourceSplitEnumerator<KV<Integer, Integer>> splitEnumerator =
new FlinkSourceSplitEnumerator<>(
testContext, testSource, FlinkPipelineOptions.defaults(), numSplits, true)) {
splitEnumerator.start();
assignment.forEach(
(splitId, assignedSplits) -> splitEnumerator.addSplitsBack(assignedSplits, splitId));
testContext.registerReader(0, "0");
splitEnumerator.addReader(0);
testContext.getExecutorService().triggerAll();

List<FlinkSourceSplit<KV<Integer, Integer>>> splitsForReader =
testContext.getSplitAssignments().get(0).getAssignedSplits();
assertEquals(numSplits / (numSubtasks + 2), splitsForReader.size());
}
}

private void assignSplits(
TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>> context,
Source<KV<Integer, Integer>> source,
Expand Down

0 comments on commit adaaacf

Please sign in to comment.