-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cleanup async state when multi-threaded shuffle readers fail #10637
Cleanup async state when multi-threaded shuffle readers fail #10637
Conversation
Signed-off-by: Alessandro Bellina <[email protected]>
@@ -770,20 +831,27 @@ abstract class RapidsShuffleThreadedReaderBase[K, C]( | |||
|
|||
private def deserializeTask(blockState: BlockState): Unit = { | |||
val slot = RapidsShuffleInternalManagerBase.getNextReaderSlot | |||
val tc = TaskContext.get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not used and is showing as a compile error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whoops, thanks
|
||
// close any materialized BlockState objects that are holding onto netty buffers or | ||
// file descriptors | ||
pendingIts.foreach(_.close) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
safeClose #10502
build |
build |
} | ||
futures.clear() | ||
if (fallbackIter != null) { | ||
fallbackIter.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this throws and failedFuture was not empty at the time, we are potentially masking the root cause
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good point, I can add it as suppressed, sec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gerashegalov should be taken care of here: d58b471
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Closes #10631
We had some reports of file descriptors left open from the multi-threaded shuffle reader, likely due to a wave of other exceptions. I started to look at it and found that it did not handle cases when tasks were cancelled correctly, nor errors such as stream close. I added code in the task completion callback to close out any accumulated future or to-be-processed batch.
I also fixed an issue where we would leak a batch if a stream closes after we materialize the batch, but before we read the next header. I found that one because I was testing this by introducing exceptions manually.
I am going to add a test in
RapidsShuffleThreadedReaderSuite
shortly.