Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQP: ReactorExecutor stops due to IllegalStateException after application runs for multiple days #15926

Closed
mkemmerz opened this issue Oct 5, 2020 · 8 comments
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs needs-author-feedback Workflow: More information is needed from author to address the issue. pillar-reliability The issue is related to reliability, one of our core engineering pillars. (includes stress testing) question The issue doesn't require a change to the product in order to be resolved. Most issues start as that

Comments

@mkemmerz
Copy link

mkemmerz commented Oct 5, 2020

We have an application that consumes Events from an Iot Hub. The application works fine for some days and then stops consuming incoming events. The application enters a state where it simply does nothing anymore, because it doesn't fetch events from the IotHub.

The moment the application stops to work can be tracked down to an IllegalStateException that is catched in com.azure.core.amqp.implementation.ReactorExecutor. We are not able to identify the cause of this though.

Question: What are reasons this exception may occur and how can I battle them or at least continue to fetch Events from Iot Hub?

Setup :

  • OS: Windows 10
  • IDE : IntelliJ
  • azure-core-amqp-1.3.0

We consume the events like the following:

EventHubConsumerAsyncClient
.receive(false)
.subscribe(
partitionEvent -> {
// execute logic
},
ex -> log.error("Error during receiving events: " + ex.getMessage()),
() -> log.info(INFO_COMPLETED_RECEIVING_EVENTS));

The following exception appears after the application runs for multiple days:

scheduleCompletePendingTasks - exception occurred while processing events. java.lang.IllegalStateException org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:112) org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) com.azure.core.amqp.implementation.ReactorExecutor.lambda$scheduleCompletePendingTasks$0(ReactorExecutor.java:156) reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:50) reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:27) java.base/java.util.concurrent.FutureTask.run(Unknown Source) java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) java.base/java.lang.Thread.run(Unknown Source)Cause: null org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54) org.apache.qpid.proton.engine.impl.TransportImpl.unbind(TransportImpl.java:315) com.azure.core.amqp.implementation.handler.ConnectionHandler.onTransportError(ConnectionHandler.java:159) org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:191) org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) com.azure.core.amqp.implementation.ReactorExecutor.lambda$scheduleCompletePendingTasks$0(ReactorExecutor.java:156) reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:50) reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:27) java.base/java.util.concurrent.FutureTask.run(Unknown Source) java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) java.base/java.lang.Thread.run(Unknown Source)]

Followed by:

Stopping the reactor because thread was interrupted or the reactor has no more events to process.

@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Oct 5, 2020
@joshfree joshfree added Client This issue points to a problem in the data-plane of the library. Event Hubs pillar-reliability The issue is related to reliability, one of our core engineering pillars. (includes stress testing) labels Oct 5, 2020
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Oct 5, 2020
@joshfree
Copy link
Member

joshfree commented Oct 5, 2020

Thanks for reporting this reliability issue in Event Hubs AMQP layer. @conniey can you please investigate?

@conniey
Copy link
Member

conniey commented Oct 5, 2020

  1. Do you have the full output logs from the library?
    1. I'm trying to understand what the contents of that IllegalStateException are.
    2. It shows that there is a transport error in the underlying library: com.azure.core.amqp.implementation.handler.ConnectionHandler.onTransportError(ConnectionHandler.java:159), so it would be nice to understand what that error is.
  2. Does it ever recover? We have logic that when the connection closes, we will try to re-instantiate it.

Stopping the reactor because thread was interrupted or the reactor has no more events to process.

Is a normal part of closing a proton-j reactor connection. I'll make note to not change that to informational.

@mkemmerz
Copy link
Author

mkemmerz commented Oct 6, 2020

Thanks for the fast replies!

The application does not recover from this exception. It seems to do a retry but it doesn't work somehow.

We are also using EventHubConsumerAsyncClient 's subscribe() method which says that it shouldn't be used for production systems so probably we should switch to EventProcessorClient like the documentation suggests.

More log:

onLinkRemoteClose connectionId[MF_f6aa5b_1601880891022], linkName[mgmt:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5, Timestamp:2020-10-05T13:44:05]","logger_name":"com.azure.core.amqp.implementation.handler.SendLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
processOnClose connectionId[MF_f6aa5b_1601880891022], linkName[mgmt:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5,Timestamp:2020-10-05T13:44:05]","logger_name":"com.azure.core.amqp.implementation.handler.SendLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
Retry #1. Transient error occurred. Retrying after 4511 ms.\nThe connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5, Timestamp:2020-10-05T13:44:05, errorContext[NAMESPACE: iothub-ns-ssf-iothub-3802138-ca4b082f0a.servicebus.windows.net, PATH: $management, REFERENCE_ID: mgmt:sender, LINK_CREDIT: 99]","logger_name":"class com.azure.core.amqp.implementation.RequestResponseChannel","thread_name":"single-1","level":"WARN","level_value":30000}
Exception occurred:\nThe connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5, Timestamp:2020-10-05T13:44:05, errorContext[NAMESPACE: iothub-ns-ssf-iothub-3802138-ca4b082f0a.servicebus.windows.net, PATH: $management, REFERENCE_ID: mgmt:sender, LINK_CREDIT: 99]","logger_name":"com.azure.messaging.eventhubs.implementation.ManagementChannel","thread_name":"single-1","level":"ERROR","level_value":40000}
Upstream connection publisher was completed. Terminating processor.","logger_name":"class com.azure.core.amqp.implementation.RequestResponseChannel","thread_name":"single-1","level":"INFO","level_value":20000}
namespace[MF_f6aa5b_1601880891022] entityPath[$management]: AMQP channel processor completed. Notifying 0 subscribers.","logger_name":"class com.azure.core.amqp.implementation.RequestResponseChannel","thread_name":"single-1","level":"INFO","level_value":20000}
Exception in RequestResponse links. Disposing and clearing unconfirmed sends.\nThe connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5, Timestamp:2020-10-05T13:44:05, errorContext[NAMESPACE:iothub-ns-ssf-iothub-3802138-ca4b082f0a.servicebus.windows.net, PATH: $management, REFERENCE_ID: mgmt:sender, LINK_CREDIT: 99]","logger_name":"class com.azure.core.amqp.implementation.RequestResponseChannel","thread_name":"single-1","level":"ERROR","level_value":40000}
onLinkLocalClose connectionId[MF_f6aa5b_1601880891022], linkName[mgmt:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5, Timestamp:2020-10-05T13:44:05]","logger_name":"com.azure.core.amqp.implementation.handler.SendLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onLinkLocalClose connectionId[MF_f6aa5b_1601880891022], linkName[mgmt:receiver], errorCondition[null], errorDescription[null]","logger_name":"com.azure.core.amqp.implementation.handler.ReceiveLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onLinkRemoteClose connectionId[MF_f6aa5b_1601880891022], linkName[mgmt:receiver], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5, Timestamp:2020-10-05T13:44:05]","logger_name":"com.azure.core.amqp.implementation.handler.ReceiveLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
processOnClose connectionId[MF_f6aa5b_1601880891022], linkName[mgmt:receiver], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5, Timestamp:2020-10-05T13:44:05]","logger_name":"com.azure.core.amqp.implementation.handler.ReceiveLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionRemoteClose connectionId[mgmt-session], entityName[MF_f6aa5b_1601880891022], condition[Error{condition=null, description='null', info=null}]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionRemoteClose closing a local session for connectionId[MF_f6aa5b_1601880891022], entityName[mgmt-session], condition[null], description[null]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
connectionId[MF_f6aa5b_1601880891022] sessionName[mgmt-session]: Complete. Removing and disposing session.","logger_name":"com.azure.core.amqp.implementation.ReactorConnection","thread_name":"single-1","level":"INFO","level_value":20000}
sessionId[mgmt-session]: Disposing of session.","logger_name":"com.azure.core.amqp.implementation.ReactorSession","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionRemoteClose connectionId[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/0], entityName[MF_f6aa5b_1601880891022], condition[Error{condition=null, description='null', info=null}]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionRemoteClose closing a local session for connectionId[MF_f6aa5b_1601880891022], entityName[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/0], condition[null], description[null]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
connectionId[MF_f6aa5b_1601880891022] sessionName[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/0]: Complete. Removing and disposing session.","logger_name":"com.azure.core.amqp.implementation.ReactorConnection","thread_name":"single-1","level":"INFO","level_value":20000}
sessionId[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/0]: Disposing of session.","logger_name":"com.azure.core.amqp.implementation.ReactorSession","thread_name":"single-1","level":"INFO","level_value":20000}
onLinkRemoteClose connectionId[MF_f6aa5b_1601880891022], linkName[cbs:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5, Timestamp:2020-10-05T13:44:05]","logger_name":"com.azure.core.amqp.implementation.handler.SendLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
processOnClose connectionId[MF_f6aa5b_1601880891022], linkName[cbs:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5, Timestamp:2020-10-05T13:44:05]","logger_name":"com.azure.core.amqp.implementation.handler.SendLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
Retry #1. Transient error occurred. Retrying after 4511 ms.\nThe connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5, Timestamp:2020-10-05T13:44:05, errorContext[NAMESPACE: iothub-ns-ssf-iothub-3802138-ca4b082f0a.servicebus.windows.net, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 66]","logger_name":"class com.azure.core.amqp.implementation.RequestResponseChannel","thread_name":"single-1","level":"WARN","level_value":30000}
Exception in RequestResponse links. Disposing and clearing unconfirmed sends.\nThe connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5, Timestamp:2020-10-05T13:44:05, errorContext[NAMESPACE:iothub-ns-ssf-iothub-3802138-ca4b082f0a.servicebus.windows.net, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 66]","logger_name":"class com.azure.core.amqp.implementation.RequestResponseChannel","thread_name":"single-1","level":"ERROR","level_value":40000}
onLinkRemoteClose connectionId[MF_f6aa5b_1601880891022], linkName[cbs:receiver], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5, Timestamp:2020-10-05T13:44:05]","logger_name":"com.azure.core.amqp.implementation.handler.ReceiveLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
processOnClose connectionId[MF_f6aa5b_1601880891022], linkName[cbs:receiver], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5, Timestamp:2020-10-05T13:44:05]","logger_name":"com.azure.core.amqp.implementation.handler.ReceiveLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionRemoteClose connectionId[cbs-session], entityName[MF_f6aa5b_1601880891022], condition[Error{condition=null, description='null', info=null}]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionRemoteClose closing a local session for connectionId[MF_f6aa5b_1601880891022], entityName[cbs-session], condition[null], description[null]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
connectionId[MF_f6aa5b_1601880891022] sessionName[cbs-session]: Complete. Removing and disposing session.","logger_name":"com.azure.core.amqp.implementation.ReactorConnection","thread_name":"single-1","level":"INFO","level_value":20000}
sessionId[cbs-session]: Disposing of session.","logger_name":"com.azure.core.amqp.implementation.ReactorSession","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionRemoteClose connectionId[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/1], entityName[MF_f6aa5b_1601880891022], condition[Error{condition=null, description='null', info=null}]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionRemoteClose closing a local session for connectionId[MF_f6aa5b_1601880891022], entityName[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/1], condition[null], description[null]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
connectionId[MF_f6aa5b_1601880891022] sessionName[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/1]: Complete. Removing and disposing session.","logger_name":"com.azure.core.amqp.implementation.ReactorConnection","thread_name":"single-1","level":"INFO","level_value":20000}
sessionId[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/1]: Disposing of session.","logger_name":"com.azure.core.amqp.implementation.ReactorSession","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionRemoteClose connectionId[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/2], entityName[MF_f6aa5b_1601880891022], condition[Error{condition=null, description='null', info=null}]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionRemoteClose closing a local session for connectionId[MF_f6aa5b_1601880891022], entityName[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/2], condition[null], description[null]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
connectionId[MF_f6aa5b_1601880891022] sessionName[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/2]: Complete. Removing and disposing session.","logger_name":"com.azure.core.amqp.implementation.ReactorConnection","thread_name":"single-1","level":"INFO","level_value":20000}
sessionId[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/2]: Disposing of session.","logger_name":"com.azure.core.amqp.implementation.ReactorSession","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionRemoteClose connectionId[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/3], entityName[MF_f6aa5b_1601880891022], condition[Error{condition=null, description='null', info=null}]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionRemoteClose closing a local session for connectionId[MF_f6aa5b_1601880891022], entityName[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/3], condition[null], description[null]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
connectionId[MF_f6aa5b_1601880891022] sessionName[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/3]: Complete. Removing and disposing session.","logger_name":"com.azure.core.amqp.implementation.ReactorConnection","thread_name":"single-1","level":"INFO","level_value":20000}
sessionId[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/3]: Disposing of session.","logger_name":"com.azure.core.amqp.implementation.ReactorSession","thread_name":"single-1","level":"INFO","level_value":20000}
onConnectionRemoteClose hostname[iothub-ns-ssf-iothub-3802138-ca4b082f0a.servicebus.windows.net:443], connectionId[MF_f6aa5b_1601880891022], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5, Timestamp:2020-10-05T13:44:05]","logger_name":"com.azure.core.amqp.implementation.handler.ConnectionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
namespace[iothub-ns-ssf-iothub-3802138-ca4b082f0a.servicebus.windows.net] entityPath[ssf-iothub-dev]: Channel is closed.","logger_name":"com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor","thread_name":"single-1","level":"INFO","level_value":20000}
connectionId[MF_f6aa5b_1601880891022]: Disposing of connection.","logger_name":"com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection","thread_name":"single-1","level":"INFO","level_value":20000}
connectionId[MF_f6aa5b_1601880891022]: Disposing of ReactorConnection.","logger_name":"com.azure.core.amqp.implementation.ReactorConnection","thread_name":"single-1","level":"INFO","level_value":20000}
Shutdown received: ReactorExecutor.close() was called., isTransient[false], initiatedByClient[true]","logger_name":"com.azure.core.amqp.implementation.AmqpExceptionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
Retry #1. Requesting from upstream.","logger_name":"class com.azure.core.amqp.implementation.RequestResponseChannel","thread_name":"parallel-1","level":"INFO","level_value":20000}
namespace[MF_f6aa5b_1601880891022] entityPath[$cbs]: Connection not requested, yet. Requesting one.","logger_name":"class com.azure.core.amqp.implementation.RequestResponseChannel","thread_name":"parallel-1","level":"INFO","level_value":20000}
connectionId[MF_f6aa5b_1601880891022]: Connection is disposed. Cannot get CBS node.","logger_name":"com.azure.core.amqp.implementation.ReactorConnection","thread_name":"parallel-1","level":"ERROR","level_value":40000}
Emitting new response channel. connectionId: MF_f6aa5b_1601880891022. entityPath: $cbs. linkName: cbs.","logger_name":"com.azure.core.amqp.implementation.ReactorConnection","thread_name":"parallel-1","level":"INFO","level_value":20000}
namespace[MF_f6aa5b_1601880891022] entityPath[$cbs]: Setting next AMQP channel.","logger_name":"class com.azure.core.amqp.implementation.RequestResponseChannel","thread_name":"parallel-1","level":"INFO","level_value":20000}
namespace[MF_f6aa5b_1601880891022] entityPath[$cbs]: Next AMQP channel received, updating 0 current subscribers","logger_name":"class com.azure.core.amqp.implementation.RequestResponseChannel","thread_name":"parallel-1","level":"INFO","level_value":20000}
Unable to acquire dispose reactor semaphore within timeout.","logger_name":"com.azure.core.amqp.implementation.ReactorExecutor","thread_name":"single-1","level":"INFO","level_value":20000}
onLinkLocalClose connectionId[MF_f6aa5b_1601880891022], linkName[cbs:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:549ad803b097405ba863949c809695bd_G20, SystemTracker:gateway5, Timestamp:2020-10-05T13:44:05]","logger_name":"com.azure.core.amqp.implementation.handler.SendLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onLinkLocalClose connectionId[MF_f6aa5b_1601880891022], linkName[cbs:receiver], errorCondition[null], errorDescription[null]","logger_name":"com.azure.core.amqp.implementation.handler.ReceiveLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onConnectionLocalClose hostname[iothub-ns-ssf-iothub-3802138-ca4b082f0a.servicebus.windows.net:443], connectionId[MF_f6aa5b_1601880891022], errorCondition[null], errorDescription[null]","logger_name":"com.azure.core.amqp.implementation.handler.ConnectionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onConnectionUnbound hostname[iothub-ns-ssf-iothub-3802138-ca4b082f0a.servicebus.windows.net:443], connectionId[MF_f6aa5b_1601880891022], state[CLOSED], remoteState[CLOSED]","logger_name":"com.azure.core.amqp.implementation.handler.ConnectionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onLinkFinal connectionId[MF_f6aa5b_1601880891022], linkName[mgmt:sender]","logger_name":"com.azure.core.amqp.implementation.handler.SendLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onLinkFinal connectionId[MF_f6aa5b_1601880891022], linkName[mgmt:receiver]","logger_name":"com.azure.core.amqp.implementation.handler.ReceiveLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionFinal connectionId[MF_f6aa5b_1601880891022], entityName[mgmt-session], condition[null], description[null]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionFinal connectionId[MF_f6aa5b_1601880891022], entityName[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/0], condition[null], description[null]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onLinkFinal connectionId[MF_f6aa5b_1601880891022], linkName[cbs:sender]","logger_name":"com.azure.core.amqp.implementation.handler.SendLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onLinkFinal connectionId[MF_f6aa5b_1601880891022], linkName[cbs:receiver]","logger_name":"com.azure.core.amqp.implementation.handler.ReceiveLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionFinal connectionId[MF_f6aa5b_1601880891022], entityName[cbs-session], condition[null], description[null]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionFinal connectionId[MF_f6aa5b_1601880891022], entityName[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/1], condition[null], description[null]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionFinal connectionId[MF_f6aa5b_1601880891022], entityName[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/2], condition[null], description[null]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionFinal connectionId[MF_f6aa5b_1601880891022], entityName[ssf-iothub-dev/ConsumerGroups/$Default/Partitions/3], condition[null], description[null]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onLinkFinal connectionId[MF_f6aa5b_1601880891022], linkName[cbs:sender]","logger_name":"com.azure.core.amqp.implementation.handler.SendLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onLinkFinal connectionId[MF_f6aa5b_1601880891022], linkName[cbs:receiver]","logger_name":"com.azure.core.amqp.implementation.handler.ReceiveLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onSessionFinal connectionId[MF_f6aa5b_1601880891022], entityName[cbs-session], condition[null], description[null]","logger_name":"com.azure.core.amqp.implementation.handler.SessionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
connectionId[MF_f6aa5b_1601880891022] sessionName[cbs-session]: Complete. Removing and disposing session.","logger_name":"com.azure.core.amqp.implementation.ReactorConnection","thread_name":"single-1","level":"INFO","level_value":20000}
sessionId[cbs-session]: Disposing of session.","logger_name":"com.azure.core.amqp.implementation.ReactorSession","thread_name":"single-1","level":"INFO","level_value":20000}
onConnectionFinal hostname[iothub-ns-ssf-iothub-3802138-ca4b082f0a.servicebus.windows.net:443], connectionId[MF_f6aa5b_1601880891022], errorCondition[null], errorDescription[null]","logger_name":"com.azure.core.amqp.implementation.handler.ConnectionHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onLinkLocalClose connectionId[MF_f6aa5b_1601880891022], linkName[cbs:sender], errorCondition[null], errorDescription[null]","logger_name":"com.azure.core.amqp.implementation.handler.SendLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
onLinkLocalClose connectionId[MF_f6aa5b_1601880891022], linkName[cbs:receiver], errorCondition[null], errorDescription[null]","logger_name":"com.azure.core.amqp.implementation.handler.ReceiveLinkHandler","thread_name":"single-1","level":"INFO","level_value":20000}
connectionId[MF_f6aa5b_1601880891022], message[Processing all pending tasks and closing old reactor.]","logger_name":"com.azure.core.amqp.implementation.ReactorExecutor","thread_name":"single-1","level":"INFO","level_value":20000}
onTransportError hostname[iothub-ns-ssf-iothub-3802138-ca4b082f0a.servicebus.windows.net:443], connectionId[MF_f6aa5b_1601880891022], error[Broken pipe]","logger_name":"com.azure.core.amqp.implementation.handler.ConnectionHandler","thread_name":"single-1","level":"WARN","level_value":30000}
connectionId[MF_f6aa5b_1601880891022], message[scheduleCompletePendingTasks - exception occurred while processing events.\njava.lang.IllegalStateException\norg.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:112)\norg.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)\norg.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)\ncom.azure.core.amqp.implementation.ReactorExecutor.lambda$scheduleCompletePendingTasks$0(ReactorExecutor.java:156)\nreactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\nreactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\njava.base/java.util.concurrent.FutureTask.run(Unknown Source)\njava.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\njava.base/java.lang.Thread.run(Unknown Source)Cause: null\norg.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54)\norg.apache.qpid.proton.engine.impl.TransportImpl.unbind(TransportImpl.java:315)\ncom.azure.core.amqp.implementation.handler.ConnectionHandler.onTransportError(ConnectionHandler.java:162)\norg.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:191)\norg.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)\norg.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)\norg.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)\ncom.azure.core.amqp.implementation.ReactorExecutor.lambda$scheduleCompletePendingTasks$0(ReactorExecutor.java:156)\nreactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\nreactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\njava.base/java.util.concurrent.FutureTask.run(Unknown Source)\njava.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\njava.base/java.lang.Thread.run(Unknown Source)]","logger_name":"com.azure.core.amqp.implementation.ReactorExecutor","thread_name":"single-1","level":"WARN","level_value":30000}
connectionId[MF_f6aa5b_1601880891022], message[Stopping the reactor because thread was interrupted or the reactor has no more events to process.]","logger_name":"com.azure.core.amqp.implementation.ReactorExecutor","thread_name":"single-1","level":"INFO","level_value":20000}

@conniey
Copy link
Member

conniey commented Oct 6, 2020

This looks like an issue that @srnagar and @YijunXieMS have resolved which should be fixed in our October release. Can you confirm @YijunXieMS ?

@conniey
Copy link
Member

conniey commented Oct 6, 2020

  • Never mind, it seems like a difference in root causes. I'll look into this. Are you using EventConsumerAsyncClient.receiveAllPartitions() ?
  • Yes. EventProcessorClient is the production level one you should be using. It offers connection recovery and load balancing.
  • Is your processing logic for each event long running? (ie. how long does it execute for?)

@mkemmerz
Copy link
Author

mkemmerz commented Oct 7, 2020

  • We are using EventHubConsumerAsyncClient.receive(false).subscribe()
  • I switched to EventProcessorClient and the application runs without issues for now. It recovers from issues.
  • The logic for each event runs between 700ms and 1100ms

@srnagar
Copy link
Member

srnagar commented Oct 20, 2020

Looks like switching to EventProcessorClient resolved the issue. Closing this issue.

@srnagar srnagar closed this as completed Oct 20, 2020
@ramya-rao-a ramya-rao-a added the needs-author-feedback Workflow: More information is needed from author to address the issue. label Feb 22, 2021
@lahmXu
Copy link

lahmXu commented Apr 19, 2021

Usage of EventProcessorClient, you can refer to the following links:

  1. create EventProcessorClient
  2. SampleCheckpointStore.java

openapi-sdkautomation bot pushed a commit to AzureSDKAutomation/azure-sdk-for-java that referenced this issue Sep 23, 2021
Add new property for MetricSpecification (Azure#15926)
@github-actions github-actions bot locked and limited conversation to collaborators Apr 12, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs needs-author-feedback Workflow: More information is needed from author to address the issue. pillar-reliability The issue is related to reliability, one of our core engineering pillars. (includes stress testing) question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Projects
None yet
Development

No branches or pull requests

6 participants