Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gossip: demonstrate problem with loopback info propagation #34472

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions pkg/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,3 +805,74 @@ func TestGossipPropagation(t *testing.T) {
return nil
})
}

// Test whether propagation of an info that was generated by a prior
// incarnation of a server can correctly be sent back to that originating
// server. Consider the scenario:
//
// n1: decommissioned
// n2: gossip node-liveness:1
// n3: node-liveness range lease acquired (does not gossip node-liveness:1
// record because it is unchanged)
// n2: restarted
// - connects as gossip client to n3
// - sends a batch of gossip records to n3
// - n3 responds without sending node-liveness:1 because it's
// OrigStamp is less than the highwater stamp from n2
func TestGossipLoopbackInfoPropagation(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skipf("#34494")
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())

local := startGossip(1, stopper, t, metric.NewRegistry())
remote := startGossip(2, stopper, t, metric.NewRegistry())
remote.mu.Lock()
rAddr := remote.mu.is.NodeAddr
remote.mu.Unlock()
local.manage()
remote.manage()

// Add a gossip info for "foo" on remote, that was generated by local. This
// simulates what happens if local was to gossip an info, and later restart
// and never gossip that info again.
func() {
local.mu.Lock()
defer local.mu.Unlock()
remote.mu.Lock()
defer remote.mu.Unlock()
// NB: replacing local.mu.is.newInfo with remote.mu.is.newInfo allows "foo"
// to be propagated.
if err := remote.mu.is.addInfo("foo", local.mu.is.newInfo(nil, 0)); err != nil {
t.Fatal(err)
}
}()

// Add an info to local so that it has a highwater timestamp that is newer
// than the info we added to remote. NB: commenting out this line allows
// "foo" to be propagated.
if err := local.AddInfo("bar", nil, 0); err != nil {
t.Fatal(err)
}

// Start a client connection to the remote node.
local.mu.Lock()
local.startClientLocked(&rAddr)
local.mu.Unlock()

getInfo := func(g *Gossip, key string) *Info {
g.mu.RLock()
defer g.mu.RUnlock()
return g.mu.is.Infos[key]
}

testutils.SucceedsSoon(t, func() error {
if getInfo(remote, "bar") == nil {
return fmt.Errorf("bar not propagated")
}
if getInfo(local, "foo") == nil {
return fmt.Errorf("foo not propagated")
}
return nil
})
}
49 changes: 49 additions & 0 deletions pkg/storage/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package storage_test
import (
"context"
"errors"
"fmt"
"reflect"
"testing"

Expand All @@ -29,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// TestStoreRangeLease verifies that regular ranges (not some special ones at
Expand Down Expand Up @@ -257,3 +259,50 @@ func TestGossipSystemConfigOnLeaseChange(t *testing.T) {
return nil
})
}

func TestGossipNodeLivenessOnLeaseChange(t *testing.T) {
defer leaktest.AfterTest(t)()
sc := storage.TestStoreConfig(nil)
sc.TestingKnobs.DisableReplicateQueue = true
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
const numStores = 3
mtc.Start(t, numStores)

rangeID := mtc.stores[0].LookupReplica(roachpb.RKey(keys.NodeLivenessSpan.Key)).RangeID
mtc.replicateRange(rangeID, 1, 2)

// Turn off liveness heartbeats on all nodes to ensure that updates to node
// liveness are not triggering gossiping.
for i := range mtc.nodeLivenesses {
mtc.nodeLivenesses[i].PauseHeartbeat(true)
}

nodeLivenessKey := gossip.MakeNodeLivenessKey(1)

initialStoreIdx := -1
for i := range mtc.stores {
if mtc.stores[i].Gossip().InfoOriginatedHere(nodeLivenessKey) {
initialStoreIdx = i
}
}
if initialStoreIdx == -1 {
t.Fatalf("no store has gossiped %s; gossip contents: %+v",
nodeLivenessKey, mtc.stores[0].Gossip().GetInfoStatus())
}
log.Infof(context.Background(), "%s gossiped from n%d",
nodeLivenessKey, mtc.stores[initialStoreIdx].Ident.NodeID)

newStoreIdx := (initialStoreIdx + 1) % numStores
mtc.transferLease(context.Background(), rangeID, initialStoreIdx, newStoreIdx)

testutils.SucceedsSoon(t, func() error {
if mtc.stores[initialStoreIdx].Gossip().InfoOriginatedHere(nodeLivenessKey) {
return fmt.Errorf("%s still most recently gossiped by original leaseholder", nodeLivenessKey)
}
if !mtc.stores[newStoreIdx].Gossip().InfoOriginatedHere(nodeLivenessKey) {
return fmt.Errorf("%s not most recently gossiped by new leaseholder", nodeLivenessKey)
}
return nil
})
}
2 changes: 1 addition & 1 deletion pkg/storage/replica_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (r *Replica) MaybeGossipNodeLiveness(ctx context.Context, span roachpb.Span
key := gossip.MakeNodeLivenessKey(kvLiveness.NodeID)
// Look up liveness from gossip; skip gossiping anew if unchanged.
if err := r.store.Gossip().GetInfoProto(key, &gossipLiveness); err == nil {
if gossipLiveness == kvLiveness {
if gossipLiveness == kvLiveness && r.store.Gossip().InfoOriginatedHere(key) {
continue
}
}
Expand Down