Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to HttpSender. #12111

Merged
merged 3 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ public Response.Listener getResponseListener()
return new ContinueListener();
}

protected void onContinue(Request request)
protected Runnable onContinue(Request request)
{
return null;
}

protected class ContinueListener extends BufferingResponseListener
Expand All @@ -79,8 +80,10 @@ public void onSuccess(Response response)
{
// All good, continue.
exchange.resetResponse();
exchange.proceed(null);
onContinue(request);
Runnable proceedAction = onContinue(request);
// Pass the proceed action to be executed
// by the sender, not here by the receiver.
exchange.proceed(proceedAction, null);
}
else
{
Expand All @@ -90,7 +93,7 @@ public void onSuccess(Response response)
ResponseListeners listeners = exchange.getResponseListeners();
HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getMediaType(), getEncoding());
listeners.emitSuccess(contentResponse);
exchange.proceed(new HttpRequestException("Expectation failed", request));
exchange.proceed(null, new HttpRequestException("Expectation failed", request));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ public void send()

public abstract void release();

public void proceed(HttpExchange exchange, Throwable failure)
public void proceed(HttpExchange exchange, Runnable proceedAction, Throwable failure)
{
getHttpSender().proceed(exchange, failure);
getHttpSender().proceed(exchange, proceedAction, failure);
}

public void abort(HttpExchange exchange, Throwable requestFailure, Throwable responseFailure, Promise<Boolean> promise)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,11 @@ public void resetResponse()
}
}

public void proceed(Throwable failure)
public void proceed(Runnable proceedAction, Throwable failure)
{
HttpChannel channel = getHttpChannel();
if (channel != null)
channel.proceed(this, failure);
channel.proceed(this, proceedAction, failure);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,15 @@ protected void dispose()
{
}

public void proceed(HttpExchange exchange, Throwable failure)
public void proceed(HttpExchange exchange, Runnable proceedAction, Throwable failure)
{
// Received a 100 Continue, although Expect header was not sent.
// Received a 100 Continue, although the Expect header was not sent.
if (!contentSender.expect100)
return;

// Write the fields in this order, since the reader of
// these fields will read them in the opposite order.
contentSender.proceedAction = proceedAction;
contentSender.expect100 = false;
if (failure == null)
{
Expand Down Expand Up @@ -462,32 +465,39 @@ private enum RequestState

private class ContentSender extends IteratingCallback
{
private HttpExchange exchange;
// Fields that are set externally.
private volatile HttpExchange exchange;
private volatile Runnable proceedAction;
private volatile boolean expect100;
// Fields only used internally.
private Content.Chunk chunk;
private ByteBuffer contentBuffer;
private boolean expect100;
private boolean committed;
private boolean success;
private boolean complete;
private Promise<Boolean> abort;
private boolean demanded;

@Override
public boolean reset()
{
exchange = null;
proceedAction = null;
expect100 = false;
chunk = null;
contentBuffer = null;
expect100 = false;
committed = false;
success = false;
complete = false;
abort = null;
demanded = false;
return super.reset();
}

@Override
protected Action process() throws Throwable
{
HttpExchange exchange = this.exchange;
if (complete)
{
if (success)
Expand All @@ -498,15 +508,26 @@ protected Action process() throws Throwable
HttpRequest request = exchange.getRequest();
Content.Source content = request.getBody();

boolean expect100 = this.expect100;
if (expect100)
{
// If the request was sent already, wait for
// the 100 response before sending the content.
if (committed)
return Action.IDLE;
else
chunk = null;
// Do not send any content yet.
chunk = null;
}
else
{
// Run the proceed action first, which likely will provide
// content after having received the 100 Continue response.
Runnable action = proceedAction;
proceedAction = null;
if (action != null)
action.run();

// Read the request content.
chunk = content.read();
}
if (LOG.isDebugEnabled())
Expand All @@ -516,11 +537,14 @@ protected Action process() throws Throwable
{
if (committed)
{
content.demand(this::iterate);
return Action.IDLE;
// No content after the headers, demand.
demanded = true;
content.demand(this::succeeded);
return Action.SCHEDULED;
}
else
{
// Normalize to avoid null checks.
chunk = Content.Chunk.EMPTY;
}
}
Expand All @@ -545,49 +569,50 @@ protected Action process() throws Throwable
@Override
protected void onSuccess()
{
boolean proceed = true;
if (committed)
if (demanded)
{
if (contentBuffer.hasRemaining())
proceed = someToContent(exchange, contentBuffer);
// Content is now available, reset
// the demand and iterate again.
demanded = false;
}
else
{
committed = true;
if (headersToCommit(exchange))
boolean proceed = true;
if (committed)
{
// Was any content sent while committing?
if (contentBuffer.hasRemaining())
proceed = someToContent(exchange, contentBuffer);
}
else
{
proceed = false;
committed = true;
proceed = headersToCommit(exchange);
if (proceed)
{
// Was any content sent while committing?
if (contentBuffer.hasRemaining())
proceed = someToContent(exchange, contentBuffer);
}
}
}

boolean last = chunk.isLast();
chunk.release();
chunk = null;
boolean last = chunk.isLast();
chunk.release();
chunk = null;

if (proceed)
{
if (last)
if (proceed)
{
success = true;
complete = true;
if (last)
{
success = true;
complete = true;
}
}
else if (expect100)
else
{
if (LOG.isDebugEnabled())
LOG.debug("Expecting 100 Continue for {}", exchange.getRequest());
// There was some concurrent error, terminate.
complete = true;
}
}
else
{
// There was some concurrent error, terminate.
complete = true;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,11 @@ protected void onServerToProxyResponseFailure(Request clientToProxyRequest, org.
Response.writeError(clientToProxyRequest, proxyToClientResponse, callback, status);
}

protected void onServerToProxyResponse100Continue(Request clientToProxyRequest, org.eclipse.jetty.client.Request proxyToServerRequest)
protected Runnable onServerToProxyResponse100Continue(Request clientToProxyRequest, org.eclipse.jetty.client.Request proxyToServerRequest)
{
if (LOG.isDebugEnabled())
LOG.debug("{} P2C 100 continue response", requestId(clientToProxyRequest));
Runnable action = (Runnable)proxyToServerRequest.getAttributes().get(PROXY_TO_SERVER_CONTINUE_ATTRIBUTE);
action.run();
return (Runnable)proxyToServerRequest.getAttributes().get(PROXY_TO_SERVER_CONTINUE_ATTRIBUTE);
}

protected void onServerToProxyResponse102Processing(Request clientToProxyRequest, org.eclipse.jetty.client.Request proxyToServerRequest, HttpFields serverToProxyResponseHeaders, Response proxyToClientResponse)
Expand Down Expand Up @@ -776,13 +775,12 @@ public InvocationType getInvocationType()
private class ProxyContinueProtocolHandler extends ContinueProtocolHandler
{
@Override
protected void onContinue(org.eclipse.jetty.client.Request proxyToServerRequest)
protected Runnable onContinue(org.eclipse.jetty.client.Request proxyToServerRequest)
{
super.onContinue(proxyToServerRequest);
var clientToProxyRequest = (Request)proxyToServerRequest.getAttributes().get(CLIENT_TO_PROXY_REQUEST_ATTRIBUTE);
if (LOG.isDebugEnabled())
LOG.debug("{} S2P received 100 Continue", requestId(clientToProxyRequest));
onServerToProxyResponse100Continue(clientToProxyRequest, proxyToServerRequest);
return onServerToProxyResponse100Continue(clientToProxyRequest, proxyToServerRequest);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,10 +763,9 @@ protected void sendProxyResponseError(HttpServletRequest clientRequest, HttpServ
}
}

protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest)
protected Runnable onContinue(HttpServletRequest clientRequest, Request proxyRequest)
{
if (_log.isDebugEnabled())
_log.debug("{} handling 100 Continue", getRequestId(clientRequest));
return null;
}

/**
Expand Down Expand Up @@ -851,10 +850,10 @@ protected String rewriteTarget(HttpServletRequest request)
class ProxyContinueProtocolHandler extends ContinueProtocolHandler
{
@Override
protected void onContinue(Request request)
protected Runnable onContinue(Request request)
{
HttpServletRequest clientRequest = (HttpServletRequest)request.getAttributes().get(CLIENT_REQUEST_ATTRIBUTE);
AbstractProxyServlet.this.onContinue(clientRequest, request);
return AbstractProxyServlet.this.onContinue(clientRequest, request);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ protected ContentTransformer newServerResponseContentTransformer(HttpServletRequ
}

@Override
protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest)
protected Runnable onContinue(HttpServletRequest clientRequest, Request proxyRequest)
{
super.onContinue(clientRequest, proxyRequest);
Runnable action = (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE);
action.run();
if (_log.isDebugEnabled())
_log.debug("{} handling 100 Continue", getRequestId(clientRequest));
return (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE);
}

private void transform(ContentTransformer transformer, ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import jakarta.servlet.AsyncContext;
Expand Down Expand Up @@ -144,12 +143,11 @@ protected void onResponseContent(HttpServletRequest request, HttpServletResponse
}

@Override
protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest)
protected Runnable onContinue(HttpServletRequest clientRequest, Request proxyRequest)
{
super.onContinue(clientRequest, proxyRequest);
Runnable action = (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE);
Executor executor = getHttpClient().getExecutor();
executor.execute(action);
if (_log.isDebugEnabled())
_log.debug("{} handling 100 Continue", getRequestId(clientRequest));
return (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,10 +768,9 @@ protected void sendProxyResponseError(HttpServletRequest clientRequest, HttpServ
}
}

protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest)
protected Runnable onContinue(HttpServletRequest clientRequest, Request proxyRequest)
{
if (_log.isDebugEnabled())
_log.debug("{} handling 100 Continue", getRequestId(clientRequest));
return null;
}

/**
Expand Down Expand Up @@ -856,10 +855,10 @@ protected String rewriteTarget(HttpServletRequest request)
class ProxyContinueProtocolHandler extends ContinueProtocolHandler
{
@Override
protected void onContinue(Request request)
protected Runnable onContinue(Request request)
{
HttpServletRequest clientRequest = (HttpServletRequest)request.getAttributes().get(CLIENT_REQUEST_ATTRIBUTE);
AbstractProxyServlet.this.onContinue(clientRequest, request);
return AbstractProxyServlet.this.onContinue(clientRequest, request);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ protected ContentTransformer newServerResponseContentTransformer(HttpServletRequ
}

@Override
protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest)
protected Runnable onContinue(HttpServletRequest clientRequest, Request proxyRequest)
{
super.onContinue(clientRequest, proxyRequest);
Runnable action = (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE);
action.run();
if (_log.isDebugEnabled())
_log.debug("{} handling 100 Continue", getRequestId(clientRequest));
return (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE);
}

private void transform(ContentTransformer transformer, ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
Expand Down
Loading
Loading