Skip to content

Commit

Permalink
[#1844] fix(spark): Reassign shuffle servers when retrying stage (#1845)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

If the Shuffle Server is not reassigned after the Retry is triggered at the Stage, data will be lost. Therefore, reassign the Shuffle Server after the Retry.
question: 
Error:  Failures:  Error:    RSSStageDynamicServerReWriteTest.testRSSStageResubmit:119-SparkIntegrationTestBase.run:64->SparkIntegrationTestBase.verifyTestResult:149 expected: <1000> but was: <970>.

### Why are the changes needed?

Fix: #1844

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Presence test.
  • Loading branch information
yl09099 authored Dec 9, 2024
1 parent 4ce1aa8 commit 0552d3b
Show file tree
Hide file tree
Showing 35 changed files with 596 additions and 391 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,21 @@

package org.apache.spark.shuffle;

import java.util.Map;
import java.util.Set;

import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.util.JavaUtils;

public class RssStageResubmitManager {

private static final Logger LOG = LoggerFactory.getLogger(RssStageResubmitManager.class);

/** Blacklist of the Shuffle Server when the write fails. */
private Set<String> serverIdBlackList;
/**
* Prevent multiple tasks from reporting FetchFailed, resulting in multiple ShuffleServer
* assignments, stageID, Attemptnumber Whether to reassign the combination flag;
*/
private Map<Integer, RssStageInfo> serverAssignedInfos;

public RssStageResubmitManager() {
this.serverIdBlackList = Sets.newConcurrentHashSet();
this.serverAssignedInfos = JavaUtils.newConcurrentMap();
}

public Set<String> getServerIdBlackList() {
Expand All @@ -53,17 +45,4 @@ public void resetServerIdBlackList(Set<String> failuresShuffleServerIds) {
public void recordFailuresShuffleServer(String shuffleServerId) {
serverIdBlackList.add(shuffleServerId);
}

public RssStageInfo recordAndGetServerAssignedInfo(int shuffleId, String stageIdAndAttempt) {

return serverAssignedInfos.computeIfAbsent(
shuffleId, id -> new RssStageInfo(stageIdAndAttempt, false));
}

public void recordAndGetServerAssignedInfo(
int shuffleId, String stageIdAndAttempt, boolean isRetried) {
serverAssignedInfos
.computeIfAbsent(shuffleId, id -> new RssStageInfo(stageIdAndAttempt, false))
.setReassigned(isRetried);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class WriteBufferManager extends MemoryConsumer {
private BlockIdLayout blockIdLayout;
private double bufferSpillRatio;
private Function<Integer, List<ShuffleServerInfo>> partitionAssignmentRetrieveFunc;
private int stageAttemptNumber;

public WriteBufferManager(
int shuffleId,
Expand All @@ -122,7 +123,8 @@ public WriteBufferManager(
taskMemoryManager,
shuffleWriteMetrics,
rssConf,
null);
null,
0);
}

public WriteBufferManager(
Expand All @@ -136,6 +138,32 @@ public WriteBufferManager(
RssConf rssConf,
Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>> spillFunc,
Function<Integer, List<ShuffleServerInfo>> partitionAssignmentRetrieveFunc) {
this(
shuffleId,
taskId,
taskAttemptId,
bufferManagerOptions,
serializer,
taskMemoryManager,
shuffleWriteMetrics,
rssConf,
spillFunc,
partitionAssignmentRetrieveFunc,
0);
}

public WriteBufferManager(
int shuffleId,
String taskId,
long taskAttemptId,
BufferManagerOptions bufferManagerOptions,
Serializer serializer,
TaskMemoryManager taskMemoryManager,
ShuffleWriteMetrics shuffleWriteMetrics,
RssConf rssConf,
Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>> spillFunc,
Function<Integer, List<ShuffleServerInfo>> partitionAssignmentRetrieveFunc,
int stageAttemptNumber) {
super(taskMemoryManager, taskMemoryManager.pageSizeBytes(), MemoryMode.ON_HEAP);
this.bufferSize = bufferManagerOptions.getBufferSize();
this.spillSize = bufferManagerOptions.getBufferSpillThreshold();
Expand Down Expand Up @@ -169,6 +197,7 @@ public WriteBufferManager(
this.bufferSpillRatio = rssConf.get(RssSparkConfig.RSS_MEMORY_SPILL_RATIO);
this.blockIdLayout = BlockIdLayout.from(rssConf);
this.partitionAssignmentRetrieveFunc = partitionAssignmentRetrieveFunc;
this.stageAttemptNumber = stageAttemptNumber;
}

public WriteBufferManager(
Expand All @@ -181,7 +210,8 @@ public WriteBufferManager(
TaskMemoryManager taskMemoryManager,
ShuffleWriteMetrics shuffleWriteMetrics,
RssConf rssConf,
Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>> spillFunc) {
Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>> spillFunc,
int stageAttemptNumber) {
this(
shuffleId,
taskId,
Expand All @@ -192,7 +222,8 @@ public WriteBufferManager(
shuffleWriteMetrics,
rssConf,
spillFunc,
partitionId -> partitionToServers.get(partitionId));
partitionId -> partitionToServers.get(partitionId),
stageAttemptNumber);
}

/** add serialized columnar data directly when integrate with gluten */
Expand Down Expand Up @@ -492,7 +523,7 @@ public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockI
+ totalSize
+ " bytes");
}
events.add(new AddBlockEvent(taskId, shuffleBlockInfosPerEvent));
events.add(new AddBlockEvent(taskId, stageAttemptNumber, shuffleBlockInfosPerEvent));
shuffleBlockInfosPerEvent = Lists.newArrayList();
totalSize = 0;
}
Expand All @@ -507,7 +538,7 @@ public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockI
+ " bytes");
}
// Use final temporary variables for closures
events.add(new AddBlockEvent(taskId, shuffleBlockInfosPerEvent));
events.add(new AddBlockEvent(taskId, stageAttemptNumber, shuffleBlockInfosPerEvent));
}
return events;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ public BlockIdManager() {
}

public void add(int shuffleId, int partitionId, List<Long> ids) {

if (CollectionUtils.isEmpty(ids)) {
return;
}
Map<Integer, Roaring64NavigableMap> partitionedBlockIds =
blockIds.computeIfAbsent(shuffleId, (k) -> JavaUtils.newConcurrentMap());
blockIds.computeIfAbsent(shuffleId, k -> JavaUtils.newConcurrentMap());
partitionedBlockIds.compute(
partitionId,
(id, bitmap) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,31 @@ public void reportShuffleResult(
managerClientSupplier.get().reportShuffleResult(request);
}

@Override
public void reportShuffleResult(
Map<ShuffleServerInfo, Map<Integer, Set<Long>>> serverToPartitionToBlockIds,
String appId,
int shuffleId,
long taskAttemptId,
int bitmapNum,
Set<ShuffleServerInfo> reportFailureServers,
boolean enableWriteFailureRetry) {
Map<Integer, List<Long>> partitionToBlockIds = new HashMap<>();
for (Map<Integer, Set<Long>> k : serverToPartitionToBlockIds.values()) {
for (Map.Entry<Integer, Set<Long>> entry : k.entrySet()) {
int partitionId = entry.getKey();
partitionToBlockIds
.computeIfAbsent(partitionId, x -> new ArrayList<>())
.addAll(entry.getValue());
}
}

RssReportShuffleResultRequest request =
new RssReportShuffleResultRequest(
appId, shuffleId, taskAttemptId, partitionToBlockIds, bitmapNum);
managerClientSupplier.get().reportShuffleResult(request);
}

@Override
public Roaring64NavigableMap getShuffleResult(
String clientType,
Expand Down
Loading

0 comments on commit 0552d3b

Please sign in to comment.