Skip to content

Commit

Permalink
Improve the recycling of Processor objects to make it more robust.
Browse files Browse the repository at this point in the history
  • Loading branch information
markt-asf committed Mar 29, 2022
1 parent f7b9a00 commit 17f177e
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 20 deletions.
32 changes: 17 additions & 15 deletions java/org/apache/coyote/AbstractProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,11 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {

S socket = wrapper.getSocket();

Processor processor = (Processor) wrapper.getCurrentProcessor();
// We take complete ownership of the Processor inside of this method to ensure
// no other thread can release it while we're using it. Whatever processor is
// held by this variable will be associated with the SocketWrapper before this
// method returns.
Processor processor = (Processor) wrapper.takeCurrentProcessor();
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
processor, socket));
Expand Down Expand Up @@ -849,9 +853,6 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {

processor.setSslSupport(wrapper.getSslSupport());

// Associate the processor with the connection
wrapper.setCurrentProcessor(processor);

SocketState state = SocketState.CLOSED;
do {
state = processor.process(wrapper, status);
Expand All @@ -871,8 +872,6 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
release(processor);
// Create the upgrade processor
processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());
// Associate with the processor with the connection
wrapper.setCurrentProcessor(processor);
} else {
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString(
Expand All @@ -892,8 +891,6 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",
processor, wrapper));
}
// Associate with the processor with the connection
wrapper.setCurrentProcessor(processor);
// Initialise the upgrade handler (which may trigger
// some IO using the new protocol which is why the lines
// above are necessary)
Expand Down Expand Up @@ -931,8 +928,8 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
wrapper.setCurrentProcessor(null);
release(processor);
processor = null;
wrapper.registerReadInterest();
} else if (state == SocketState.SENDFILE) {
// Sendfile in progress. If it fails, the socket will be
Expand All @@ -957,8 +954,7 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
// Connection closed. OK to recycle the processor.
// Processors handling upgrades require additional clean-up
// before release.
wrapper.setCurrentProcessor(null);
if (processor.isUpgrade()) {
if (processor != null && processor.isUpgrade()) {
UpgradeToken upgradeToken = processor.getUpgradeToken();
HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
InstanceManager instanceManager = upgradeToken.getInstanceManager();
Expand All @@ -979,7 +975,13 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
}
}
}

release(processor);
processor = null;
}

if (processor != null) {
wrapper.setCurrentProcessor(processor);
}
return state;
} catch(java.net.SocketException e) {
Expand Down Expand Up @@ -1015,7 +1017,6 @@ public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {

// Make sure socket/processor is removed from the list of current
// connections
wrapper.setCurrentProcessor(null);
release(processor);
return SocketState.CLOSED;
}
Expand All @@ -1035,7 +1036,9 @@ protected void longPoll(SocketWrapperBase<?> socket, Processor processor) {

/**
* Expected to be used by the handler once the processor is no longer
* required.
* required. Care must be taken to ensure that this method is only
* called once per processor, after the request processing has
* completed.
*
* @param processor Processor being released (that was associated with
* the socket)
Expand Down Expand Up @@ -1073,8 +1076,7 @@ private void release(Processor processor) {
*/
@Override
public void release(SocketWrapperBase<S> socketWrapper) {
Processor processor = (Processor) socketWrapper.getCurrentProcessor();
socketWrapper.setCurrentProcessor(null);
Processor processor = (Processor) socketWrapper.takeCurrentProcessor();
release(processor);
}

Expand Down
17 changes: 12 additions & 5 deletions java/org/apache/tomcat/util/net/SocketWrapperBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.servlet.ServletConnection;

Expand Down Expand Up @@ -122,10 +123,12 @@ public abstract class SocketWrapperBase<E> {
protected volatile OperationState<?> writeOperation = null;

/**
* The org.apache.coyote.Processor instance currently associated
* with the wrapper.
* The org.apache.coyote.Processor instance currently associated with the
* wrapper. Only populated when required to maintain wrapper<->Processor
* mapping between calls to
* {@link AbstractEndpoint.Handler#process(SocketWrapperBase, SocketEvent)}.
*/
protected Object currentProcessor = null;
private final AtomicReference<Object> currentProcessor = new AtomicReference<>();

public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) {
this.socket = socket;
Expand Down Expand Up @@ -153,11 +156,15 @@ protected AbstractEndpoint<E,?> getEndpoint() {
}

public Object getCurrentProcessor() {
return currentProcessor;
return currentProcessor.get();
}

public void setCurrentProcessor(Object currentProcessor) {
this.currentProcessor = currentProcessor;
this.currentProcessor.set(currentProcessor);
}

public Object takeCurrentProcessor() {
return currentProcessor.getAndSet(null);
}

/**
Expand Down
4 changes: 4 additions & 0 deletions webapps/docs/changelog.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@
with TLS 1.3 but the JSSE TLS 1.3 implementation does not support PHA.
(markt)
</add>
<fix>
Improve the recycling of Processor objects to make it more robust.
(markt)
</fix>
</changelog>
</subsection>
<subsection name="Jasper">
Expand Down

0 comments on commit 17f177e

Please sign in to comment.