Skip to content

Commit

Permalink
fetch/peers: make latency based selection biased towards last requests (
Browse files Browse the repository at this point in the history
#5688)

in the flaky system test it took 4 minutes to stop sending requests to a node that was actively dropping requests.
in this pr i switched peer latency estimator to be biased towards latency observed in last requests, i looked up if/how lotus node handles similar issues.

beside that there will be an INFO log with top peers stats, and global average latency in nanoseconds and total number of peers. by default node will emit log every 30m and it can be tuned by adding log-peer-stats-interval in fetch section of the config.

```json
"fetch": {
        "log-peer-stats-interval": "1m"
}
```
  • Loading branch information
dshulyak committed Mar 12, 2024
1 parent 82e08df commit b6c5ac3
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 21 deletions.
2 changes: 1 addition & 1 deletion config/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func DefaultLoggingConfig() LoggerConfig {
TrtlLoggerLevel: defaultLoggingLevel.String(),
MeshLoggerLevel: defaultLoggingLevel.String(),
SyncLoggerLevel: defaultLoggingLevel.String(),
FetcherLoggerLevel: zapcore.ErrorLevel.String(),
FetcherLoggerLevel: defaultLoggingLevel.String(),
HareOracleLoggerLevel: defaultLoggingLevel.String(),
HareLoggerLevel: defaultLoggingLevel.String(),
NipostBuilderLoggerLevel: defaultLoggingLevel.String(),
Expand Down
23 changes: 17 additions & 6 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ type Config struct {
RequestHardTimeout time.Duration `mapstructure:"request-hard-timeout"`
EnableServerMetrics bool `mapstructure:"servers-metrics"`
ServersConfig map[string]ServerConfig `mapstructure:"servers"`
PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"`
// The maximum number of concurrent requests to get ATXs.
GetAtxsConcurrency int64 `mapstructure:"getatxsconcurrency"`
DecayingTag server.DecayingTagSpec `mapstructure:"decaying-tag"`
GetAtxsConcurrency int64 `mapstructure:"getatxsconcurrency"`
DecayingTag server.DecayingTagSpec `mapstructure:"decaying-tag"`
LogPeerStatsInterval time.Duration `mapstructure:"log-peer-stats-interval"`
}

func (c Config) getServerConfig(protocol string) ServerConfig {
Expand Down Expand Up @@ -151,14 +151,14 @@ func DefaultConfig() Config {
// 64 bytes
OpnProtocol: {Queue: 10000, Requests: 1000, Interval: time.Second},
},
PeersRateThreshold: 0.02,
GetAtxsConcurrency: 100,
DecayingTag: server.DecayingTagSpec{
Interval: time.Minute,
Inc: 1000,
Dec: 1000,
Cap: 10000,
},
LogPeerStatsInterval: 20 * time.Minute,
}
}

Expand Down Expand Up @@ -366,6 +366,17 @@ func (f *Fetch) Start() error {
return srv.Run(f.shutdownCtx)
})
}
f.eg.Go(func() error {
for {
select {
case <-f.shutdownCtx.Done():
return nil
case <-time.After(f.cfg.LogPeerStatsInterval):
stats := f.peers.Stats()
f.logger.With().Info("peer stats", log.Inline(&stats))
}
}
})
})
return nil
}
Expand Down Expand Up @@ -588,7 +599,7 @@ func (f *Fetch) send(requests []RequestMessage) {
go func() {
data, err := f.sendBatch(peer, batch)
if err != nil {
f.logger.With().Warning(
f.logger.With().Debug(
"failed to send batch request",
log.Stringer("batch", batch.ID),
log.Stringer("peer", peer),
Expand Down Expand Up @@ -690,7 +701,7 @@ func (f *Fetch) handleHashError(batch *batchInfo, err error) {
continue
}
f.logger.WithContext(req.ctx).With().
Warning("hash request failed", log.Stringer("hash", req.hash), log.Err(err))
Debug("hash request failed", log.Stringer("hash", req.hash), log.Err(err))
req.promise.err = err
peerErrors.WithLabelValues(string(req.hint)).Inc()
close(req.promise.completed)
Expand Down
76 changes: 68 additions & 8 deletions fetch/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap/zapcore"

"github.com/spacemeshos/go-spacemesh/p2p"
)
Expand All @@ -18,13 +19,14 @@ type data struct {
}

func (d *data) latency(global float64) float64 {
if d.success+d.failures == 0 {
return 0.8 * global // to prioritize trying out new peer
switch {
case d.success+d.failures == 0:
return 0.9 * global // to prioritize trying out new peer
case d.success == 0:
return 1.1 * global
default:
return d.averageLatency + d.failRate*global
}
if d.success == 0 {
return global + d.failRate*global
}
return d.averageLatency + d.failRate*global
}

func (p *data) less(other *data, global float64) bool {
Expand Down Expand Up @@ -101,12 +103,14 @@ func (p *Peers) OnLatency(id peer.ID, size int, latency time.Duration) {
peer.success++
peer.failRate = float64(peer.failures) / float64(peer.success+peer.failures)
if peer.averageLatency != 0 {
peer.averageLatency = 0.8*peer.averageLatency + 0.2*float64(latency)
delta := (float64(latency) - float64(peer.averageLatency)) / 10 // 86% of the value is the last 19
peer.averageLatency += delta
} else {
peer.averageLatency = float64(latency)
}
if p.globalLatency != 0 {
p.globalLatency = 0.8*p.globalLatency + 0.2*float64(latency)
delta := (float64(latency) - float64(p.globalLatency)) / 25 // 86% of the value is the last 49
p.globalLatency += delta
} else {
p.globalLatency = float64(latency)
}
Expand Down Expand Up @@ -171,3 +175,59 @@ func (p *Peers) Total() int {
defer p.mu.Unlock()
return len(p.peers)
}

func (p *Peers) Stats() Stats {
best := p.SelectBest(5)
p.mu.Lock()
defer p.mu.Unlock()
stats := Stats{
Total: len(p.peers),
GlobalAverageLatency: p.globalLatency,
}
for _, peer := range best {
peerData, exist := p.peers[peer]
if !exist {
continue
}
stats.BestPeers = append(stats.BestPeers, PeerStats{
ID: peerData.id,
Success: peerData.success,
Failures: peerData.failures,
Latency: peerData.averageLatency,
})
}
return stats
}

type Stats struct {
Total int
GlobalAverageLatency float64
BestPeers []PeerStats
}

func (s *Stats) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddInt("total", s.Total)
enc.AddFloat64("global average latency", s.GlobalAverageLatency)
enc.AddArray("best peers", zapcore.ArrayMarshalerFunc(func(arrEnc zapcore.ArrayEncoder) error {
for _, peer := range s.BestPeers {
arrEnc.AppendObject(&peer)
}
return nil
}))
return nil
}

type PeerStats struct {
ID peer.ID
Success int
Failures int
Latency float64
}

func (p *PeerStats) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("id", p.ID.String())
enc.AddInt("success", p.Success)
enc.AddInt("failures", p.Failures)
enc.AddFloat64("latency per 1024 bytes", p.Latency)
return nil
}
4 changes: 2 additions & 2 deletions fetch/peers/peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ func TestSelect(t *testing.T) {
best peer.ID
}{
{
desc: "latency adjusted with moving average",
desc: "latency adjusted with more requests",
events: []event{
{id: "a", success: 1, latency: 8, add: true},
{id: "b", success: 1, latency: 9, add: true},
{id: "a", success: 1, latency: 14, add: true},
{id: "a", success: 3, latency: 14, add: true},
},
n: 5,
expect: []peer.ID{"b", "a"},
Expand Down
12 changes: 9 additions & 3 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,10 +473,16 @@ func (s *Syncer) syncAtx(ctx context.Context) error {
s.backgroundSync.cancel = cancel
s.backgroundSync.eg.Go(func() error {
err := s.fetchATXsForEpoch(ctx, publish, true)
if err != nil {
s.logger.With().Warning("background atx sync failed", log.Context(ctx), publish.Field(), log.Err(err))
s.backgroundSync.epoch.Store(0)
if err == nil {
return nil
}
if !errors.Is(err, context.Canceled) {
s.logger.With().
Warning("background atx sync failed", log.Context(ctx), publish.Field(), log.Err(err))
} else {
s.logger.With().Debug("background atx sync stopped", log.Context(ctx), publish.Field())
}
s.backgroundSync.epoch.Store(0)
return err
})
}
Expand Down
3 changes: 2 additions & 1 deletion systest/parameters/fastnet/smesher.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"grpc-post-listener": "0.0.0.0:9094"
},
"fetch": {
"servers-metrics": true
"servers-metrics": true,
"log-peer-stats-interval": "1m"
},
"smeshing": {
"smeshing-verifying-opts": {
Expand Down

0 comments on commit b6c5ac3

Please sign in to comment.