diff --git a/carapace-server/src/main/java/org/carapaceproxy/client/impl/ConnectionsManagerImpl.java b/carapace-server/src/main/java/org/carapaceproxy/client/impl/ConnectionsManagerImpl.java index f68828a27..1448fbd55 100644 --- a/carapace-server/src/main/java/org/carapaceproxy/client/impl/ConnectionsManagerImpl.java +++ b/carapace-server/src/main/java/org/carapaceproxy/client/impl/ConnectionsManagerImpl.java @@ -57,6 +57,7 @@ import org.apache.commons.pool2.impl.DefaultPooledObjectInfo; import org.apache.commons.pool2.impl.GenericKeyedObjectPool; import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; +import org.carapaceproxy.server.FullHttpMessageLogger; import org.carapaceproxy.utils.CarapaceLogger; import org.carapaceproxy.utils.PrometheusUtils; @@ -83,6 +84,8 @@ public class ConnectionsManagerImpl implements ConnectionsManager, AutoCloseable final BackendHealthManager backendHealthManager; final ScheduledExecutorService scheduler; + private final FullHttpMessageLogger fullHttpMessageLogger; + private static final Gauge PENDING_REQUESTS_GAUGE = PrometheusUtils.createGauge("backends", "pending_requests", "pending requests").register(); private static final Counter STUCK_REQUESTS_COUNTER = PrometheusUtils.createCounter("backends", @@ -226,7 +229,8 @@ public final void applyNewConfiguration(RuntimeServerConfiguration configuration } - public ConnectionsManagerImpl(RuntimeServerConfiguration configuration, BackendHealthManager backendHealthManager) { + public ConnectionsManagerImpl(RuntimeServerConfiguration configuration, + BackendHealthManager backendHealthManager, FullHttpMessageLogger fullHttpMessageLogger) { this.scheduler = Executors.newSingleThreadScheduledExecutor(); LOG.log(Level.INFO, "Reading carapace.connectionsmanager.returnconnectionthreadpool.size={0}", CONNECTIONS_MANAGER_RETURN_CONNECTION_THREAD_POOL_SIZE); this.returnConnectionThreadPool = CONNECTIONS_MANAGER_RETURN_CONNECTION_THREAD_POOL_SIZE > 0 @@ -243,6 +247,7 @@ public ConnectionsManagerImpl(RuntimeServerConfiguration configuration, BackendH eventLoopForOutboundConnections = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup(); connections = new GenericKeyedObjectPool<>(new ConnectionsFactory(), config); this.backendHealthManager = backendHealthManager; + this.fullHttpMessageLogger = fullHttpMessageLogger; applyNewConfiguration(configuration); } @@ -362,4 +367,8 @@ public void setRequestsHeaderDebugEnabled(boolean requestsHeaderDebugEnabled) { this.requestsHeaderDebugEnabled = requestsHeaderDebugEnabled; } + public FullHttpMessageLogger getFullHttpMessageLogger() { + return fullHttpMessageLogger; + } + } diff --git a/carapace-server/src/main/java/org/carapaceproxy/client/impl/EndpointConnectionImpl.java b/carapace-server/src/main/java/org/carapaceproxy/client/impl/EndpointConnectionImpl.java index d7bfb838a..3f4cd3d9f 100644 --- a/carapace-server/src/main/java/org/carapaceproxy/client/impl/EndpointConnectionImpl.java +++ b/carapace-server/src/main/java/org/carapaceproxy/client/impl/EndpointConnectionImpl.java @@ -85,7 +85,7 @@ public class EndpointConnectionImpl implements EndpointConnection { private AtomicReference state = new AtomicReference<>(ConnectionState.IDLE); private volatile boolean forcedInvalid = false; - private volatile RequestHandler clientSidePeerHandler; + private volatile AtomicReference clientSidePeerHandler = new AtomicReference<>(); // stats private static final Summary CONNECTION_STATS_SUMMARY = PrometheusUtils.createSummary("backends", "connection_time_ns", @@ -151,6 +151,7 @@ public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("writeTimeoutHandler", new WriteTimeoutHandler(parent.getIdleTimeout() / 2)); ch.pipeline().addLast("client-codec", new HttpClientCodec()); ch.pipeline().addLast(new ReadEndpointResponseHandler()); + //parent.getFullHttpMessageLogger().attachHandler(ch, clientSidePeerHandler); } }); final ChannelFuture connectFuture = b.connect(key.getHost(), key.getPort()); @@ -223,7 +224,7 @@ public void sendRequest(HttpRequest request, RequestHandler clientSidePeerHandle } // these have to be set before calling clientSidePeerHandler.errorSendingRequest which will perform a release - this.clientSidePeerHandler = clientSidePeerHandler; + this.clientSidePeerHandler.set(clientSidePeerHandler); endpointstats.getActiveConnections().incrementAndGet(); activeConnectionsStats.inc(); endpointstats.getTotalRequests().incrementAndGet(); @@ -434,8 +435,8 @@ private void connectionDeactivated() { if (active.compareAndSet(true, false)) { endpointstats.getActiveConnections().decrementAndGet(); activeConnectionsStats.dec(); - parent.unregisterPendingRequest(clientSidePeerHandler); - clientSidePeerHandler = null; + parent.unregisterPendingRequest(clientSidePeerHandler.get()); + clientSidePeerHandler.set(null); } else { LOG.log(Level.SEVERE, "connectionDeactivated on a non active connection! {0}", this); } @@ -446,7 +447,7 @@ private class ReadEndpointResponseHandler extends SimpleChannelInboundHandler final long connectionStartsTs; volatile boolean keepAlive = true; volatile boolean refuseOtherRequests; - private final List pendingRequests = new CopyOnWriteArrayList<>(); + private final AtomicReference pendingRequest = new AtomicReference<>(); + private volatile boolean requestFinished = false; final Runnable onClientDisconnected; private final String listenerHost; private final int listenerPort; @@ -172,30 +173,31 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) { totalRequests.inc(); RequestHandler currentRequest = new RequestHandler(requestIdGenerator.incrementAndGet(), request, filters, this, ctx, () -> RUNNING_REQUESTS_GAUGE.dec(), backendHealthManager, requestsLogger); - addPendingRequest(currentRequest); + pendingRequest.set(currentRequest); + requestFinished = false; currentRequest.start(); } else if (msg instanceof LastHttpContent) { - LastHttpContent trailer = (LastHttpContent) msg; - try { - RequestHandler currentRequest = pendingRequests.get(0); - currentRequest.clientRequestFinished(trailer); - } catch (java.lang.ArrayIndexOutOfBoundsException noMorePendingRequests) { + if (requestFinished) { LOG.log(Level.INFO, "{0} swallow {1}, no more pending requests", new Object[]{this, msg}); refuseOtherRequests = true; ctx.close(); + } else { + LastHttpContent trailer = (LastHttpContent) msg; + pendingRequest.get().clientRequestFinished(trailer); } } else if (msg instanceof HttpContent) { - // for example chunks from client - HttpContent httpContent = (HttpContent) msg; - try { - RequestHandler currentRequest = pendingRequests.get(0); - currentRequest.continueClientRequest(httpContent); - } catch (java.lang.ArrayIndexOutOfBoundsException noMorePendingRequests) { + if (requestFinished) { LOG.log(Level.INFO, "{0} swallow {1}, no more pending requests", new Object[]{this, msg}); refuseOtherRequests = true; ctx.close(); + } else { + // for example chunks from client + HttpContent httpContent = (HttpContent) msg; + pendingRequest.get().continueClientRequest(httpContent); } } + + ctx.fireChannelRead(msg); } private void detectSSLProperties(ChannelHandlerContext ctx) { @@ -247,13 +249,13 @@ public int getTotalRequestsCount() { } public void errorSendingRequest(RequestHandler request, EndpointConnectionImpl endpointConnection, ChannelHandlerContext peerChannel, Throwable error) { - pendingRequests.remove(request); + requestFinished = true; mapper.endpointFailed(endpointConnection.getKey(), error); LOG.log(Level.INFO, error, () -> this + " errorSendingRequest " + endpointConnection); } public void lastHttpContentSent(RequestHandler requestHandler) { - pendingRequests.remove(requestHandler); + requestFinished = true; } @Override @@ -261,7 +263,7 @@ public String toString() { return "ClientConnectionHandler{chid=" + id + ",ka=" + keepAlive + '}'; } - void addPendingRequest(RequestHandler request) { - pendingRequests.add(request); + public AtomicReference getPendingRequest() { + return pendingRequest; } } diff --git a/carapace-server/src/main/java/org/carapaceproxy/server/FullHttpMessageLogger.java b/carapace-server/src/main/java/org/carapaceproxy/server/FullHttpMessageLogger.java new file mode 100644 index 000000000..128d550a6 --- /dev/null +++ b/carapace-server/src/main/java/org/carapaceproxy/server/FullHttpMessageLogger.java @@ -0,0 +1,378 @@ +/* + * Licensed to Diennea S.r.l. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Diennea S.r.l. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.carapaceproxy.server; + +import static org.stringtemplate.v4.STGroup.verbose; +import com.google.common.annotations.VisibleForTesting; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.DecoderResult; +import io.netty.handler.codec.http.FullHttpMessage; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpObjectAggregator; +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * + * Logger for both http requests and reponses. + * + * @author paolo.venturi + */ +public class FullHttpMessageLogger implements Runnable, Closeable { + + private static final Logger LOG = Logger.getLogger(FullHttpMessageLogger.class.getName()); + private static final String FULL_ACCESS_LOG_PATH_SUFFIX = "full"; + + private final BlockingQueue queue; + + private volatile RuntimeServerConfiguration currentConfiguration; + private volatile RuntimeServerConfiguration newConfiguration = null; + private volatile boolean closeRequested = false; + private volatile boolean closed = false; + + private boolean started = false; + private final Thread thread; + + private OutputStream os = null; + private OutputStreamWriter osw = null; + private BufferedWriter bw = null; + private PrintWriter pw = null; + + public long lastFlush = 0; + + public FullHttpMessageLogger(RuntimeServerConfiguration currentConfiguration) { + this.currentConfiguration = currentConfiguration; + this.queue = new ArrayBlockingQueue<>(this.currentConfiguration.getAccessLogMaxQueueCapacity()); + this.thread = new Thread(this); + } + + private void ensureAccessLogFileOpened() throws IOException { + if (os != null) { + return; + } + + LOG.log(Level.INFO, "Opening file: {0}", getFullAccessLogPath()); + os = new FileOutputStream(getFullAccessLogPath(), true); + osw = new OutputStreamWriter(os, StandardCharsets.UTF_8); + bw = new BufferedWriter(osw); + pw = new PrintWriter(bw); + } + + private String getFullAccessLogPath() { + return currentConfiguration.getAccessLogPath() + "." + FULL_ACCESS_LOG_PATH_SUFFIX; + } + + private void closeAccessLogFile() throws IOException { + LOG.log(Level.INFO, "Closing file"); + if (pw != null) { + pw = null; + } + if (bw != null) { + bw.close(); + bw = null; + } + if (osw != null) { + osw.close(); + osw = null; + } + if (os != null) { + os.close(); + os = null; + } + } + + public void reloadConfiguration(RuntimeServerConfiguration newConfiguration) { + this.newConfiguration = newConfiguration; + } + + private void _reloadConfiguration() throws IOException { + if (newConfiguration == null) { + return; + } + + LOG.log(Level.INFO, "Reloading conf"); + String oldAccessLogPath = this.currentConfiguration.getAccessLogPath(); + if (newConfiguration.getAccessLogMaxQueueCapacity() != currentConfiguration.getAccessLogMaxQueueCapacity()) { + LOG.log(Level.SEVERE, "accesslog.queue.maxcapacity hot reload is not currently supported"); + } + this.currentConfiguration = newConfiguration; + if (!oldAccessLogPath.equals(newConfiguration.getAccessLogPath())) { + closeAccessLogFile(); + // File opening will be retried at next cycle start + } + newConfiguration = null; + } + + @Override + public void close() { + closeRequested = true; + } + + public void start() { + if (started) { + throw new IllegalStateException("Already started"); + } + thread.start(); + started = true; + } + + public void stop() { + close(); + try { + thread.join(60_000); + } catch (InterruptedException ex) { + LOG.log(Level.SEVERE, "Interrupted while stopping"); + } + } + + @VisibleForTesting + void flushAccessLogFile() throws IOException { + if (verbose) { + LOG.log(Level.INFO, "Flushed"); + } + if (bw != null) { + bw.flush(); + } + if (osw != null) { + osw.flush(); + } + if (os != null) { + os.flush(); + } + lastFlush = System.currentTimeMillis(); + } + + @Override + public void run() { + if (lastFlush == 0) { + lastFlush = System.currentTimeMillis(); + } + + FullHttpMessageLogEntry currentEntry = null; + while (!closed) { + try { + _reloadConfiguration(); + + try { + ensureAccessLogFileOpened(); + } catch (IOException ex) { + LOG.log(Level.SEVERE, "Exception while trying to open access log file"); + LOG.log(Level.SEVERE, null, ex); + Thread.sleep(currentConfiguration.getAccessLogWaitBetweenFailures()); + lastFlush = System.currentTimeMillis(); + continue; + } + + long waitTime = !closeRequested + ? currentConfiguration.getAccessLogFlushInterval() - (System.currentTimeMillis() - lastFlush) + : 0L; + waitTime = Math.max(waitTime, 0L); + + if (currentEntry == null) { + currentEntry = queue.poll(waitTime, TimeUnit.MILLISECONDS); + } + + if (currentEntry != null) { + if (verbose) { + LOG.log(Level.INFO, "writing entry: {0}", currentEntry); + } + currentEntry.write(); + currentEntry = null; + } else { + if (closeRequested) { + closeAccessLogFile(); + closed = true; + } + } + + if (System.currentTimeMillis() - lastFlush >= currentConfiguration.getAccessLogFlushInterval()) { + flushAccessLogFile(); + } + } catch (InterruptedException ex) { + LOG.log(Level.SEVERE, "Interrupt received"); + try { + closeAccessLogFile(); + } catch (IOException ex1) { + LOG.log(Level.SEVERE, null, ex1); + } + closed = true; + + } catch (IOException ex) { + LOG.log(Level.SEVERE, "Exception while writing on access log file"); + LOG.log(Level.SEVERE, null, ex); + try { + closeAccessLogFile(); + } catch (IOException ex1) { + LOG.log(Level.SEVERE, "Exception while trying to close access log file"); + LOG.log(Level.SEVERE, null, ex1); + } + // File opening will be retried at next cycle start + + try { + Thread.sleep(currentConfiguration.getAccessLogFlushInterval()); + lastFlush = System.currentTimeMillis(); + } catch (InterruptedException ex1) { + closed = true; + } + } + } + } + + public void attachHandler(SocketChannel channel, AtomicReference reqHandler) { + channel.pipeline().addLast(new HttpObjectAggregator(Integer.MAX_VALUE)); + channel.pipeline().addLast(new FullHttpMessageLoggerHandler(reqHandler)); + } + + public class FullHttpMessageLoggerHandler extends SimpleChannelInboundHandler { + + private final AtomicReference reqHandler; + + public FullHttpMessageLoggerHandler(AtomicReference reqHandler) { + this.reqHandler = reqHandler; + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, FullHttpMessage msg) throws Exception { + // Check for invalid http data + if (msg.decoderResult() != DecoderResult.SUCCESS) { + LOG.log(Level.SEVERE, "Message {0} cannot be logged due to failed decoding.", msg); + return; + } + + FullHttpMessageLogEntry entry = new FullHttpMessageLogEntry(reqHandler.get().getId(), + msg, currentConfiguration.getAccessLogTimestampFormat() + ); + + logMessageEntry(entry); + } + } + + public void logMessageEntry(FullHttpMessageLogEntry entry) { + if (closeRequested) { + LOG.log(Level.SEVERE, "Request {0} not logged to access log because RequestsLogger is closed", entry); + return; + } + + // If configuration reloads already created entries will keep a possibile old format, but it doesn't really matter + boolean ret = queue.offer(entry); + + if (!ret) { + LOG.log(Level.SEVERE, "Request {0} not logged to access log because queue is full", entry); + } + } + + private class FullHttpMessageLogEntry { + + private final long requestId; + private final String time; + private boolean request; + private String method; + private String uri; + private final String protocolVersion; + private final String headers; + private final String trailingHeaders; + private final String data; + + FullHttpMessageLogEntry(long requestId, FullHttpMessage msg, String timestampFormat) { + + this.requestId = requestId; + this.time = new SimpleDateFormat(timestampFormat).format(new Date()); + if (msg instanceof FullHttpRequest) { + this.request = true; + FullHttpRequest request = (FullHttpRequest) msg; + method = request.method().toString(); + uri = request.uri(); + } + + protocolVersion = msg.protocolVersion().toString(); + headers = formatHeaders(msg.headers()); + trailingHeaders = formatHeaders(msg.trailingHeaders()); + ByteBuf data = msg.content(); + this.data = data.toString(StandardCharsets.UTF_8); + } + + void write() throws IOException { + String opening = ">>>>>>>>>>>>>>>>>>>>[ SERVER RESPONSE (requestId: " + requestId + ") ]>>>>>>>>>>>>>>>>>>>>"; + String closing = ">>>>>>>>>>>>>>>>>>>>[ SERVER RESPONSE END (requestId: " + requestId + ") ]>>>>>>>>>>>>>>>>>>>>"; + if (request) { + opening = "<<<<<<<<<<<<<<<<<<<<[ CLIENT REQUEST (requestId: " + requestId + ") ]<<<<<<<<<<<<<<<<<<<<"; + closing = "<<<<<<<<<<<<<<<<<<<<[ CLIENT REQUEST END (requestId: " + requestId + ") ]<<<<<<<<<<<<<<<<<<<<"; + } + pw.println(opening); + pw.println("Time: " + time); + pw.println("HTTP Version: " + protocolVersion); + if (request) { + pw.println("HTTP Method: " + method); + pw.println("URI: " + uri); + } + pw.println("--------------------[ HEADERS ]--------------------"); + pw.println(headers); + if (!trailingHeaders.isEmpty()) { + pw.println(trailingHeaders); + } + if (data != null && !data.isEmpty()) { + pw.println("--------------------[ DATA ]--------------------"); + pw.println(data); + } + pw.println(closing); + pw.println("\n"); + pw.flush(); + } + + private String formatHeaders(HttpHeaders headers) { + if (headers.isEmpty()) { + return ""; + } + + StringBuilder sBuilder = new StringBuilder(); + headers.forEach(h -> { + sBuilder.append(h.getKey()); + sBuilder.append(": "); + sBuilder.append(h.getValue()); + sBuilder.append("\n"); + }); + String res = sBuilder.toString(); + return res.substring(0, res.length() - 1); + } + + @Override + public String toString() { + return "FullHttpMessageLogEntry{" + "requestId=" + requestId + ", time=" + time + ", request=" + request + ", method=" + method + ", uri=" + uri + '}'; + } + } +} diff --git a/carapace-server/src/main/java/org/carapaceproxy/server/HttpProxyServer.java b/carapace-server/src/main/java/org/carapaceproxy/server/HttpProxyServer.java index d77386c21..07415c217 100644 --- a/carapace-server/src/main/java/org/carapaceproxy/server/HttpProxyServer.java +++ b/carapace-server/src/main/java/org/carapaceproxy/server/HttpProxyServer.java @@ -99,6 +99,7 @@ public class HttpProxyServer implements AutoCloseable { private final PrometheusMetricsProvider statsProvider; private final PropertiesConfiguration statsProviderConfig = new PropertiesConfiguration(); private final RequestsLogger requestsLogger; + private final FullHttpMessageLogger fullHttpMessageLogger; private String peerId = "localhost"; private String zkAddress; @@ -151,7 +152,8 @@ public HttpProxyServer(EndpointMapper mapper, File basePath) throws Exception { this.listeners = new Listeners(this); this.cache = new ContentsCache(currentConfiguration); this.requestsLogger = new RequestsLogger(currentConfiguration); - this.connectionsManager = new ConnectionsManagerImpl(currentConfiguration, backendHealthManager); + this.fullHttpMessageLogger = new FullHttpMessageLogger(currentConfiguration); + this.connectionsManager = new ConnectionsManagerImpl(currentConfiguration, backendHealthManager, fullHttpMessageLogger); this.dynamicCertificatesManager = new DynamicCertificatesManager(this); if (mapper != null) { mapper.setDynamicCertificateManager(dynamicCertificatesManager); @@ -289,6 +291,7 @@ public void start() throws InterruptedException, ConfigurationNotValidException connectionsManager.start(); cache.start(); requestsLogger.start(); + fullHttpMessageLogger.start(); listeners.start(); backendHealthManager.start(); dynamicCertificatesManager.attachGroupMembershipHandler(groupMembershipHandler); @@ -348,6 +351,9 @@ public void close() { if (requestsLogger != null) { requestsLogger.stop(); } + if (fullHttpMessageLogger != null) { + fullHttpMessageLogger.stop(); + } if (cache != null) { cache.close(); } @@ -375,6 +381,10 @@ public RequestsLogger getRequestsLogger() { return requestsLogger; } + public FullHttpMessageLogger getFullHttpMessageLogger() { + return fullHttpMessageLogger; + } + private static EndpointMapper buildMapper(String className, ConfigurationStore properties) throws ConfigurationNotValidException { try { EndpointMapper res = (EndpointMapper) Class.forName(className).getConstructor().newInstance(); @@ -644,6 +654,7 @@ private void applyDynamicConfiguration(ConfigurationStore newConfigurationStore, this.listeners.reloadConfiguration(newConfiguration); this.cache.reloadConfiguration(newConfiguration); this.requestsLogger.reloadConfiguration(newConfiguration); + this.fullHttpMessageLogger.reloadConfiguration(newConfiguration); this.connectionsManager.applyNewConfiguration(newConfiguration); this.currentConfiguration = newConfiguration; diff --git a/carapace-server/src/main/java/org/carapaceproxy/server/Listeners.java b/carapace-server/src/main/java/org/carapaceproxy/server/Listeners.java index ab19a0f04..a1d64d731 100644 --- a/carapace-server/src/main/java/org/carapaceproxy/server/Listeners.java +++ b/carapace-server/src/main/java/org/carapaceproxy/server/Listeners.java @@ -236,6 +236,7 @@ protected SslHandler newSslHandler(SslContext context, ByteBufAllocator allocato listener.isSsl() ); channel.pipeline().addLast(connHandler); + parent.getFullHttpMessageLogger().attachHandler(channel, connHandler.getPendingRequest()); listenersHandlers.put(key, connHandler); } diff --git a/carapace-server/src/test/java/org/carapaceproxy/server/FullHttpMessageLoggerTest.java b/carapace-server/src/test/java/org/carapaceproxy/server/FullHttpMessageLoggerTest.java new file mode 100644 index 000000000..2b222ecb3 --- /dev/null +++ b/carapace-server/src/test/java/org/carapaceproxy/server/FullHttpMessageLoggerTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to Diennea S.r.l. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Diennea S.r.l. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.carapaceproxy.server; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.carapaceproxy.client.EndpointKey; +import org.carapaceproxy.utils.RawHttpClient; +import org.carapaceproxy.utils.TestEndpointMapper; +import static org.junit.Assert.assertTrue; +import com.github.tomakehurst.wiremock.client.WireMock; +import java.nio.file.Files; +import org.carapaceproxy.utils.CarapaceLogger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * + * @author paolo.venturi + */ +public class FullHttpMessageLoggerTest { + + private static final boolean DEBUG = true; + + private String accessLogFilePath; + + @Rule + public WireMockRule wireMockRule = new WireMockRule(0); + + @Rule + public TemporaryFolder tmpDir = new TemporaryFolder(); + + @Before + public void before() { + accessLogFilePath = tmpDir.getRoot().getAbsolutePath() + "/access.log"; + } + + @Test + public void testGetMessageLog() throws Exception { + stubFor(get(urlEqualTo("/index.html")) + .willReturn(aResponse() + .withStatus(200) + .withHeader("Content-Type", "text/html") + .withHeader("Content-Length", "it works !!".length() + "") + .withBody("it works !!"))); + + TestEndpointMapper mapper = new TestEndpointMapper("localhost", wireMockRule.port(), true); + EndpointKey key = new EndpointKey("localhost", wireMockRule.port()); + + CarapaceLogger.setLoggingDebugEnabled(true); + + try (HttpProxyServer server = HttpProxyServer.buildForTests("localhost", 0, mapper, tmpDir.newFolder());) { + server.getCurrentConfiguration().setAccessLogPath(tmpDir.getRoot().getAbsolutePath() + "/access.log"); + Path path = Paths.get(server.getCurrentConfiguration().getAccessLogPath() + ".full"); + server.start(); + int port = server.getLocalPort(); + try (RawHttpClient client = new RawHttpClient("localhost", port)) { + RawHttpClient.HttpResponse resp = client.executeRequest("GET /index.html HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"); + String s = resp.toString(); + assertTrue(s.endsWith("it works !!")); + } + try (RawHttpClient client = new RawHttpClient("localhost", port)) { + { + RawHttpClient.HttpResponse resp = client.executeRequest("GET /index.html HTTP/1.1\r\nHost: localhost\r\n\r\n"); + String s = resp.toString(); + assertTrue(s.endsWith("it works !!")); + } + { + RawHttpClient.HttpResponse resp = client.executeRequest("GET /index.html HTTP/1.1\r\nHost: localhost\r\n\r\n"); + String s = resp.toString(); + assertTrue(s.endsWith("it works !!")); + } + } + Thread.sleep(3000); + File[] f = new File(tmpDir.getRoot().getAbsolutePath()).listFiles((dir, name) -> name.startsWith("access") && name.endsWith(".full")); + assertTrue(f.length == 1); + Files.readAllLines(f[0].toPath()).forEach(l -> System.out.println(l)); + } + } + + @Test + public void testPostMessageLog() throws Exception { + String responseJson = "{\"property\" : \"value\"}"; + stubFor(post(urlEqualTo("/index.html")) + .willReturn(WireMock.okJson(responseJson))); + + TestEndpointMapper mapper = new TestEndpointMapper("localhost", wireMockRule.port(), true); + EndpointKey key = new EndpointKey("localhost", wireMockRule.port()); + + CarapaceLogger.setLoggingDebugEnabled(true); + + try (HttpProxyServer server = HttpProxyServer.buildForTests("localhost", 0, mapper, tmpDir.newFolder());) { + server.getCurrentConfiguration().setAccessLogPath(tmpDir.getRoot().getAbsolutePath() + "/access.log"); + Path path = Paths.get(server.getCurrentConfiguration().getAccessLogPath() + ".full"); + server.start(); + int port = server.getLocalPort(); + try (RawHttpClient client = new RawHttpClient("localhost", port)) { + String body = "{\"values\" : {\"p\" : \"v\"}, \"options\" : {\"o\" : 1}}"; + RawHttpClient.HttpResponse res = + client.executeRequest("POST /index.html HTTP/1.1" + + "\r\nHost: localhost" + + "\r\nConnection: keep-alive" + + "\r\nContent-Type: application/json" + + "\r\nContent-Length: " + body.length() + + "\r\n\r\n" + + body + ); + String resp = res.getBodyString(); + } + } + Thread.sleep(3000); + File[] f = new File(tmpDir.getRoot().getAbsolutePath()).listFiles((dir, name) -> name.startsWith("access") && name.endsWith(".full")); + assertTrue(f.length == 1); + Files.readAllLines(f[0].toPath()).forEach(l -> System.out.println(l)); + } +} diff --git a/carapace-server/src/test/java/org/carapaceproxy/server/RequestsLoggerTest.java b/carapace-server/src/test/java/org/carapaceproxy/server/RequestsLoggerTest.java index f41eabef9..4ed164d12 100644 --- a/carapace-server/src/test/java/org/carapaceproxy/server/RequestsLoggerTest.java +++ b/carapace-server/src/test/java/org/carapaceproxy/server/RequestsLoggerTest.java @@ -1,21 +1,21 @@ /* - Licensed to Diennea S.r.l. under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. Diennea S.r.l. licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - + * Licensed to Diennea S.r.l. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Diennea S.r.l. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * */ package org.carapaceproxy.server; @@ -37,9 +37,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.text.SimpleDateFormat; -import java.util.ArrayList; import java.util.List; -import static junit.framework.Assert.assertEquals; import org.carapaceproxy.EndpointStats; import org.carapaceproxy.MapResult; import org.carapaceproxy.client.ConnectionsManagerStats; @@ -480,19 +478,19 @@ public void testWithServer() throws Exception { EndpointKey key = new EndpointKey("localhost", wireMockRule.port()); ConnectionsManagerStats stats; - try ( HttpProxyServer server = HttpProxyServer.buildForTests("localhost", 0, mapper, tmpDir.newFolder());) { + try (HttpProxyServer server = HttpProxyServer.buildForTests("localhost", 0, mapper, tmpDir.newFolder());) { server.getCurrentConfiguration().setAccessLogPath(tmpDir.getRoot().getAbsolutePath() + "/access.log"); server.start(); int port = server.getLocalPort(); - try ( RawHttpClient client = new RawHttpClient("localhost", port)) { + try (RawHttpClient client = new RawHttpClient("localhost", port)) { RawHttpClient.HttpResponse resp = client.executeRequest("GET /index.html HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"); String s = resp.toString(); assertTrue(s.endsWith("it works !!")); assertFalse(resp.getHeaderLines().stream().anyMatch(h -> h.contains("X-Cached"))); } - try ( RawHttpClient client = new RawHttpClient("localhost", port)) { + try (RawHttpClient client = new RawHttpClient("localhost", port)) { { RawHttpClient.HttpResponse resp = client.executeRequest("GET /index.html HTTP/1.1\r\nHost: localhost\r\n\r\n"); String s = resp.toString(); @@ -537,7 +535,7 @@ public void testAccessLogRotation() throws Exception { TestEndpointMapper mapper = new TestEndpointMapper("localhost", wireMockRule.port(), true); EndpointKey key = new EndpointKey("localhost", wireMockRule.port()); - try ( HttpProxyServer server = HttpProxyServer.buildForTests("localhost", 0, mapper, tmpDir.newFolder());) { + try (HttpProxyServer server = HttpProxyServer.buildForTests("localhost", 0, mapper, tmpDir.newFolder());) { server.getCurrentConfiguration().setAccessLogPath(tmpDir.getRoot().getAbsolutePath() + "/access.log"); server.getCurrentConfiguration().setAccessLogMaxSize(1024); Path currentAccessLogPath = Paths.get(server.getCurrentConfiguration().getAccessLogPath()); @@ -547,13 +545,13 @@ public void testAccessLogRotation() throws Exception { FileChannel logFileChannel = FileChannel.open(currentAccessLogPath); while (logFileChannel.size() < 1024) { - try ( RawHttpClient client = new RawHttpClient("localhost", port)) { + try (RawHttpClient client = new RawHttpClient("localhost", port)) { RawHttpClient.HttpResponse resp = client.executeRequest("GET /index.html HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"); String s = resp.toString(); assertTrue(s.endsWith("it works !!")); } - try ( RawHttpClient client = new RawHttpClient("localhost", port)) { + try (RawHttpClient client = new RawHttpClient("localhost", port)) { { RawHttpClient.HttpResponse resp = client.executeRequest("GET /index.html HTTP/1.1\r\nHost: localhost\r\n\r\n"); String s = resp.toString(); @@ -571,7 +569,7 @@ public void testAccessLogRotation() throws Exception { //check if gzip file exist File[] f = new File(tmpDir.getRoot().getAbsolutePath()).listFiles((dir, name) -> name.startsWith("access") && name.endsWith(".gzip")); assertTrue(f.length == 1); - try ( RawHttpClient client = new RawHttpClient("localhost", port)) { + try (RawHttpClient client = new RawHttpClient("localhost", port)) { RawHttpClient.HttpResponse resp = client.executeRequest("GET /index.html HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"); String s = resp.toString(); assertTrue(s.endsWith("it works !!")); @@ -579,4 +577,5 @@ public void testAccessLogRotation() throws Exception { assertTrue(logFileChannel.size() > 0); } } + }