Skip to content

Commit

Permalink
[release-16.0] Flaky tests: Fix race in memory topo (#13559) (#13576)
Browse files Browse the repository at this point in the history
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Dirkjan Bussink <[email protected]>
  • Loading branch information
vitess-bot[bot] and dbussink authored Jul 27, 2023
1 parent dff0957 commit b0a3cfe
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 13 deletions.
4 changes: 1 addition & 3 deletions go/vt/topo/memorytopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,7 @@ func (mp *cLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan st
}

notifications := make(chan string, 8)
watchIndex := nextWatchIndex
nextWatchIndex++
n.watches[watchIndex] = watch{lock: notifications}
watchIndex := n.addWatch(watch{lock: notifications})

if n.lock != nil {
notifications <- n.lockContents
Expand Down
18 changes: 14 additions & 4 deletions go/vt/topo/memorytopo/memorytopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ const (
UnreachableServerAddr = "unreachable"
)

var (
nextWatchIndex = 0
)

// Factory is a memory-based implementation of topo.Factory. It
// takes a file-system like approach, with directories at each level
// being an actual directory node. This is meant to be closer to
Expand Down Expand Up @@ -206,6 +202,20 @@ func (n *node) propagateRecursiveWatch(ev *topo.WatchDataRecursive) {
}
}

var (
nextWatchIndex = 0
nextWatchIndexMu sync.Mutex
)

func (n *node) addWatch(w watch) int {
nextWatchIndexMu.Lock()
defer nextWatchIndexMu.Unlock()
watchIndex := nextWatchIndex
nextWatchIndex++
n.watches[watchIndex] = w
return watchIndex
}

// PropagateWatchError propagates the given error to all watches on this node
// and recursively applies to all children
func (n *node) PropagateWatchError(err error) {
Expand Down
8 changes: 2 additions & 6 deletions go/vt/topo/memorytopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c
}

notifications := make(chan *topo.WatchData, 100)
watchIndex := nextWatchIndex
nextWatchIndex++
n.watches[watchIndex] = watch{contents: notifications}
watchIndex := n.addWatch(watch{contents: notifications})

go func() {
<-ctx.Done()
Expand Down Expand Up @@ -105,9 +103,7 @@ func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Watc
})

notifications := make(chan *topo.WatchDataRecursive, 100)
watchIndex := nextWatchIndex
nextWatchIndex++
n.watches[watchIndex] = watch{recursive: notifications}
watchIndex := n.addWatch(watch{recursive: notifications})

go func() {
defer close(notifications)
Expand Down

0 comments on commit b0a3cfe

Please sign in to comment.