Skip to content

Commit

Permalink
runtime/op/combine: fix panic in Proc.propagateDone (#4335)
Browse files Browse the repository at this point in the history
Proc.propagateDone can trigger a panic by calling Proc.unwait without
all parent blocked if one of its goroutines exits because Proc.ctx is
canceled.  Fix this by replacing the sync.WaitGroup with an
errgroup.Group and using that to check for goroutines exiting due to
cancelation.

Closes #4318.
  • Loading branch information
nwt authored Jan 26, 2023
1 parent 58606a0 commit 3ca4b8a
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions runtime/op/combine/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/brimdata/zed/runtime/op"
"github.com/brimdata/zed/zbuf"
"golang.org/x/sync/errgroup"
)

type Proc struct {
Expand Down Expand Up @@ -109,18 +110,16 @@ func (p *Proc) block(parent *puller) {

func (p *Proc) propagateDone() error {
var mu sync.Mutex
var wg sync.WaitGroup
var group errgroup.Group
for _, parent := range p.parents {
if parent.blocked {
continue
}
parent := parent
wg.Add(1)
// We use a goroutine here because sending to parents[i].doneCh
// can block until we've sent to parents[i+1].doneCh, as with
// "fork (=> count() => pass) | head".
go func() {
defer wg.Done()
group.Go(func() error {
again:
select {
case <-p.queue:
Expand All @@ -134,11 +133,15 @@ func (p *Proc) propagateDone() error {
mu.Lock()
p.block(parent)
mu.Unlock()
return nil
case <-p.ctx.Done():
return p.ctx.Err()
}
}()
})
}
if err := group.Wait(); err != nil {
return err
}
wg.Wait()
// Make sure all the dones that canceled pending queue entries
// are clear. Otherwise, this will block the queue on the next
// platoon.
Expand Down

0 comments on commit 3ca4b8a

Please sign in to comment.