Skip to content

Commit

Permalink
Add suppressed failures in Callback failed (#11435)
Browse files Browse the repository at this point in the history
If an exception is thrown during failure handling, then record the original failure as a suppressed Throwable on the thrown exception
  • Loading branch information
gregw authored Feb 29, 2024
1 parent 56e05a9 commit 4155e7b
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ private Runnable lockedFailWrite(Throwable x, boolean fatal)
}
if (writeCallback == null)
return null;
return () -> writeCallback.failed(x);
return () -> HttpChannelState.failed(writeCallback, x);
}

public long getContentBytesWritten()
Expand Down Expand Up @@ -1229,7 +1229,7 @@ else if (last && !(totalWritten == 0 && HttpMethod.HEAD.is(_request.getMethod())
if (writeFailure != null)
{
Throwable failure = writeFailure;
httpChannelState._writeInvoker.run(() -> callback.failed(failure));
httpChannelState._writeInvoker.run(() -> HttpChannelState.failed(callback, failure));
return;
}

Expand Down Expand Up @@ -1297,7 +1297,7 @@ public void failed(Throwable x)
httpChannel.lockedStreamSendCompleted(false);
}
if (callback != null)
httpChannel._writeInvoker.run(() -> callback.failed(x));
httpChannel._writeInvoker.run(() -> HttpChannelState.failed(callback, x));
}

@Override
Expand Down Expand Up @@ -1513,41 +1513,49 @@ else if (LOG.isDebugEnabled())
@Override
public void failed(Throwable failure)
{
// Called when the request/response cycle is completing with a failure.
HttpStream stream;
ChannelRequest request;
HttpChannelState httpChannelState;
ErrorResponse errorResponse = null;
try (AutoLock ignored = _request._lock.lock())
try
{
if (lockedCompleteCallback())
return;
httpChannelState = _request._httpChannelState;
stream = httpChannelState._stream;
request = _request;
// Called when the request/response cycle is completing with a failure.
HttpStream stream;
ChannelRequest request;
HttpChannelState httpChannelState;
ErrorResponse errorResponse = null;
try (AutoLock ignored = _request._lock.lock())
{
if (lockedCompleteCallback())
return;
httpChannelState = _request._httpChannelState;
stream = httpChannelState._stream;
request = _request;

assert httpChannelState._callbackFailure == null;
assert httpChannelState._callbackFailure == null;

httpChannelState._callbackFailure = failure;
httpChannelState._callbackFailure = failure;

// Consume any input.
Throwable unconsumed = stream.consumeAvailable();
ExceptionUtil.addSuppressedIfNotAssociated(failure, unconsumed);
// Consume any input.
Throwable unconsumed = stream.consumeAvailable();
ExceptionUtil.addSuppressedIfNotAssociated(failure, unconsumed);

ChannelResponse response = httpChannelState._response;
if (LOG.isDebugEnabled())
LOG.debug("failed stream.isCommitted={}, response.isCommitted={} {}", stream.isCommitted(), response.isCommitted(), this);
ChannelResponse response = httpChannelState._response;
if (LOG.isDebugEnabled())
LOG.debug("failed stream.isCommitted={}, response.isCommitted={} {}", stream.isCommitted(), response.isCommitted(), this);

// There may have been an attempt to write an error response that failed.
// Do not try to write again an error response if already committed.
if (!stream.isCommitted())
errorResponse = new ErrorResponse(request);
}
// There may have been an attempt to write an error response that failed.
// Do not try to write again an error response if already committed.
if (!stream.isCommitted())
errorResponse = new ErrorResponse(request);
}

if (errorResponse != null)
Response.writeError(request, errorResponse, new ErrorCallback(request, errorResponse, stream, failure), failure);
else
_request.getHttpChannelState()._handlerInvoker.failed(failure);
if (errorResponse != null)
Response.writeError(request, errorResponse, new ErrorCallback(request, errorResponse, stream, failure), failure);
else
_request.getHttpChannelState()._handlerInvoker.failed(failure);
}
catch (Throwable t)
{
ExceptionUtil.addSuppressedIfNotAssociated(t, failure);
throw t;
}
}

private boolean lockedCompleteCallback()
Expand Down Expand Up @@ -1683,7 +1691,7 @@ public void succeeded()
}
else
{
httpChannelState._handlerInvoker.failed(failure);
HttpChannelState.failed(httpChannelState._handlerInvoker, failure);
}
}

Expand All @@ -1705,9 +1713,8 @@ public void failed(Throwable x)
httpChannelState = _request.lockedGetHttpChannelState();
httpChannelState._response._status = _errorResponse._status;
}
if (ExceptionUtil.areNotAssociated(failure, x))
failure.addSuppressed(x);
httpChannelState._handlerInvoker.failed(failure);
ExceptionUtil.addSuppressedIfNotAssociated(failure, x);
HttpChannelState.failed(httpChannelState._handlerInvoker, failure);
}

@Override
Expand Down Expand Up @@ -1736,8 +1743,16 @@ public void succeeded()
@Override
public void failed(Throwable x)
{
completing();
super.failed(x);
try
{
completing();
super.failed(x);
}
catch (Throwable t)
{
ExceptionUtil.addSuppressedIfNotAssociated(t, x);
throw t;
}
}

private void completing()
Expand Down Expand Up @@ -1868,4 +1883,25 @@ public void onComplianceViolation(ComplianceViolation.Event event)
}
}
}

/**
* Invoke a callback failure, handling any {@link Throwable} thrown
* by adding the passed {@code failure} as a suppressed with
* {@link ExceptionUtil#addSuppressedIfNotAssociated(Throwable, Throwable)}.
* @param callback The callback to fail
* @param failure The failure
* @throws RuntimeException If thrown, will have the {@code failure} added as a suppressed.
*/
private static void failed(Callback callback, Throwable failure)
{
try
{
callback.failed(failure);
}
catch (Throwable t)
{
ExceptionUtil.addSuppressedIfNotAssociated(t, failure);
throw t;
}
}
}
Loading

0 comments on commit 4155e7b

Please sign in to comment.