Skip to content

Commit

Permalink
Dump request/resposte header and body in a file #262
Browse files Browse the repository at this point in the history
  • Loading branch information
Paolo Venturi committed Mar 12, 2021
1 parent 8b33a96 commit 86882fc
Show file tree
Hide file tree
Showing 8 changed files with 610 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.commons.pool2.impl.DefaultPooledObjectInfo;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.carapaceproxy.server.FullHttpMessageLogger;
import org.carapaceproxy.utils.CarapaceLogger;
import org.carapaceproxy.utils.PrometheusUtils;

Expand All @@ -83,6 +84,8 @@ public class ConnectionsManagerImpl implements ConnectionsManager, AutoCloseable
final BackendHealthManager backendHealthManager;
final ScheduledExecutorService scheduler;

private final FullHttpMessageLogger fullHttpMessageLogger;

private static final Gauge PENDING_REQUESTS_GAUGE = PrometheusUtils.createGauge("backends", "pending_requests",
"pending requests").register();
private static final Counter STUCK_REQUESTS_COUNTER = PrometheusUtils.createCounter("backends",
Expand Down Expand Up @@ -226,7 +229,8 @@ public final void applyNewConfiguration(RuntimeServerConfiguration configuration

}

public ConnectionsManagerImpl(RuntimeServerConfiguration configuration, BackendHealthManager backendHealthManager) {
public ConnectionsManagerImpl(RuntimeServerConfiguration configuration,
BackendHealthManager backendHealthManager, FullHttpMessageLogger fullHttpMessageLogger) {
this.scheduler = Executors.newSingleThreadScheduledExecutor();
LOG.log(Level.INFO, "Reading carapace.connectionsmanager.returnconnectionthreadpool.size={0}", CONNECTIONS_MANAGER_RETURN_CONNECTION_THREAD_POOL_SIZE);
this.returnConnectionThreadPool = CONNECTIONS_MANAGER_RETURN_CONNECTION_THREAD_POOL_SIZE > 0
Expand All @@ -243,6 +247,7 @@ public ConnectionsManagerImpl(RuntimeServerConfiguration configuration, BackendH
eventLoopForOutboundConnections = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
connections = new GenericKeyedObjectPool<>(new ConnectionsFactory(), config);
this.backendHealthManager = backendHealthManager;
this.fullHttpMessageLogger = fullHttpMessageLogger;
applyNewConfiguration(configuration);
}

Expand Down Expand Up @@ -362,4 +367,8 @@ public void setRequestsHeaderDebugEnabled(boolean requestsHeaderDebugEnabled) {
this.requestsHeaderDebugEnabled = requestsHeaderDebugEnabled;
}

public FullHttpMessageLogger getFullHttpMessageLogger() {
return fullHttpMessageLogger;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
Expand Down Expand Up @@ -85,7 +86,7 @@ public class EndpointConnectionImpl implements EndpointConnection {

private AtomicReference<ConnectionState> state = new AtomicReference<>(ConnectionState.IDLE);
private volatile boolean forcedInvalid = false;
private volatile RequestHandler clientSidePeerHandler;
private volatile AtomicReference<RequestHandler> clientSidePeerHandler = new AtomicReference<>();

// stats
private static final Summary CONNECTION_STATS_SUMMARY = PrometheusUtils.createSummary("backends", "connection_time_ns",
Expand All @@ -104,6 +105,7 @@ public class EndpointConnectionImpl implements EndpointConnection {
private boolean forceErrorOnRequest = false;
final AtomicBoolean returningToPool = new AtomicBoolean();
private boolean requestsHeaderDebugEnabled = false;
private volatile boolean requestFinished = false;

private static enum ConnectionState {
IDLE,
Expand Down Expand Up @@ -151,6 +153,7 @@ public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("writeTimeoutHandler", new WriteTimeoutHandler(parent.getIdleTimeout() / 2));
ch.pipeline().addLast("client-codec", new HttpClientCodec());
ch.pipeline().addLast(new ReadEndpointResponseHandler());
parent.getFullHttpMessageLogger().attachHandler(ch, clientSidePeerHandler);
}
});
final ChannelFuture connectFuture = b.connect(key.getHost(), key.getPort());
Expand Down Expand Up @@ -223,7 +226,8 @@ public void sendRequest(HttpRequest request, RequestHandler clientSidePeerHandle
}

// these have to be set before calling clientSidePeerHandler.errorSendingRequest which will perform a release
this.clientSidePeerHandler = clientSidePeerHandler;
this.clientSidePeerHandler.set(clientSidePeerHandler);
requestFinished = false;
endpointstats.getActiveConnections().incrementAndGet();
activeConnectionsStats.inc();
endpointstats.getTotalRequests().incrementAndGet();
Expand Down Expand Up @@ -434,8 +438,10 @@ private void connectionDeactivated() {
if (active.compareAndSet(true, false)) {
endpointstats.getActiveConnections().decrementAndGet();
activeConnectionsStats.dec();
parent.unregisterPendingRequest(clientSidePeerHandler);
clientSidePeerHandler = null;
if (!requestFinished) {
parent.unregisterPendingRequest(clientSidePeerHandler.get());
requestFinished = true;
}
} else {
LOG.log(Level.SEVERE, "connectionDeactivated on a non active connection! {0}", this);
}
Expand All @@ -446,8 +452,8 @@ private class ReadEndpointResponseHandler extends SimpleChannelInboundHandler<Ht

@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
RequestHandler _clientSidePeerHandler = clientSidePeerHandler;
if (_clientSidePeerHandler == null) {
RequestHandler _clientSidePeerHandler = clientSidePeerHandler.get();
if (_clientSidePeerHandler == null || requestFinished) {
LOG.log(Level.INFO, "swallow content {0}: {1}, disconnected client. connection: {2}", new Object[]{msg.getClass(), msg, EndpointConnectionImpl.this});
return;
}
Expand All @@ -464,12 +470,13 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
LOG.log(Level.SEVERE, "unknown message type {0}: {1}. connection: {2}", new Object[]{msg.getClass(), msg, EndpointConnectionImpl.this});
}

ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
RequestHandler _clientSidePeerHandler = clientSidePeerHandler;
if (_clientSidePeerHandler != null) {
RequestHandler _clientSidePeerHandler = clientSidePeerHandler.get();
if (_clientSidePeerHandler != null && !requestFinished) {
logConnectionInfo("channelReadComplete, open: " + ctx.channel().isOpen());
_clientSidePeerHandler.readCompletedFromRemote();
// server said no more data will be sent to this channel, we must close it before netty does
Expand All @@ -481,9 +488,9 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.log(Level.SEVERE, "I/O error on endpoint " + key, cause);
parent.backendHealthManager.reportBackendUnreachable(key.getHostPort(), System.currentTimeMillis(), "I/O error: " + cause);
RequestHandler _clientSidePeerHandler = clientSidePeerHandler;
RequestHandler _clientSidePeerHandler = clientSidePeerHandler.get();

if (_clientSidePeerHandler != null) {
if (_clientSidePeerHandler != null && !requestFinished) {
_clientSidePeerHandler.badErrorOnRemote(cause);
}
invalidate();
Expand All @@ -499,7 +506,7 @@ public String toString() {
}

private void checkHandler(RequestHandler handler) throws IllegalStateException {
if (this.clientSidePeerHandler != handler) {
if (this.clientSidePeerHandler.get() != handler || requestFinished) {
throw new IllegalStateException("connection is bound to " + this.clientSidePeerHandler + " cannot be managed by " + handler);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.carapaceproxy.EndpointMapper;
Expand Down Expand Up @@ -70,7 +71,8 @@ public class ClientConnectionHandler extends SimpleChannelInboundHandler<Object>
final long connectionStartsTs;
volatile boolean keepAlive = true;
volatile boolean refuseOtherRequests;
private final List<RequestHandler> pendingRequests = new CopyOnWriteArrayList<>();
private final AtomicReference<RequestHandler> pendingRequest = new AtomicReference<>();
private volatile boolean requestFinished = false;
final Runnable onClientDisconnected;
private final String listenerHost;
private final int listenerPort;
Expand Down Expand Up @@ -172,30 +174,31 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
totalRequests.inc();
RequestHandler currentRequest = new RequestHandler(requestIdGenerator.incrementAndGet(),
request, filters, this, ctx, () -> RUNNING_REQUESTS_GAUGE.dec(), backendHealthManager, requestsLogger);
addPendingRequest(currentRequest);
pendingRequest.set(currentRequest);
requestFinished = false;
currentRequest.start();
} else if (msg instanceof LastHttpContent) {
LastHttpContent trailer = (LastHttpContent) msg;
try {
RequestHandler currentRequest = pendingRequests.get(0);
currentRequest.clientRequestFinished(trailer);
} catch (java.lang.ArrayIndexOutOfBoundsException noMorePendingRequests) {
if (requestFinished) {
LOG.log(Level.INFO, "{0} swallow {1}, no more pending requests", new Object[]{this, msg});
refuseOtherRequests = true;
ctx.close();
} else {
LastHttpContent trailer = (LastHttpContent) msg;
pendingRequest.get().clientRequestFinished(trailer);
}
} else if (msg instanceof HttpContent) {
// for example chunks from client
HttpContent httpContent = (HttpContent) msg;
try {
RequestHandler currentRequest = pendingRequests.get(0);
currentRequest.continueClientRequest(httpContent);
} catch (java.lang.ArrayIndexOutOfBoundsException noMorePendingRequests) {
if (requestFinished) {
LOG.log(Level.INFO, "{0} swallow {1}, no more pending requests", new Object[]{this, msg});
refuseOtherRequests = true;
ctx.close();
} else {
// for example chunks from client
HttpContent httpContent = (HttpContent) msg;
pendingRequest.get().continueClientRequest(httpContent);
}
}

ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
}

private void detectSSLProperties(ChannelHandlerContext ctx) {
Expand Down Expand Up @@ -247,21 +250,21 @@ public int getTotalRequestsCount() {
}

public void errorSendingRequest(RequestHandler request, EndpointConnectionImpl endpointConnection, ChannelHandlerContext peerChannel, Throwable error) {
pendingRequests.remove(request);
requestFinished = true;
mapper.endpointFailed(endpointConnection.getKey(), error);
LOG.log(Level.INFO, error, () -> this + " errorSendingRequest " + endpointConnection);
}

public void lastHttpContentSent(RequestHandler requestHandler) {
pendingRequests.remove(requestHandler);
requestFinished = true;
}

@Override
public String toString() {
return "ClientConnectionHandler{chid=" + id + ",ka=" + keepAlive + '}';
}

void addPendingRequest(RequestHandler request) {
pendingRequests.add(request);
public AtomicReference<RequestHandler> getPendingRequest() {
return pendingRequest;
}
}
Loading

0 comments on commit 86882fc

Please sign in to comment.