Skip to content

Commit

Permalink
Merge pull request #12 from rjl493456442/get-rid-of-timer
Browse files Browse the repository at this point in the history
les: avoid persisting clock
  • Loading branch information
zsfelfoldi authored May 8, 2020
2 parents aa163b5 + 585a445 commit acb4933
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 72 deletions.
1 change: 0 additions & 1 deletion les/lespay/client/queueiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type QueueIterator struct {

ns *nodestate.NodeStateMachine
queue []*enode.Node
selected nodestate.Flags
nextNode *enode.Node
fifo, closed bool
}
Expand Down
126 changes: 55 additions & 71 deletions les/serverpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package les

import (
"encoding/binary"
"errors"
"reflect"
"sync"
Expand Down Expand Up @@ -48,11 +47,8 @@ const (
// serverPool provides a node iterator for dial candidates. The output is a mix of newly discovered
// nodes, a weighted random selection of known (previously valuable) nodes and trusted/paid nodes.
type serverPool struct {
clock mclock.Clock
clockOffset mclock.AbsTime
db ethdb.KeyValueStore
dbClockKey []byte
quit chan struct{}
clock mclock.Clock
db ethdb.KeyValueStore

ns *nodestate.NodeStateMachine
vt *lpc.ValueTracker
Expand Down Expand Up @@ -81,20 +77,16 @@ type nodeHistoryEnc struct {
}

var (
serverPoolSetup = &nodestate.Setup{}

sfHasValue = serverPoolSetup.NewPersistentFlag("hasValue")
sfQueried = serverPoolSetup.NewFlag("queried")
sfCanDial = serverPoolSetup.NewFlag("canDial")
sfDialed = serverPoolSetup.NewFlag("dialed")
sfConnected = serverPoolSetup.NewFlag("connected")
sfRedialWait = serverPoolSetup.NewFlag("redialWait")
sfAlwaysConnect = serverPoolSetup.NewFlag("alwaysConnect")

serverPoolSetup = &nodestate.Setup{}
sfHasValue = serverPoolSetup.NewPersistentFlag("hasValue")
sfQueried = serverPoolSetup.NewFlag("queried")
sfCanDial = serverPoolSetup.NewFlag("canDial")
sfDialed = serverPoolSetup.NewFlag("dialed")
sfConnected = serverPoolSetup.NewFlag("connected")
sfRedialWait = serverPoolSetup.NewFlag("redialWait")
sfAlwaysConnect = serverPoolSetup.NewFlag("alwaysConnect")
sfDisableSelection = nodestate.MergeFlags(sfQueried, sfCanDial, sfDialed, sfConnected, sfRedialWait)

errInvalidField = errors.New("invalid field type")

sfiNodeWeight = serverPoolSetup.NewField("nodeWeight", reflect.TypeOf(uint64(0)))
sfiNodeHistory = serverPoolSetup.NewPersistentField("nodeHistory", reflect.TypeOf(nodeHistory{}),
func(field interface{}) ([]byte, error) {
Expand All @@ -107,7 +99,7 @@ var (
enc, err := rlp.EncodeToBytes(&ne)
return enc, err
} else {
return nil, errInvalidField
return nil, errors.New("invalid field type")
}
},
func(enc []byte) (interface{}, error) {
Expand All @@ -127,14 +119,12 @@ var (
// newServerPool creates a new server pool
func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, discovery enode.Iterator, query lpc.PreNegQuery, clock mclock.Clock, trustedURLs []string, testing bool) *serverPool {
s := &serverPool{
db: db,
dbClockKey: append(dbKey, []byte("persistentClock")...),
clock: clock,
ns: nodestate.NewNodeStateMachine(db, []byte(string(dbKey)+"ns:"), clock, serverPoolSetup),
vt: vt,
quit: make(chan struct{}),
db: db,
clock: clock,
vt: vt,
ns: nodestate.NewNodeStateMachine(db, []byte(string(dbKey)+"ns:"), clock, serverPoolSetup),
}
s.getTimeout()
s.recalTimeout()
var (
validSchemes enr.IdentityScheme
mixerTimeout time.Duration
Expand Down Expand Up @@ -209,33 +199,12 @@ func (s *serverPool) start() {
for _, node := range s.trusted {
s.ns.SetState(node, sfAlwaysConnect, nodestate.Flags{}, 0)
}
clockEnc, _ := s.db.Get(s.dbClockKey)
var clockStart mclock.AbsTime
if len(clockEnc) == 8 {
clockStart = mclock.AbsTime(binary.BigEndian.Uint64(clockEnc))
}
s.clockOffset = clockStart - s.clock.Now()
s.ns.ForEach(sfHasValue, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
s.updateNode(node, false, false, 0) // set weight flag
if n, ok := s.ns.GetField(node, sfiNodeHistory).(nodeHistory); ok && n.waitUntil > clockStart {
s.ns.SetState(node, sfRedialWait, nodestate.Flags{}, time.Duration(n.waitUntil-clockStart))
if n, ok := s.ns.GetField(node, sfiNodeHistory).(nodeHistory); ok && n.waitUntil > s.clock.Now() {
s.ns.SetState(node, sfRedialWait, nodestate.Flags{}, time.Duration(n.waitUntil-s.clock.Now()))
}
})
go func() {
for {
select {
case <-time.After(time.Minute * 5):
s.persistClock()
suggestedTimeoutGauge.Update(int64(s.getTimeout() / time.Millisecond))
s.timeoutLock.RLock()
timeWeights := s.timeWeights
s.timeoutLock.RUnlock()
totalValueGauge.Update(int64(s.vt.RtStats().Value(timeWeights, s.vt.StatsExpFactor())))
case <-s.quit:
return
}
}
}()
}

// stop stops the server pool
Expand All @@ -244,18 +213,9 @@ func (s *serverPool) stop() {
s.ns.ForEach(sfConnected, nodestate.Flags{}, func(n *enode.Node, state nodestate.Flags) {
s.updateNode(n, true, false, 0)
})
close(s.quit)
s.persistClock()
s.ns.Stop()
}

// persistClock stores the persistent absolute time into the database
func (s *serverPool) persistClock() {
var clockEnc [8]byte
binary.BigEndian.PutUint64(clockEnc[:], uint64(s.clock.Now()+s.clockOffset))
s.db.Put(s.dbClockKey, clockEnc[:])
}

// registerPeer implements serverPeerSubscriber
func (s *serverPool) registerPeer(p *serverPeer) {
s.ns.SetState(p.Node(), sfConnected, sfDialed, 0)
Expand All @@ -274,21 +234,29 @@ func (s *serverPool) unregisterPeer(p *serverPeer) {
p.setValueTracker(nil, nil)
}

// getTimeout calculates the current recommended timeout. This value is used by
// recalTimeout calculates the current recommended timeout. This value is used by
// the client as a "soft timeout" value. It also affects the service value calculation
// of individual nodes.
func (s *serverPool) getTimeout() time.Duration {
now := s.clock.Now()
func (s *serverPool) recalTimeout() {
// Use cached result if possible, avoid recalculating too frequently.
s.timeoutLock.RLock()
timeout := s.timeout
refreshed := s.timeoutRefreshed
s.timeoutLock.RUnlock()
now := s.clock.Now()
if refreshed != 0 && time.Duration(now-refreshed) < timeoutRefresh {
return timeout
return
}
// Cached result is stale, recalculate a new one.
rts := s.vt.RtStats()

// Add a fake statistic here. It is an easy way to initialize with some
// conservative values when the database is new. As soon as we have a
// considerable amount of real stats this small value won't matter.
rts.Add(time.Second*2, 10, s.vt.StatsExpFactor())
timeout = minTimeout

// Use either 10% failure rate timeout or twice the median response time
// as the recommended timeout.
timeout := minTimeout
if t := rts.Timeout(0.1); t > timeout {
timeout = t
}
Expand All @@ -298,11 +266,30 @@ func (s *serverPool) getTimeout() time.Duration {
s.timeoutLock.Lock()
if s.timeout != timeout {
s.timeout = timeout
s.timeWeights = lpc.TimeoutWeights(timeout)
s.timeWeights = lpc.TimeoutWeights(s.timeout)

suggestedTimeoutGauge.Update(int64(s.timeout / time.Millisecond))
totalValueGauge.Update(int64(rts.Value(s.timeWeights, s.vt.StatsExpFactor())))
}
s.timeoutRefreshed = now
s.timeoutLock.Unlock()
return timeout
}

// getTimeout returns the recommended request timeout.
func (s *serverPool) getTimeout() time.Duration {
s.recalTimeout()
s.timeoutLock.RLock()
defer s.timeoutLock.RUnlock()
return s.timeout
}

// getTimeoutAndWeight returns the recommended request timeout as well as the
// response time weight which is necessary to calculate service value.
func (s *serverPool) getTimeoutAndWeight() (time.Duration, lpc.ResponseTimeWeights) {
s.recalTimeout()
s.timeoutLock.RLock()
defer s.timeoutLock.RUnlock()
return s.timeout, s.timeWeights
}

// updateNode calculates the selection weight and the proposed redial wait time of the given node
Expand All @@ -313,10 +300,7 @@ func (s *serverPool) updateNode(node *enode.Node, calculateSessionValue, redialW
return
}
currentStats := nvt.RtStats()
s.getTimeout() // updates s.timeWeights
s.timeoutLock.RLock()
timeWeights := s.timeWeights
s.timeoutLock.RUnlock()
_, timeWeights := s.getTimeoutAndWeight()
expFactor := s.vt.StatsExpFactor()

var sessionValue float64
Expand Down Expand Up @@ -353,7 +337,7 @@ func (s *serverPool) updateNode(node *enode.Node, calculateSessionValue, redialW
n.waitFactor = 1
}
wait := time.Duration(float64(minRedialWait) * n.waitFactor)
n.waitUntil = s.clock.Now() + s.clockOffset + mclock.AbsTime(wait)
n.waitUntil = s.clock.Now() + mclock.AbsTime(wait)
s.ns.SetField(node, sfiNodeHistory, n)
s.ns.SetState(node, sfRedialWait, nodestate.Flags{}, wait)
}
Expand Down

0 comments on commit acb4933

Please sign in to comment.