Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] EventHub Consumer stops consuming messages until we restart #18070

Closed
2 of 3 tasks
ravikumargarlapati opened this issue Dec 10, 2020 · 24 comments
Closed
2 of 3 tasks
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Milestone

Comments

@ravikumargarlapati
Copy link

ravikumargarlapati commented Dec 10, 2020

Describe the bug
EventHub consumer stops consuming messages until we restart the consumer. Have deployed my consumer in Azure Kubernetes (AKS). Initially, it consumes a few messages, all of a sudden it stops consuming. If we restart the consumer, it works like charm. Until we restart it, all the messages sit in EH. Even it is 2-3 days, none of the messages is consumed.

Exception or Stack Trace
This is the consumer log just captured before restarting:
consumerlogmessages.txt
Picked up this stack trace from log:

2020-12-03 14:56:02.126 �[1;31m[ERROR]�[0;39m [Loggers$Slf4JLogger:319] Scheduler worker in group main failed with an uncaught exception java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 60000ms in 'takeUntil' (and no fallback has been configured)
	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:289)
	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:274)
	at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:396)

To Reproduce
This has happened very often.

Code Snippet

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.roo.connect.soa.config.IEventHubReceiverConfig;
import com.roo.connect.soa.log.ServiceLogger;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class IEventHubReceiver implements Runnable {

    private EventProcessorClient eventProcessorClient;
    private String consumerGroupName="";
    private static final String IEVENTHUBRECIEVER = "IEventHubReceiver : ";

    @Autowired
    private ServiceLogger log;

    public IEventHubReceiver(IEventHubReceiverConfig config){

        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
                .connectionString(config.getStorageConnectionString())
                .containerName(config.getStorageContainerName())
                .buildAsyncClient();

        eventProcessorClient = new EventProcessorClientBuilder()
                .consumerGroup(config.getConsumerGroup())
                .connectionString(config.getConnectionString())
                .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
                .processEvent(eventContext -> {
                    EventData eventData = eventContext.getEventData();
                    String receivedData = eventData.getBodyAsString();
                    consumerGroupName = config.getConsumerGroup();
                    log.info("IEventHubReceiver - received from : " + consumerGroupName + " : receivedData : " + receivedData);
                    onEvents(eventData);
                    if(isAlive()) {
                        //if onEvent is success, context updates the 'checkpoint'
                        eventContext.updateCheckpoint();
                    }
                })
                .processError(errorContext -> {
                    log.error(IEVENTHUBRECIEVER + consumerGroupName + " - Error occurred while processing events :: " + errorContext.getThrowable().getMessage());
                })
                .buildEventProcessorClient();
    }

    //This method is implemented in child classes with the logic to process the eventData
    public abstract void onEvents(EventData eventData);

    public void stop() {

        eventProcessorClient.stop();
        log.info(IEVENTHUBRECIEVER + consumerGroupName + " - Client stopped...");
    }

    public boolean isAlive() {

        boolean isAlive = eventProcessorClient.isRunning();
        log.info(IEVENTHUBRECIEVER + consumerGroupName + " - isAlive:: " + isAlive);
        return isAlive;
    }

    @Override
    public void run() {

        // This will start the processor. It will start processing events from all partitions.
        eventProcessorClient.start();
        log.info(IEVENTHUBRECIEVER + consumerGroupName + " - Client started...");
    }
}

Expected behavior
I could see a few connection exception errors in logs, but I am wondering why it stopped consuming messages forever. Immediately after restarting messages started consuming.

Setup (please complete the following information):

  • OS: Ubuntu (AKS)
  • IDE : IntelliJ
  • Version of the Library used
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
      <version>1.3.1</version>
    </dependency>

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added
@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Dec 10, 2020
@alzimmermsft alzimmermsft added Client This issue points to a problem in the data-plane of the library. Event Hubs labels Dec 10, 2020
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Dec 10, 2020
@puffei
Copy link

puffei commented Dec 14, 2020

We met with the same problem. We even set the RetryOptions to make it keep trying when error happens. While our log shows the same exception periodically throws out and for several days no message was received. When we restart the application, the message can be consumed normally again.

@ravikumargarlapati
Copy link
Author

Could anyone throw some light here, please?

@conniey
Copy link
Member

conniey commented Feb 17, 2021

In 5.3.0, we added a watchdog functionality that would check to see if connection is alive. If it is not, we’ll return the partition to the pool. That way, another processor can reclaim it and begin processing again.

https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md#530-2020-10-12

@ravikumargarlapati
Copy link
Author

ravikumargarlapati commented Feb 18, 2021

Thanks for your information.

we are already using 5.3.1 version as shown below, still, we face this issue.

    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
      <version>1.3.1</version>
    </dependency>

@meetamitbhatia
Copy link

meetamitbhatia commented Feb 19, 2021

We've another customer who has been able to repro the same issue with latest version 5.5.0. They could repro the issue by disconnecting network for couple of mins and then connecting back.
We see SDK is able to take ownership of partition back, however keeps on failing with TimeoutException until application is restarted,

2021-02-16 18:56:47.462 [reactor-http-kqueue-4] [INFO] com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - Starting next iteration of load balancer
2021-02-16 18:56:47.462 [reactor-http-kqueue-4] [INFO] com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - Number of ownership records 10, number of partitions 10
2021-02-16 18:56:47.462 [reactor-http-kqueue-4] [INFO] com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - Number of active ownership records 10
2021-02-16 18:56:47.462 [reactor-http-kqueue-4] [DEBUG] com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - Current partition distribution 570672db-f55a-4bed-ad0b-f50d447a02e0=[0,1,2,3,4,5,6,7,8,9]
2021-02-16 18:56:47.462 [reactor-http-kqueue-4] [INFO] com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - Number of active event processors 1
2021-02-16 18:56:47.462 [reactor-http-kqueue-4] [INFO] com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - Expected min partitions per event processor = 10, expected number of event processors with additional partition = 0
2021-02-16 18:56:47.462 [reactor-http-kqueue-4] [INFO] com.azure.messaging.eventhubs.PartitionBasedLoadBalancer - Load is balanced with this event processor owning 10 partitions
****```
2021-02-16 18:56:49.233 [single-1] [INFO] com.azure.core.amqp.implementation.ReactorExecutor - Unable to acquire dispose reactor semaphore within timeout.
2021-02-16 18:56:49.233 [single-1] [DEBUG] com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor - Attempted 1 times to get a new AMQP connection
2021-02-16 18:56:49.233 [single-1] [WARN] com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor - Retry #1. Transient error occurred. Retrying after 800 ms.
Operation timed out, errorContext[NAMESPACE: s00199-enscdx-pd-us-account-na1.servicebus.windows.net]
com.azure.core.amqp.exception.AmqpException: Operation timed out, errorContext[NAMESPACE: xxxx-pd-us-account-na1.servicebus.windows.net]
```****

@conniey
Copy link
Member

conniey commented Feb 20, 2021

Related to #15976, #17568

@gandhirajan
Copy link

@conniey Exact same issue seen in azure-messaging-eventhubs dependency version - 5.3.1 Can we expect a fix on this shortly or any temporary workaround that you can suggest?

@joelw
Copy link

joelw commented Mar 10, 2021

It's not a very satisfactory workaround, but we track the time when we last received messages on each partition and trigger a restart using a liveness probe if too much time has elapsed since the last message. In Event Hubs with low message volumes this does cause a lot of unnecessary restarts, but this is far better than losing messages altogether.

@conniey
Copy link
Member

conniey commented Mar 10, 2021

So this isn't lost in the fray, we dug some more into this and the existing Track 1 library and noticed that our third party dependency doesn't always propagate to its children that the underlying transport is closed. It's possible our consumers believe they are still alive, even though the connection is not. I'm looking into a fix where we can propagate the connection error to all its children (receiver links) and close them.

@gandhirajan
Copy link

@conniey From the perspective of a consumer client application that continuously listens to the eventhub to process any incoming message, this is a super critical issue. Any message that is not able to be picked by the consumer client is basically a kind of data loss and processing resumes only after we do an application restart which we can't afford to do in production environment. Can we expect a fix for this issue anytime soon as we have a production release in week's time and this issue is a blocker for us?

@conniey
Copy link
Member

conniey commented Mar 11, 2021

@gandhirajan

Any message that is not able to be picked by the consumer client is basically a kind of data loss and processing resumes only after we do an application restart which we can't afford to do in production environment.

Can you explain this a little more? Event Hubs doesn't remove any events from the stream. Events leave the stream when your "Message Retention" policy has elapsed for your Event Hub. That's one of the reasons we have a durable store for checkpointing, so if your application restarts, you'll understand where in the stream you last processed a message from the hub.

@gandhirajan
Copy link

gandhirajan commented Mar 12, 2021

@conniey Hi, Thanks for the response. I do agree that the message is retained in event hub till the retention period is elapsed. But in case of near real time processing of events from eventhub, if the consumer client stops processing incoming messages till application restart then it's actually a kind of data loss for us.

In our use case, we are routing all the incoming messages to the IoT hubs as events to an external event hub. Also we have a spring boot consumer client application that listens to the external event hub and processes the incoming events to the eventhub near real time. The processing of incoming events is time bound as it has IoT device info at that point in time and actually it's obsolete to process these messages at later point in time. So in our use case, we can't afford to stop processing incoming messages to eventhub abruptly. Can we expect a SDK fix for this anytime soon?

@gandhirajan
Copy link

@conniey Hi, I could see a merged PR - #19585 related to this issue. Can we expect a fix for this issue in the next version?

@laosiaudi
Copy link

laosiaudi commented Mar 15, 2021

So this isn't lost in the fray, we dug some more into this and the existing Track 1 library and noticed that our third party dependency doesn't always propagate to its children that the underlying transport is closed. It's possible our consumers believe they are still alive, even though the connection is not. I'm looking into a fix where we can propagate the connection error to all its children (receiver links) and close them.

@conniey If the connection is not alive but the consumers still hold it, will the server detect it and reassign the partition to other consumers?

@jyyy-57
Copy link

jyyy-57 commented Apr 8, 2021

Hey @conniey, could you please share updates on this issue? I believe we're facing a similar issue. We're using BlobContainerAsyncClient and EventProcessorClientBuilder. We have iot hub built-in endpoint. We need to keep eventProcessorClient open to receive the message in the built-in endpoint., but we notice that load balancing is done regularly, but until I restart the service it is not picking up new events. But it's hard to reproduce and debug, as it is intermittent and usually happens 10-14 days after we restart the service/client. Also, the log always shows "Load balancing completed successfully", even when it cannot pick any new messages.
the version:

                <dependency>
			<groupId>com.azure</groupId>
			<artifactId>azure-messaging-eventhubs</artifactId>
			<version>5.1.1</version>
		</dependency>
		<dependency>
			<groupId>com.azure</groupId>
			<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
			<version>1.1.1</version>
		</dependency>

@conniey
Copy link
Member

conniey commented Apr 8, 2021

Hey @jyyy-57 , the PR associated with this issue, #19924, we are currently stress testing for the next three days. We plan to merge it on Monday if all goes well.

@gandhirajan
Copy link

@conniey Hi, will the fix take care of automatically reestablishing the connection to the partition returned to the pool, or do the application need to implement any additional logic on top of this fix?

@conniey conniey added this to the [2021] April milestone Apr 9, 2021
@conniey
Copy link
Member

conniey commented Apr 9, 2021

@conniey Hi, will the fix take care of automatically reestablishing the connection to the partition returned to the pool, or do the application need to implement any additional logic on top of this fix?

Yes. The partition load balancer periodically checks for ownership and it'll notice that this partition is unclaimed, then will re claim it automatically.

@conniey
Copy link
Member

conniey commented Apr 13, 2021

We released 5.7.0 this morning. This should recover the consumer after closing when using the EventProcessorClient.

https://repo1.maven.org/maven2/com/azure/azure-messaging-eventhubs/5.7.0/
https://repo1.maven.org/maven2/com/azure/azure-messaging-eventhubs-checkpointstore-blob/1.6.0/

Cheers,
Connie

@jyyy-57
Copy link

jyyy-57 commented Apr 14, 2021

We released 5.7.0 this morning. This should recover the consumer after closing when using the EventProcessorClient.

https://repo1.maven.org/maven2/com/azure/azure-messaging-eventhubs/5.7.0/
https://repo1.maven.org/maven2/com/azure/azure-messaging-eventhubs-checkpointstore-blob/1.6.0/

Cheers,
Connie

@conniey Hi Conniey, I followed the instruction from https://docs.microsoft.com/en-us/java/api/overview/azure/messaging-eventhubs-readme?view=azure-java-stable, when I used the 5.7.0 azure-messagin-eventhubs, I failed to start my spring boot app, I got these errors, everything works fine when I use 5.5.0 azure-messagin-eventhubs. Could you please give me some suggestions? Thanks!

ava.lang.NoClassDefFoundError: reactor/core/publisher/Sinks
	at com.azure.core.amqp.implementation.ReactorConnection.<init>(ReactorConnection.java:51)
	at com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection.<init>(EventHubReactorAmqpConnection.java:64)
	at com.azure.messaging.eventhubs.EventHubClientBuilder.lambda$buildConnectionProcessor$0(EventHubClientBuilder.java:643)
	at reactor.core.publisher.FluxCreate$BaseSink.onRequest(FluxCreate.java:535)
	at reactor.core.publisher.FluxCreate$SerializedSink.onRequest(FluxCreate.java:263)
	at com.azure.messaging.eventhubs.EventHubClientBuilder.lambda$buildConnectionProcessor$1(EventHubClientBuilder.java:624)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:94)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8325)
	at reactor.core.publisher.Flux.subscribeWith(Flux.java:8494)
	at com.azure.messaging.eventhubs.EventHubClientBuilder.buildConnectionProcessor(EventHubClientBuilder.java:651)
	at com.azure.messaging.eventhubs.EventHubClientBuilder.buildAsyncClient(EventHubClientBuilder.java:555)
	at com.azure.messaging.eventhubs.EventProcessorClient.<init>(EventProcessorClient.java:89)
	at com.azure.messaging.eventhubs.EventProcessorClientBuilder.buildEventProcessorClient(EventProcessorClientBuilder.java:525)
	at com.MessageRoutingService.startService(MessageRoutingService.java:74)
	at MessageRoutingApplication.run(MessageRoutingApplication.java:23)
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:786)
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:776)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:322)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
	at MessageRoutingApplication.main(MessageRoutingApplication.java:17)
Caused by: java.lang.ClassNotFoundException: reactor.core.publisher.Sinks
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 21 common frames omitted

@gandhirajan
Copy link

gandhirajan commented Apr 14, 2021

@jyyy-57 Hi, We had to exclude the 'reactor-core' dependency from eventhub dependency and included 'reactor-core 3.4.3' to workaround this issue. Changes as follows:

  <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs</artifactId>
      <version>5.7.0</version>
      <exclusions>
          <exclusion>
              <artifactId>reactor-core</artifactId>
              <groupId>io.projectreactor</groupId>
          </exclusion>
      </exclusions>
  </dependency>
  
  <!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
  <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-core</artifactId>
      <version>3.4.5</version>
  </dependency>

@conniey, hope this will be taken care at SDK end.

@jyyy-57
Copy link

jyyy-57 commented Apr 14, 2021

Hi @gandhirajan, thank you so much!! Your method works! I can start the application now. Reactor-core might be the key to issues. However, I got another exception:java.lang.NoSuchMethodError: reactor.core.publisher.Mono.retry(Ljava/util/function/Predicate;)Lreactor/core/publisher/Mono;
So I tried to remove the 'exclusions' from the 5.7.0 dependency and add these to dependencies. It works fine for me, but not sure if it is a solution. @conniey could you please confirm? Thanks!

      <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.4.5</version>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty</artifactId>
            <version>0.9.15.RELEASE</version>
        </dependency>

@conniey
Copy link
Member

conniey commented Apr 14, 2021

@jyyy-57

java.lang.NoClassDefFoundError: reactor/core/publisher/Sinks

Usually NoClassDefFoundError is because there is a dependency mismatch. Spring Boot brings in reactor-core and so does our client library. You can understand what the dependency conflicts via mvn dependency:tree -Dverbose. Our Spring starters consume the Event Hubs library underneath the covers, and we try to ship on the same cadence.

/cc @saragluna may have more insights for when they will update with 5.7.0

@gandhirajan
Copy link

@jyyy-57, we are using spring boot 2.3.7.RELEASE and we faced issues only with reactor-core and not reactor-netty. As @conniey mentioned, this has to be worked out at the application end to sort of jar conflicts I guess.

@conniey conniey closed this as completed Apr 20, 2021
openapi-sdkautomation bot pushed a commit to AzureSDKAutomation/azure-sdk-for-java that referenced this issue Mar 24, 2022
SRP Swagger Updates for Sep21 [2021-09-01] Api Version GA (Azure#18070)

* Full copy from 2021-08-01 to 2021-09-01 without any changes

* Updating API version for Service types, Updating readme files

* Feature: Premium Access Tier for PBB accounts, with example, prettier and validation check done.

* Feature: Add LastTierChangeTime LCM Action support, Added Examples, Ran Prettier and Validation check

* Feature: Add LCM BaseBlob daysAfterCreationTime actions with example

* Feature : Async Sku Conversion Status Object for Customer controlled Migration and SCGRS operations

* Feature: AllowPermanentDelete property to allow deletion for soft deleted versions and snapshotr

* Linting S360 - Enum Mismatch, https://portal.azure-devex-tools.com/amekpis/correctness/detail?errorId=A2DB3DB8-3A59-44C9-85C1-7C9C66AED0AD, https://portal.azure-devex-tools.com/amekpis/correctness/detail?errorId=A1CA6F05-42DC-4D72-A0D6-928FB6BEEC54

* S365 Swagger Linting, Blob Invnetory, Add destination container back at policy level with deprecated comment.

* Move SignedIdentifiers to Common.json, Add SI to Table, Add Examples, Run prettier check and validation

* update the api version for the test

* S365: Return CMK version expiration time with example, Return TenantID for UA Identities, Update existing Example

* Feature: DnsEndpointType For Account Level PDNS

* SpellCheck, Move Location header to 202 response for existing example (CI CD check failure)

* Feature: Add ExcludePrefix, IncludeDeleted and Additional Scheams for BlobInventory.

* remove asunc location header from example, latest model validation requires it to be removed

* Account PDNS Updates Change AzureDnsZone type, Address comment for allowPermanentDelete

* update the description and x-ms-clientname for currentCmkVersion

* Update the Description for the DnsEndpointType.

* Add input parameter to the Table API

* Create separate definition for Table Signed Identifiers, Update Tests

* Feature: Change daysAfterLastTierChangeTimeGreaterThan To daysAfterLastTierChangeGreaterThan

* Add type=object and description for TableAccessPolicy, Update descriptions, Add the Async header back to the example.

* Spell correction for file shares

* Updating descriptions

* Remove tenantId from the UAI

* Change Header to Headers for failing check
@github-actions github-actions bot locked and limited conversation to collaborators Apr 12, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Projects
None yet
Development

No branches or pull requests

9 participants