diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/Main.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/Main.java index eb08cb968..b99da13f9 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/Main.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/Main.java @@ -20,6 +20,7 @@ import org.opensearch.migrations.trafficcapture.IConnectionCaptureFactory; import org.opensearch.migrations.trafficcapture.StreamChannelConnectionCaptureSerializer; import org.opensearch.migrations.trafficcapture.kafkaoffloader.KafkaCaptureFactory; +import org.opensearch.migrations.trafficcapture.proxyserver.netty.BacksideConnectionPool; import org.opensearch.migrations.trafficcapture.proxyserver.netty.NettyScanningHttpProxy; import org.opensearch.security.ssl.DefaultSecurityKeyStore; import org.opensearch.security.ssl.util.SSLConfigConstants; @@ -31,6 +32,7 @@ import java.net.URI; import java.nio.file.Files; import java.nio.file.Paths; +import java.time.Duration; import java.util.Optional; import java.util.Properties; import java.util.UUID; @@ -100,6 +102,25 @@ static class Parameters { arity = 1, description = "Exposed port for clients to connect to this proxy.") int frontsidePort = 0; + @Parameter(required = false, + names = {"--numThreads"}, + arity = 1, + description = "How many threads netty should create in its event loop group") + int numThreads = 1; + @Parameter(required = false, + names = {"--destinationConnectionPoolSize"}, + arity = 1, + description = "Number of socket connections that should be maintained to the destination server " + + "to reduce the perceived latency to clients. Each thread will have its own cache, so the " + + "total number of outstanding warm connections will be multiplied by numThreads.") + int destinationConnectionPoolSize = 0; + @Parameter(required = false, + names = {"--destinationConnectionPoolTimeout"}, + arity = 1, + description = "Of the socket connections maintained by the destination connection pool, " + + "how long after connection should the be recycled " + + "(closed with a new connection taking its place)") + String destinationConnectionPoolTimeout = "PT30S"; } public static Parameters parseArgs(String[] args) { @@ -243,10 +264,14 @@ public static void main(String[] args) throws InterruptedException, IOException sksOp.ifPresent(x->x.initHttpSSLConfig()); var proxy = new NettyScanningHttpProxy(params.frontsidePort); - try { - proxy.start(backsideUri, loadBacksideSslContext(backsideUri, params.allowInsecureConnectionsToBackside), - sksOp.map(sks-> (Supplier) () -> { + var pooledConnectionTimeout = params.destinationConnectionPoolSize == 0 ? Duration.ZERO : + Duration.parse(params.destinationConnectionPoolTimeout); + var backsideConnectionPool = new BacksideConnectionPool(backsideUri, + loadBacksideSslContext(backsideUri, params.allowInsecureConnectionsToBackside), + params.destinationConnectionPoolSize, pooledConnectionTimeout); + proxy.start(backsideConnectionPool, params.numThreads, + sksOp.map(sks -> (Supplier) () -> { try { var sslEngine = sks.createHTTPSSLEngine(); return sslEngine; diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideConnectionPool.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideConnectionPool.java new file mode 100644 index 000000000..307ebcc0f --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideConnectionPool.java @@ -0,0 +1,115 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.netty; + +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LogLevel; +import org.slf4j.event.Level; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.FastThreadLocal; +import lombok.extern.slf4j.Slf4j; + +import javax.net.ssl.SSLEngine; +import java.net.URI; +import java.time.Duration; +import java.util.HashMap; +import java.util.StringJoiner; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Slf4j +public class BacksideConnectionPool { + private final URI backsideUri; + private final SslContext backsideSslContext; + private final FastThreadLocal channelClassToConnectionCacheForEachThread; + private final Duration inactivityTimeout; + private final int poolSize; + + public BacksideConnectionPool(URI backsideUri, SslContext backsideSslContext, + int poolSize, Duration inactivityTimeout) { + this.backsideUri = backsideUri; + this.backsideSslContext = backsideSslContext; + this.channelClassToConnectionCacheForEachThread = new FastThreadLocal(); + this.inactivityTimeout = inactivityTimeout; + this.poolSize = poolSize; + } + + public ChannelFuture getOutboundConnectionFuture(EventLoop eventLoop) { + if (poolSize == 0) { + return buildConnectionFuture(eventLoop); + } + return getExpiringWarmChannelPool(eventLoop).getAvailableOrNewItem(); + } + + private ExpiringSubstitutableItemPool + getExpiringWarmChannelPool(EventLoop eventLoop) { + var thisContextsConnectionCache = (ExpiringSubstitutableItemPool) + channelClassToConnectionCacheForEachThread.get(); + if (thisContextsConnectionCache == null) { + thisContextsConnectionCache = + new ExpiringSubstitutableItemPool(inactivityTimeout, + eventLoop, + () -> buildConnectionFuture(eventLoop), + x->x.channel().close(), poolSize, Duration.ZERO); + if (log.isInfoEnabled()) { + final var finalChannelClassToChannelPoolMap = thisContextsConnectionCache; + logProgressAtInterval(Level.INFO, eventLoop, + thisContextsConnectionCache, Duration.ofSeconds(30)); + } + channelClassToConnectionCacheForEachThread.set(thisContextsConnectionCache); + } + + return thisContextsConnectionCache; + } + + private void logProgressAtInterval(Level logLevel, EventLoop eventLoop, + ExpiringSubstitutableItemPool channelPoolMap, + Duration frequency) { + eventLoop.schedule(() -> { + log.atLevel(logLevel).log(channelPoolMap.getStats().toString()); + logProgressAtInterval(logLevel, eventLoop, channelPoolMap, frequency); + }, frequency.toMillis(), TimeUnit.MILLISECONDS); + } + + private ChannelFuture buildConnectionFuture(EventLoop eventLoop) { + // Start the connection attempt. + Bootstrap b = new Bootstrap(); + b.group(eventLoop) + .channel(NioSocketChannel.class) + .handler(new ChannelDuplexHandler()) + .option(ChannelOption.AUTO_READ, false); + var f = b.connect(backsideUri.getHost(), backsideUri.getPort()); + var rval = new DefaultChannelPromise(f.channel()); + f.addListener((ChannelFutureListener) connectFuture -> { + if (connectFuture.isSuccess()) { + // connection complete start to read first data + log.debug("Done setting up backend channel & it was successful (" + connectFuture.channel() + ")"); + if (backsideSslContext != null) { + var pipeline = connectFuture.channel().pipeline(); + SSLEngine sslEngine = backsideSslContext.newEngine(connectFuture.channel().alloc()); + sslEngine.setUseClientMode(true); + var sslHandler = new SslHandler(sslEngine); + pipeline.addFirst("ssl", sslHandler); + sslHandler.handshakeFuture().addListener(handshakeFuture -> { + if (handshakeFuture.isSuccess()) { + rval.setSuccess(); + } else { + rval.setFailure(handshakeFuture.cause()); + } + }); + } else { + rval.setSuccess(); + } + } else { + rval.setFailure(connectFuture.cause()); + } + }); + return rval; + } +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideHandler.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideHandler.java index 6abe5ec09..094e30028 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideHandler.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideHandler.java @@ -33,7 +33,7 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { @Override public void channelInactive(ChannelHandlerContext ctx) { - log.debug("inactive channel - closing"); + log.debug("inactive channel - closing (" + ctx.channel() + ")"); FrontsideHandler.closeAndFlush(writeBackChannel); } diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPool.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPool.java new file mode 100644 index 000000000..b5e7ddd19 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPool.java @@ -0,0 +1,306 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.netty; + +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import lombok.Getter; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayDeque; +import java.util.LinkedHashSet; +import java.util.Queue; +import java.util.StringJoiner; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Supplier; + + +/** + * This class maintains N items for S seconds. After S seconds, items are expired as per the + * specified expiration callback. Callers can retrieve items from the cache or built on-demand + * if no items are available within the cache that are ready to go. + * + * This class does not use locking. Instead, it is assumed that one of these will be created for + * each netty event loop. + */ +@Slf4j +public class ExpiringSubstitutableItemPool, U> { + + private static class Entry { + Instant timestamp; + F future; + public Entry(F future) { + timestamp = Instant.now(); + this.future = future; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("Entry{"); + sb.append("timestamp=").append(timestamp); + sb.append(", value=").append(future); + sb.append('}'); + return sb.toString(); + } + } + + public static class PoolClosedException extends RuntimeException { } + + public static class Stats { + @Getter + private long nItemsCreated; + @Getter + private long nItemsExpired; + @Getter + private long nHotGets; // cache hits + @Getter + private long nColdGets; // cache misses + @Getter + Duration totalDurationBuildingItems = Duration.ZERO; + @Getter + Duration totalWaitTimeForCallers = Duration.ZERO; + + @Override + public String toString() { + return new StringJoiner(", ", Stats.class.getSimpleName() + "[", "]") + .add("nItemsCreated=" + nItemsCreated) + .add("nHotGets=" + nHotGets) + .add("nColdGets=" + nColdGets) + .add("nExpiredItems=" + nItemsExpired) + .add("avgDurationBuildingItems=" + averageBuildTime()) + .add("avgWaitTimeForCallers=" + averageWaitTime()) + .toString(); + } + + public long getTotalGets() { + return nHotGets+nColdGets; + } + + public Duration averageWaitTime() { + if (getTotalGets() == 0) { + return Duration.ZERO; + } + return totalWaitTimeForCallers.dividedBy(getTotalGets()); + } + + public Duration averageBuildTime() { + if (totalItemsCreated() == 0) { + return Duration.ZERO; + } + return totalDurationBuildingItems.dividedBy(totalItemsCreated()); + } + + private void itemBuilt(Duration delta) { + totalDurationBuildingItems = totalDurationBuildingItems.plus(delta); + nItemsCreated++; + } + private void addWaitTime(Duration delta) { + totalWaitTimeForCallers = totalWaitTimeForCallers.plus(delta); + } + + private void addHotGet() { + nHotGets++; + } + + private void addColdGet() { + nColdGets++; + } + + private long totalItemsCreated() { + return nItemsCreated; + } + + private void addExpiredItem() { + nItemsExpired++; + } + } + + // Store in-progress futures that were the result of item builds in their "in-order" + // creation time so that if the readyItems is empty, we can return a future that is + // more likely to complete. + private final LinkedHashSet inProgressItems; + private final Queue> readyItems; + private final Supplier itemSupplier; + private final Consumer onExpirationConsumer; + @Getter + private final EventLoop eventLoop; + private Duration inactivityTimeout; + private GenericFutureListener shuffleInProgressToReady; + @Getter + private Stats stats = new Stats(); + private int poolSize; + + public ExpiringSubstitutableItemPool(@NonNull Duration inactivityTimeout, + @NonNull EventLoop eventLoop, + @NonNull Supplier itemSupplier, + @NonNull Consumer onExpirationConsumer, + int numItemsToLoad, @NonNull Duration initialItemLoadInterval) { + this(inactivityTimeout, eventLoop, itemSupplier, onExpirationConsumer); + increaseCapacityWithSchedule(numItemsToLoad, initialItemLoadInterval); + } + + public ExpiringSubstitutableItemPool(@NonNull Duration inactivityTimeout, + @NonNull EventLoop eventLoop, + @NonNull Supplier itemSupplier, + @NonNull Consumer onExpirationConsumer) { + assert inactivityTimeout.multipliedBy(-1).isNegative() : "inactivityTimeout must be > 0"; + this.inProgressItems = new LinkedHashSet<>(); + this.readyItems = new ArrayDeque<>(); + this.eventLoop = eventLoop; + this.inactivityTimeout = inactivityTimeout; + this.onExpirationConsumer = onExpirationConsumer; + this.itemSupplier = () -> { + var startTime = Instant.now(); + var rval = itemSupplier.get(); + rval.addListener(v->{ + stats.itemBuilt(Duration.between(startTime, Instant.now())); + }); + return rval; + }; + // store this as a field so that we can remove the listener once the inProgress item has been + // shifted to the readyItems + this.shuffleInProgressToReady = + f -> { + inProgressItems.remove(f); + if (f.isSuccess()) { + readyItems.add(new Entry(f)); + scheduleNextExpirationSweep(inactivityTimeout); + } else { + // the calling context should track failures too - no reason to log + // TODO - add some backoff here + beginLoadingNewItemIfNecessary(); + } + }; + } + + public int reduceCapacity(int delta) { + assert delta >= 0 : "expected capacity delta to be >= 0"; + poolSize -= delta; + assert poolSize >= 0 : "expected pool size to remain >= 0"; + return poolSize; + } + + public int increaseCapacity(int itemsToLoad) { + return increaseCapacityWithSchedule(itemsToLoad, Duration.ZERO); + } + + public int increaseCapacityWithSchedule(int itemsToLoad, Duration gapBetweenLoads) { + poolSize += itemsToLoad; + scheduleItemLoadsRecurse(itemsToLoad, gapBetweenLoads); + return poolSize; + } + + public F getAvailableOrNewItem() { + if (inactivityTimeout.isZero()) { + throw new PoolClosedException(); + } + var startTime = Instant.now(); + { + log.trace("getAvailableOrNewItem: readyItems.size()="+readyItems.size()); + var item = readyItems.poll(); + log.trace("getAvailableOrNewItem: item="+item + " remaining readyItems.size()="+readyItems.size()); + if (item != null) { + stats.addHotGet(); + beginLoadingNewItemIfNecessary(); + stats.addWaitTime(Duration.between(startTime, Instant.now())); + return item.future; + } + } + + BiFunction durationTrackingDecoratedItem = + (itemsFuture, label) -> (F) itemsFuture.addListener(f->{ + stats.addWaitTime(Duration.between(startTime, Instant.now())); + log.trace(label + "returning value="+ f.get()+" from future " + itemsFuture); + }); + stats.addColdGet(); + var inProgressIt = inProgressItems.iterator(); + + if (inProgressIt.hasNext()) { + var firstItem = inProgressIt.next(); + inProgressIt.remove(); + firstItem.removeListeners(shuffleInProgressToReady); + beginLoadingNewItemIfNecessary(); + return durationTrackingDecoratedItem.apply(firstItem, "IN_PROGRESS: "); + } + return durationTrackingDecoratedItem.apply(itemSupplier.get(), "FRESH: "); + } + + public void close() { + inactivityTimeout = Duration.ZERO; + expireItems(); + } + + private void scheduleItemLoadsRecurse(int itemsToLoad, Duration gapBetweenLoads) { + eventLoop.schedule(()-> { + beginLoadingNewItemIfNecessary(); + if (itemsToLoad >= 0) { + scheduleItemLoadsRecurse(itemsToLoad-1, gapBetweenLoads); + } + }, gapBetweenLoads.toMillis(), TimeUnit.MILLISECONDS); + } + + private void scheduleNextExpirationSweep(Duration d) { + eventLoop.schedule(this::expireItems, d.toMillis(), TimeUnit.MILLISECONDS); + } + + private void expireItems() { + var thresholdTimestamp = Instant.now().minus(this.inactivityTimeout); + log.debug("expiration threshold = " + thresholdTimestamp); + while (!readyItems.isEmpty()) { + var oldestItem = readyItems.peek(); + var gap = Duration.between(thresholdTimestamp, oldestItem.timestamp); + if (!gap.isNegative()) { + log.debug("scheduling next sweep for " + gap); + scheduleNextExpirationSweep(gap); + return; + } else { + stats.addExpiredItem(); + var removedItem = readyItems.poll(); + assert removedItem == oldestItem : "expected the set of readyItems to be ordered chronologically, " + + "so with a fixed item timeout, nothing should ever be able to cut back in time. " + + "Secondly, a concurrent mutation of any sort while in this function " + + "should have been impossible since we're only modifying this object through a shared eventloop"; + log.debug("Removing " + removedItem); + onExpirationConsumer.accept(removedItem.future); + beginLoadingNewItemIfNecessary(); + } + } + } + + private void beginLoadingNewItemIfNecessary() { + if (inactivityTimeout.isZero()) { + throw new PoolClosedException(); + } else if (poolSize > (inProgressItems.size() + readyItems.size())) { + var futureItem = itemSupplier.get(); + inProgressItems.add(futureItem); + futureItem.addListener(shuffleInProgressToReady); + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ExpiringSubstitutableItemPool{"); + sb.append("poolSize=").append(poolSize); + if (eventLoop.inEventLoop()) { + // these two lines are dangerous if toString() is run from a concurrent environment + sb.append(", inProgressItems=").append(inProgressItems); + sb.append(", readyItems=").append(readyItems); + } else { + sb.append(", numInProgressItems=").append(inProgressItems.size()); + sb.append(", numReadyItems=").append(readyItems.size()); + } + sb.append(", inProgressItems=").append(inProgressItems); + sb.append(", readyItems=").append(readyItems); + sb.append(", itemSupplier=").append(itemSupplier); + sb.append(", onExpirationConsumer=").append(onExpirationConsumer); + sb.append(", eventLoop=").append(eventLoop); + sb.append(", inactivityTimeout=").append(inactivityTimeout); + sb.append(", stats=").append(stats); + sb.append('}'); + return sb.toString(); + } +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java index 7344af3c2..8f2aa3030 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java @@ -1,77 +1,53 @@ package org.opensearch.migrations.trafficcapture.proxyserver.netty; -import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelOption; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslHandler; +import io.netty.handler.logging.LogLevel; import lombok.extern.slf4j.Slf4j; -import javax.net.ssl.SSLEngine; import java.net.URI; @Slf4j public class FrontsideHandler extends ChannelInboundHandlerAdapter { private Channel outboundChannel; + private BacksideConnectionPool backsideConnectionPool; - private final URI backsideUri; - private final SslContext backsideSslContext; /** - * Create a handler that sets the autoreleases flag - * @param backsideUri - * @param backsideSslContext + * Create a handler that sets the autorelease flag */ - public FrontsideHandler(URI backsideUri, SslContext backsideSslContext) { - this.backsideUri = backsideUri; - this.backsideSslContext = backsideSslContext; + public FrontsideHandler(BacksideConnectionPool backsideConnectionPool) { + this.backsideConnectionPool = backsideConnectionPool; } @Override public void channelActive(ChannelHandlerContext ctx) { final Channel inboundChannel = ctx.channel(); - // Start the connection attempt. - Bootstrap b = new Bootstrap(); - b.group(inboundChannel.eventLoop()) - .channel(ctx.channel().getClass()) - .handler(new BacksideHandler(inboundChannel)) - .option(ChannelOption.AUTO_READ, false); - log.debug("Active - setting up backend connection"); - var f = b.connect(backsideUri.getHost(), backsideUri.getPort()); - outboundChannel = f.channel(); - f.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - if (future.isSuccess()) { - // connection complete start to read first data - log.debug("Done setting up backend channel & it was successful"); - if (backsideSslContext != null) { - var pipeline = future.channel().pipeline(); - SSLEngine sslEngine = backsideSslContext.newEngine(future.channel().alloc()); - sslEngine.setUseClientMode(true); - pipeline.addFirst("ssl", new SslHandler(sslEngine)); - } - inboundChannel.read(); - } else { - // Close the connection if the connection attempt has failed. - log.debug("closing outbound channel because CONNECT future was not successful"); - inboundChannel.close(); - } + var outboundChannelFuture = backsideConnectionPool.getOutboundConnectionFuture(inboundChannel.eventLoop()); + log.debug("Active - setting up backend connection with channel " + outboundChannelFuture.channel()); + outboundChannelFuture.addListener((ChannelFutureListener) (future -> { + if (future.isSuccess()) { + var pipeline = future.channel().pipeline(); + pipeline.addLast(new BacksideHandler(inboundChannel)); + inboundChannel.read(); + } else { + // Close the connection if the connection attempt has failed. + log.debug("closing outbound channel because CONNECT future was not successful"); + inboundChannel.close(); } - }); + })); + outboundChannel = outboundChannelFuture.channel(); } @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) { - log.debug("frontend handler read: "+msg); + log.debug("frontend handler[" + this.outboundChannel + "] read: "+msg); if (outboundChannel.isActive()) { - log.debug("Writing data to backside handler"); + log.debug("Writing data to backside handler " + outboundChannel); outboundChannel.writeAndFlush(msg) .addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { @@ -82,7 +58,10 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) { future.channel().close(); // close the backside } }); - } // if the outbound channel has died, so be it... let this frontside finish with it's caller naturally + outboundChannel.config().setAutoRead(true); + } else { // if the outbound channel has died, so be it... let this frontside finish with it's caller naturally + log.warn("Output channel (" + outboundChannel + ") is NOT active"); + } } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxy.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxy.java index 4f2afde0c..6a7e90133 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxy.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxy.java @@ -6,13 +6,11 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.ssl.SslContext; import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.JdkLoggerFactory; import org.opensearch.migrations.trafficcapture.IConnectionCaptureFactory; import javax.net.ssl.SSLEngine; -import java.net.URI; import java.util.function.Supplier; public class NettyScanningHttpProxy { @@ -29,16 +27,17 @@ public int getProxyPort() { return proxyPort; } - public void start(URI backsideUri, SslContext backsideSslContext, Supplier sslEngineSupplier, - IConnectionCaptureFactory connectionCaptureFactory) throws InterruptedException { - InternalLoggerFactory.setDefaultFactory(JdkLoggerFactory.INSTANCE); + public void start(BacksideConnectionPool backsideConnectionPool, + int numThreads, + Supplier sslEngineSupplier, + IConnectionCaptureFactory connectionCaptureFactory) throws InterruptedException { bossGroup = new NioEventLoopGroup(1); - workerGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(numThreads); ServerBootstrap serverBootstrap = new ServerBootstrap(); try { mainChannel = serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) - .childHandler(new ProxyChannelInitializer(backsideUri, backsideSslContext, sslEngineSupplier, + .childHandler(new ProxyChannelInitializer(backsideConnectionPool, sslEngineSupplier, connectionCaptureFactory)) .childOption(ChannelOption.AUTO_READ, false) .bind(proxyPort).sync().channel(); diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ProxyChannelInitializer.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ProxyChannelInitializer.java index 2687261f2..98e207c5b 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ProxyChannelInitializer.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ProxyChannelInitializer.java @@ -2,35 +2,26 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpObjectDecoder; import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; -import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import org.opensearch.migrations.trafficcapture.IConnectionCaptureFactory; import org.opensearch.migrations.trafficcapture.netty.ConditionallyReliableLoggingHttpRequestHandler; -import org.opensearch.migrations.trafficcapture.netty.LoggingHttpRequestHandler; import org.opensearch.migrations.trafficcapture.netty.LoggingHttpResponseHandler; import javax.net.ssl.SSLEngine; import java.io.IOException; -import java.net.URI; import java.util.function.Supplier; public class ProxyChannelInitializer extends ChannelInitializer { private final IConnectionCaptureFactory connectionCaptureFactory; private final Supplier sslEngineProvider; - private final URI backsideUri; - private final SslContext backsideSslContext; + private final BacksideConnectionPool backsideConnectionPool; - public ProxyChannelInitializer(URI backsideUri, SslContext backsideSslContext, Supplier sslEngineSupplier, + public ProxyChannelInitializer(BacksideConnectionPool backsideConnectionPool, Supplier sslEngineSupplier, IConnectionCaptureFactory connectionCaptureFactory) { - this.backsideUri = backsideUri; - this.backsideSslContext = backsideSslContext; + this.backsideConnectionPool = backsideConnectionPool; this.sslEngineProvider = sslEngineSupplier; this.connectionCaptureFactory = connectionCaptureFactory; } @@ -53,6 +44,6 @@ protected void initChannel(SocketChannel ch) throws IOException { ch.pipeline().addLast(new LoggingHttpResponseHandler(offloader)); ch.pipeline().addLast(new ConditionallyReliableLoggingHttpRequestHandler(offloader, this::shouldGuaranteeMessageOffloading)); - ch.pipeline().addLast(new FrontsideHandler(backsideUri, backsideSslContext)); + ch.pipeline().addLast(new FrontsideHandler(backsideConnectionPool)); } } diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPoolTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPoolTest.java new file mode 100644 index 000000000..f5bd0a74e --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPoolTest.java @@ -0,0 +1,147 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.netty; + +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.Future; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@Slf4j +class ExpiringSubstitutableItemPoolTest { + + public static final int NUM_POOLED_ITEMS = 5; + private static final Duration SUPPLY_WORK_TIME = Duration.ofMillis(100); + private static final Duration TIME_BETWEEN_INITIAL_ITEMS = Duration.ofMillis(10); + public static final Duration INACTIVITY_TIMEOUT = Duration.ofMillis(1000); + public static final int NUM_ITEMS_TO_PULL = 1; + + /** + * This test uses concurrency primitives to govern when fully-built items can be returned. + * That removes timing inconsistencies for the first half or so of this test. However, + * expirations aren't as easy to control. Built items aren't added until AFTER the callback, + * giving the callback an opportunity to drive sequencing. However, expirations are driven + * by netty's clock and internal values of the ExpiringSubstitutablePool, so it's much harder + * to mitigate temporal inconsistencies for the expiration-related checks. + * + * Still, there's been enormous value in finding issues with the assertions in the latter + * part of this test. If there are future issues, putting more conservative duration values + * in place may further mitigate inconsistencies, though I haven't had any tests fail yet + * unless I've stopped threads within the debugger. + */ + @Test + void get() throws Exception { + var firstWaveBuildCountdownLatch = new CountDownLatch(NUM_POOLED_ITEMS); + var expireCountdownLatch = new CountDownLatch(NUM_POOLED_ITEMS-NUM_ITEMS_TO_PULL); + var secondWaveBuildCountdownLatch = new CountDownLatch(NUM_POOLED_ITEMS); + var expirationsAreDoneFuture = new CompletableFuture(); + var builtItemCursor = new AtomicInteger(); + var expiredItems = new ArrayList(); + var eventLoop = new NioEventLoopGroup(1); + var lastCreation = new AtomicReference(); + var pool = new ExpiringSubstitutableItemPool,Integer>( + INACTIVITY_TIMEOUT, eventLoop.next(), + () -> { + var rval = new DefaultPromise(eventLoop.next()); + eventLoop.schedule(() -> { + if (firstWaveBuildCountdownLatch.getCount() <= 0) { + expirationsAreDoneFuture.whenComplete((v,t)-> + rval.setSuccess(getIntegerItem(builtItemCursor, lastCreation, secondWaveBuildCountdownLatch))); + } else { + rval.setSuccess(getIntegerItem(builtItemCursor, lastCreation, firstWaveBuildCountdownLatch)); + } + }, + SUPPLY_WORK_TIME.toMillis(), TimeUnit.MILLISECONDS); + return rval; + }, + item->{ + expireCountdownLatch.countDown(); + log.info("Expiring item: "+item); + try { + expiredItems.add(item.get()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + }); + for (int i = 0; ii.toString()).collect(Collectors.joining(","))); + + expirationsAreDoneFuture.complete(true); + Assertions.assertTrue(pool.getStats().getNItemsCreated() >= 5); + Assertions.assertEquals(0, pool.getStats().getNColdGets()); + Assertions.assertEquals(1, pool.getStats().getNHotGets()); + Assertions.assertTrue(pool.getStats().getNItemsExpired() >= 4); + + for (int i=1; i<=NUM_POOLED_ITEMS*2; ++i) { + log.info("Pool=" + pool); + Assertions.assertEquals(NUM_POOLED_ITEMS+i, getNextItem(pool)); + } + + Assertions.assertEquals(15, pool.getStats().getNItemsCreated()); + Assertions.assertEquals(11, pool.getStats().getNHotGets()+pool.getStats().getNColdGets()); + Assertions.assertEquals(4, pool.getStats().getNItemsExpired()); + + Assertions.assertTrue(pool.getStats().averageBuildTime().toMillis() > 0); + Assertions.assertTrue(pool.getStats().averageWaitTime().toMillis() < + pool.getStats().averageBuildTime().toMillis()); + } + + private static Integer getNextItem(ExpiringSubstitutableItemPool,Integer> pool) + throws InterruptedException, ExecutionException { + return pool.getEventLoop().next().schedule(()->pool.getAvailableOrNewItem(), + 0, TimeUnit.MILLISECONDS).get().get(); + } + + private static Integer getIntegerItem(AtomicInteger builtItemCursor, + AtomicReference lastCreation, + CountDownLatch countdownLatchToUse) { + log.info("Building item (" +builtItemCursor.hashCode() + ") " + (builtItemCursor.get()+1)); + countdownLatchToUse.countDown(); + lastCreation.set(Instant.now()); + return Integer.valueOf(builtItemCursor.incrementAndGet()); + } +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java index d8b8006f4..e47dd745a 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java @@ -20,6 +20,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -197,7 +198,9 @@ private static String makeTestRequestViaClient(SimpleHttpClientForTesting client try { URI testServerUri = new URI("http", null, SimpleHttpServer.LOCALHOST, underlyingPort, null, null, null); - nshp.get().start(testServerUri,null, null, connectionCaptureFactory); + var connectionPool = new BacksideConnectionPool(testServerUri, null, + 10, Duration.ofSeconds(10)); + nshp.get().start(connectionPool,1, null, connectionCaptureFactory); System.out.println("proxy port = "+port.intValue()); } catch (InterruptedException | URISyntaxException e) { throw new RuntimeException(e); diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/resources/log4j2.properties b/TrafficCapture/trafficCaptureProxyServer/src/test/resources/log4j2.properties new file mode 100644 index 000000000..9d01fa124 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/resources/log4j2.properties @@ -0,0 +1,11 @@ +status = error + +appender.console.type = Console +appender.console.name = STDERR +appender.console.target = SYSTEM_ERR +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n + +rootLogger.level = trace +rootLogger.appenderRefs = stderr +rootLogger.appenderRef.stderr.ref = STDERR \ No newline at end of file diff --git a/TrafficCapture/trafficCaptureProxyServerTest/src/main/docker/docker-compose.yml b/TrafficCapture/trafficCaptureProxyServerTest/src/main/docker/docker-compose.yml index 75c194289..ff86ec9bc 100644 --- a/TrafficCapture/trafficCaptureProxyServerTest/src/main/docker/docker-compose.yml +++ b/TrafficCapture/trafficCaptureProxyServerTest/src/main/docker/docker-compose.yml @@ -2,6 +2,8 @@ version: '3.7' services: webserver: image: 'migrations/nginx-perf-test-webserver:latest' + networks: + - testing ports: - "8080:80" @@ -11,7 +13,7 @@ services: - testing ports: - "9201:9201" - command: /runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.Main --destinationUri http://webserver:80 --listenPort 9201 --noCapture + command: /runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.Main --destinationUri http://webserver:80 --listenPort 9201 --noCapture --destinationConnectionPoolSize 0 --numThreads 1 depends_on: - webserver @@ -19,7 +21,7 @@ services: image: 'migrations/jmeter:latest' networks: - testing - command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.JMeterLoadTest -p 9201 -s captureproxy -r HTTPS; tail -f /dev/null" + command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.JMeterLoadTest -p 9201 -s captureproxy -r HTTP; tail -f /dev/null" depends_on: - captureproxy diff --git a/TrafficCapture/trafficCaptureProxyServerTest/src/main/docker/nginx/Dockerfile b/TrafficCapture/trafficCaptureProxyServerTest/src/main/docker/nginx/Dockerfile index ed8c9bcae..982dfb43f 100644 --- a/TrafficCapture/trafficCaptureProxyServerTest/src/main/docker/nginx/Dockerfile +++ b/TrafficCapture/trafficCaptureProxyServerTest/src/main/docker/nginx/Dockerfile @@ -1,47 +1,4 @@ -ARG NGINX_VERSION=1.25.1 -FROM nginx:${NGINX_VERSION} as build - -RUN apt-get update -RUN apt-get install -y \ - openssh-client \ - git \ - wget \ - libxml2 \ - libxslt1-dev \ - libpcre3 \ - libpcre3-dev \ - zlib1g \ - zlib1g-dev \ - openssl \ - libssl-dev \ - libtool \ - automake \ - gcc \ - g++ \ - make && \ - rm -rf /var/cache/apt - -RUN wget "http://nginx.org/download/nginx-${NGINX_VERSION}.tar.gz" && \ - tar -C /usr/src -xzvf nginx-${NGINX_VERSION}.tar.gz - -#RUN mkdir -p -m 0600 ~/.ssh && \ -# ssh-keyscan github.com >> ~/.ssh/known_hosts - -WORKDIR /src/ngx_devel_kit -#RUN --mount=type=ssh git clone git@github.com:simpl/ngx_devel_kit . -RUN git clone https://github.com/vision5/ngx_devel_kit . - -WORKDIR /src/echo-nginx-module -#RUN --mount=type=ssh git clone git@github.com:openresty/set-misc-nginx-module.git . -RUN git clone https://github.com/openresty/echo-nginx-module.git . - -WORKDIR /usr/src/nginx-${NGINX_VERSION} -RUN NGINX_ARGS=$(nginx -V 2>&1 | sed -n -e 's/^.*arguments: //p') \ - ./configure --with-compat --with-http_ssl_module --add-dynamic-module=/src/ngx_devel_kit --add-dynamic-module=/src/echo-nginx-module ${NGINX_ARGS} && \ - make modules && \ - make install - -FROM nginx:${NGINX_VERSION} +FROM nginx:1.25.1 # Installing VIM for the sake of users who would like to exec shells in the webserver container. RUN apt-get update @@ -51,10 +8,10 @@ RUN /bin/bash -c '\ export HTMLDIR=/usr/share/nginx/html ; \ for i in {1..100}; do echo -n t; done > ${HTMLDIR}/100.txt && \ for i in {1..1000}; do echo -n s; done > ${HTMLDIR}/1K.txt && \ - for i in {1..10000}; do echo -n m; done > ${HTMLDIR}/10K.txt && \ - for i in {1..100000}; do echo -n L; done > ${HTMLDIR}/100K.txt && \ - for i in {1..1000000}; do echo -n X; done > ${HTMLDIR}/1M.txt && \ - for i in {1..10000000}; do echo -n H; done > ${HTMLDIR}/10M.txt' + for i in {1..10}; do cat ${HTMLDIR}/1K.txt | tr s m ; done > ${HTMLDIR}/10K.txt && \ + for i in {1..10}; do cat ${HTMLDIR}/10K.txt | tr m L; done > ${HTMLDIR}/100K.txt && \ + for i in {1..10}; do cat ${HTMLDIR}/100K.txt | tr L X; done > ${HTMLDIR}/1M.txt && \ + for i in {1..10}; do cat ${HTMLDIR}/1M.txt | tr X H; done > ${HTMLDIR}/10M.txt' COPY nginx.conf /etc/nginx/nginx.conf #COPY --from=build /usr/src/nginx-${NGINX_VERSION}/objs/ngx_http_echo_module.so /usr/src/nginx-${NGINX_VERSION}/objs/ndk_http_module.so /usr/lib/nginx/modules/ diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ReplayUtils.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ReplayUtils.java index 5596330f6..4e887de9a 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ReplayUtils.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ReplayUtils.java @@ -1,5 +1,7 @@ package org.opensearch.migrations.replay; +import io.netty.buffer.ByteBuf; + import java.io.ByteArrayInputStream; import java.io.SequenceInputStream; import java.util.Collections; @@ -11,6 +13,15 @@ public class ReplayUtils { public static SequenceInputStream byteArraysToInputStream(List data) { return byteArraysToInputStream(data.stream()); } + + public static SequenceInputStream byteBufsToInputStream(Stream byteBufStream) { + return byteArraysToInputStream(byteBufStream.map(bb->{ + byte[] asBytes = new byte[bb.readableBytes()]; + bb.duplicate().readBytes(asBytes); + return asBytes; + })); + } + public static SequenceInputStream byteArraysToInputStream(Stream data) { return new SequenceInputStream(Collections.enumeration( data.map(b -> new ByteArrayInputStream(b)).collect(Collectors.toList())));