Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Deadlock when sarama commit an out of range offset #121

Closed
plopik opened this issue Apr 7, 2017 · 6 comments
Closed

Deadlock when sarama commit an out of range offset #121

plopik opened this issue Apr 7, 2017 · 6 comments

Comments

@plopik
Copy link

plopik commented Apr 7, 2017

I'm currently using sarama-cluster and I have an issue where my consumer would not consume events fast enough and would fall behind to the point of trying to commit offsets that were already deleted by Kafka. The current behavior of sarama cluster is to close the consumer on the concerned partitions and deadlock.

func (bc *brokerConsumer) handleResponses() {
    ...
        switch result {
        ...
        case ErrOffsetOutOfRange:
            // there's no point in retrying this it will just fail the same way again
            // shut it down and force the user to choose what to do
            child.sendError(result)
            Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
            close(child.trigger)
            delete(bc.subscriptions, child)
        case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable:
            // not an error, but does need redispatching
            Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
                bc.broker.ID(), child.topic, child.partition, result)
            child.trigger <- none{}
            delete(bc.subscriptions, child)
        ...
        }
    }
}

In the handleResponse function of sarama the choice has been made to kill the child partitionConsumer in case of an ErrOffsetOutOfRange error and send the error upward.

Sarama-cluster hence kill its own partitionConsumer and send the same Error upward.
As of right now the user only have the information "ErrOffsetOutOfRange" that could happen at several places around the code and the other partitionConsumers keep on going like nothing happened.

It feels like in this case sarama-cluster could either

  • Retry to open a PartitionConsumer using the current strategy oldest or newest messages (as it does for out of range offsets on new consumers)
  • Close the whole client down (as offsets are handled by sarama-cluster and this error should not happen under normal conditions)
  • Send an other error stating that this particuliar partitionConsumer has been unexpectedly closed and let the user decide what to do (Close or keep going)
  • Rebalance the partitions (that would effectively reopen the partitionConsumer)
@dim
Copy link
Member

dim commented Apr 10, 2017

Strange, I thought we are covered for that, see https://github.com/bsm/sarama-cluster/blob/master/partitions.go#L24

@plopik
Copy link
Author

plopik commented Apr 10, 2017

the same error is actually sent at differents times, here we test errors for newPartitiion but the error I'm talking about is actally handle here https://github.com/bsm/sarama-cluster/blob/master/partitions.go#L55 (we log the error but the partitionConsumer is never closed)

@dim
Copy link
Member

dim commented Apr 11, 2017 via email

@dim
Copy link
Member

dim commented Apr 19, 2017

Hey, sorry, but I cannot recreate the problem, could you please do me a favour and:

  1. set config.Consumer.Return.Errors = true
  2. iterate over consumer.Errors() in a background goroutine and fmt.Sprinf("%T %#v\n", err, err)

Thanks

@dim
Copy link
Member

dim commented May 31, 2017

I will close this one for now, please re-open if you can find a way to re-create it consistently

@dim dim closed this as completed May 31, 2017
@aravindvs
Copy link

I am reopening this one as I saw a similar behavior:
(1) Producer was producing at a much higher rate than the consumer
(2) Consumer was trying to commit offset which is already deleted.

I was able to log the error message as well by listening on consumer.Errors() channel and got this one:
kafka-consumer : Failed to consume message. Error: kafka: error while consuming topic: kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition.
After this since the pc.Loop() above doesn't refresh, which means we will just be stuck as @plopik mentioned above. I think it will be better to just trigger a rebalance() which should get us out of this situation as mentioned earlier.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants