Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Commit failed: Local: No offset stored" + messages commited anyway #71

Closed
fillest opened this issue Nov 14, 2016 · 11 comments
Closed

"Commit failed: Local: No offset stored" + messages commited anyway #71

fillest opened this issue Nov 14, 2016 · 11 comments
Labels

Comments

@fillest
Copy link

fillest commented Nov 14, 2016

confluent-kafka==0.9.2, librdkafka 0.9.2, Python 2.7.6, kafka 0.10.1.0
It's me again..
I had a clean new topic 'test', in which I've produced some messages.
I have a consumer, which is configured with 'enable.auto.commit': False, 'enable.auto.offset.store': True, 'default.topic.config': {'auto.offset.reset': 'smallest'}. I'm manually assigning a partiotion via assign. I consme messages until _PARTITION_EOF, then .commit(async = False) and finally: .close().
On the first run I see printed recieved messages and the following:

consuming
assigning
getting position
[TopicPartition{topic=test,partition=0,offset=-1001,error=None}] -1001
polling
Received message: sup
...
_PARTITION_EOF
commiting
closing
**on_commit None [TopicPartition{topic=test,partition=0,offset=27,error=None}]
**on_commit KafkaError{code=_NO_OFFSET,val=-168,str="(null)"} [TopicPartition{topic=test,partition=0,offset=-1001,error=None}]
closed
Traceback (most recent call last):
  File "ktest.py", line 118, in <module>
    c.commit(async = False)
cimpl.KafkaException: KafkaError{code=_NO_OFFSET,val=-168,str="Commit failed: Local: No offset stored"}

Moreover, I run the script again:

consuming
assigning
getting position
[TopicPartition{topic=test,partition=0,offset=-1001,error=None}] -1001
polling
_PARTITION_EOF
commiting
closing
**on_commit KafkaError{code=_NO_OFFSET,val=-168,str="(null)"} [TopicPartition{topic=test,partition=0,offset=-1001,error=None}]
**on_commit KafkaError{code=_NO_OFFSET,val=-168,str="(null)"} [TopicPartition{topic=test,partition=0,offset=-1001,error=None}]
closed
Traceback (most recent call last):
  File "ktest.py", line 118, in <module>
    c.commit(async = False)
cimpl.KafkaException: KafkaError{code=_NO_OFFSET,val=-168,str="Commit failed: Local: No offset stored"}

There were no messages, which means the offset was actually commited. Also why on_commit was called two times?

@edenhill
Copy link
Contributor

edenhill commented Nov 14, 2016

Can you provide your complete consumer conf dict?

@edenhill
Copy link
Contributor

_NO_OFFSET is a "soft error", it means it didnt have to commit the offset because the current offset had already been committed. This might make sense in this case since the first on_commit indicates offset 27 was succesfully committed and the subsequent commit did not have any offsets to commit because no new messages were consumed.
Still, if you only called commit() once you shouldnt see two on_commits.

@fillest
Copy link
Author

fillest commented Nov 14, 2016

c = Consumer(**{
    'client.id': 'test-client',
    'bootstrap.servers': 'localhost',
    'group.id': 'test-group',
    'enable.auto.commit': False,
    'enable.auto.offset.store': True,
    'offset.store.method': 'broker',
    'enable.partition.eof': True,
    'default.topic.config': {'auto.offset.reset': 'smallest'},
    'on_commit': on_commit,
    'error_cb': error_cb,
})

@edenhill
Copy link
Contributor

Thanks.
And you're sure you only call commit once?

@fillest
Copy link
Author

fillest commented Nov 14, 2016

commit is being called only once, yes.

        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print '_PARTITION_EOF'
            # print c.position([p])
            print 'commiting'
            c.commit(async = False)
            print 'commited'
            break

@edenhill
Copy link
Contributor

Okay, will try to reproduce.

@edenhill
Copy link
Contributor

I can't reproduce this, can you provide a small script to reproduce the issue?

@fillest
Copy link
Author

fillest commented Nov 14, 2016

from confluent_kafka import Consumer, KafkaError, TopicPartition

def on_commit (err, partitions):
    print '**on_commit', err, partitions
def error_cb (err):
    print '***error_cb', err.name(), err.str()
print 'consuming'
c = Consumer(**{
    'client.id': 'test-client',
    'bootstrap.servers': 'localhost',
    'group.id': 'test-group',
    'enable.auto.commit': False,
    'enable.auto.offset.store': True,
    'offset.store.method': 'broker',
    'enable.partition.eof': True,
    'default.topic.config': {'auto.offset.reset': 'smallest'},
    'on_commit': on_commit,
    'error_cb': error_cb,
})

print 'assigning'
p = TopicPartition('test', 0)
c.assign([p])
print 'getting position'
pos = c.position([p])
print pos, pos[0].offset

print 'polling'
try:
    while True:
        msg = c.poll(1)
        if msg is None:
            continue

        if not msg.error():
            print 'Received message: %s' % msg.value().decode('utf-8')
        elif msg.error().code() != KafkaError._PARTITION_EOF:
            print '***error', msg.error()
            break
        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print '_PARTITION_EOF'
            print 'commiting'
            c.commit(async = False)
            print 'commited'
            break
finally:
    print 'closing'
    c.close()
    print 'closed'

first run:

consuming
assigning
getting position
[TopicPartition{topic=test,partition=0,offset=-1001,error=None}] -1001
polling
Received message: sup
...more messages...
_PARTITION_EOF
commiting
closing
**on_commit None [TopicPartition{topic=test,partition=0,offset=27,error=None}]
**on_commit KafkaError{code=_NO_OFFSET,val=-168,str="(null)"} [TopicPartition{topic=test,partition=0,offset=-1001,error=None}]
closed
Traceback (most recent call last):
  File "ktest.py", line 120, in <module>
    c.commit(async = False)
cimpl.KafkaException: KafkaError{code=_NO_OFFSET,val=-168,str="Commit failed: Local: No offset stored"}

second run:

consuming
assigning
getting position
[TopicPartition{topic=test,partition=0,offset=-1001,error=None}] -1001
polling
_PARTITION_EOF
commiting
closing
**on_commit KafkaError{code=_NO_OFFSET,val=-168,str="(null)"} [TopicPartition{topic=test,partition=0,offset=-1001,error=None}]
**on_commit KafkaError{code=_NO_OFFSET,val=-168,str="(null)"} [TopicPartition{topic=test,partition=0,offset=-1001,error=None}]
closed
Traceback (most recent call last):
  File "ktest.py", line 120, in <module>
    c.commit(async = False)
cimpl.KafkaException: KafkaError{code=_NO_OFFSET,val=-168,str="Commit failed: Local: No offset stored"}

@edenhill
Copy link
Contributor

Awesome! This reproduces it reliably

@edenhill
Copy link
Contributor

There's a stray commit() left in the C code for the case where no msg or partition list argument is given to commit():
https://github.com/confluentinc/confluent-kafka-python/blob/master/confluent_kafka/src/Consumer.c#L282

That's why you get double commits.

@drew-vz
Copy link

drew-vz commented Oct 4, 2024

I recently started seeing this error in my consumer code on 2.5.0. Tried upgrading to 2.5.3 where this issue was supposedly fixed by this commit , however I still saw the error. I ended up having to downgrade to 2.0.2 to fix this. Here is the consumer code...


def consume(self):
        """
        Returns
        -------
        None
        """
        self.consumer.subscribe(self.topics)
        running = True
        while running:
            try:
                start = time.process_time()
                valid_messages = 0
                kafka_messages = self.consumer.consume(
                    num_messages=NUM_MESSAGES, timeout=TOPIC_CONSUME_TIMEOUT_IN_SECONDS
                )
                for kafka_message in kafka_messages:
                    if kafka_message is None:
                        continue
                    error = kafka_message.error()
                    if not error:
                        value = self.deserializer(kafka_message.value(), None)
                        key = self.deserializer(kafka_message.key(), None)
                        # PROCESS DATA HERE
                        valid_messages += 1
                    else:
                        if error.code() == KafkaError._PARTITION_EOF:
                            logging.info(
                                f"No more messages found, finished reading topic, EOF message offset={kafka_message.offset()}, topic={kafka_message.topic()}"
                            )
                        else:
                            logging.exception(
                                f"Message processing failed for message, message error={kafka_message.error()}"
                            )
                if valid_messages > 0:
                    self.consumer.commit(asynchronous=False)
                logging.info(
                    f"Consumed {valid_messages} messages from Kafka. in {time.process_time() - start} seconds"
                )
            except SerializerError:
                logging.exception("could not serialize message")
                self.slack_bot.send_slack_exception(
                    "message de-serialization failed for topics, exiting"
                )
                running = False
            except KafkaException:
                logging.exception("kafka error occurred while processing message")
                self.slack_bot.send_slack_exception(traceback.format_exc())
                running = False
            except Exception:
                logging.exception("some error occurred while running consumer")
                self.slack_bot.send_slack_exception(traceback.format_exc())
                running = False

config looks like...
        self.consumer = Consumer(
            {
                "security.protocol": config.security_protocol,
                "sasl.mechanism": config.sasl_mechanism,
                "oauth_cb": oauth_cb,
                "client.id": socket.gethostname(),
                "bootstrap.servers": config.bootstrap_servers,
                "group.id": config.consumer_group_id,
                "enable.auto.commit": "true" if auto_commit else "false",
                "auto.offset.reset": "earliest",
                "enable.partition.eof": "true",
                "max.poll.interval.ms": "1200000",  # send poll() from the main thread every 20 mins (> max iteration time)
                "session.timeout.ms": "120000",  # send heartbeat in a separate thread every 2 mins
                "partition.assignment.strategy": "roundrobin",
            }
        )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants