-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
"panic: sync: negative WaitGroup counter" in PartitionProcessor.VisitValues #433
Comments
Can someone please take a look at this issue? It caused a failure in our production env. Thanks! |
+1 I'm also having this issue! |
Could you provide some information how you use the visit functionality? Like how is it initialized, what's done inside the visit function and so on? |
Hi @frairon , thanks for the quick response. For sure, here is how we have initialized and used it: func main_runner() error {
if err := config.Init(); err != nil {
if errors.Is(err, flag.ErrHelp) {
return nil
}
return fmt.Errorf("loading configuration: %w", err)
}
ctx := context.Background()
ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()
if err := database.Init(ctx); err != nil {
return fmt.Errorf("setting up database: %w", err)
}
if err := app.Init(ctx, g); err != nil {
return fmt.Errorf("in application: %w", err)
}
return g.Wait()
} app.New() => // New creates a new instance of the stream processing application.
func New(brokers []string, opts ...Option) (*App, error) {
// Default options
opt := &options{
groupName: "servicename",
inputTopic: "topicname",
logger: zap.NewNop(),
processCallback: DefaultProcessCallback,
snapshotInterval: 10 * time.Second,
}
// Update defaults
opt.apply(opts...)
app := &App{
logger: opt.logger,
snapshotInterval: opt.snapshotInterval,
}
// Define the group graph
graph := goka.DefineGroup(
opt.groupName,
goka.Input(opt.inputTopic, orderbook.DeltaCodec{}, process),
goka.Visitor("snapshot", app.snapshot),
goka.Persist(orderbook.Codec{}),
)
// Create processor
proc, err := goka.NewProcessor(brokers, graph, opt.processorOptions...)
if err != nil {
return nil, err
}
app.processor = proc
return app, nil
} app.Run() => func (app *App) Run(ctx context.Context, db database.Interface) error {
if err := app.setupMetrics(); err != nil {
return fmt.Errorf("setting up metrics: %w", err)
}
g, ctx := errgroup.WithContext(ctx)
sw := database.NewSafeAsyncBatchSnapshotWriter(db, 1000)
g.Go(func() error {
return sw.Run(ctx)
})
g.Go(func() error {
return app.processor.Run(ctx)
})
// Snapshotter thread.
g.Go(func() error {
app.logger.Info("snapshotter started")
defer app.logger.Info("snapshotter stopped")
for {
select {
case <-ctx.Done():
return nil
default:
}
// Trigger the visitor callback, which writes a snapshot to the db
visited, err := app.processor.VisitAllWithStats(ctx, "snapshot", sw)
if err != nil {
app.logger.Error("error while snapshotting", zap.Error(err))
}
// Flush the async snapshot writer between rounds of snapshotting.
if err := sw.Flush(context.TODO()); err != nil {
app.logger.Error("error flushing snapshot", zap.Error(err))
}
app.logger.Info("visit complete", zap.Int64("num_visited", visited))
// Sleep for a few seconds before taking another round of snapshots.
sleep, cancel := context.WithTimeout(ctx, app.snapshotInterval)
<-sleep.Done()
cancel()
}
})
return g.Wait()
} app.Init() => // Init sets up and runs the stream processing application, including the
// snapshotter thread, which will be restarted each time the underlying Kafka
// consumer group rebalances.
func Init(ctx context.Context, g *errgroup.Group) error {
cfg := goka.DefaultConfig()
kafka := config.GetKafkaConfig()
if kafka.AuthMechanism == config.KafkaAuthIAM {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.Mechanism = sarama.SASLTypeAWSMSKIAM
cfg.Net.SASL.AWSMSKIAM = sarama.AWSMSKIAMConfig{Region: "ignored"}
}
if kafka.UseTLS {
cfg.Net.TLS.Enable = true
}
// Set up application
goka.ReplaceGlobalConfig(cfg)
meter := otel.Meter(pkgName)
tracer = otel.Tracer(pkgName)
messagesProcessed, _ = meter.Int64Counter("input.messages_processed")
logger := zap.L()
db := database.GetDatabaseInstance()
opts := []Option{
WithLogger(logger),
WithProcessCallback(process),
WithProcessorOptions(
goka.WithConsumerGroupBuilder(consumerGroupBuilder()),
goka.WithStorageBuilder(storage.MemoryBuilder()),
goka.WithTopicManagerBuilder(
topicManagerBuilder(kafka.TableReplicationFactor),
),
),
}
if kafka.GroupName != "" {
opts = append(opts, WithGroupName(kafka.GroupName))
}
if kafka.InputTopic != "" {
opts = append(opts, WithInputTopic(kafka.InputTopic))
}
app, err := New(kafka.BootstrapServers, opts...)
if err != nil {
logger.Error("failed to create application", zap.Error(err))
return fmt.Errorf("creating application: %w", err)
}
g.Go(func() error {
if err := app.Run(ctx, db); err != nil {
logger.Error("error in application", zap.Error(err))
return fmt.Errorf("running application: %w", err)
}
return nil
})
return nil
} Inside the visit function, we are just writing a snapshot of the results to postgres. Our processor keeps an incremental record of the data we get, does some ETL into an in-memory store struct. A bit hesitant to give a lot of information on the internals of those processors and visitors since this is business IP. But I think the issue is independent of what is happening inside that function, right? The wait group counter should be resilient to even situation except a seg fault. IMO, the error lies in the way the wg.Done() is being called in the library in the partition_processor. Please let me know if you need more information. Appreciate the help on this! |
Hey @akshatraika-moment, thanks a lot for the detailed information! No need to share more, especially not sensitive information - don't worry :). We found something indeed, which might have caused this behavior on two conditions:
For some background: But that also meant that somewhere in the logs of your application there should be an error logged somewhere. Or maybe it got swallowed by the panic before it could surface somewhere. Anyway, this PR should fix the issue, hoping it does also in your case. Would you be able to test it before we release it or are you dependend on a proper release? Just as a note on the |
Thanks @frairon I looked at the fix you made. Although, I am not sure if this is going to fix the problem entirely. I found more stuff on Friday. I think there might be a race condition somewhere in the waitgroups when the CPU is overloaded. I have an hypothesis that there is a correlation between CPU killing goroutines and the wg counter losing track. Here are 2 graphs. The histogram is the graph of the panic logs and the other graph is the CPU usage in my service. As you can see, they look very similar. Unfortunately, the only place where we see the error is in production so we will not be able to test your PR there. But once you merge it in, we can give it a try. Right now, I am trying to move over the service to using views - thanks for suggesting that. |
Hmm, I don't think the CPU is killing goroutines, also the high CPU-usage is probably caused by the iteration itself, because right now there is no rate control when iterating the state. So not sure what the actual cause is. Thanks! |
@akshatraika-moment does the new version fix your issue? |
Yes, it did. Thanks! |
Hi, seeing a panic loop in https://github.com/lovoo/goka/blob/master/partition_processor.go#L685 and I am using version 1.1.7
Here is the stack trace:
There is possibly a race condition in the VisitValues function. Can someone please take a look?
Happy to provide more information if needed but this seems to be independent of my implementation.
The text was updated successfully, but these errors were encountered: