Skip to content

Commit

Permalink
Removing inactive/timed out sessions (Azure#32365)
Browse files Browse the repository at this point in the history
* Adding test case for non-retried session.

* Adding test case for non-retried session.

* Removing old session.

* Add changelog entry.
  • Loading branch information
conniey authored Dec 5, 2022
1 parent 69fc168 commit fcf8237
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 23 deletions.
4 changes: 2 additions & 2 deletions sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Removing inactive session when it has timed out, so `ReactorConnection.getSession(String)` does not return the same session.

### Other Changes

## 2.8.0 (2022-11-04)
Expand All @@ -19,8 +21,6 @@

### Other Changes

-

#### Dependency Updates

- Upgraded `azure-core` from `1.33.0` to `1.34.0`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.azure.core.amqp.exception.AmqpErrorCondition.TIMEOUT_ERROR;
import static com.azure.core.amqp.implementation.AmqpLoggingUtils.addShutdownSignal;
import static com.azure.core.amqp.implementation.AmqpLoggingUtils.addSignalTypeAndResult;
import static com.azure.core.amqp.implementation.AmqpLoggingUtils.createContextWithConnectionId;
Expand Down Expand Up @@ -114,7 +115,7 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption
this.connectionOptions = connectionOptions;
this.reactorProvider = reactorProvider;
this.connectionId = connectionId;
this.logger = new ClientLogger(ReactorConnection.class, createContextWithConnectionId(connectionId));
this.logger = new ClientLogger(ReactorConnection.class, createContextWithConnectionId(connectionId));
this.handlerProvider = handlerProvider;
this.tokenManagerProvider = Objects.requireNonNull(tokenManagerProvider,
"'tokenManagerProvider' cannot be null.");
Expand Down Expand Up @@ -276,7 +277,7 @@ public Map<String, Object> getConnectionProperties() {
@Override
public Mono<AmqpSession> createSession(String sessionName) {
return connectionMono.map(connection -> {
final SessionSubscription sessionSubscription = sessionMap.computeIfAbsent(sessionName, key -> {
return sessionMap.computeIfAbsent(sessionName, key -> {
final SessionHandler sessionHandler = handlerProvider.createSessionHandler(connectionId,
getFullyQualifiedNamespace(), key, connectionOptions.getRetry().getTryTimeout());
final Session session = connection.session();
Expand Down Expand Up @@ -309,15 +310,27 @@ public Mono<AmqpSession> createSession(String sessionName) {

return new SessionSubscription(amqpSession, subscription);
});

return sessionSubscription;
}).flatMap(sessionSubscription -> {
final Mono<AmqpEndpointState> activeSession = sessionSubscription.getSession().getEndpointStates()
.filter(state -> state == AmqpEndpointState.ACTIVE)
.next()
.timeout(retryPolicy.getRetryOptions().getTryTimeout(), Mono.error(() -> new AmqpException(true,
String.format("connectionId[%s] sessionName[%s] Timeout waiting for session to be active.",
connectionId, sessionName), handler.getErrorContext())));
TIMEOUT_ERROR, String.format(
"connectionId[%s] sessionName[%s] Timeout waiting for session to be active.", connectionId,
sessionName), handler.getErrorContext())))
.doOnError(error -> {
// Clean up the subscription if there was an error waiting for the session to become active.

if (!(error instanceof AmqpException)) {
return;
}

final AmqpException amqpException = (AmqpException) error;
if (amqpException.getErrorCondition() == TIMEOUT_ERROR) {
final SessionSubscription removed = sessionMap.remove(sessionName);
removed.dispose();
}
});

return activeSession.thenReturn(sessionSubscription.getSession());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
Expand Down Expand Up @@ -99,6 +101,7 @@ class ReactorConnectionTest {
private ReactorConnection connection;
private ConnectionHandler connectionHandler;
private SessionHandler sessionHandler;
private SessionHandler sessionHandler2;
private AutoCloseable mocksCloseable;
private ConnectionOptions connectionOptions;

Expand Down Expand Up @@ -155,8 +158,11 @@ void setup() throws IOException {
.thenReturn(connectionHandler);
sessionHandler = new SessionHandler(CONNECTION_ID, FULLY_QUALIFIED_NAMESPACE, SESSION_NAME, reactorDispatcher,
TEST_DURATION, AmqpMetricsProvider.noop());
sessionHandler2 = new SessionHandler(CONNECTION_ID, FULLY_QUALIFIED_NAMESPACE, SESSION_NAME, reactorDispatcher,
TEST_DURATION, AmqpMetricsProvider.noop());

when(reactorHandlerProvider.createSessionHandler(anyString(), anyString(), anyString(), any(Duration.class)))
.thenReturn(sessionHandler);
.thenReturn(sessionHandler, sessionHandler2, null);

connection = new ReactorConnection(CONNECTION_ID, connectionOptions, reactorProvider, reactorHandlerProvider,
tokenManager, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST);
Expand All @@ -170,6 +176,8 @@ void setup() throws IOException {

@AfterEach
void teardown() throws Exception {
System.err.println("Clean-up");

connectionHandler.close();
sessionHandler.close();

Expand All @@ -191,22 +199,22 @@ void createConnection() {
final Map<String, Object> expectedProperties = new HashMap<>(connectionHandler.getConnectionProperties());

// Assert
Assertions.assertNotNull(connection);
Assertions.assertEquals(CONNECTION_ID, connection.getId());
Assertions.assertEquals(FULLY_QUALIFIED_NAMESPACE, connection.getFullyQualifiedNamespace());
assertNotNull(connection);
assertEquals(CONNECTION_ID, connection.getId());
assertEquals(FULLY_QUALIFIED_NAMESPACE, connection.getFullyQualifiedNamespace());

Assertions.assertEquals(connectionHandler.getMaxFrameSize(), connection.getMaxFrameSize());
assertEquals(connectionHandler.getMaxFrameSize(), connection.getMaxFrameSize());

Assertions.assertNotNull(connection.getConnectionProperties());
Assertions.assertEquals(expectedProperties.size(), connection.getConnectionProperties().size());
assertNotNull(connection.getConnectionProperties());
assertEquals(expectedProperties.size(), connection.getConnectionProperties().size());

expectedProperties.forEach((key, value) -> {
final Object removed = connection.getConnectionProperties().remove(key);
Assertions.assertNotNull(removed);
assertNotNull(removed);

final String expected = String.valueOf(value);
final String actual = String.valueOf(removed);
Assertions.assertEquals(expected, actual);
assertEquals(expected, actual);
});
assertTrue(connection.getConnectionProperties().isEmpty());
}
Expand All @@ -227,26 +235,27 @@ void createSession() {
when(connectionProtonJ.getRemoteState()).thenReturn(EndpointState.ACTIVE);
connectionHandler.onConnectionRemoteOpen(connectionEvent);


sessionHandler.onSessionRemoteOpen(sessionEvent);

// Act & Assert
StepVerifier.create(connection.createSession(SESSION_NAME))
.assertNext(s -> {
Assertions.assertNotNull(s);
Assertions.assertEquals(SESSION_NAME, s.getSessionName());
assertNotNull(s);
assertEquals(SESSION_NAME, s.getSessionName());
assertTrue(s instanceof ReactorSession);
Assertions.assertSame(session, ((ReactorSession) s).session());
assertSame(session, ((ReactorSession) s).session());
})
.expectComplete()
.verify(VERIFY_TIMEOUT);

// Assert that the same instance is obtained and we don't get a new session with the same name.
StepVerifier.create(connection.createSession(SESSION_NAME))
.assertNext(s -> {
Assertions.assertNotNull(s);
Assertions.assertEquals(SESSION_NAME, s.getSessionName());
assertNotNull(s);
assertEquals(SESSION_NAME, s.getSessionName());
assertTrue(s instanceof ReactorSession);
Assertions.assertSame(session, ((ReactorSession) s).session());
assertSame(session, ((ReactorSession) s).session());
})
.expectComplete()
.verify(VERIFY_TIMEOUT);
Expand Down Expand Up @@ -277,6 +286,76 @@ void createSessionWhenConnectionInactive() {
.verify(VERIFY_TIMEOUT);
}

@Test
void createSessionFailureWorksWithRetry() {
when(reactor.process()).then(invocation -> {
TimeUnit.SECONDS.sleep(5);
return true;
});

final AtomicInteger numberOfInvocations = new AtomicInteger();

final Session session2 = mock(Session.class);
final Record record2 = mock(Record.class);

when(session2.attachments()).thenReturn(record2);

when(session2.getRemoteState()).thenAnswer(invocation -> {
if (numberOfInvocations.getAndIncrement() < 1) {
return EndpointState.UNINITIALIZED;
} else {
return EndpointState.ACTIVE;
}
});

when(session.attachments()).thenReturn(record);
when(session.getRemoteState()).thenAnswer(invocation -> {
return EndpointState.UNINITIALIZED;
});

when(reactor.connectionToHost(connectionHandler.getHostname(), connectionHandler.getProtocolPort(),
connectionHandler)).thenReturn(connectionProtonJ);
when(connectionProtonJ.session()).thenReturn(session, session2);

// We only want it to emit a session when it is active.
when(connectionProtonJ.getRemoteState()).thenReturn(EndpointState.ACTIVE);
connectionHandler.onConnectionRemoteOpen(connectionEvent);

final Event sessionEvent2 = mock(Event.class);
when(sessionEvent2.getSession()).thenReturn(session2);


// Act & Assert

// Assert that the first session timed out while being created.
StepVerifier.create(connection.createSession(SESSION_NAME))
.expectErrorSatisfies(error -> {
assertTrue(error instanceof AmqpException);

final AmqpException exception = (AmqpException) error;
assertTrue(exception.isTransient());
})
.verify();

// Assert that the second time, a new session is obtained.
StepVerifier.create(connection.createSession(SESSION_NAME))
.then(() -> {
System.out.println("Pushing new session open downstream.");
sessionHandler2.onSessionRemoteOpen(sessionEvent2);
})
.assertNext(s -> {
assertNotNull(s);
assertEquals(SESSION_NAME, s.getSessionName());
assertTrue(s instanceof ReactorSession);
assertSame(session2, ((ReactorSession) s).session());
})
.expectComplete()
.verify();

verify(record).set(Handler.class, Handler.class, sessionHandler);
verify(record2).set(Handler.class, Handler.class, sessionHandler2);
}

/**
* Creates a session with the given name and set handler.
*/
Expand Down

0 comments on commit fcf8237

Please sign in to comment.