Skip to content

Commit

Permalink
Fixes #9778 - Jetty 12 - Remove WriteFlusher.Listener.
Browse files Browse the repository at this point in the history
This listener is not necessary anymore, as the min data rate checks have been moved to a StatisticsHandler.MinimumDataRateHandler.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed May 24, 2024
1 parent b164602 commit fd32ab5
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.Retainable;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle;
Expand All @@ -48,7 +47,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HTTP2Connection extends AbstractConnection implements Parser.Listener, WriteFlusher.Listener, Connection.UpgradeTo
public class HTTP2Connection extends AbstractConnection implements Parser.Listener, Connection.UpgradeTo
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP2Connection.class);

Expand Down Expand Up @@ -302,12 +301,6 @@ public void onConnectionFailure(int error, String reason)
session.onConnectionFailure(error, reason);
}

@Override
public void onFlushed(long bytes) throws IOException
{
session.onFlushed(bytes);
}

protected class HTTP2Producer implements ExecutionStrategy.Producer
{
private final Callback fillableCallback = new FillableCallback();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.Callback;
Expand Down Expand Up @@ -1084,11 +1083,6 @@ private void onStreamDestroyed(int streamId)
streamsState.onStreamDestroyed();
}

public void onFlushed(long bytes) throws IOException
{
flusher.onFlushed(bytes);
}

private void terminate(Throwable cause)
{
flusher.terminate(cause);
Expand Down Expand Up @@ -1263,8 +1257,6 @@ public int getDataBytesRemaining()

public abstract boolean generate(ByteBufferPool.Accumulator accumulator) throws HpackException;

public abstract long onFlushed(long bytes) throws IOException;

boolean hasHighPriority()
{
return false;
Expand Down Expand Up @@ -1355,16 +1347,6 @@ public boolean generate(ByteBufferPool.Accumulator accumulator) throws HpackExce
return true;
}

@Override
public long onFlushed(long bytes)
{
long flushed = Math.min(frameBytes, bytes);
if (LOG.isDebugEnabled())
LOG.debug("Flushed {}/{} frame bytes for {}", flushed, bytes, this);
frameBytes -= flushed;
return bytes - flushed;
}

/**
* <p>Performs actions just before writing the frame to the network.</p>
* <p>Some frame, when sent over the network, causes the receiver
Expand Down Expand Up @@ -1433,7 +1415,6 @@ public void succeeded()
private class DataEntry extends Entry
{
private int frameBytes;
private int frameRemaining;
private int dataBytes;
private int dataRemaining;

Expand Down Expand Up @@ -1477,7 +1458,6 @@ public boolean generate(ByteBufferPool.Accumulator accumulator)
DataFrame dataFrame = (DataFrame)frame;
int frameBytes = generator.data(accumulator, dataFrame, length);
this.frameBytes += frameBytes;
this.frameRemaining += frameBytes;

int dataBytes = frameBytes - Frame.HEADER_LENGTH;
this.dataBytes += dataBytes;
Expand All @@ -1492,27 +1472,11 @@ public boolean generate(ByteBufferPool.Accumulator accumulator)
return true;
}

@Override
public long onFlushed(long bytes) throws IOException
{
long flushed = Math.min(frameRemaining, bytes);
if (LOG.isDebugEnabled())
LOG.debug("Flushed {}/{} frame bytes for {}", flushed, bytes, this);
frameRemaining -= flushed;
// We should only forward data (not frame) bytes,
// but we trade precision for simplicity.
Object channel = stream.getAttachment();
if (channel instanceof WriteFlusher.Listener)
((WriteFlusher.Listener)channel).onFlushed(flushed);
return bytes - flushed;
}

@Override
public void succeeded()
{
bytesWritten.addAndGet(frameBytes);
frameBytes = 0;
frameRemaining = 0;

flowControl.onDataSent(stream, dataBytes);
dataBytes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,6 @@ protected Action process() throws Throwable
return Action.SCHEDULED;
}

public void onFlushed(long bytes) throws IOException
{
// A single EndPoint write may be flushed multiple times (for example with SSL).
for (HTTP2Session.Entry entry : processedEntries)
{
bytes = entry.onFlushed(bytes);
}
}

@Override
public void succeeded()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ protected ByteBuffer[] flush(SocketAddress address, ByteBuffer[] buffers) throws
if (written > 0)
{
Connection connection = _endPoint.getConnection();
if (connection instanceof Listener)
((Listener)connection).onFlushed(written);
if (connection instanceof Listener listener)
listener.onFlushed(written);
}

if (flushed)
Expand Down Expand Up @@ -581,7 +581,10 @@ public String toString()
/**
* <p>A listener of {@link WriteFlusher} events.
* If implemented by a Connection class, the {@link #onFlushed(long)} event will be delivered to it.</p>
*
* @deprecated functionality removed, no replacement
*/
@Deprecated(since = "12.0.9", forRemoval = true)
public interface Listener
{
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.server.AbstractMetaDataConnection;
import org.eclipse.jetty.server.ConnectionFactory;
Expand Down Expand Up @@ -81,7 +80,7 @@
/**
* <p>A {@link Connection} that handles the HTTP protocol.</p>
*/
public class HttpConnection extends AbstractMetaDataConnection implements Runnable, WriteFlusher.Listener, Connection.UpgradeFrom, Connection.UpgradeTo, ConnectionMetaData
public class HttpConnection extends AbstractMetaDataConnection implements Runnable, Connection.UpgradeFrom, Connection.UpgradeTo, ConnectionMetaData
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);
private static final HttpField PREAMBLE_UPGRADE_H2C = new HttpField(HttpHeader.UPGRADE, "h2c");
Expand Down Expand Up @@ -336,13 +335,6 @@ public void onUpgradeTo(ByteBuffer buffer)
BufferUtil.append(getRequestBuffer(), buffer);
}

@Override
public void onFlushed(long bytes) throws IOException
{
// TODO is this callback still needed? Couldn't we wrap send callback instead?
// Either way, the dat rate calculations from HttpOutput.onFlushed should be moved to Channel.
}

void releaseRequestBuffer()
{
if (_retainableByteBuffer != null && !_retainableByteBuffer.hasRemaining())
Expand Down

0 comments on commit fd32ab5

Please sign in to comment.