Skip to content

Commit

Permalink
Fallback to session message disposition to management node
Browse files Browse the repository at this point in the history
  • Loading branch information
anuchandy committed Apr 25, 2024
1 parent 5b416c7 commit f51ef7d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
2 changes: 2 additions & 0 deletions sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Fixes the session message disposition to use management node as fall back. ([#39913](https://github.com/Azure/azure-sdk-for-java/issues/ 39913))

### Other Changes

## 7.16.0 (2024-04-22)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1579,11 +1579,8 @@ private Mono<Void> updateDisposition(ServiceBusReceivedMessage message, Disposit
// V2: The final this.sessionManager is guaranteed to be 'ServiceBusSingleSessionManager'.
updateDispositionOperation = sessionManager.updateDisposition(lockToken, sessionId, dispositionStatus,
propertiesToModify, deadLetterReason, deadLetterErrorDescription, transactionContext)
// Unlike V1, if the lock token cannot be found on the session link (because it is closed), V2 won't
// fall back to 'dispositionViaManagementNode'. Once the session link is closed, the session is lost,
// and disposition cannot be performed on the management node (Same approach in .NET, JS, Go).
.then(Mono.fromRunnable(() -> {
LOGGER.atInfo()
.<Void>then(Mono.fromRunnable(() -> {
LOGGER.atVerbose()
.addKeyValue(LOCK_TOKEN_KEY, lockToken)
.addKeyValue(ENTITY_PATH_KEY, entityPath)
.addKeyValue(DISPOSITION_STATUS_KEY, dispositionStatus)
Expand All @@ -1592,7 +1589,17 @@ private Mono<Void> updateDisposition(ServiceBusReceivedMessage message, Disposit
// The session-lock-renew logic in V2 is localized to ServiceBusReactorReceiver instance that
// ServiceBusSingleSessionManager composes. The logic does not use 'renewalContainer', hence
// unlike V1, no call to renewalContainer.remove(lockToken).
}));
})).onErrorResume(DeliveryNotOnLinkException.class, __ -> {
// If a disposition, e.g., defer, of this session message was done via a previous
// `updateDisposition(..)` call then the delivery would have removed from the session-link map.
// In that case, if the application attempts another disposition by calling
// `updateDisposition(..)` again, e.g., complete, then the session-link map look up will fail.
// However, this disposition can be done on the management node if the broker still has the
// session active.
LOGGER.info("Could not perform disposition on session manger. Performing on management node.");
return dispositionViaManagementNode(message, dispositionStatus, deadLetterReason,
deadLetterErrorDescription, propertiesToModify, transactionContext);
});
} else {
// V1: The final this.sessionManager is guaranteed to be 'ServiceBusSessionManager'.
updateDispositionOperation = sessionManager.updateDisposition(lockToken, sessionId, dispositionStatus,
Expand Down

0 comments on commit f51ef7d

Please sign in to comment.