Skip to content

Commit

Permalink
Improve the recycling of Processor objects to make it more robust.
Browse files Browse the repository at this point in the history
  • Loading branch information
markt-asf committed Mar 29, 2022
1 parent 688dba2 commit 9651b83
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 20 deletions.
32 changes: 17 additions & 15 deletions java/org/apache/coyote/AbstractProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,11 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {

S socket = wrapper.getSocket();

Processor processor = (Processor) wrapper.getCurrentProcessor();
// We take complete ownership of the Processor inside of this method to ensure
// no other thread can release it while we're using it. Whatever processor is
// held by this variable will be associated with the SocketWrapper before this
// method returns.
Processor processor = (Processor) wrapper.takeCurrentProcessor();
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
processor, socket));
Expand Down Expand Up @@ -858,9 +862,6 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
processor.setSslSupport(
wrapper.getSslSupport(getProtocol().getClientCertProvider()));

// Associate the processor with the connection
wrapper.setCurrentProcessor(processor);

SocketState state = SocketState.CLOSED;
do {
state = processor.process(wrapper, status);
Expand All @@ -880,8 +881,6 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
release(processor);
// Create the upgrade processor
processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());
// Associate with the processor with the connection
wrapper.setCurrentProcessor(processor);
} else {
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString(
Expand All @@ -901,8 +900,6 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",
processor, wrapper));
}
// Associate with the processor with the connection
wrapper.setCurrentProcessor(processor);
// Initialise the upgrade handler (which may trigger
// some IO using the new protocol which is why the lines
// above are necessary)
Expand Down Expand Up @@ -940,8 +937,8 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
wrapper.setCurrentProcessor(null);
release(processor);
processor = null;
wrapper.registerReadInterest();
} else if (state == SocketState.SENDFILE) {
// Sendfile in progress. If it fails, the socket will be
Expand All @@ -966,8 +963,7 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
// Connection closed. OK to recycle the processor.
// Processors handling upgrades require additional clean-up
// before release.
wrapper.setCurrentProcessor(null);
if (processor.isUpgrade()) {
if (processor != null && processor.isUpgrade()) {
UpgradeToken upgradeToken = processor.getUpgradeToken();
HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
InstanceManager instanceManager = upgradeToken.getInstanceManager();
Expand All @@ -988,7 +984,13 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
}
}
}

release(processor);
processor = null;
}

if (processor != null) {
wrapper.setCurrentProcessor(processor);
}
return state;
} catch(java.net.SocketException e) {
Expand Down Expand Up @@ -1024,7 +1026,6 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {

// Make sure socket/processor is removed from the list of current
// connections
wrapper.setCurrentProcessor(null);
release(processor);
return SocketState.CLOSED;
}
Expand All @@ -1044,7 +1045,9 @@ protected void longPoll(SocketWrapperBase<?> socket, Processor processor) {

/**
* Expected to be used by the handler once the processor is no longer
* required.
* required. Care must be taken to ensure that this method is only
* called once per processor, after the request processing has
* completed.
*
* @param processor Processor being released (that was associated with
* the socket)
Expand Down Expand Up @@ -1082,8 +1085,7 @@ private void release(Processor processor) {
*/
@Override
public void release(SocketWrapperBase<S> socketWrapper) {
Processor processor = (Processor) socketWrapper.getCurrentProcessor();
socketWrapper.setCurrentProcessor(null);
Processor processor = (Processor) socketWrapper.takeCurrentProcessor();
release(processor);
}

Expand Down
17 changes: 12 additions & 5 deletions java/org/apache/tomcat/util/net/SocketWrapperBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
Expand Down Expand Up @@ -104,10 +105,12 @@ public abstract class SocketWrapperBase<E> {
protected volatile OperationState<?> writeOperation = null;

/**
* The org.apache.coyote.Processor instance currently associated
* with the wrapper.
* The org.apache.coyote.Processor instance currently associated with the
* wrapper. Only populated when required to maintain wrapper<->Processor
* mapping between calls to
* {@link AbstractEndpoint.Handler#process(SocketWrapperBase, SocketEvent)}.
*/
protected Object currentProcessor = null;
private final AtomicReference<Object> currentProcessor = new AtomicReference<>();

public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) {
this.socket = socket;
Expand All @@ -134,11 +137,15 @@ protected AbstractEndpoint<E,?> getEndpoint() {
}

public Object getCurrentProcessor() {
return currentProcessor;
return currentProcessor.get();
}

public void setCurrentProcessor(Object currentProcessor) {
this.currentProcessor = currentProcessor;
this.currentProcessor.set(currentProcessor);
}

public Object takeCurrentProcessor() {
return currentProcessor.getAndSet(null);
}

/**
Expand Down
4 changes: 4 additions & 0 deletions webapps/docs/changelog.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@
with TLS 1.3 but the JSSE TLS 1.3 implementation does not support PHA.
(markt)
</add>
<fix>
Improve the recycling of Processor objects to make it more robust.
(markt)
</fix>
</changelog>
</subsection>
<subsection name="Jasper">
Expand Down

0 comments on commit 9651b83

Please sign in to comment.