Skip to content

Commit

Permalink
race condition tests for connection pool leaks
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenLindsell committed Oct 29, 2019
1 parent 72dbb77 commit fdeb1b5
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ private Mono<Connection> newConnection(int attempts) {
}
}

private Connection dequeue() {
@VisibleForTesting
Connection dequeue() {
Connection connection = availableConnections.poll();

while (nonNull(connection) && !connection.isConnected()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import com.hotels.styx.client.Connection;
import com.hotels.styx.client.ConnectionSettings;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.AnswersWithDelay;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.reactivestreams.Publisher;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -39,6 +42,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -712,12 +716,96 @@ public void emitsExceptionWhenPendingConnectionTimesOut() {
// And then ensure connection is placed in the active queue:
processor.onNext(mock(Connection.class));

// Could do with a thread.sleep here

assertEquals(pool.stats().availableConnectionCount(), 1);
assertEquals(pool.stats().pendingConnectionCount(), 0);
assertEquals(pool.stats().busyConnectionCount(), 0);
assertEquals(pool.stats().connectionAttempts(), 1);
}

@Test
public void shouldNotHandoutConnectionToCancelledSubscriberWhenCreatingNewConnection() throws Exception {
when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class)))
.thenReturn(Mono.just(connection1));

ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.pendingConnectionTimeout(100, MILLISECONDS)
.build();

SimpleConnectionPool simpleConnectionPool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = spy(simpleConnectionPool);
when(pool.dequeue())
.thenAnswer(new AnswersWithDelay(200, new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
return invocation.callRealMethod();
}
}));
StepVerifier.create(pool.borrowConnection())
.expectError(MaxPendingConnectionTimeoutException.class)
.verify();

assertEquals(pool.stats().availableConnectionCount(), 1);
assertEquals(pool.stats().pendingConnectionCount(), 0); // Waiting subscribers
assertEquals(pool.stats().busyConnectionCount(), 0); // Borrowed count
}

@Test
public void shouldNotHandoutConnectionToCancelledSubscriberWhenConnectionIsReturned() throws Exception {
EmitterProcessor<Connection> processor = EmitterProcessor.create();
when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class)))
.thenReturn(Mono.from(processor));
ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.pendingConnectionTimeout(100, MILLISECONDS)
.build();

SimpleConnectionPool simpleConnectionPool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = spy(simpleConnectionPool);
when(pool.dequeue())
.thenAnswer(new AnswersWithDelay(200, new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
return invocation.callRealMethod();
}
}));

StepVerifier.create(pool.borrowConnection())
.expectError(MaxPendingConnectionTimeoutException.class)
.verify();

pool.returnConnection(connection1);

assertEquals(pool.stats().availableConnectionCount(), 1);
assertEquals(pool.stats().pendingConnectionCount(), 0); // Waiting subscribers
assertEquals(pool.stats().busyConnectionCount(), -1); // Borrowed count
}

@Test
public void shouldNotHandoutConnectionToCancelledSubscriberWhenConnectionAlreadyInPool() throws Exception {
ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.pendingConnectionTimeout(100, MILLISECONDS)
.build();

SimpleConnectionPool simpleConnectionPool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = spy(simpleConnectionPool);
when(pool.dequeue())
.thenAnswer(new AnswersWithDelay(200, new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
return connection1;
}
}));

StepVerifier.create(pool.borrowConnection())
.expectError(MaxPendingConnectionTimeoutException.class)
.verify();

assertEquals(pool.stats().availableConnectionCount(), 1);
assertEquals(pool.stats().pendingConnectionCount(), 0); // Waiting subscribers
assertEquals(pool.stats().busyConnectionCount(), 0); // Borrowed count
}

@Test
public void registersAsConnectionListener() {
when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class)))
Expand Down

0 comments on commit fdeb1b5

Please sign in to comment.