-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
gossip.go
1615 lines (1449 loc) · 53.7 KB
/
gossip.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
/*
Each node attempts to contact peer nodes to gather all Infos in
the system with minimal total hops. The algorithm is as follows:
0 Node starts up gossip server to accept incoming gossip requests.
Continue to step #1 to join the gossip network.
1 Node selects random peer from bootstrap list, excluding its own
address for its first outgoing connection. Node starts client and
continues to step #2.
2 Node requests gossip from peer. Gossip requests (and responses)
contain a map from node ID to info about other nodes in the
network. Each node maintains its own map as well as the maps of
each of its peers. The info for each node includes the most recent
timestamp of any Info originating at that node, as well as the min
number of hops to reach that node. Requesting node times out at
checkInterval. On timeout, client is closed and GC'd. If node has
no outgoing connections, goto #1.
a. When gossip is received, infostore is augmented. If new Info was
received, the client in question is credited. If node has no
outgoing connections, goto #1.
b. If any gossip was received at > maxHops and num connected peers
< maxPeers(), choose random peer from those originating Info >
maxHops, start it, and goto #2.
c. If sentinel gossip keyed by KeySentinel is missing or expired,
node is considered partitioned; goto #1.
3 On connect, if node has too many connected clients, gossip requests
are returned immediately with an alternate address set to a random
selection from amongst already-connected clients.
*/
package gossip
import (
"bytes"
"context"
"fmt"
"math"
"math/rand"
"net"
"sort"
"strconv"
"strings"
"sync"
"time"
"google.golang.org/grpc"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
circuit "github.com/rubyist/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip/resolver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
const (
// maxHops is the maximum number of hops which any gossip info
// should require to transit between any two nodes in a gossip
// network.
maxHops = 5
// minPeers is the minimum number of peers which the maxPeers()
// function will return. This is set higher than one to prevent
// excessive tightening of the network.
minPeers = 3
// defaultStallInterval is the default interval for checking whether
// the incoming and outgoing connections to the gossip network are
// insufficient to keep the network connected.
defaultStallInterval = 2 * time.Second
// defaultBootstrapInterval is the minimum time between successive
// bootstrapping attempts to avoid busy-looping trying to find the
// sentinel gossip info.
defaultBootstrapInterval = 1 * time.Second
// defaultCullInterval is the default interval for culling the least
// "useful" outgoing gossip connection to free up space for a more
// efficiently targeted connection to the most distant node.
defaultCullInterval = 60 * time.Second
// defaultClientsInterval is the default interval for updating the gossip
// clients key which allows every node in the cluster to create a map of
// gossip connectivity. This value is intentionally small as we want to
// detect gossip partitions faster that the node liveness timeout (9s).
defaultClientsInterval = 2 * time.Second
// NodeDescriptorInterval is the interval for gossiping the node descriptor.
// Note that increasing this duration may increase the likelihood of gossip
// thrashing, since node descriptors are used to determine the number of gossip
// hops between nodes (see #9819 for context).
NodeDescriptorInterval = 1 * time.Hour
// NodeDescriptorTTL is time-to-live for node ID -> descriptor.
NodeDescriptorTTL = 2 * NodeDescriptorInterval
// StoresInterval is the default interval for gossiping store descriptors.
StoresInterval = 60 * time.Second
// StoreTTL is time-to-live for store-related info.
StoreTTL = 2 * StoresInterval
unknownNodeID roachpb.NodeID = 0
)
// Gossip metrics counter names.
var (
MetaConnectionsIncomingGauge = metric.Metadata{
Name: "gossip.connections.incoming",
Help: "Number of active incoming gossip connections",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
MetaConnectionsOutgoingGauge = metric.Metadata{
Name: "gossip.connections.outgoing",
Help: "Number of active outgoing gossip connections",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
MetaConnectionsRefused = metric.Metadata{
Name: "gossip.connections.refused",
Help: "Number of refused incoming gossip connections",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
MetaInfosSent = metric.Metadata{
Name: "gossip.infos.sent",
Help: "Number of sent gossip Info objects",
Measurement: "Infos",
Unit: metric.Unit_COUNT,
}
MetaInfosReceived = metric.Metadata{
Name: "gossip.infos.received",
Help: "Number of received gossip Info objects",
Measurement: "Infos",
Unit: metric.Unit_COUNT,
}
MetaBytesSent = metric.Metadata{
Name: "gossip.bytes.sent",
Help: "Number of sent gossip bytes",
Measurement: "Gossip Bytes",
Unit: metric.Unit_BYTES,
}
MetaBytesReceived = metric.Metadata{
Name: "gossip.bytes.received",
Help: "Number of received gossip bytes",
Measurement: "Gossip Bytes",
Unit: metric.Unit_BYTES,
}
)
// KeyNotPresentError is returned by gossip when queried for a key that doesn't
// exist of has expired.
type KeyNotPresentError struct {
key string
}
// Error implements the error interface.
func (err KeyNotPresentError) Error() string {
return fmt.Sprintf("KeyNotPresentError: gossip key %q does not exist or has expired", err.key)
}
// NewKeyNotPresentError creates a new KeyNotPresentError.
func NewKeyNotPresentError(key string) error {
return KeyNotPresentError{key: key}
}
// AddressResolver is a thin wrapper around gossip's GetNodeIDAddress
// that allows it to be used as a nodedialer.AddressResolver
func AddressResolver(gossip *Gossip) nodedialer.AddressResolver {
return func(nodeID roachpb.NodeID) (net.Addr, error) {
return gossip.GetNodeIDAddress(nodeID)
}
}
// Storage is an interface which allows the gossip instance
// to read and write bootstrapping data to persistent storage
// between instantiations.
type Storage interface {
// ReadBootstrapInfo fetches the bootstrap data from the persistent
// store into the provided bootstrap protobuf. Returns nil or an
// error on failure.
ReadBootstrapInfo(*BootstrapInfo) error
// WriteBootstrapInfo stores the provided bootstrap data to the
// persistent store. Returns nil or an error on failure.
WriteBootstrapInfo(*BootstrapInfo) error
}
// Gossip is an instance of a gossip node. It embeds a gossip server.
// During bootstrapping, the bootstrap list contains candidates for
// entry to the gossip network.
type Gossip struct {
*server // Embedded gossip RPC server
Connected chan struct{} // Closed upon initial connection
hasConnected bool // Set first time network is connected
rpcContext *rpc.Context // The context required for RPC
outgoing nodeSet // Set of outgoing client node IDs
storage Storage // Persistent storage interface
bootstrapInfo BootstrapInfo // BootstrapInfo proto for persistent storage
bootstrapping map[string]struct{} // Set of active bootstrap clients
hasCleanedBS bool
// Note that access to each client's internal state is serialized by the
// embedded server's mutex. This is surprising!
clientsMu struct {
syncutil.Mutex
clients []*client
// One breaker per client for the life of the process.
breakers map[string]*circuit.Breaker
}
disconnected chan *client // Channel of disconnected clients
stalled bool // True if gossip is stalled (i.e. host doesn't have sentinel)
stalledCh chan struct{} // Channel to wake up stalled bootstrap
stallInterval time.Duration
bootstrapInterval time.Duration
cullInterval time.Duration
// The system config is treated unlike other info objects.
// It is used so often that we keep an unmarshaled version of it
// here and its own set of callbacks.
// We do not use the infostore to avoid unmarshalling under the
// main gossip lock.
systemConfig config.SystemConfig
systemConfigSet bool
systemConfigMu syncutil.RWMutex
systemConfigChannels []chan<- struct{}
// resolvers is a list of resolvers used to determine
// bootstrap hosts for connecting to the gossip network.
resolverIdx int
resolvers []resolver.Resolver
resolversTried map[int]struct{} // Set of attempted resolver indexes
nodeDescs map[roachpb.NodeID]*roachpb.NodeDescriptor
// storeMap maps store IDs to node IDs.
storeMap map[roachpb.StoreID]roachpb.NodeID
// Membership sets for resolvers and bootstrap addresses.
// bootstrapAddrs also tracks which address is associated with which
// node ID to enable faster node lookup by address.
resolverAddrs map[util.UnresolvedAddr]resolver.Resolver
bootstrapAddrs map[util.UnresolvedAddr]roachpb.NodeID
localityTierMap map[string]struct{}
logCh chan struct{}
lastConnectivity string
}
// New creates an instance of a gossip node.
// The higher level manages the ClusterIDContainer and NodeIDContainer instances
// (which can be shared by various server components). The ambient context is
// expected to already contain the node ID.
//
// grpcServer: The server on which the new Gossip instance will register its RPC
// service. Can be nil, in which case the Gossip will not register the
// service.
// rpcContext: The context used to connect to other nodes. Can be nil for tests
// that also specify a nil grpcServer and that plan on using the Gossip in a
// restricted way by populating it with data manually.
func New(
ambient log.AmbientContext,
clusterID *base.ClusterIDContainer,
nodeID *base.NodeIDContainer,
rpcContext *rpc.Context,
grpcServer *grpc.Server,
stopper *stop.Stopper,
registry *metric.Registry,
locality roachpb.Locality,
) *Gossip {
ambient.SetEventLog("gossip", "gossip")
g := &Gossip{
server: newServer(ambient, clusterID, nodeID, stopper, registry),
Connected: make(chan struct{}),
rpcContext: rpcContext,
outgoing: makeNodeSet(minPeers, metric.NewGauge(MetaConnectionsOutgoingGauge)),
bootstrapping: map[string]struct{}{},
disconnected: make(chan *client, 10),
stalledCh: make(chan struct{}, 1),
stallInterval: defaultStallInterval,
bootstrapInterval: defaultBootstrapInterval,
cullInterval: defaultCullInterval,
resolversTried: map[int]struct{}{},
nodeDescs: map[roachpb.NodeID]*roachpb.NodeDescriptor{},
storeMap: make(map[roachpb.StoreID]roachpb.NodeID),
resolverAddrs: map[util.UnresolvedAddr]resolver.Resolver{},
bootstrapAddrs: map[util.UnresolvedAddr]roachpb.NodeID{},
localityTierMap: map[string]struct{}{},
logCh: make(chan struct{}, 1),
}
for _, loc := range locality.Tiers {
g.localityTierMap[loc.String()] = struct{}{}
}
stopper.AddCloser(stop.CloserFn(g.server.AmbientContext.FinishEventLog))
registry.AddMetric(g.outgoing.gauge)
g.clientsMu.breakers = map[string]*circuit.Breaker{}
g.mu.Lock()
// Add ourselves as a SystemConfig watcher.
g.mu.is.registerCallback(KeySystemConfig, g.updateSystemConfig)
// Add ourselves as a node descriptor watcher.
g.mu.is.registerCallback(MakePrefixPattern(KeyNodeIDPrefix), g.updateNodeAddress)
g.mu.is.registerCallback(MakePrefixPattern(KeyStorePrefix), g.updateStoreMap)
// Log gossip connectivity whenever we receive an update.
g.mu.is.registerCallback(MakePrefixPattern(KeyGossipClientsPrefix),
func(_ string, _ roachpb.Value) {
// Rather than logging here directly, we signal logCh which "debounces"
// frequent updates. This approach is used rather than something like
// log.Every because gossip connectivity is critical for correct
// operation and we want to make sure the most recent update is logged.
select {
case g.logCh <- struct{}{}:
default:
}
})
g.mu.Unlock()
if grpcServer != nil {
RegisterGossipServer(grpcServer, g.server)
}
return g
}
// NewTest is a simplified wrapper around New that creates the
// ClusterIDContainer and NodeIDContainer internally. Used for testing.
//
// grpcServer: The server on which the new Gossip instance will register its RPC
// service. Can be nil, in which case the Gossip will not register the
// service.
// rpcContext: The context used to connect to other nodes. Can be nil for tests
// that also specify a nil grpcServer and that plan on using the Gossip in a
// restricted way by populating it with data manually.
func NewTest(
nodeID roachpb.NodeID,
rpcContext *rpc.Context,
grpcServer *grpc.Server,
stopper *stop.Stopper,
registry *metric.Registry,
) *Gossip {
return NewTestWithLocality(nodeID, rpcContext, grpcServer, stopper, registry, roachpb.Locality{})
}
// NewTestWithLocality calls NewTest with an explicit locality value.
func NewTestWithLocality(
nodeID roachpb.NodeID,
rpcContext *rpc.Context,
grpcServer *grpc.Server,
stopper *stop.Stopper,
registry *metric.Registry,
locality roachpb.Locality,
) *Gossip {
c := &base.ClusterIDContainer{}
n := &base.NodeIDContainer{}
var ac log.AmbientContext
ac.AddLogTag("n", n)
gossip := New(ac, c, n, rpcContext, grpcServer, stopper, registry, locality)
if nodeID != 0 {
n.Set(context.TODO(), nodeID)
}
return gossip
}
// GetNodeMetrics returns the gossip node metrics.
func (g *Gossip) GetNodeMetrics() *Metrics {
return g.server.GetNodeMetrics()
}
// SetNodeDescriptor adds the node descriptor to the gossip network.
func (g *Gossip) SetNodeDescriptor(desc *roachpb.NodeDescriptor) error {
ctx := g.AnnotateCtx(context.TODO())
log.Infof(ctx, "NodeDescriptor set to %+v", desc)
if err := g.AddInfoProto(MakeNodeIDKey(desc.NodeID), desc, NodeDescriptorTTL); err != nil {
return errors.Errorf("node %d: couldn't gossip descriptor: %v", desc.NodeID, err)
}
g.updateClients()
return nil
}
// SetStallInterval sets the interval between successive checks
// to determine whether this host is not connected to the gossip
// network, or else is connected to a partition which doesn't
// include the host which gossips the sentinel info.
func (g *Gossip) SetStallInterval(interval time.Duration) {
g.mu.Lock()
defer g.mu.Unlock()
g.stallInterval = interval
}
// SetBootstrapInterval sets a minimum interval between successive
// attempts to connect to new hosts in order to join the gossip
// network.
func (g *Gossip) SetBootstrapInterval(interval time.Duration) {
g.mu.Lock()
defer g.mu.Unlock()
g.bootstrapInterval = interval
}
// SetCullInterval sets the interval between periodic shutdown of
// outgoing gossip client connections in an effort to improve the
// fitness of the network.
func (g *Gossip) SetCullInterval(interval time.Duration) {
g.mu.Lock()
defer g.mu.Unlock()
g.cullInterval = interval
}
// SetStorage provides an instance of the Storage interface
// for reading and writing gossip bootstrap data from persistent
// storage. This should be invoked as early in the lifecycle of a
// gossip instance as possible, but can be called at any time.
func (g *Gossip) SetStorage(storage Storage) error {
ctx := g.AnnotateCtx(context.TODO())
// Maintain lock ordering.
var storedBI BootstrapInfo
if err := storage.ReadBootstrapInfo(&storedBI); err != nil {
log.Warningf(ctx, "failed to read gossip bootstrap info: %s", err)
}
g.mu.Lock()
defer g.mu.Unlock()
g.storage = storage
// Merge the stored bootstrap info addresses with any we've become
// aware of through gossip.
existing := map[string]struct{}{}
makeKey := func(a util.UnresolvedAddr) string { return fmt.Sprintf("%s,%s", a.Network(), a.String()) }
for _, addr := range g.bootstrapInfo.Addresses {
existing[makeKey(addr)] = struct{}{}
}
for _, addr := range storedBI.Addresses {
// If the address is new, and isn't our own address, add it.
if _, ok := existing[makeKey(addr)]; !ok && addr != g.mu.is.NodeAddr {
g.maybeAddBootstrapAddressLocked(addr, unknownNodeID)
}
}
// Persist merged addresses.
if numAddrs := len(g.bootstrapInfo.Addresses); numAddrs > len(storedBI.Addresses) {
if err := g.storage.WriteBootstrapInfo(&g.bootstrapInfo); err != nil {
log.Error(ctx, err)
}
}
// Cycle through all persisted bootstrap hosts and add resolvers for
// any which haven't already been added.
newResolverFound := false
for _, addr := range g.bootstrapInfo.Addresses {
if !g.maybeAddResolverLocked(addr) {
continue
}
// If we find a new resolver, reset the resolver index so that the
// next resolver we try is the first of the new resolvers.
if !newResolverFound {
newResolverFound = true
g.resolverIdx = len(g.resolvers) - 1
}
}
// If a new resolver was found, immediately signal bootstrap.
if newResolverFound {
if log.V(1) {
log.Infof(ctx, "found new resolvers from storage; signaling bootstrap")
}
g.signalStalledLocked()
}
return nil
}
// setResolvers initializes the set of gossip resolvers used to find
// nodes to bootstrap the gossip network.
func (g *Gossip) setResolvers(resolvers []resolver.Resolver) {
if resolvers == nil {
return
}
g.mu.Lock()
defer g.mu.Unlock()
// Start index at end because get next address loop logic increments as first step.
g.resolverIdx = len(resolvers) - 1
g.resolvers = resolvers
g.resolversTried = map[int]struct{}{}
// Start new bootstrapping immediately instead of waiting for next bootstrap interval.
g.maybeSignalStatusChangeLocked()
}
// GetResolvers returns a copy of the resolvers slice.
func (g *Gossip) GetResolvers() []resolver.Resolver {
g.mu.RLock()
defer g.mu.RUnlock()
return append([]resolver.Resolver(nil), g.resolvers...)
}
// GetNodeIDAddress looks up the address of the node by ID.
func (g *Gossip) GetNodeIDAddress(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) {
g.mu.RLock()
defer g.mu.RUnlock()
return g.getNodeIDAddressLocked(nodeID)
}
// GetNodeIDForStoreID looks up the NodeID by StoreID.
func (g *Gossip) GetNodeIDForStoreID(storeID roachpb.StoreID) (roachpb.NodeID, error) {
g.mu.RLock()
defer g.mu.RUnlock()
return g.getNodeIDForStoreIDLocked(storeID)
}
// GetNodeDescriptor looks up the descriptor of the node by ID.
func (g *Gossip) GetNodeDescriptor(nodeID roachpb.NodeID) (*roachpb.NodeDescriptor, error) {
g.mu.RLock()
defer g.mu.RUnlock()
return g.getNodeDescriptorLocked(nodeID)
}
// LogStatus logs the current status of gossip such as the incoming and
// outgoing connections.
func (g *Gossip) LogStatus() {
g.mu.RLock()
n := len(g.nodeDescs)
status := "ok"
if g.mu.is.getInfo(KeySentinel) == nil {
status = "stalled"
}
g.mu.RUnlock()
ctx := g.AnnotateCtx(context.TODO())
log.Infof(
ctx, "gossip status (%s, %d node%s)\n%s%s%s", status, n, util.Pluralize(int64(n)),
g.clientStatus(), g.server.status(), g.Connectivity(),
)
}
func (g *Gossip) clientStatus() ClientStatus {
g.mu.RLock()
defer g.mu.RUnlock()
g.clientsMu.Lock()
defer g.clientsMu.Unlock()
var status ClientStatus
status.MaxConns = int32(g.outgoing.maxSize)
status.ConnStatus = make([]OutgoingConnStatus, 0, len(g.clientsMu.clients))
for _, c := range g.clientsMu.clients {
status.ConnStatus = append(status.ConnStatus, OutgoingConnStatus{
ConnStatus: ConnStatus{
NodeID: c.peerID,
Address: c.addr.String(),
AgeNanos: timeutil.Since(c.createdAt).Nanoseconds(),
},
MetricSnap: c.clientMetrics.Snapshot(),
})
}
return status
}
// Connectivity returns the current view of the gossip network as seen by this
// node.
func (g *Gossip) Connectivity() Connectivity {
ctx := g.AnnotateCtx(context.TODO())
var c Connectivity
g.mu.RLock()
if i := g.mu.is.getInfo(KeySentinel); i != nil {
c.SentinelNodeID = i.NodeID
}
for nodeID := range g.nodeDescs {
i := g.mu.is.getInfo(MakeGossipClientsKey(nodeID))
if i == nil {
continue
}
v, err := i.Value.GetBytes()
if err != nil {
log.Errorf(ctx, "unable to retrieve gossip value for %s: %v",
MakeGossipClientsKey(nodeID), err)
continue
}
if len(v) == 0 {
continue
}
for _, part := range strings.Split(string(v), ",") {
id, err := strconv.ParseInt(part, 10 /* base */, 64 /* bitSize */)
if err != nil {
log.Errorf(ctx, "unable to parse node ID: %v", err)
}
c.ClientConns = append(c.ClientConns, Connectivity_Conn{
SourceID: nodeID,
TargetID: roachpb.NodeID(id),
})
}
}
g.mu.RUnlock()
sort.Slice(c.ClientConns, func(i, j int) bool {
a, b := &c.ClientConns[i], &c.ClientConns[j]
if a.SourceID < b.SourceID {
return true
}
if a.SourceID > b.SourceID {
return false
}
return a.TargetID < b.TargetID
})
return c
}
// EnableSimulationCycler is for TESTING PURPOSES ONLY. It sets a
// condition variable which is signaled at each cycle of the
// simulation via SimulationCycle(). The gossip server makes each
// connecting client wait for the cycler to signal before responding.
func (g *Gossip) EnableSimulationCycler(enable bool) {
g.mu.Lock()
defer g.mu.Unlock()
if enable {
g.simulationCycler = sync.NewCond(&g.mu)
} else {
// TODO(spencer): remove this nil check when gossip/simulation is no
// longer used in kv tests.
if g.simulationCycler != nil {
g.simulationCycler.Broadcast()
g.simulationCycler = nil
}
}
}
// SimulationCycle cycles this gossip node's server by allowing all
// connected clients to proceed one step.
func (g *Gossip) SimulationCycle() {
g.mu.Lock()
defer g.mu.Unlock()
if g.simulationCycler != nil {
g.simulationCycler.Broadcast()
}
}
// maybeAddResolverLocked creates and adds a resolver for the specified
// address if one does not already exist. Returns whether a new
// resolver was added. The caller must hold the gossip mutex.
func (g *Gossip) maybeAddResolverLocked(addr util.UnresolvedAddr) bool {
if _, ok := g.resolverAddrs[addr]; ok {
return false
}
ctx := g.AnnotateCtx(context.TODO())
r, err := resolver.NewResolverFromUnresolvedAddr(addr)
if err != nil {
log.Warningf(ctx, "bad address %s: %s", addr, err)
return false
}
g.resolvers = append(g.resolvers, r)
g.resolverAddrs[addr] = r
log.Eventf(ctx, "add resolver %s", r)
return true
}
// maybeAddBootstrapAddressLocked adds the specified address to the list
// of bootstrap addresses if not already present. Returns whether a new
// bootstrap address was added. The caller must hold the gossip mutex.
func (g *Gossip) maybeAddBootstrapAddressLocked(
addr util.UnresolvedAddr, nodeID roachpb.NodeID,
) bool {
if existingNodeID, ok := g.bootstrapAddrs[addr]; ok {
if existingNodeID == unknownNodeID || existingNodeID != nodeID {
g.bootstrapAddrs[addr] = nodeID
}
return false
}
g.bootstrapInfo.Addresses = append(g.bootstrapInfo.Addresses, addr)
g.bootstrapAddrs[addr] = nodeID
ctx := g.AnnotateCtx(context.TODO())
log.Eventf(ctx, "add bootstrap %s", addr)
return true
}
// maybeCleanupBootstrapAddresses cleans up the stored bootstrap addresses to
// include only those currently available via gossip. The gossip mutex must
// be held by the caller.
func (g *Gossip) maybeCleanupBootstrapAddressesLocked() {
if g.storage == nil || g.hasCleanedBS {
return
}
defer func() { g.hasCleanedBS = true }()
ctx := g.AnnotateCtx(context.TODO())
log.Event(ctx, "cleaning up bootstrap addresses")
g.resolvers = g.resolvers[:0]
g.resolverIdx = 0
g.bootstrapInfo.Addresses = g.bootstrapInfo.Addresses[:0]
g.bootstrapAddrs = map[util.UnresolvedAddr]roachpb.NodeID{}
g.resolverAddrs = map[util.UnresolvedAddr]resolver.Resolver{}
g.resolversTried = map[int]struct{}{}
var desc roachpb.NodeDescriptor
if err := g.mu.is.visitInfos(func(key string, i *Info) error {
if strings.HasPrefix(key, KeyNodeIDPrefix) {
if err := i.Value.GetProto(&desc); err != nil {
return err
}
if desc.Address.IsEmpty() || desc.Address == g.mu.is.NodeAddr {
return nil
}
g.maybeAddResolverLocked(desc.Address)
g.maybeAddBootstrapAddressLocked(desc.Address, desc.NodeID)
}
return nil
}, true /* deleteExpired */); err != nil {
log.Error(ctx, err)
return
}
if err := g.storage.WriteBootstrapInfo(&g.bootstrapInfo); err != nil {
log.Error(ctx, err)
}
}
// maxPeers returns the maximum number of peers each gossip node
// may connect to. This is based on maxHops, which is a preset
// maximum for number of hops allowed before the gossip network
// will seek to "tighten" by creating new connections to distant
// nodes.
func maxPeers(nodeCount int) int {
// This formula uses maxHops-2, instead of maxHops, to provide a
// "fudge" factor for max connected peers, to account for the
// arbitrary, decentralized way in which gossip networks are created.
// This will return the following maxPeers for the given number of nodes:
// <= 27 nodes -> 3 peers
// <= 64 nodes -> 4 peers
// <= 125 nodes -> 5 peers
// <= n^3 nodes -> n peers
//
// Quick derivation of the formula for posterity (without the fudge factor):
// maxPeers^maxHops > nodeCount
// maxHops * log(maxPeers) > log(nodeCount)
// log(maxPeers) > log(nodeCount) / maxHops
// maxPeers > e^(log(nodeCount) / maxHops)
// hence maxPeers = ceil(e^(log(nodeCount) / maxHops)) should work
maxPeers := int(math.Ceil(math.Exp(math.Log(float64(nodeCount)) / float64(maxHops-2))))
if maxPeers < minPeers {
return minPeers
}
return maxPeers
}
// updateNodeAddress is a gossip callback which fires with each
// update to a node descriptor. This allows us to compute the
// total size of the gossip network (for determining max peers
// each gossip node is allowed to have), as well as to create
// new resolvers for each encountered host and to write the
// set of gossip node addresses to persistent storage when it
// changes.
func (g *Gossip) updateNodeAddress(key string, content roachpb.Value) {
ctx := g.AnnotateCtx(context.TODO())
var desc roachpb.NodeDescriptor
if err := content.GetProto(&desc); err != nil {
log.Error(ctx, err)
return
}
if log.V(1) {
log.Infof(ctx, "updateNodeAddress called on %q with desc %+v", key, desc)
}
g.mu.Lock()
defer g.mu.Unlock()
// If desc is the empty descriptor, that indicates that the node has been
// removed from the cluster. If that's the case, remove it from our map of
// nodes to prevent other parts of the system from trying to talk to it.
// We can't directly compare the node against the empty descriptor because
// the proto has a repeated field and thus isn't comparable.
if desc.NodeID == 0 && desc.Address.IsEmpty() {
nodeID, err := NodeIDFromKey(key, KeyNodeIDPrefix)
if err != nil {
log.Errorf(ctx, "unable to update node address for removed node: %s", err)
return
}
log.Infof(ctx, "removed node %d from gossip", nodeID)
g.removeNodeDescriptorLocked(nodeID)
return
}
existingDesc, ok := g.nodeDescs[desc.NodeID]
if !ok || !proto.Equal(existingDesc, &desc) {
g.nodeDescs[desc.NodeID] = &desc
}
// Skip all remaining logic if the address hasn't changed, since that's all
// the logic cares about.
if ok && existingDesc.Address == desc.Address {
return
}
g.recomputeMaxPeersLocked()
// Skip if it's our own address.
if desc.Address == g.mu.is.NodeAddr {
return
}
// Add this new node address (if it's not already there) to our list
// of resolvers so we can keep connecting to gossip if the original
// resolvers go offline.
g.maybeAddResolverLocked(desc.Address)
// We ignore empty addresses for the sake of not breaking the many tests
// that don't bother specifying addresses.
if desc.Address.IsEmpty() {
return
}
// If the new node's address conflicts with another node's address, then it
// must be the case that the new node has replaced the previous one. Remove
// it from our set of tracked descriptors to ensure we don't attempt to
// connect to its previous identity (as came up in issue #10266).
oldNodeID, ok := g.bootstrapAddrs[desc.Address]
if ok && oldNodeID != unknownNodeID && oldNodeID != desc.NodeID {
log.Infof(ctx, "removing node %d which was at same address (%s) as new node %v",
oldNodeID, desc.Address, desc)
g.removeNodeDescriptorLocked(oldNodeID)
// Deleting the local copy isn't enough to remove the node from the gossip
// network. We also have to clear it out in the infoStore by overwriting
// it with an empty descriptor, which can be represented as just an empty
// byte array due to how protocol buffers are serialized.
// Calling addInfoLocked here is somewhat recursive since
// updateNodeAddress is typically called in response to the infoStore
// being updated but won't lead to deadlock because it's called
// asynchronously.
key := MakeNodeIDKey(oldNodeID)
var emptyProto []byte
if err := g.addInfoLocked(key, emptyProto, NodeDescriptorTTL); err != nil {
log.Errorf(ctx, "failed to empty node descriptor for node %d: %s", oldNodeID, err)
}
}
// Add new address (if it's not already there) to bootstrap info and
// persist if possible.
added := g.maybeAddBootstrapAddressLocked(desc.Address, desc.NodeID)
if added && g.storage != nil {
if err := g.storage.WriteBootstrapInfo(&g.bootstrapInfo); err != nil {
log.Error(ctx, err)
}
}
}
func (g *Gossip) removeNodeDescriptorLocked(nodeID roachpb.NodeID) {
delete(g.nodeDescs, nodeID)
g.recomputeMaxPeersLocked()
}
// updateStoreMaps is a gossip callback which is used to update storeMap.
func (g *Gossip) updateStoreMap(key string, content roachpb.Value) {
ctx := g.AnnotateCtx(context.TODO())
var desc roachpb.StoreDescriptor
if err := content.GetProto(&desc); err != nil {
log.Error(ctx, err)
return
}
if log.V(1) {
log.Infof(ctx, "updateStoreMap called on %q with desc %+v", key, desc)
}
g.mu.Lock()
defer g.mu.Unlock()
g.storeMap[desc.StoreID] = desc.Node.NodeID
}
func (g *Gossip) getNodeIDForStoreIDLocked(storeID roachpb.StoreID) (roachpb.NodeID, error) {
if nodeID, ok := g.storeMap[storeID]; ok {
return nodeID, nil
}
return 0, errors.Errorf("unable to look up Node ID for store %d", storeID)
}
func (g *Gossip) updateClients() {
nodeID := g.NodeID.Get()
if nodeID == 0 {
return
}
var buf bytes.Buffer
var sep string
g.mu.RLock()
g.clientsMu.Lock()
for _, c := range g.clientsMu.clients {
if c.peerID != 0 {
fmt.Fprintf(&buf, "%s%d", sep, c.peerID)
sep = ","
}
}
g.clientsMu.Unlock()
g.mu.RUnlock()
if err := g.AddInfo(MakeGossipClientsKey(nodeID), buf.Bytes(), 2*defaultClientsInterval); err != nil {
log.Error(g.AnnotateCtx(context.Background()), err)
}
}
func (g *Gossip) logConnectivity() {
s := g.Connectivity().String()
if g.lastConnectivity != s {
g.lastConnectivity = s
log.Infof(g.AnnotateCtx(context.Background()), "%s", s)
}
}
// recomputeMaxPeersLocked recomputes max peers based on size of
// network and set the max sizes for incoming and outgoing node sets.
//
// Note: if we notice issues with never-ending connection refused errors
// in real deployments, consider allowing more incoming connections than
// outgoing connections. As of now, the cluster's steady state is to have
// all nodes fill up, which can make rebalancing of connections tough.
// I'm not making this change now since it tends to lead to less balanced
// networks and I'm not sure what all the consequences of that might be.
func (g *Gossip) recomputeMaxPeersLocked() {
maxPeers := maxPeers(len(g.nodeDescs))
g.mu.incoming.setMaxSize(maxPeers)
g.outgoing.setMaxSize(maxPeers)
}
// getNodeDescriptorLocked looks up the descriptor of the node by ID. The mutex
// is assumed held by the caller. This method is called externally via
// GetNodeDescriptor and internally by getNodeIDAddressLocked.
func (g *Gossip) getNodeDescriptorLocked(nodeID roachpb.NodeID) (*roachpb.NodeDescriptor, error) {
if desc, ok := g.nodeDescs[nodeID]; ok {
return desc, nil
}
// Fallback to retrieving the node info and unmarshalling the node
// descriptor. This path occurs in tests which add a node descriptor to
// gossip and then immediately try retrieve it.
nodeIDKey := MakeNodeIDKey(nodeID)
// We can't use GetInfoProto here because that method grabs the lock.
if i := g.mu.is.getInfo(nodeIDKey); i != nil {
if err := i.Value.Verify([]byte(nodeIDKey)); err != nil {
return nil, err
}
nodeDescriptor := &roachpb.NodeDescriptor{}
if err := i.Value.GetProto(nodeDescriptor); err != nil {
return nil, err
}
// Don't return node descriptors that are empty, because that's meant to
// indicate that the node has been removed from the cluster.
if nodeDescriptor.NodeID == 0 || nodeDescriptor.Address.IsEmpty() {
return nil, errors.Errorf("node %d has been removed from the cluster", nodeID)
}
return nodeDescriptor, nil
}
return nil, errors.Errorf("unable to look up descriptor for node %d", nodeID)
}
// getNodeIDAddressLocked looks up the address of the node by ID. The mutex is
// assumed held by the caller. This method is called externally via
// GetNodeIDAddress or internally when looking up a "distant" node address to
// connect directly to.
func (g *Gossip) getNodeIDAddressLocked(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) {
nd, err := g.getNodeDescriptorLocked(nodeID)
if err != nil {