Skip to content

Commit

Permalink
Fix race in memory topo
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Jul 20, 2023
1 parent 7e58261 commit 731c889
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 1 deletion.
4 changes: 4 additions & 0 deletions go/vt/topo/memorytopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,12 @@ func (mp *cLeaderParticipation) WaitForNewLeader(ctx context.Context) (<-chan st
}

notifications := make(chan string, 8)

nextWatchIndexMu.Lock()
watchIndex := nextWatchIndex
nextWatchIndex++
nextWatchIndexMu.Unlock()

n.watches[watchIndex] = watch{lock: notifications}

if n.lock != nil {
Expand Down
3 changes: 2 additions & 1 deletion go/vt/topo/memorytopo/memorytopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ const (
)

var (
nextWatchIndex = 0
nextWatchIndex = 0
nextWatchIndexMu sync.Mutex
)

// Factory is a memory-based implementation of topo.Factory. It
Expand Down
8 changes: 8 additions & 0 deletions go/vt/topo/memorytopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c
}

notifications := make(chan *topo.WatchData, 100)

nextWatchIndexMu.Lock()
watchIndex := nextWatchIndex
nextWatchIndex++
nextWatchIndexMu.Unlock()

n.watches[watchIndex] = watch{contents: notifications}

go func() {
Expand Down Expand Up @@ -105,8 +109,12 @@ func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Watc
})

notifications := make(chan *topo.WatchDataRecursive, 100)

nextWatchIndexMu.Lock()
watchIndex := nextWatchIndex
nextWatchIndex++
nextWatchIndexMu.Unlock()

n.watches[watchIndex] = watch{recursive: notifications}

go func() {
Expand Down

0 comments on commit 731c889

Please sign in to comment.