Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Service Bus Session Processor stop receiving messages, because if the pending attach link is forced to close without any errors, it cannot be recovered. #33313

Closed
liukun-msft opened this issue Feb 3, 2023 · 2 comments · Fixed by #33386
Assignees
Milestone

Comments

@liukun-msft
Copy link
Contributor

liukun-msft commented Feb 3, 2023

Describe the bug

Users encounter a issue that their session processor stop receiving messages after some time.

The log shows after 08:12:45, the session processor client can't receive any messages after the connection shutdown:

  • 08:11:47 - 08:12:41 the receive link is waiting for attach response since there is no message in the session queue.
  • 08:12:41 the receive link receive was forced to close due to remote link was closed, but the errorCondition is null.
  • 08:12:45 the connection was shutdown and doesn't recover.
08:11:47 - [reactor-executor-1] - ReceiveLinkHandler - {"az.sdk.message":"onLinkLocalOpen","connectionId":"xxx","entityPath":"xxx","linkName":"xxx","localSource":"xxx"}
...
08:12:41 - [reactor-executor-1] - ReceiveLinkHandler - {"az.sdk.message":"onLinkRemoteOpen","connectionId":"xxx","entityPath":"xxx","linkName":"xxx","action":"waitingForError"}
08:12:41 - [reactor-executor-1] - ReceiveLinkHandler - {"az.sdk.message":"onLinkRemoteClose","connectionId":"xxx","errorCondition":null,"errorDescription":null,"linkName":"session-xxx","entityPath":"xxx"}
…
08:12:45 - [reactor-executor-1] - ReactorConnection - {"az.sdk.message":"onConnectionShutdown. Shutting dow","connectionId":"xxx","isTransient":false,"isInitiatedByClient":false,"shutdownMessage":"xxx.","namespace":"xxx"}

To Reproduce

  1. Hardcode the errror condition in onLinkRemoteClose() to be empty, not to get from event.
private void handleRemoteLinkClosed(final String eventName, final Event event) {
    final Link link = event.getLink();
    final ErrorCondition condition = new ErrorCondition();  // set the error condition alway be empty
    ...
}
  1. Create a session processor
AmqpRetryOptions options = new AmqpRetryOptions();
// use a small value to reduce the timeout, we leverage timeout error to trigger the link not recover issue.
options.setTryTimeout(Duration.ofSeconds(10)); 

ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
        .retryOptions(options)
        .connectionString(Credentials.serviceBusConnectionString)
        .sessionProcessor()
        .queueName(Credentials.serviceBusSessionQueue)
        .processMessage(processMessage)
        .processError(processError)
        .maxConcurrentSessions(1)
        .disableAutoComplete()
        .maxAutoLockRenewDuration(Duration.ofSeconds(0))
        .buildProcessorClient();
  1. create a session queue with no message
  2. start session processor
  3. wait for attach process timeout -> we will see the link is closed and can't recover
  4. wait for connection inactive -> we will see the connection shutdown and can't recover

Questions

  1. Why link was forced to close without receive any error message?
    The service bus role instance, which the link are waiting the attach response from, was restarted due to OS update at around 8:12. In this case, the session receive link was detached with no error condition messages.

  2. Why the link does not recover?
    This is what we have to analyze and address in this issue.

  3. Why connection does not recover?
    After connection is closed, the ServiceBusConnectionProcessor will request one more connection.
    But the new connection is UNINITIALIZED and there is no receiveMessages() action to make that connection to be active.

08:12:41 [reactor-executor-1] - c.a.m.s.i.ServiceBusConnectionProcessor - {"az.sdk.message":"Connection not requested, yet. Requesting one.","entityPath":"N/A"}
08:12:41  [reactor-executor-1] - c.a.m.s.i.ServiceBusConnectionProcessor - {"az.sdk.message":"Setting next AMQP channel.","entityPath":"N/A"}
08:12:41 [reactor-executor-1]  - c.a.c.a.i.ReactorConnection - {"az.sdk.message":"State UNINITIALIZED","connectionId":"xxx"}
  1. Why client does not restart?
    The client was not restarted because of our old connection check is wrong, the ServiceBusConnectionProcessor is not disposed and there is an UNINITIALIZED connection. It can be fixed by ([Re-Design Amqp Connection Recovery] 'ReactorConnectionCache' replacement for 'AmqpChannelProcessor' #33224)
@liukun-msft liukun-msft added this to the Backlog milestone Feb 3, 2023
@liukun-msft liukun-msft self-assigned this Feb 3, 2023
@liukun-msft
Copy link
Contributor Author

liukun-msft commented Feb 3, 2023

Reason why link cannot be recovered

Log Analysis

When we try to receive message for an empty session queue. The link is in UNINITIALIZED status and waiting for a response from service.

//Repro log when link is open and waiting for attach response
21:01:28.976 [reactor-executor-1] INFO  com.azure.core.amqp.implementation.ReactorSession - {"az.sdk.message":"Creating a new receiver link.","connectionId":"MF_d94f27_1675429285720","sessionName":"sessionqueue","linkName":"session-_dc47bf_1675429285804"}
21:01:28.979 [reactor-executor-1] DEBUG com.azure.core.amqp.implementation.ReactorReceiver - {"az.sdk.message":"State UNINITIALIZED","connectionId":"MF_d94f27_1675429285720","entityPath":"sessionqueue","linkName":"session-_dc47bf_1675429285804"}
21:01:28.980 [reactor-executor-1] DEBUG com.azure.core.amqp.implementation.ReactorReceiver - {"az.sdk.message":"Token refreshed.","connectionId":"MF_d94f27_1675429285720","entityPath":"sessionqueue","linkName":"session-_dc47bf_1675429285804","response":"ACCEPTED"}
21:01:28.986 [reactor-executor-1] DEBUG com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkLocalOpen","connectionId":"MF_d94f27_1675429285720","entityPath":"sessionqueue","linkName":"session-_dc47bf_1675429285804","localSource":"Source{address='sessionqueue', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter={com.microsoft:session-filter=null}, defaultOutcome=null, outcomes=null, capabilities=null}"}
[1674059459:0] -> Attach{name='session-_dc47bf_1675429285804', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=SECOND, source=Source{address='sessionqueue', durable=NONE, expiryPolic...}

As repro, we wait until timeout and service response with the DETACH frame. Due to the timeout error, the link was forced to CLOSED. Because we set the error condition to be null in code level, the link can't be recovered after CLOSED.

//Repro log when link is force close by remote
[1674059459:0] <- Attach{name='session-_dc47bf_1675429285804', handle=0, role=SENDER, sndSettleMode=MIXED, rcvSettleMode=FIRST, source=null, target=null, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1674059459:0] <- Detach{handle=0, closed=true, error=Error{condition=com.microsoft:timeout, description='The operation did not complete within the allotted timeout of 00:00:29. The time allotted to this operation may have been a portion of a longer timeout. For more information on exception types and proper exception handling...}
21:01:58.200 [reactor-executor-1] INFO  com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkRemoteOpen","connectionId":"MF_d94f27_1675429285720","entityPath":"sessionqueue","linkName":"session-_dc47bf_1675429285804","action":"waitingForError"}
21:01:58.201 [reactor-executor-1] INFO  com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkRemoteClose","connectionId":"MF_d94f27_1675429285720","errorCondition":null,"errorDescription":null,"linkName":"session-_dc47bf_1675429285804","entityPath":"sessionqueue"}
...
[1674059459:0] -> Detach{handle=0, closed=true, error=null}
21:01:58.207 [reactor-executor-1] INFO  com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkFinal","connectionId":"MF_d94f27_1675429285720","linkName":"session-_dc47bf_1675429285804","entityPath":"sessionqueue"}

Because the link can't be recovered, after 5 minutes, the connection was shutting down because of inactive and can't be recover either. (We have explained why connection can't be recovered)

//Repro log when connection shutdown
[1674059459:0] <- Close{error=Error{condition=amqp:connection:forced, description='The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:a5f41ab242264a22930891867ea98edb_G26, SystemTracker:gateway7, Timestamp:2023-02-03T13:06:29', info=null}}
21:06:29.224 [reactor-executor-1] INFO  com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionRemoteClose","connectionId":"MF_d94f27_1675429285720","errorCondition":null,"errorDescription":null,"sessionName":"sessionqueue"}
...
21:06:33.268 [reactor-executor-1] INFO  com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"onConnectionShutdown. Shutting down.","connectionId":"MF_d94f27_1675429285720","isTransient":false,"isInitiatedByClient":false,"shutdownMessage":"Finished processing pending tasks.","namespace":"xxx"}
//no more logs after here...

Code Analysis

From code side, when the link is closed by remote (the real scenario is remote is close because of OS upgrade), the onLinkRemoteClose() was called.

//LinkHandler.class
public void onLinkRemoteClose(Event event) {
    handleRemoteLinkClosed("onLinkRemoteClose", event);
}   

Then it jumps to its private function handleRemoteLinkClosed(), as we mock the response that error condition is null, we hardcode the condition to an empty instance. The logic then goes to super.close() rather than onError(exception).

//LinkHandler.class   
private void handleRemoteLinkClosed(final String eventName, final Event event) {
    final Link link = event.getLink();
    //Here we change to use a empty error condition to repro
    final ErrorCondition condition = new ErrorCondition();
    ...
    if (condition != null && condition.getCondition() != null) {
        ...
        onError(exception);
    } else {
        super.close();
    }

The super class is Handler, inside the close() function, the endpointStates will emit CLOSED as next signal and complete signal.

//Handler.class
public void close() {
    ...
    endpointStates.emitNext(EndpointState.CLOSED, (signalType, emitResult) -> {
        ...
    });
    endpointStates.emitComplete((signalType, emitResult) -> {
        ...
    });
}

Once the next signal of CLOSED was emitted, the status of created link changed from UNINTIALIZED to CLOSED. we will not pass the CLOSED state as we .takeUntil(e -> e == AmqpEndpointState.ACTIVE).

But once the complete signal was emitted, it will complete the .takeUntil() + .timeout(). Then the logic move to .then(Mono.just(link))) which pass down that CLOSED link. Because the link is closed, the downstream subscriber can't receive any message from that link.

//ServiceBusSessionManager.class
Mono<ServiceBusReceiveLink> getActiveLink() {
    ...
    return Mono.defer(() -> createSessionReceiveLink()
        .flatMap(link -> link.getEndpointStates()
            .takeUntil(e -> e == AmqpEndpointState.ACTIVE)
            .timeout(operationTimeout)
            .then(Mono.just(link))))
        .retryWhen(Retry.from(retrySignals -> retrySignals.flatMap(signal -> {
            ...
            if (isDisposed.get()) {
                ...
            } else if (failure instanceof TimeoutException) {
                return Mono.delay(SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION);
            } else if (failure instanceof AmqpException
                && ((AmqpException) failure).getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR) {
                return Mono.delay(SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION);
            } else {
                return Mono.<Long>error(failure);
            }
        })));
}

The problem here is our retryWhen() is skipped because no error occurred. We need to handle this special case where link status was change directly from UNINTIALIZED to CLOSED.

session-processor-link-error-graph

Fixes

At present, the fix is to make sure we don't pass down the CLOSED link. So we can check whether the link is disposed, and if so, change it Mono.just(link) to Mono.error() to trigger the retryWhen.

Temp Solution 1:

    ...
    .then(Mono.fromCallable(() -> link.isDisposed() ? null : link))
    .switchIfEmpty(Mono.error(() -> 
                new TimeoutException("Mock Timeout exception to trigger Mono.delay() condition in retry")))
    //we receive the link on reactor-executor thread, we need to publish on a non-blocking thread for retry
    .publishOn(Schedulers.boundedElastic()) 
    ...

Temp Solution 2: Add a filter to filter out dispose link then use switchIfEmpty to change to error

    ...
    .then(Mono.just(link))
    .filter(receiveLink -> !receiveLink.isDisposed())
    .switchIfEmpty(Mono.error(() -> 
                new TimeoutException("Mock Timeout exception to trigger Mono.delay() condition in retry")))
    .publishOn(Schedulers.boundedElastic()) 
    ...

Temp Solution 3:

...
    .then(Mono.just(link))
    .flatMap(receiveLink -> {
        if(receiveLink.isDisposed()) {
            return Mono.error(() -> 
                new TimeoutException("Mock Timeout exception to trigger Mono.delay() condition in retry"));
        } else {
            return Mono.just(receiveLink);
        }
    })
    .publishOn(Schedulers.boundedElastic())
    ...

Temp Solution 4:

    ...
    .then(Mono.fromCallable(() -> {
        if(link.isDisposed()) {
            throw new TimeoutException("Mock Timeout exception to trigger Mono.delay() condition in retry");
        } else {
            return link;
        }
    }))
    .publishOn(Schedulers.boundedElastic())
    ...

pending questions

  1. In real case log, the connection is directly shutting down, but for our repro log, connection is shutting down when inactive. This is a little different. It seems that the PR for connection refactoring already solves this problem, as the connection will also be closed.
  2. Review design for ServiceBusSessionManager, it seems have too much functionality in one class:
    • maintain one session receiveLink, use it to receive messages
    • request for a new active link (like a ServiceBusReceiveLinkProcessor)
    • rolling sessions when maxConcurrentSessions is set

@liukun-msft
Copy link
Contributor Author

Question: Does processor have this problem when link is closed without any error?

No. ServiceBusProcessor are using SerivceBusReceiveLinkProcessor to maintain the receice link. It can always request a new link when link change from any status to CLOSED:

serivebus-link-processor-recovery-graph

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.