Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling "poison" messages in event hubs #18850

Closed
remoba opened this issue Feb 18, 2021 · 12 comments
Closed

Handling "poison" messages in event hubs #18850

remoba opened this issue Feb 18, 2021 · 12 comments
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs needs-author-feedback Workflow: More information is needed from author to address the issue. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that

Comments

@remoba
Copy link

remoba commented Feb 18, 2021

Query/Question
I am trying to figure out how the event hub SDK responds to unhandled exceptions. My goal is to STOP processing any more events from a given partition if I fail to process a single event in it. I know event hub is probably really not suited for this, but I cannot tolerate any data loss in this data pipeline.

I've tried running the sample receiver in a local environment, and I've created a storage account and an event hub with 2 partitions. Then I sent 3 messages to that event hub's first partition (Partition 0).

My full code is below (with the resource identifiers and secrets removed):

using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;

namespace EventHubPlayground
{
    class Program
    {
        private const string ehubNamespaceConnectionString = "EH_CONNECTION_STRING";
        private const string eventHubName = "EH_NAME";
        private const string blobStorageConnectionString = "BLOB_CONNECTION_STRING";
        private const string blobContainerName = "BLOB_CONTAINER_NAME";

        public static async Task Main(string[] args)
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

            // Create a blob container client that the event processor will use 
            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);

            // Create an event processor client to process events in the event hub
            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);

            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;

            // Start the processing
            await processor.StartProcessingAsync();

            // Wait for 10 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(1000));

            // Stop the processing
            await processor.StopProcessingAsync();
        }

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            // Write the body of the event to the console window
            Console.WriteLine(
                "\tReceived event: {0}, Offset: {1}, SequenceNumber: {2}, PartitionId: {3}",
                Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()),
                eventArgs.Data.Offset,
                eventArgs.Data.SequenceNumber,
                eventArgs.Partition.PartitionId);

            if (eventArgs.Data.SequenceNumber == 1)
            {
                throw new Exception("Something bad happened");
            }

            if (eventArgs.Data.SequenceNumber == 2)
            {
                // Update checkpoint in the blob storage so that the app receives only new events the next time it's run
                await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
            }
        }

        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            // Write details about the error to the console window
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }
    }
}

I am trying to simulate a case where message #2 (SequenceNumber == 1) fails during processing, and throws an unhandled exception. I also only update the checkpoint after message #3 (SequenceNumber == 2) has finished processing.

The behavior I see when I run this sample locally is:

If I remove the checkpoint update entirely, I instead see the following behavior:

My problem here is that message #3 is being received after I've already threw an exception for message #2, so I can't simply update the checkpoint without risking data loss, but if I don't then I get message #3 again and again even though I've already processed it.

If my understanding of the code is correct, this is happening because the EventProcessorClient class is receiving batches, but not exposing them. Instead they are triggering my function 1 by 1, and only acknowledge any unhandled exceptions after the whole batch was processed, which in my case is too late.

Is there a different class I can use that will let me process an entire batch at a time instead of a single message? That way I could decide per batch that I do not want to update the checkpoint, and I will keep receiving the same batch over and over again.
I saw there's a generic EventProcessor under the Primitives namespace, but seeing as everything is internal I can't really implement it myself.

Environment:

  • Name and version of the Library package used: Azure.Messaging.EventHubs 5.2.0
  • Hosting platform or OS and .NET runtime version (dotnet --info output for .NET Core projects): Windows 10 .NET Core 3.1
  • IDE and version : Visual Studio 16.8.5
@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Feb 18, 2021
@Mohit-Chakraborty Mohit-Chakraborty added Client This issue points to a problem in the data-plane of the library. Event Hubs labels Feb 18, 2021
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Feb 18, 2021
@Mohit-Chakraborty
Copy link
Contributor

Thank you for your feedback. Tagging and routing to the team best able to assist.

@Mohit-Chakraborty Mohit-Chakraborty added the needs-team-triage Workflow: This issue needs the team to triage. label Feb 18, 2021
@jsquire jsquire self-assigned this Feb 18, 2021
@jsquire jsquire removed the needs-team-triage Workflow: This issue needs the team to triage. label Feb 18, 2021
@jsquire
Copy link
Member

jsquire commented Feb 18, 2021

Hi @remoba. Thank you for reaching out and we regret that you're experiencing difficulties. With respect to dealing with exceptions within your handlers, our guidance is documented here. The most relevant part is:

It is extremely important that you always guard against exceptions in your handler code; it is strongly recommended to wrap your 
entire handler in a try/catch block and ensure that you do not re-throw exceptions. The processor does not have enough understanding
of your handler code to determine the correct action to take in the face of an exception nor to understand whether it is safe to assume
that processing has not been corrupted. Any exceptions thrown from your handler will not be caught by the processor and will NOT be 
redirected to the error handler. This will typically cause processing for the partition to abort, and be restarted, but may also crash your 
application process.

The behavior that you're describing is expected. When your exception is thrown it goes unhandled and, in this case, crashes the task processing that partition. When the processor restarts the partition processing task, it reads the latest checkpoint and begins from the next event after it. If you want to treat a message as poison, you'll need to be sure to create a checkpoint based on it, so that it is skipped. I would strongly advise not letting that exception bubble, as the behavior is undefined and will vary by your host process, runtime version, and other environmental factors. Handle it in whatever way makes sense for your application, and let the processor continue on.

There is a base class, EventProcessor<TPartition>, that you can extend to process in batches, but that will not alter the behavior that you're seeing nor the guidance. The only difference is that when using EventProcessor<TPartition>, your event handler method will receive a batch of events rather than a single one. If it throws, the same pattern that I described above will occur. You'd still want to be sure to catch any exception, checkpoint the poison event, and let the processor continue on.

This blog article walks through creating a custom processor that is similar to EventProcessorClient but works in batches. Our Event Hubs samples area offers additional context and discussion around patterns and behavior of the EventProcessorClient that may be helpful.

@jsquire jsquire added the needs-author-feedback Workflow: More information is needed from author to address the issue. label Feb 18, 2021
@remoba
Copy link
Author

remoba commented Feb 19, 2021

Hey @jsquire, thanks for the quick response.

If I am reading this right then, I am basically never supposed to throw an exception. If my goal here is to not let any message go unprocessed, would you say I should just keep retrying the single message processing forever?
Just to confirm, were unhandled exceptions always undefined behavior, even in the older version of the SDK?

@ghost ghost added needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team and removed needs-author-feedback Workflow: More information is needed from author to address the issue. labels Feb 19, 2021
@jsquire
Copy link
Member

jsquire commented Feb 19, 2021

If I am reading this right then, I am basically never supposed to throw an exception.

Correct; None of the code used in your event handlers should throw unless you're confident in your host environment's behavior and are comfortable trusting it.

If my goal here is to not let any message go unprocessed, would you say I should just keep retrying the single message processing forever?

That kind of depends on your definition of "any message" and the sensitivity to ordering in your application. My assumption was that the poison message was something that you could not process and that you wanted to do some form of dead-lettering and then move onto the next message. In my head, the barebones form looks something like:

private static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
    try
    {
        // If there was no event, there's nothing to do; only
        // applicable if a max wait time was specified.

        if (!eventArgs.HasEvent)
        {
            return;
        }

         // Run your business logic, including any retries.  If you
         // identify the message as poison, throw a specific exception to
         // denote it.

         await ApplyBusinessLogicAsync(eventArgs.Data);
    }
    catch (PoisonMessageException)
    {    
           // Push the event to some form of storage to dead letter it.

           await SendToDeadLetterAsync(eventArgs.Data);
 
           // Checkpoint this event so that we don't see it again. 

           await eventArgs.UpdateCheckpointAsync();
     }
     catch (Exception ex)
     {
         // Decide here if the unexpected exception was safe to continue from or 
         // take some action to fail fast.  In this case, I'll tear down the process and
         // assume an orchestrator is monitoring to restart.

        if (!TryHandleProcessingException(args, ex))
        {
            Environment.Exit(-1);
        }
     }
}

If you're looking to stop the world on a poison message, it get a bit more involved since you can't directly call StopProcessing or StopProcessingAsync in the event handler without causing a deadlock. The common approach for this is to signal a cancellation token that is honored in by the application host. In my head, the barebones looks something like:

public class AppHost
{
    private volatile bool allowProcessingEvents = true;
    
    public async Task Run()
    {
        // This token is used by the host to control when to stop processing.

        using var hostCancellationSource = new CancellationTokenSource();

        async Task processEventHandler(ProcessEventArgs eventArgs)
        {
            try
            {
                if ((args.CancellationToken.IsCancellationRequested)
                    || (!allowProcessingEvents)
                    || (!eventArgs.HasEvent))
                {
                    return;
                }
                
                 // I am deliberately not honoring the cancellation token from
                 // the args when applying logic here.  Because timing is 
                 // non-deterministic, you may see events be dispatched 
                 // while cancellation is being performed when the processor 
                 // is stopped.
        
                 await ApplyBusinessLogicAsync(eventArgs.Data);
            }
             catch (Exception ex)
             {
                 // You may or may not want to checkpoint, depending on whether
                 // you want to skip this event in the future.
        
                 allowProcessingEvents = false;
        
                 // Cancellation the host source will trigger a request to stop
                 // all processing.  Additional events may be dispatched during this
                 // time.
        
                 hostCancellationSource.Cancel();
        
                 // I'm assuming that you'd want to do some form of
                 // exception handling to log.
        
                 HandleProcessingException(args, ex);
             }
        }
        
        try
        {
            // The error handler is not relevant for this sample; for
            // illustration, using a fake stub.
        
            processor.ProcessEventAsync += processEventHandler;
            processor.ProcessErrorAsync += someFakeErrorHandler;
        
            try
            {
                await processor.StartProcessingAsync(hostCancellationSource.Token);
                await Task.Delay(Timeout.Infinite, hostCancellationSource.Token);
            }
            catch (TaskCanceledException)
            {
                // This is expected
            }
            finally
            {
                await processor.StopProcessingAsync();
            }
        }
        catch
        {
            // If this block is invoked, then something external to the
            // processor was the source of the exception.
        }
        finally
        {
           processor.ProcessEventAsync -= processEventHandler;
           processor.ProcessErrorAsync -= someFakeErrorHandler;
        }
    }
}

Just to confirm, were unhandled exceptions always undefined behavior, even in the older version of the SDK?

Not in the same manner. The legacy SDK passed exceptions in developer-provided code to the error handler in the IEventProcessor instance and allowed processing to continue. However, event processing was often stateful in the IEventProcessor instance and was left in an inconsistent or corrupted state after an exception occurred.

When we were in the design phase for the new library, we found is that it was not common for developers to implement the optional error handler and perform a state consistency check as part of that handler. That led to exceptions going unobserved, and unexpected behavior during processing that was potentially causing data corruption while the processor continued on.

That led to the decision that because the processor lacks enough knowledge to understand whether it is safe to continue in the face of an error in developer-provided code, it was better to fail fast and allow the application host environment to control exception behavior. For most host environments, the task responsible for processing the partition is faulted, partition processing crashes and is restarted.

To be transparent, this was a difficult decision with a fair amount of contention. My personal preference would have been to provide more determinism and guarantee the "just crash the partition task", but there were other viewpoints advocating to allow for container-based hosts to crash the process and allow the orchestrator to maintain control.

@jsquire jsquire added needs-author-feedback Workflow: More information is needed from author to address the issue. and removed needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team labels Feb 19, 2021
@remoba
Copy link
Author

remoba commented Feb 21, 2021

@jsquire Thank you very much for the detailed response.

Going with the "retry-forever" approach, is there any risk of having my checkpoint blob lease expire during my retries? Because I would essentially not be updating the checkpoint for a very long time right?

So if I retry such a message for 5 minutes (because my dead letter queue mechanism is experiencing an outage), when I eventually do update the checkpoint, can it fail?

@ghost ghost added needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team and removed needs-author-feedback Workflow: More information is needed from author to address the issue. labels Feb 21, 2021
@jsquire
Copy link
Member

jsquire commented Feb 21, 2021

The TLDR version is that you're safe to retry forever. Until your handler returns, no other events from that partition will be dispatched for processing and no other processor will attempt to steal that partition.

An important callout is that your handler will still be called concurrently for events in other partitions, but only one event is ever being processed from any partition at a time.

One of the things that we did in the new version of the SDK was to split the concept of checkpointing and ownership. Partition ownership is managed by the processor as part of it's load balancing cycles and will continue to refresh ownership timestamps regularly regardless of whether you're writing checkpoints. The load balancing algorithm has also changed quite a bit to prevent "partition bouncing" when processors stole from one another. In the new approach, there are only two scenarios where you'll see you'll see a partition stolen:

  • When you're scaling up processors and partitions are redistributed to ensure that each processor has its fair share of the work.

  • If a processor has crashed or cannot talk to storage and the ownership record for every partition that it owns expires. This causes the processor to appear dead to the other processors and its partitions abandoned, so they'll redistribute the work. So long as the processor is seen to own any single partition, it will continue to own the partitions that it had claimed unless it willfully relinquishes the claim (which we currently don't do)

@jsquire jsquire added needs-author-feedback Workflow: More information is needed from author to address the issue. and removed needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team labels Feb 21, 2021
@remoba
Copy link
Author

remoba commented Feb 22, 2021

Thanks @jsquire, that clarifies things quite a bit.
Hopefully last questions:

  1. You mentioned that a partition might get taken by another instance when scaling, which makes perfect sense. Just wondering what's the expected behavior here, If the original owner of the partition was still processing a message at the time. When the original processor instance tries to update the checkpoint, will it receive an exception? Or maybe partition redistribution waits somehow for a checkpoint to occur before taking a partition?
  2. Can I assume this ownership update will not be blocked by my "running forever" threads? Does it happen in another thread on a timer?

@ghost ghost added needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team and removed needs-author-feedback Workflow: More information is needed from author to address the issue. labels Feb 22, 2021
@jsquire
Copy link
Member

jsquire commented Feb 22, 2021

  1. When a partition is stolen, the new owner will begin reading from the last recorded checkpoint and will inform the Event Hubs service that it would like exclusive access to that partition. The old owner will not understand that ownership has changed until it attempts to read the next set of events from the Event Hubs service.

    The old owner will continue processing any event that is active in the handler. It will also dispatch any events that were previously read and are held in its memory buffer. As a result, you will see some duplicate events when partitions are stolen, the number of which will differ depending on the configuration of your processor and your checkpointing strategy.

    (The CacheEventCount option is what controls the size of this memory cache.)

  2. Correct; load balancing runs as a dedicated background task, as does each individual partition that is owned. The net results is that anything that your handler is doing impacts only that partition. Other partitions and the core operations of the processor are isolated.

@jsquire jsquire added needs-author-feedback Workflow: More information is needed from author to address the issue. and removed needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team labels Feb 22, 2021
@remoba
Copy link
Author

remoba commented Feb 23, 2021

@jsquire Regarding #1, so is this flow a possibility? Just to verify I understood correctly.

  1. Instance A is handling Partition X, the checkpoint is at sequence number 10 and it is currently handling a batch
  2. Instance B launches, it decides to take Partition X from Instance A. It starts at sequence number 10, and gets a batch with some of the same messages as Instance A is working on.
  3. Instance A finishes and successfully updates the checkpoint.
  4. Instance B finishes and successfully updates the checkpoint
  5. Instance A tries to read the next batch and fails, then forfeits ownership of partition X.
  6. Instance B continues processing partition X

@ghost ghost added needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team and removed needs-author-feedback Workflow: More information is needed from author to address the issue. labels Feb 23, 2021
@jsquire
Copy link
Member

jsquire commented Feb 23, 2021

Hi @remoba. You're correct; that flow is entirely possible.

The big call-out for me is what you've numbered 3 and 4 since the ordering could be transposed and could "rewind" the checkpoint to an earlier point in time. That would potentially lead to further duplication if additional scaling or a processor crash happens at that moment in time. This is an unfortunate side-effect of trading off a more complicated and robust load balancing mechanism for one that is simpler, lighter weight, and more performant.

This is the part where I make a gentle reminder that it is highly recommended that you ensure that your processing is resilient to event duplication in whatever way is appropriate for your application scenarios. Because Event Hubs itself offers an at-least-once delivery guarantee, this is important even in cases where you're not using the processor or have extended it with a more robust load balancing mechanism.

@jsquire jsquire added the needs-author-feedback Workflow: More information is needed from author to address the issue. label Feb 23, 2021
@ghost ghost removed the needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team label Feb 23, 2021
@remoba
Copy link
Author

remoba commented Feb 23, 2021

@jsquire Gotcha, to clarify - I wasn't raising this as a problem, just wanted to verify that the checkpoint will indeed successfully update for both instances in this race.

Thank you very much for all of your time and answers, I will be closing this thread now.

@remoba remoba closed this as completed Feb 23, 2021
@jsquire
Copy link
Member

jsquire commented Feb 23, 2021

My pleasure; please feel free to reach out in the future if you run into anything further.

@github-actions github-actions bot locked and limited conversation to collaborators Mar 28, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs needs-author-feedback Workflow: More information is needed from author to address the issue. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Projects
None yet
Development

No branches or pull requests

3 participants