diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 2bd8ef4c18a..cea972e35a7 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -360,7 +360,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur healthy: make(map[KeyspaceShardTabletType][]*TabletHealth), subscribers: make(map[chan *TabletHealth]struct{}), cellAliases: make(map[string]string), - loadTabletsTrigger: make(chan struct{}), + loadTabletsTrigger: make(chan struct{}, 1), } var topoWatchers []*TopologyWatcher cells := strings.Split(cellsToWatch, ",") @@ -539,7 +539,13 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ if prevTarget.TabletType == topodata.TabletType_PRIMARY { if primaries := hc.healthData[oldTargetKey]; len(primaries) == 0 { log.Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet)) - hc.loadTabletsTrigger <- struct{}{} + // We want to trigger a loadTablets call, but if the channel is not empty + // then a trigger is already scheduled, we don't need to trigger another one. + // This also prevents the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994. + select { + case hc.loadTabletsTrigger <- struct{}{}: + default: + } } } } diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 4c8f81c6a3e..cef367c9b74 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -28,6 +28,7 @@ import ( "google.golang.org/protobuf/proto" "vitess.io/vitess/go/test/utils" + querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/logutil" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -709,3 +710,67 @@ func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) { assert.True(t, proto.Equal(tablet1, allTablets[key1])) assert.True(t, proto.Equal(tablet2, allTablets[key2])) } + +// TestDeadlockBetweenTopologyWatcherAndHealthCheck tests the possibility of a deadlock +// between the topology watcher and the health check. +// The issue https://github.com/vitessio/vitess/issues/16994 has more details on the deadlock. +func TestDeadlockBetweenTopologyWatcherAndHealthCheck(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + // create a new memory topo server and an health check instance. + ts, _ := memorytopo.NewServerAndFactory(ctx, "zone-1") + hc := NewHealthCheck(ctx, time.Hour, time.Hour, ts, "zone-1", "", nil) + defer hc.Close() + defer hc.topoWatchers[0].Stop() + + // Add a tablet to the topology. + tablet1 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone-1", + Uid: 100, + }, + Type: topodatapb.TabletType_REPLICA, + Hostname: "host1", + PortMap: map[string]int32{ + "grpc": 123, + }, + Keyspace: "keyspace", + Shard: "shard", + } + err := ts.CreateTablet(ctx, tablet1) + // Run the first loadTablets call to ensure the tablet is present in the topology watcher. + hc.topoWatchers[0].loadTablets() + require.NoError(t, err) + + // We want to run updateHealth with arguments that always + // make it trigger load Tablets. + th := &TabletHealth{ + Tablet: tablet1, + Target: &querypb.Target{ + Keyspace: "keyspace", + Shard: "shard", + TabletType: topodatapb.TabletType_REPLICA, + }, + } + prevTarget := &querypb.Target{ + Keyspace: "keyspace", + Shard: "shard", + TabletType: topodatapb.TabletType_PRIMARY, + } + + // If we run the updateHealth function often enough, then we + // will see the deadlock where the topology watcher is trying to replace + // the tablet in the health check, but health check has the mutex acquired + // already because it is calling updateHealth. + // updateHealth itself will be stuck trying to send on the shared channel. + for i := 0; i < 10; i++ { + // Update the port of the tablet so that when update Health asks topo watcher to + // refresh the tablets, it finds an update and tries to replace it. + _, err = ts.UpdateTabletFields(ctx, tablet1.Alias, func(t *topodatapb.Tablet) error { + t.PortMap["testing_port"] = int32(i + 1) + return nil + }) + require.NoError(t, err) + hc.updateHealth(th, prevTarget, false, false) + } +}