Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve cleanup of deflater/inflater pools for PerMessageDeflateExtension #8134

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import java.io.Closeable;

import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;

public abstract class CompressionPool<T> extends AbstractLifeCycle
@ManagedObject
public abstract class CompressionPool<T> extends ContainerLifeCycle
{
public static final int DEFAULT_CAPACITY = 1024;

Expand Down Expand Up @@ -51,6 +53,11 @@ public void setCapacity(int capacity)
_capacity = capacity;
}

public Pool<Entry> getPool()
{
return _pool;
}

protected abstract T newPooled();

protected abstract void end(T object);
Expand Down Expand Up @@ -85,7 +92,10 @@ public void release(Entry entry)
protected void doStart() throws Exception
{
if (_capacity > 0)
{
_pool = new Pool<>(Pool.StrategyType.RANDOM, _capacity, true);
addBean(_pool);
}
super.doStart();
}

Expand All @@ -95,6 +105,7 @@ public void doStop() throws Exception
if (_pool != null)
{
_pool.close();
removeBean(_pool);
_pool = null;
}
super.doStop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,25 @@

package org.eclipse.jetty.websocket.core;

import java.io.Closeable;

/**
* Interface for WebSocket Extensions.
* <p>
* That {@link Frame}s are passed through the Extension via the {@link IncomingFrames} and {@link OutgoingFrames} interfaces
*/
public interface Extension extends IncomingFrames, OutgoingFrames
public interface Extension extends IncomingFrames, OutgoingFrames, Closeable
{

void init(ExtensionConfig config, WebSocketComponents components);

/**
* Used to clean up any resources after connection close.
*/
default void close()
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
{
}

/**
* The active configuration for this extension.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ public ExtensionStack(WebSocketComponents components, Behavior behavior)
this.behavior = behavior;
}

public void close()
{
for (Extension ext : extensions)
{
try
{
ext.close();
}
catch (Throwable t)
{
if (LOG.isDebugEnabled())
LOG.debug("Extension Error During Close", t);
}
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
}
}

@ManagedAttribute(name = "Extension List", readonly = true)
public List<Extension> getExtensions()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public class FrameFlusher extends IteratingCallback
{
public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY);
private static final Logger LOG = LoggerFactory.getLogger(FrameFlusher.class);
private static final Throwable CLOSED_CHANNEL = new ClosedChannelException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lachlan-roberts as discussed on chat.... I think we can keep this static sentinael and just use the constructor that avoids suppressed exceptions and a stacktrace.


private final AutoLock lock = new AutoLock();
private final LongAdder messagesOut = new LongAdder();
Expand Down Expand Up @@ -185,7 +184,15 @@ public void onClose(Throwable cause)
{
try (AutoLock l = lock.lock())
{
closedCause = cause == null ? CLOSED_CHANNEL : cause;
// TODO: find a way to not create exception if cause is null.
closedCause = cause == null ? new ClosedChannelException()
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
{
@Override
public Throwable fillInStackTrace()
{
return this;
}
} : cause;
}
iterate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.jetty.websocket.core.internal;

import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -146,6 +147,24 @@ public void init(final ExtensionConfig config, WebSocketComponents components)
super.init(configNegotiated, components);
}

@Override
public void close()
{
// TODO: use IteratingCallback.close() instead of creating exception with failFlusher methods.
ClosedChannelException exception = new ClosedChannelException()
{
@Override
public Throwable fillInStackTrace()
{
return this;
}
};
incomingFlusher.failFlusher(exception);
outgoingFlusher.failFlusher(exception);
releaseInflater();
releaseDeflater();
}

private static String toDetail(Inflater inflater)
{
return String.format("Inflater[finished=%b,read=%d,written=%d,remaining=%d,in=%d,out=%d]", inflater.finished(), inflater.getBytesRead(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,34 @@ public final void sendFrame(Frame frame, Callback callback, boolean batch)
notifyCallbackFailure(callback, failure);
}

/**
* Used to fail this flusher possibly from an external event such as a callback.
* @param t the failure.
*/
public void failFlusher(Throwable t)
{
// TODO: find a way to close the flusher in non error case without exception.
boolean failed = false;
try (AutoLock l = lock.lock())
{
if (failure == null)
{
failure = t;
failed = true;
}
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
else
{
failure.addSuppressed(t);
}
}

if (failed)
{
flusher.failed(t);
flusher.iterate();
}
}

private void onFailure(Throwable t)
{
try (AutoLock l = lock.lock())
Expand All @@ -103,8 +131,14 @@ private class Flusher extends IteratingCallback implements Callback
private FrameEntry current;

@Override
protected Action process()
protected Action process() throws Throwable
{
try (AutoLock l = lock.lock())
{
if (failure != null)
throw failure;
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
}

if (finished)
{
if (current != null)
Expand Down Expand Up @@ -134,8 +168,11 @@ protected void onCompleteFailure(Throwable t)
if (log.isDebugEnabled())
log.debug("onCompleteFailure {}", t.toString());

notifyCallbackFailure(current.callback, t);
current = null;
if (current != null)
{
notifyCallbackFailure(current.callback, t);
current = null;
}
onFailure(t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,13 @@ public void onEof()
closeConnection(sessionState.getCloseStatus(), Callback.NOOP);
}

public void closeConnection(CloseStatus closeStatus, Callback callback)
private void closeConnection(CloseStatus closeStatus, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("closeConnection() {} {}", closeStatus, this);

abort();
extensionStack.close();

// Forward Errors to Local WebSocket EndPoint
if (closeStatus.isAbnormal() && closeStatus.getCause() != null)
Expand Down
Loading