Skip to content

Commit

Permalink
perf: withstand connect storms
Browse files Browse the repository at this point in the history
Use a background worker thread pool to create new connections. This
reduces the probability that the client will timeout while waiting for a
connection, when a large number of connections are being requested at
the same time.

Fixes #1377
  • Loading branch information
olavloite committed Feb 15, 2024
1 parent 0a78cf6 commit 56e762c
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,9 @@ && getServer().getOptions().getSslMode().isSslEnabled()
() ->
String.format(
"Exception on connection handler with ID %s for client %s: %s",
getName(), socket.getInetAddress().getHostAddress(), e));
getName(),
socket == null ? "(none)" : socket.getInetAddress().getHostAddress(),
e));
} finally {
if (result != RunConnectionState.RESTART_WITH_SSL) {
logger.log(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
Expand Down Expand Up @@ -81,6 +84,9 @@ public class ProxyServer extends AbstractApiService {
private final ConcurrentLinkedQueue<WireMessage> debugMessages = new ConcurrentLinkedQueue<>();
private final AtomicInteger debugMessageCount = new AtomicInteger();

private final ExecutorService createConnectionHandlerExecutor =
new ThreadPoolExecutor(1, Integer.MAX_VALUE, 20L, TimeUnit.SECONDS, new SynchronousQueue<>());

/**
* Instantiates the ProxyServer from CLI-gathered metadata.
*
Expand Down Expand Up @@ -220,6 +226,7 @@ protected void doStop() {
SpannerPool.closeSpannerPool();
} catch (Throwable ignore) {
}
createConnectionHandlerExecutor.shutdown();
notifyStopped();
}

Expand Down Expand Up @@ -296,7 +303,24 @@ void runServer(
awaitRunning();
try {
while (isRunning()) {
createConnectionHandler(serverSocket.accept());
Socket socket = serverSocket.accept();
// Hand off the creation of the connection handler to a worker thread to ensure that we
// continue to listen for new incoming connection requests as quickly as possible.
// This prevents connection timeouts if there is a large 'connect storm' (i.e. a client
// sends a large number of connection requests at the same time).
createConnectionHandlerExecutor.submit(
() -> {
try {
createConnectionHandler(socket);
} catch (SocketException socketException) {
logger.log(
Level.WARNING,
() ->
String.format(
"Failed to create connection on socket %s: %s.",
socket, socketException));
}
});
}
} catch (SocketException e) {
// This is a normal exception, as this will occur when Server#stopServer() is called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.pgadapter.metadata.OptionsMetadata;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ListValue;
import com.google.protobuf.Value;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlRequest;
Expand All @@ -44,8 +48,11 @@
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.junit.BeforeClass;
import org.junit.Ignore;
Expand Down Expand Up @@ -775,4 +782,26 @@ public void testImplicitBatchOfClientSideStatements() throws SQLException {
}
}
}

@Test
public void testConnectStorm() throws Exception {
int numThreads = 1000;
ListeningExecutorService service =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads));
List<ListenableFuture<Void>> futures = new ArrayList<>(numThreads);
for (int n = 0; n < numThreads; n++) {
futures.add(service.submit(new ConnectCallable()));
}
assertEquals(numThreads, Futures.allAsList(futures).get().size());
}

class ConnectCallable implements Callable<Void> {
@Override
public Void call() throws Exception {
try (Connection ignore = DriverManager.getConnection(createUrl())) {
// Just connect and disconnect.
return null;
}
}
}
}

0 comments on commit 56e762c

Please sign in to comment.