Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commit structure validation failure: TP != TopicPartition #539

Closed
richardhundt opened this issue Jul 28, 2023 · 13 comments · Fixed by #541
Closed

Commit structure validation failure: TP != TopicPartition #539

richardhundt opened this issue Jul 28, 2023 · 13 comments · Fixed by #541
Labels
bug Something isn't working documentation Improvements or additions to documentation

Comments

@richardhundt
Copy link
Collaborator

richardhundt commented Jul 28, 2023

I found a puzzling issue.

Both aiokafka and faust define named tuples for (topic, partition), namely TopicPartition and TP respectively. The faust one doesn't satisfy the isinstance check in aiokafka's commit_structure_validate and so the stack trace below occurs. What I don't understand is how this ever worked. Why do I only see this now? Is this some unusual commit path?

Either way, a faust TP is not a aiokafka TopicPartition according to isinstance, so this must be a bug.

ERROR:faust.transport.drivers.aiokafka:[^--AIOKafkaConsumerThread]: Got exception: ValueError('Key should be TopicPartition instance')
Current assignment: {TP(topic='test-uploads', partition=0), TP(topic='test-dgtal-cases-documents-list-changelog', partition=0), TP(topic='test-dgtal.worker.agents.health_agent', partition=0), TP(topic='test-transactions', partition=0), TP(topic='dgtal-reply-to', partition=0), TP(topic='test-actions', partition=0), TP(topic='test-dgtal-documents-changelog', partition=0), TP(topic='test-__assignor-__leader', partition=0), TP(topic='test-documents', partition=0), TP(topic='test-events', partition=0), TP(topic='test-cases', partition=0), TP(topic='test-dgtal-transactions-changelog', partition=0), TP(topic='test-dgtal-cases-changelog', partition=0), TP(topic='test-cases-index', partition=0), TP(topic='test-dgtal-cases-transactions-list-changelog', partition=0)}, detail: {TP(topic='test-events', partition=0): OffsetAndMetadata(offset=3, metadata='')}
Traceback (most recent call last):
  File "/home/richard/Work/dgtal/.venv/lib/python3.10/site-packages/faust/transport/drivers/aiokafka.py", line 721, in _commit
    await consumer.commit(aiokafka_offsets)
  File "/home/richard/Work/dgtal/.venv/lib/python3.10/site-packages/aiokafka/consumer/consumer.py", line 562, in commit
    offsets = commit_structure_validate(offsets)
  File "/home/richard/Work/dgtal/.venv/lib/python3.10/site-packages/aiokafka/util.py", line 63, in commit_structure_validate
    raise ValueError("Key should be TopicPartition instance")
ValueError: Key should be TopicPartition instance
ERROR:faust.transport.drivers.aiokafka:[^--AIOKafkaConsumerThread]: Crashed reason=ValueError('Key should be TopicPartition instance')
Traceback (most recent call last):
  File "/home/richard/Work/dgtal/.venv/lib/python3.10/site-packages/faust/transport/drivers/aiokafka.py", line 721, in _commit
    await consumer.commit(aiokafka_offsets)
  File "/home/richard/Work/dgtal/.venv/lib/python3.10/site-packages/aiokafka/consumer/consumer.py", line 562, in commit
    offsets = commit_structure_validate(offsets)
  File "/home/richard/Work/dgtal/.venv/lib/python3.10/site-packages/aiokafka/util.py", line 63, in commit_structure_validate
    raise ValueError("Key should be TopicPartition instance")
ValueError: Key should be TopicPartition instance
@richardhundt
Copy link
Collaborator Author

richardhundt commented Jul 28, 2023

If I change this

aiokafka_offsets = {
from

            aiokafka_offsets = {
                tp: OffsetAndMetadata(offset, "")
                for tp, offset in offsets.items()
                if tp in self.assignment()
            }
            self.tp_last_committed_at.update({tp: now for tp in aiokafka_offsets})
            await consumer.commit(aiokafka_offsets)

to this:

            aiokafka_offsets = {
                TopicPartition(tp.topic, tp.partition): OffsetAndMetadata(offset, "")
                for tp, offset in offsets.items()
                if tp in self.assignment()
            }
            self.tp_last_committed_at.update({TP(tp.topic, tp.partition): now for tp in aiokafka_offsets})
            await consumer.commit(aiokafka_offsets)

it works.

@wbarnha wbarnha added the bug Something isn't working label Aug 1, 2023
@wbarnha
Copy link
Member

wbarnha commented Aug 1, 2023

Interesting, let's get a PR opened with these changes and talk about this further there. I've never seen this issue before, so I'm curious to know what code you're running that triggered this.

@dada-engineer
Copy link
Collaborator

@richardhundt as far as I can see a consumer has the function _new_topicpartition which should avoid exactly this for aiokafka drivers

def _new_topicpartition(self, topic: str, partition: int) -> TP:

But it seems to be not used anywhere 🤔

@richardhundt
Copy link
Collaborator Author

richardhundt commented Aug 10, 2023

@dabdada also, when it fetches the assignment, it makes sure they're not the aiokafka ones:

def ensure_TPset(tps: Iterable[Any]) -> Set[TP]:

@dada-engineer
Copy link
Collaborator

I do suspect this to be simply for internal type usage.

Can you provide a minimal example of code that raises the error? As well as @wbarnha I didn't experience anything like this and now wonder why 😁

@richardhundt
Copy link
Collaborator Author

richardhundt commented Aug 12, 2023

@dabdada Are you saying that _commit is only for internal use?

async def _commit(self, offsets: Mapping[TP, int]) -> bool:

Because, you can just inspect it, it's not hard to see that it calls assignment and assignment calls ensure_TPset, which gives you a set of faust's TP types and there's definitely an isinstance assertion in aiokafka so you simply cannot pass anything other than aiokafka's TopicPartition types.

Surely seeing that _commit is broken doesn't require anything other than looking at it.

The real question is why has this ever worked? Does _commit not get called unless we're in some strange code path, because we're usually relying on Kafka's autocommit?

EDIT: also the stack trace proves my point ;) It actually logs the dictionary which causes the error:

Current assignment: {TP(topic='test-uploads', partition=0), TP(topic='test-dgtal-cases-documents-list-changelog', partition=0), TP(topic='test-dgtal.worker.agents.health_agent', partition=0), TP(topic='test-transactions', partition=0), TP(topic='dgtal

Those are TP instances, clearly not TopicPartition instances so you'd expect passing them to aiokafka to fail.

@richardhundt
Copy link
Collaborator Author

richardhundt commented Aug 12, 2023

I've added comments inline to show what's going on:

# we're about to build a `Dict[TP, OffsetAndMetadata]`...
aiokafka_offsets = {
    tp: OffsetAndMetadata(offset, "")
    for tp, offset in offsets.items() # offsets is `Mapping[TP, int]`
    # if `assignment` returns `TP` instances (which it does)
    # then this builds a `Dict[TP, OffsetAndMetadata]`
    if tp in self.assignment()
}

# the following is okay, we can work with `TP` instances
self.tp_last_committed_at.update({tp: now for tp in aiokafka_offsets})

# however the following calls into `aiokafka` and passes the *same* `Dict[TP, OffsetAndMetadata]`.
# It cannot possibly be correct because aiokafka wants `TopicPartition` instances!
await consumer.commit(aiokafka_offsets) # BOOM!

EDIT: I'm now thinking that there are cases where the offsets: Mapping[TP, int] parameter is a Mapping[TopicPartition, int], but that doesn't explain why the if tp in self.assignment() filter works because equality should fail if the tps are different types.

@dada-engineer
Copy link
Collaborator

dada-engineer commented Aug 12, 2023

I think I finally figured it out after digging deeper into faust-streaming.

First of all yes _commit is an internal function according to pythonic convention (prefixed with underscore). You should be careful when using it, although in that case it's documentation here is not optimal and the type hints suggest that faust TP is used here.
Secondly, to answer the question why the in self.assignemnt() filter works is simply because faust.types.tuples.TP and kafka.structs.TopicPartition are NamedTuples with the same attributes, so their equality check magic function compares them according to the attributes and those are the same.

Now for the initial question, how this all did work anyways:

  1. The aiokafka consumer started by faust naturally delivers messages with kafka.structs.TopicPartition info
  2. The Conductor defines a on_message consumer callback that is called on new messages and gets thrown in the message from aiokafka consumer (probably somewhere here: https://github.com/faust-streaming/faust/blob/87a80a968f73220d5ac6190fb7df70b85427bdae/faust/transport/conductor.py#L274C43-L274C43)
  3. Now this message gets assigned to an event and delivered to the agent
  4. The agent acks the event, and the events message.tp is appended to the acked tps in the consumer.
  5. When the consumers commit background task runs (periodically or after x messages see configuration reference) it gets the tps from the _acked attributes and calls the commit method of the consumer.
  6. The commit method hands those to aiokafka and we have a full lifecycle of the event / message completed

This means the TopicPartition is never really changed to an faust internal TP when using the aiokafka driver, It's simply typed like one.

Why do you want to call commit directly anyways? Cant you reduce the commit interval if you want to have more frequent commits?

Hope this explanation helps to understand the inner workings. And also resolves your question. This is actually not a bug just somewhat bad type hinting and weak ensuring of the correct classes transferred to aiokafka. Plus no docs about this.

Edit 1: Actually this is bad wording, it is not badly typed, its the only way we get a coherent way of typing internally.

@wbarnha wbarnha added documentation Improvements or additions to documentation and removed bug Something isn't working labels Aug 13, 2023
@richardhundt
Copy link
Collaborator Author

richardhundt commented Aug 13, 2023

That's the thing, I'm not calling commit or _commit manually. Some internal machinery is doing it.

That means that something is passing offsets of the type actually shown in the method signature.

@richardhundt richardhundt added the bug Something isn't working label Aug 13, 2023
@dada-engineer
Copy link
Collaborator

dada-engineer commented Aug 13, 2023

Ah okay sorry for this misunderstanding. Looks like if you start in client mode the on_message callback provides Faust TPs. You still didn't share your config how this happens so it's hard to say really.

https://github.com/faust-streaming/faust/blob/master/faust/transport/conductor.py#L268

@dada-engineer
Copy link
Collaborator

Either way it might be a low hanging fruit to ensure aiokafka topic partitions is thrown into the aiokafka lib.

@richardhundt
Copy link
Collaborator Author

I'm trying to create a minimal example, but I'm running a 32 hour processing job at the moment, so I don't want to interrupt it. I'll see what I can dig up.

Aren't the commits a no-op in client-only mode because the consumer doesn't have a consumer group?

@dada-engineer
Copy link
Collaborator

dada-engineer commented Aug 13, 2023

I guess client only mode would not reach this commit method here as it is intended as dev mode that does not require a kafka but has a simple reply_consumer.

Edit: so yes what you said.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working documentation Improvements or additions to documentation
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants