Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock in messager and health streamer #17230

Merged
merged 5 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 35 additions & 25 deletions go/vt/vttablet/tabletserver/health_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,18 @@ type healthStreamer struct {
degradedThreshold time.Duration
unhealthyThreshold atomic.Int64

mu sync.Mutex
ctx context.Context
cancel context.CancelFunc
clients map[chan *querypb.StreamHealthResponse]struct{}
state *querypb.StreamHealthResponse
// cancelMu is a mutex used to protect the cancel variable
// and for ensuring we don't call setup functions in parallel.
cancelMu sync.Mutex
ctx context.Context
cancel context.CancelFunc

// fieldsMu is used to protect access to the fields below.
// We require two separate mutexes, so that we don't have to acquire the same mutex
// in Close and reload that can lead to a deadlock described in https://github.com/vitessio/vitess/issues/17229#issuecomment-2476136610.
fieldsMu sync.Mutex
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
clients map[chan *querypb.StreamHealthResponse]struct{}
state *querypb.StreamHealthResponse
// isServingPrimary stores if this tablet is currently the serving primary or not.
isServingPrimary bool

Expand Down Expand Up @@ -110,8 +117,8 @@ func (hs *healthStreamer) InitDBConfig(target *querypb.Target) {
}

func (hs *healthStreamer) Open() {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.cancelMu.Lock()
defer hs.cancelMu.Unlock()

if hs.cancel != nil {
return
Expand All @@ -120,8 +127,8 @@ func (hs *healthStreamer) Open() {
}

func (hs *healthStreamer) Close() {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.cancelMu.Lock()
defer hs.cancelMu.Unlock()

if hs.cancel != nil {
hs.se.UnregisterNotifier("healthStreamer")
Expand Down Expand Up @@ -158,13 +165,16 @@ func (hs *healthStreamer) Stream(ctx context.Context, callback func(*querypb.Str
}

func (hs *healthStreamer) register() (chan *querypb.StreamHealthResponse, context.Context) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.cancelMu.Lock()
defer hs.cancelMu.Unlock()

if hs.cancel == nil {
return nil, nil
}

hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()

ch := make(chan *querypb.StreamHealthResponse, streamHealthBufferSize)
hs.clients[ch] = struct{}{}

Expand All @@ -174,15 +184,15 @@ func (hs *healthStreamer) register() (chan *querypb.StreamHealthResponse, contex
}

func (hs *healthStreamer) unregister(ch chan *querypb.StreamHealthResponse) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()

delete(hs.clients, ch)
}

func (hs *healthStreamer) ChangeState(tabletType topodatapb.TabletType, ptsTimestamp time.Time, lag time.Duration, err error, serving bool) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()

hs.state.Target.TabletType = tabletType
if tabletType == topodatapb.TabletType_PRIMARY {
Expand Down Expand Up @@ -236,8 +246,8 @@ func (hs *healthStreamer) broadCastToClients(shr *querypb.StreamHealthResponse)
}

func (hs *healthStreamer) AppendDetails(details []*kv) []*kv {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()
if hs.state.Target.TabletType == topodatapb.TabletType_PRIMARY {
return details
}
Expand Down Expand Up @@ -282,8 +292,8 @@ func (hs *healthStreamer) SetUnhealthyThreshold(v time.Duration) {
// MakePrimary tells the healthstreamer that the current tablet is now the primary,
// so it can read and write to the MySQL instance for schema-tracking.
func (hs *healthStreamer) MakePrimary(serving bool) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()
hs.isServingPrimary = serving
// We register for notifications from the schema Engine only when schema tracking is enabled,
// and we are going to a serving primary state.
Expand All @@ -298,15 +308,15 @@ func (hs *healthStreamer) MakePrimary(serving bool) {

// MakeNonPrimary tells the healthstreamer that the current tablet is now not a primary.
func (hs *healthStreamer) MakeNonPrimary() {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()
hs.isServingPrimary = false
}

// reload reloads the schema from the underlying mysql for the tables that we get the alert on.
func (hs *healthStreamer) reload(created, altered, dropped []*schema.Table, udfsChanged bool) error {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()
// Schema Reload to happen only on primary when it is serving.
// We can be in a state when the primary is not serving after we have run DemotePrimary. In that case,
// we don't want to run any queries in MySQL, so we shouldn't reload anything in the healthStreamer.
Expand Down Expand Up @@ -349,8 +359,8 @@ func (hs *healthStreamer) reload(created, altered, dropped []*schema.Table, udfs

// sendUnresolvedTransactionSignal sends broadcast message about unresolved transactions.
func (hs *healthStreamer) sendUnresolvedTransactionSignal() {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()
// send signal only when primary is serving.
if !hs.isServingPrimary {
return
Expand Down
40 changes: 40 additions & 0 deletions go/vt/vttablet/tabletserver/health_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,43 @@ func testStream(hs *healthStreamer) (<-chan *querypb.StreamHealthResponse, conte
func testBlpFunc() (int64, int32) {
return 1, 2
}

// TestDeadlockBwCloseAndReload tests the deadlock observed between Close and Reload
// functions. More details can be found in the issue https://github.com/vitessio/vitess/issues/17229#issuecomment-2476136610.
func TestDeadlockBwCloseAndReload(t *testing.T) {
cfg := newConfig(nil)
env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TestNotServingPrimary")
alias := &topodatapb.TabletAlias{
Cell: "cell",
Uid: 1,
}
se := schema.NewEngineForTests()
// Create a new health streamer and set it to a serving primary state
hs := newHealthStreamer(env, alias, se)
hs.signalWhenSchemaChange = true
hs.Open()
hs.MakePrimary(true)
defer hs.Close()

wg := sync.WaitGroup{}
wg.Add(2)
// Try running Close and reload in parallel multiple times.
// This reproduces the deadlock quite readily.
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
hs.Close()
hs.Open()
}
}()

go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
se.BroadcastForTesting(nil, nil, nil, true)
}
}()

// Wait for wait group to finish.
wg.Wait()
}
30 changes: 19 additions & 11 deletions go/vt/vttablet/tabletserver/messager/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,16 @@ type VStreamer interface {

// Engine is the engine for handling messages.
type Engine struct {
mu sync.Mutex
isOpen bool
managers map[string]*messageManager
// mu is a mutex used to protect the isOpen variable
// and for ensuring we don't call setup functions in parallel.
mu sync.Mutex
isOpen bool

// managersMu is a mutex used to protect the managers field.
// We require two separate mutexes, so that we don't have to acquire the same mutex
// in Close and schemaChanged which can lead to a deadlock described in https://github.com/vitessio/vitess/issues/17229.
managersMu sync.Mutex
managers map[string]*messageManager

tsv TabletService
se *schema.Engine
Expand All @@ -76,15 +83,12 @@ func NewEngine(tsv TabletService, se *schema.Engine, vs VStreamer) *Engine {
// Open starts the Engine service.
func (me *Engine) Open() {
me.mu.Lock()
defer me.mu.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if me.isOpen {
me.mu.Unlock()
return
}
me.isOpen = true
me.mu.Unlock()
log.Info("Messager: opening")
// Unlock before invoking RegisterNotifier because it
// obtains the same lock.
me.se.RegisterNotifier("messages", me.schemaChanged, true)
}

Expand All @@ -102,6 +106,8 @@ func (me *Engine) Close() {
log.Infof("messager Engine - unregistering notifiers")
me.se.UnregisterNotifier("messages")
log.Infof("messager Engine - closing all managers")
me.managersMu.Lock()
defer me.managersMu.Unlock()
Comment on lines +109 to +110
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

for _, mm := range me.managers {
mm.Close()
}
Expand All @@ -110,8 +116,8 @@ func (me *Engine) Close() {
}

func (me *Engine) GetGenerator(name string) (QueryGenerator, error) {
me.mu.Lock()
defer me.mu.Unlock()
me.managersMu.Lock()
defer me.managersMu.Unlock()
mm := me.managers[name]
if mm == nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "message table %s not found in schema", name)
Expand All @@ -132,6 +138,8 @@ func (me *Engine) Subscribe(ctx context.Context, name string, send func(*sqltype
if !me.isOpen {
return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "messager engine is closed, probably because this is not a primary any more")
}
me.managersMu.Lock()
defer me.managersMu.Unlock()
mm := me.managers[name]
if mm == nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "message table %s not found", name)
Expand All @@ -140,8 +148,8 @@ func (me *Engine) Subscribe(ctx context.Context, name string, send func(*sqltype
}

func (me *Engine) schemaChanged(tables map[string]*schema.Table, created, altered, dropped []*schema.Table, _ bool) {
me.mu.Lock()
defer me.mu.Unlock()
me.managersMu.Lock()
defer me.managersMu.Unlock()
for _, table := range append(dropped, altered...) {
name := table.Name.String()
mm := me.managers[name]
Expand Down
33 changes: 32 additions & 1 deletion go/vt/vttablet/tabletserver/messager/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package messager
import (
"context"
"reflect"
"sync"
"testing"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -156,7 +157,7 @@ func newTestEngine() *Engine {
tsv := &fakeTabletServer{
Env: tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "MessagerTest"),
}
se := schema.NewEngine(tsv)
se := schema.NewEngineForTests()
te := NewEngine(tsv, se, newFakeVStreamer())
te.Open()
return te
Expand All @@ -169,3 +170,33 @@ func newEngineReceiver() (f func(qr *sqltypes.Result) error, ch chan *sqltypes.R
return nil
}, ch
}

// TestDeadlockBwCloseAndSchemaChange tests the deadlock observed between Close and schemaChanged
// functions. More details can be found in the issue https://github.com/vitessio/vitess/issues/17229.
func TestDeadlockBwCloseAndSchemaChange(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this on main in a loop while reviewing the PR. I didn't get any failures:

while true; do
    go test -v -count 1 -timeout 5s -run ^TestDeadlockBwCloseAndSchemaChange$ vitess.io/vitess/go/vt/vttablet/tabletserver/messager
    if [[ $? -ne 0 ]]; then
        say "got one"
        break
    fi
done

But we already know that it's not easy to trigger and we can logically see how it can happen. Just noting that the test doesn't seem to be as useful as one would hope.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tracks. I commented on the issue that we haven't seen this in practice, at least that I'm aware of.
#17229 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really!? It fails for me -

while true; do
    go test -v -count 1 -timeout 5s -run ^TestDeadlockBwCloseAndSchemaChange$ vitess.io/vitess/go/vt/vttablet/tabletserver/messager
    if [[ $? -ne 0 ]]; then
        say "got one"
        break
    fi
done
=== RUN   TestDeadlockBwCloseAndSchemaChange
--- PASS: TestDeadlockBwCloseAndSchemaChange (0.00s)
PASS
ok      vitess.io/vitess/go/vt/vttablet/tabletserver/messager   0.579s
=== RUN   TestDeadlockBwCloseAndSchemaChange
panic: test timed out after 5s
        running tests:
                TestDeadlockBwCloseAndSchemaChange (5s)

goroutine 66 [running]:
testing.(*M).startAlarm.func1()
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/testing/testing.go:2373 +0x30c
created by time.goFunc
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/time/sleep.go:215 +0x38

goroutine 1 [chan receive]:
testing.(*T).Run(0x140001be820, {0x1012cb3fe?, 0x1400059fb48?}, 0x101ab0970)
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1751 +0x328
testing.runTests.func1(0x140001be820)
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/testing/testing.go:2168 +0x40
testing.tRunner(0x140001be820, 0x1400059fc68)
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1690 +0xe4
testing.runTests(0x140001c7d58, {0x1024c7e00, 0x19, 0x19}, {0xfc00000000000000?, 0xfcaacd9be1871533?, 0x102a6b640?})
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/testing/testing.go:2166 +0x3ac
testing.(*M).Run(0x14000342f00)
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/testing/testing.go:2034 +0x588
main.main()
        _testmain.go:93 +0x90

goroutine 35 [select]:
github.com/golang/glog.(*fileSink).flushDaemon(0x102a6b6d8)
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/github.com/golang/[email protected]/glog_file.go:352 +0xa4
created by github.com/golang/glog.init.1 in goroutine 1
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/github.com/golang/[email protected]/glog_file.go:167 +0x1c8

goroutine 10 [syscall]:
os/signal.signal_recv()
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/runtime/sigqueue.go:149 +0x2c
os/signal.loop()
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/os/signal/signal_unix.go:23 +0x1c
created by os/signal.Notify.func1.1 in goroutine 1
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/os/signal/signal.go:151 +0x28

goroutine 11 [chan receive]:
vitess.io/vitess/go/vt/dbconfigs.init.0.func1()
        /Users/manangupta/vitess/go/vt/dbconfigs/credentials.go:90 +0x34
created by vitess.io/vitess/go/vt/dbconfigs.init.0 in goroutine 1
        /Users/manangupta/vitess/go/vt/dbconfigs/credentials.go:89 +0x164

goroutine 12 [semacquire]:
sync.runtime_Semacquire(0x14000663340?)
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/runtime/sema.go:71 +0x2c
sync.(*WaitGroup).Wait(0x140003b47d0)
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/sync/waitgroup.go:118 +0x74
vitess.io/vitess/go/vt/vttablet/tabletserver/messager.TestDeadlockBwCloseAndSchemaChange(0x140001be9c0?)
        /Users/manangupta/vitess/go/vt/vttablet/tabletserver/messager/engine_test.go:201 +0x110
testing.tRunner(0x140001be9c0, 0x101ab0970)
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1690 +0xe4
created by testing.(*T).Run in goroutine 1
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1743 +0x314

goroutine 13 [select]:
vitess.io/vitess/go/stats.(*Rates).track(0x14000465100)
        /Users/manangupta/vitess/go/stats/rates.go:110 +0xac
created by vitess.io/vitess/go/stats.NewRates in goroutine 12
        /Users/manangupta/vitess/go/stats/rates.go:97 +0x240

goroutine 14 [select]:
vitess.io/vitess/go/stats.(*Rates).track(0x14000465280)
        /Users/manangupta/vitess/go/stats/rates.go:110 +0xac
created by vitess.io/vitess/go/stats.NewRates in goroutine 12
        /Users/manangupta/vitess/go/stats/rates.go:97 +0x240

goroutine 15 [sync.Mutex.Lock]:
sync.runtime_SemacquireMutex(0x2?, 0x0?, 0x1012d7c7b?)
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/runtime/sema.go:95 +0x28
sync.(*Mutex).lockSlow(0x14000410048)
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/sync/mutex.go:173 +0x174
sync.(*Mutex).Lock(...)
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/sync/mutex.go:92
vitess.io/vitess/go/vt/vttablet/tabletserver/schema.(*Engine).UnregisterNotifier(0x14000410000, {0x10129dacb, 0x8})
        /Users/manangupta/vitess/go/vt/vttablet/tabletserver/schema/engine.go:846 +0xc0
vitess.io/vitess/go/vt/vttablet/tabletserver/messager.(*Engine).Close(0x14000061c20)
        /Users/manangupta/vitess/go/vt/vttablet/tabletserver/messager/engine.go:108 +0x144
vitess.io/vitess/go/vt/vttablet/tabletserver/messager.TestDeadlockBwCloseAndSchemaChange.func1()
        /Users/manangupta/vitess/go/vt/vttablet/tabletserver/messager/engine_test.go:188 +0x5c
created by vitess.io/vitess/go/vt/vttablet/tabletserver/messager.TestDeadlockBwCloseAndSchemaChange in goroutine 12
        /Users/manangupta/vitess/go/vt/vttablet/tabletserver/messager/engine_test.go:185 +0xb8

goroutine 16 [sync.Mutex.Lock]:
sync.runtime_SemacquireMutex(0x1298a9bf0?, 0x50?, 0x1298a9bf0?)
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/runtime/sema.go:95 +0x28
sync.(*Mutex).lockSlow(0x14000061c20)
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/sync/mutex.go:173 +0x174
sync.(*Mutex).Lock(...)
        /Users/manangupta/.gvm/pkgsets/go1.23.0/global/pkg/mod/golang.org/[email protected]/src/sync/mutex.go:92
vitess.io/vitess/go/vt/vttablet/tabletserver/messager.(*Engine).schemaChanged(0x14000061c20, 0x140001b7dc8?, {0x0, 0x0, 0x0}, {0x0, 0x0, 0x1012570f4?}, {0x0, 0x0, ...}, ...)
        /Users/manangupta/vitess/go/vt/vttablet/tabletserver/messager/engine.go:152 +0xa8
vitess.io/vitess/go/vt/vttablet/tabletserver/schema.(*Engine).broadcast(0x14000410000, {0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...)
        /Users/manangupta/vitess/go/vt/vttablet/tabletserver/schema/engine.go:864 +0x264
vitess.io/vitess/go/vt/vttablet/tabletserver/schema.(*Engine).BroadcastForTesting(0x0?, {0x0?, 0x0?, 0x0?}, {0x0?, 0x0?, 0x0?}, {0x0?, 0x0?, 0x0?}, ...)
        /Users/manangupta/vitess/go/vt/vttablet/tabletserver/schema/engine.go:872 +0x114
vitess.io/vitess/go/vt/vttablet/tabletserver/messager.TestDeadlockBwCloseAndSchemaChange.func2()
        /Users/manangupta/vitess/go/vt/vttablet/tabletserver/messager/engine_test.go:196 +0x88
created by vitess.io/vitess/go/vt/vttablet/tabletserver/messager.TestDeadlockBwCloseAndSchemaChange in goroutine 12
        /Users/manangupta/vitess/go/vt/vttablet/tabletserver/messager/engine_test.go:193 +0x108
FAIL    vitess.io/vitess/go/vt/vttablet/tabletserver/messager   5.648s
FAIL

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made sure the test was failing before I added any fixes. After the changes, the test doesn't fail across 1000 or so runs 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can try running this on e525c1a169 commit hash. In this PR I've added the test in one commit, before fixing the issue in the next. So it should be reproducible for you too.

engine := newTestEngine()
defer engine.Close()
se := engine.se

wg := sync.WaitGroup{}
wg.Add(2)
// Try running Close and schemaChanged in parallel multiple times.
// This reproduces the deadlock quite readily.
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
engine.Close()
engine.Open()
}
}()

go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
se.BroadcastForTesting(nil, nil, nil, true)
}
}()

// Wait for wait group to finish.
wg.Wait()
}
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,13 @@ func (se *Engine) broadcast(created, altered, dropped []*Table, udfsChanged bool
}
}

// BroadcastForTesting is meant to be a testing function that triggers a broadcast call.
func (se *Engine) BroadcastForTesting(created, altered, dropped []*Table, udfsChanged bool) {
se.mu.Lock()
defer se.mu.Unlock()
se.broadcast(created, altered, dropped, udfsChanged)
}

// GetTable returns the info for a table.
func (se *Engine) GetTable(tableName sqlparser.IdentifierCS) *Table {
se.mu.Lock()
Expand Down Expand Up @@ -951,6 +958,7 @@ func NewEngineForTests() *Engine {
tables: make(map[string]*Table),
historian: newHistorian(false, 0, nil),
env: tabletenv.NewEnv(vtenv.NewTestEnv(), tabletenv.NewDefaultConfig(), "SchemaEngineForTests"),
notifiers: make(map[string]notifier),
}
return se
}
Expand Down
Loading