Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling Panic in ctx.Lookup Caused by ctx.Fail in Goka Library #467

Open
kevin-you2 opened this issue Oct 21, 2024 · 3 comments
Open

Handling Panic in ctx.Lookup Caused by ctx.Fail in Goka Library #467

kevin-you2 opened this issue Oct 21, 2024 · 3 comments

Comments

@kevin-you2
Copy link

Hello,

I’m encountering an issue with the Goka library where a panic occurs when using ctx.Lookup, which leads to the processor shutting down. The panic seems to be triggered by calls to ctx.Fail within the Lookup method. Here is the relevant code snippet from the Goka library:

func (ctx *cbContext) Lookup(topic Table, key string) interface{} {
    if ctx.views == nil {
        ctx.Fail(fmt.Errorf("topic %s not subscribed", topic))
    }
    v, ok := ctx.views[string(topic)]
    if !ok {
        ctx.Fail(fmt.Errorf("topic %s not subscribed", topic))
    }
    val, err := v.Get(key)
    if err != nil {
        ctx.Fail(fmt.Errorf("error getting key %s of table %s: %v", key, topic, err))
    }
    return val
}

When ctx.Fail is called, it results in a panic with the following error message:

error processing message (partition=0): panic in callback: error getting key 867604054647616 of table bicycle.avro: no partitions found

I understand that ctx.Fail is intended to initiate an immediate shutdown of the processor to maintain data integrity, and the documentation advises against recovering from this panic to avoid potential deadlocks.

However, in my application, I need to handle this exception gracefully without causing the entire processor to shut down. Is there a recommended way to handle errors within ctx.Lookup to prevent the panic from occurring? Alternatively, is there a way to catch this panic safely without risking a deadlock, so that I can log the error and continue processing?

Any guidance or suggestions on how to handle this situation would be greatly appreciated.

Thank you!

@norbertklawikowski
Copy link
Contributor

Hey @kevin-you2,

the error is caused by the underlying view for the topic having no partitions created. I am not sure what caused this, but it seems the topic is not correctly configured.

You could maybe beforehand check if there are partitions configured using the topic managers Partitions method: https://github.com/lovoo/goka/blob/master/topic_manager.go#L111

@kevin-you2
Copy link
Author

kevin-you2 commented Oct 22, 2024

Thank you for your response, @norbertklawikowski .

Based on your suggestions, I wanted to provide some additional information about my setup.

I’m using the following consumer group definition:

group := goka.DefineGroup(goka.Group(topicConfig.ConsumerGroupName),
    goka.Input(goka.Stream(topicConfig.StatusTopicName), statusCodec, consumerController.Process),
    goka.Lookup(goka.Table(topicConfig.RegionViewTopicName), rCodec),
    goka.Lookup(goka.Table(topicConfig.BicycleViewTopicName), bCodec),
    goka.Output(goka.Stream(topicConfig.RichStatusTopicName), outputCodec),
)

I noticed that I’m not using Persist in this definition. Could this be related to the issue with the topic manager?

The panic error occurs at the following code snippet:

bMsg := ctx.Lookup(goka.Table(r.bicycleTopic), value.IotCode)
if bMsg == nil {
    return
}

Looking at the Goka source code here, I see that ctx.Fail is called when an error occurs:

val, err := v.Get(key)
if err != nil {
    ctx.Fail(fmt.Errorf("error getting key %s of table %s: %v", key, topic, err))
}

The error message I receive matches this format:

error getting key 867604054647616 of table bicycle.avro: no partitions found

Is it possible that the absence of Persist is causing this issue? Could this be related to the topic manager not properly managing the table?

@norbertklawikowski
Copy link
Contributor

Hey @kevin-you2,

this is not necessary, as its done by goka when creating the processor:

https://github.com/lovoo/goka/blob/master/processor.go#L119

Here for every goka.Lookup edge a new view is created, which is backed internally by a local key-value store (a Persist would create a local key-value store for the group table).

🤔 I think the problem is caused somewhere else. In example 3 there are functions which make sure a topic exists with the correct number of partitions, could you try to run some ensure-function for your topics before creating the processor?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants