Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-20.0] Fix deadlock in messager and health streamer (#17230) #17234

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 33 additions & 23 deletions go/vt/vttablet/tabletserver/health_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,18 @@ type healthStreamer struct {
degradedThreshold time.Duration
unhealthyThreshold atomic.Int64

mu sync.Mutex
ctx context.Context
cancel context.CancelFunc
clients map[chan *querypb.StreamHealthResponse]struct{}
state *querypb.StreamHealthResponse
// cancelMu is a mutex used to protect the cancel variable
// and for ensuring we don't call setup functions in parallel.
cancelMu sync.Mutex
ctx context.Context
cancel context.CancelFunc

// fieldsMu is used to protect access to the fields below.
// We require two separate mutexes, so that we don't have to acquire the same mutex
// in Close and reload that can lead to a deadlock described in https://github.com/vitessio/vitess/issues/17229#issuecomment-2476136610.
fieldsMu sync.Mutex
clients map[chan *querypb.StreamHealthResponse]struct{}
state *querypb.StreamHealthResponse
// isServingPrimary stores if this tablet is currently the serving primary or not.
isServingPrimary bool

Expand Down Expand Up @@ -126,8 +133,8 @@ func (hs *healthStreamer) InitDBConfig(target *querypb.Target, cp dbconfigs.Conn
}

func (hs *healthStreamer) Open() {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.cancelMu.Lock()
defer hs.cancelMu.Unlock()

if hs.cancel != nil {
return
Expand All @@ -140,8 +147,8 @@ func (hs *healthStreamer) Open() {
}

func (hs *healthStreamer) Close() {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.cancelMu.Lock()
defer hs.cancelMu.Unlock()

if hs.cancel != nil {
hs.se.UnregisterNotifier("healthStreamer")
Expand Down Expand Up @@ -182,13 +189,16 @@ func (hs *healthStreamer) Stream(ctx context.Context, callback func(*querypb.Str
}

func (hs *healthStreamer) register() (chan *querypb.StreamHealthResponse, context.Context) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.cancelMu.Lock()
defer hs.cancelMu.Unlock()

if hs.cancel == nil {
return nil, nil
}

hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()

ch := make(chan *querypb.StreamHealthResponse, streamHealthBufferSize)
hs.clients[ch] = struct{}{}

Expand All @@ -198,15 +208,15 @@ func (hs *healthStreamer) register() (chan *querypb.StreamHealthResponse, contex
}

func (hs *healthStreamer) unregister(ch chan *querypb.StreamHealthResponse) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()

delete(hs.clients, ch)
}

func (hs *healthStreamer) ChangeState(tabletType topodatapb.TabletType, ptsTimestamp time.Time, lag time.Duration, err error, serving bool) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()

hs.state.Target.TabletType = tabletType
if tabletType == topodatapb.TabletType_PRIMARY {
Expand Down Expand Up @@ -260,8 +270,8 @@ func (hs *healthStreamer) broadCastToClients(shr *querypb.StreamHealthResponse)
}

func (hs *healthStreamer) AppendDetails(details []*kv) []*kv {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()
if hs.state.Target.TabletType == topodatapb.TabletType_PRIMARY {
return details
}
Expand Down Expand Up @@ -306,8 +316,8 @@ func (hs *healthStreamer) SetUnhealthyThreshold(v time.Duration) {
// MakePrimary tells the healthstreamer that the current tablet is now the primary,
// so it can read and write to the MySQL instance for schema-tracking.
func (hs *healthStreamer) MakePrimary(serving bool) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()
hs.isServingPrimary = serving
// We register for notifications from the schema Engine only when schema tracking is enabled,
// and we are going to a serving primary state.
Expand All @@ -322,15 +332,15 @@ func (hs *healthStreamer) MakePrimary(serving bool) {

// MakeNonPrimary tells the healthstreamer that the current tablet is now not a primary.
func (hs *healthStreamer) MakeNonPrimary() {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()
hs.isServingPrimary = false
}

// reload reloads the schema from the underlying mysql for the tables that we get the alert on.
func (hs *healthStreamer) reload(created, altered, dropped []*schema.Table, udfsChanged bool) error {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()
// Schema Reload to happen only on primary when it is serving.
// We can be in a state when the primary is not serving after we have run DemotePrimary. In that case,
// we don't want to run any queries in MySQL, so we shouldn't reload anything in the healthStreamer.
Expand Down
43 changes: 43 additions & 0 deletions go/vt/vttablet/tabletserver/health_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,3 +566,46 @@ func testStream(hs *healthStreamer) (<-chan *querypb.StreamHealthResponse, conte
func testBlpFunc() (int64, int32) {
return 1, 2
}

// TestDeadlockBwCloseAndReload tests the deadlock observed between Close and Reload
// functions. More details can be found in the issue https://github.com/vitessio/vitess/issues/17229#issuecomment-2476136610.
func TestDeadlockBwCloseAndReload(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
cfg := newConfig(db)
env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TestNotServingPrimary")
alias := &topodatapb.TabletAlias{
Cell: "cell",
Uid: 1,
}
se := schema.NewEngineForTests()
// Create a new health streamer and set it to a serving primary state
hs := newHealthStreamer(env, alias, se)
hs.signalWhenSchemaChange = true
hs.InitDBConfig(&querypb.Target{TabletType: topodatapb.TabletType_PRIMARY}, cfg.DB.DbaWithDB())
hs.Open()
hs.MakePrimary(true)
defer hs.Close()

wg := sync.WaitGroup{}
wg.Add(2)
// Try running Close and reload in parallel multiple times.
// This reproduces the deadlock quite readily.
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
hs.Close()
hs.Open()
}
}()

go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
se.BroadcastForTesting(nil, nil, nil, true)
}
}()

// Wait for wait group to finish.
wg.Wait()
}
30 changes: 19 additions & 11 deletions go/vt/vttablet/tabletserver/messager/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,16 @@ type VStreamer interface {

// Engine is the engine for handling messages.
type Engine struct {
mu sync.Mutex
isOpen bool
managers map[string]*messageManager
// mu is a mutex used to protect the isOpen variable
// and for ensuring we don't call setup functions in parallel.
mu sync.Mutex
isOpen bool

// managersMu is a mutex used to protect the managers field.
// We require two separate mutexes, so that we don't have to acquire the same mutex
// in Close and schemaChanged which can lead to a deadlock described in https://github.com/vitessio/vitess/issues/17229.
managersMu sync.Mutex
managers map[string]*messageManager

tsv TabletService
se *schema.Engine
Expand All @@ -75,15 +82,12 @@ func NewEngine(tsv TabletService, se *schema.Engine, vs VStreamer) *Engine {
// Open starts the Engine service.
func (me *Engine) Open() {
me.mu.Lock()
defer me.mu.Unlock()
if me.isOpen {
me.mu.Unlock()
return
}
me.isOpen = true
me.mu.Unlock()
log.Info("Messager: opening")
// Unlock before invoking RegisterNotifier because it
// obtains the same lock.
me.se.RegisterNotifier("messages", me.schemaChanged, true)
}

Expand All @@ -101,6 +105,8 @@ func (me *Engine) Close() {
log.Infof("messager Engine - unregistering notifiers")
me.se.UnregisterNotifier("messages")
log.Infof("messager Engine - closing all managers")
me.managersMu.Lock()
defer me.managersMu.Unlock()
for _, mm := range me.managers {
mm.Close()
}
Expand All @@ -109,8 +115,8 @@ func (me *Engine) Close() {
}

func (me *Engine) GetGenerator(name string) (QueryGenerator, error) {
me.mu.Lock()
defer me.mu.Unlock()
me.managersMu.Lock()
defer me.managersMu.Unlock()
mm := me.managers[name]
if mm == nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "message table %s not found in schema", name)
Expand All @@ -131,6 +137,8 @@ func (me *Engine) Subscribe(ctx context.Context, name string, send func(*sqltype
if !me.isOpen {
return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "messager engine is closed, probably because this is not a primary any more")
}
me.managersMu.Lock()
defer me.managersMu.Unlock()
mm := me.managers[name]
if mm == nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "message table %s not found", name)
Expand All @@ -139,8 +147,8 @@ func (me *Engine) Subscribe(ctx context.Context, name string, send func(*sqltype
}

func (me *Engine) schemaChanged(tables map[string]*schema.Table, created, altered, dropped []*schema.Table, _ bool) {
me.mu.Lock()
defer me.mu.Unlock()
me.managersMu.Lock()
defer me.managersMu.Unlock()
for _, table := range append(dropped, altered...) {
name := table.Name.String()
mm := me.managers[name]
Expand Down
33 changes: 32 additions & 1 deletion go/vt/vttablet/tabletserver/messager/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package messager
import (
"context"
"reflect"
"sync"
"testing"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -156,7 +157,7 @@ func newTestEngine() *Engine {
tsv := &fakeTabletServer{
Env: tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "MessagerTest"),
}
se := schema.NewEngine(tsv)
se := schema.NewEngineForTests()
te := NewEngine(tsv, se, newFakeVStreamer())
te.Open()
return te
Expand All @@ -169,3 +170,33 @@ func newEngineReceiver() (f func(qr *sqltypes.Result) error, ch chan *sqltypes.R
return nil
}, ch
}

// TestDeadlockBwCloseAndSchemaChange tests the deadlock observed between Close and schemaChanged
// functions. More details can be found in the issue https://github.com/vitessio/vitess/issues/17229.
func TestDeadlockBwCloseAndSchemaChange(t *testing.T) {
engine := newTestEngine()
defer engine.Close()
se := engine.se

wg := sync.WaitGroup{}
wg.Add(2)
// Try running Close and schemaChanged in parallel multiple times.
// This reproduces the deadlock quite readily.
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
engine.Close()
engine.Open()
}
}()

go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
se.BroadcastForTesting(nil, nil, nil, true)
}
}()

// Wait for wait group to finish.
wg.Wait()
}
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,13 @@ func (se *Engine) broadcast(created, altered, dropped []*Table, udfsChanged bool
}
}

// BroadcastForTesting is meant to be a testing function that triggers a broadcast call.
func (se *Engine) BroadcastForTesting(created, altered, dropped []*Table, udfsChanged bool) {
se.mu.Lock()
defer se.mu.Unlock()
se.broadcast(created, altered, dropped, udfsChanged)
}

// GetTable returns the info for a table.
func (se *Engine) GetTable(tableName sqlparser.IdentifierCS) *Table {
se.mu.Lock()
Expand Down Expand Up @@ -886,6 +893,7 @@ func NewEngineForTests() *Engine {
tables: make(map[string]*Table),
historian: newHistorian(false, 0, nil),
env: tabletenv.NewEnv(vtenv.NewTestEnv(), tabletenv.NewDefaultConfig(), "SchemaEngineForTests"),
notifiers: make(map[string]notifier),
}
return se
}
Expand Down
Loading