Skip to content

Commit

Permalink
les: UDP pre-negotiation of available server capacity (#22183)
Browse files Browse the repository at this point in the history
This PR implements the first one of the "lespay" UDP queries which
is already useful in itself: the capacity query. The server pool is making
use of this query by doing a cheap UDP query to determine whether it is
worth starting the more expensive TCP connection process.
  • Loading branch information
zsfelfoldi authored Mar 1, 2021
1 parent 498458b commit d968704
Show file tree
Hide file tree
Showing 18 changed files with 915 additions and 89 deletions.
5 changes: 2 additions & 3 deletions common/prque/lazyqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type LazyQueue struct {
}

type (
PriorityCallback func(data interface{}, now mclock.AbsTime) int64 // actual priority callback
PriorityCallback func(data interface{}) int64 // actual priority callback
MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback
)

Expand Down Expand Up @@ -139,11 +139,10 @@ func (q *LazyQueue) peekIndex() int {
// Pop multiple times. Popped items are passed to the callback. MultiPop returns
// when the callback returns false or there are no more items to pop.
func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) {
now := q.clock.Now()
nextIndex := q.peekIndex()
for nextIndex != -1 {
data := heap.Pop(q.queue[nextIndex]).(*item).value
heap.Push(q.popQueue, &item{data, q.priority(data, now)})
heap.Push(q.popQueue, &item{data, q.priority(data)})
nextIndex = q.peekIndex()
for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) {
i := heap.Pop(q.popQueue).(*item)
Expand Down
2 changes: 1 addition & 1 deletion common/prque/lazyqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type lazyItem struct {
index int
}

func testPriority(a interface{}, now mclock.AbsTime) int64 {
func testPriority(a interface{}) int64 {
return a.(*lazyItem).p
}

Expand Down
85 changes: 72 additions & 13 deletions les/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,33 @@ import (
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/les/vflux"
vfc "github.com/ethereum/go-ethereum/les/vflux/client"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
)

type LightEthereum struct {
lesCommons

peers *serverPeerSet
reqDist *requestDistributor
retriever *retrieveManager
odr *LesOdr
relay *lesTxRelay
handler *clientHandler
txPool *light.TxPool
blockchain *light.LightChain
serverPool *vfc.ServerPool
dialCandidates enode.Iterator
pruner *pruner
peers *serverPeerSet
reqDist *requestDistributor
retriever *retrieveManager
odr *LesOdr
relay *lesTxRelay
handler *clientHandler
txPool *light.TxPool
blockchain *light.LightChain
serverPool *vfc.ServerPool
serverPoolIterator enode.Iterator
pruner *pruner

bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
Expand Down Expand Up @@ -112,7 +115,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
p2pConfig: &stack.Config().P2P,
}

leth.serverPool, leth.dialCandidates = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, nil, &mclock.System{}, config.UltraLightServers, requestList)
leth.serverPool, leth.serverPoolIterator = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, leth.prenegQuery, &mclock.System{}, config.UltraLightServers, requestList)
leth.serverPool.AddMetrics(suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge, sessionValueMeter, serverDialedMeter)

leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.GetTimeout)
Expand Down Expand Up @@ -189,6 +192,62 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
return leth, nil
}

// VfluxRequest sends a batch of requests to the given node through discv5 UDP TalkRequest and returns the responses
func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.Replies {
reqsEnc, _ := rlp.EncodeToBytes(&reqs)
repliesEnc, _ := s.p2pServer.DiscV5.TalkRequest(s.serverPool.DialNode(n), "vfx", reqsEnc)
var replies vflux.Replies
if len(repliesEnc) == 0 || rlp.DecodeBytes(repliesEnc, &replies) != nil {
return nil
}
return replies
}

// vfxVersion returns the version number of the "les" service subdomain of the vflux UDP
// service, as advertised in the ENR record
func (s *LightEthereum) vfxVersion(n *enode.Node) uint {
if n.Seq() == 0 {
var err error
if n, err = s.p2pServer.DiscV5.RequestENR(n); n != nil && err == nil && n.Seq() != 0 {
s.serverPool.Persist(n)
} else {
return 0
}
}

var les []rlp.RawValue
if err := n.Load(enr.WithEntry("les", &les)); err != nil || len(les) < 1 {
return 0
}
var version uint
rlp.DecodeBytes(les[0], &version) // Ignore additional fields (for forward compatibility).
return version
}

// prenegQuery sends a capacity query to the given server node to determine whether
// a connection slot is immediately available
func (s *LightEthereum) prenegQuery(n *enode.Node) int {
if s.vfxVersion(n) < 1 {
// UDP query not supported, always try TCP connection
return 1
}

var requests vflux.Requests
requests.Add("les", vflux.CapacityQueryName, vflux.CapacityQueryReq{
Bias: 180,
AddTokens: []vflux.IntOrInf{{}},
})
replies := s.VfluxRequest(n, requests)
var cqr vflux.CapacityQueryReply
if replies.Get(0, &cqr) != nil || len(cqr) != 1 { // Note: Get returns an error if replies is nil
return -1
}
if cqr[0] > 0 {
return 1
}
return 0
}

type LightDummyAPI struct{}

// Etherbase is the address that mining rewards will be send to
Expand Down Expand Up @@ -269,7 +328,7 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {
return p.Info()
}
return nil
}, s.dialCandidates)
}, s.serverPoolIterator)
}

// Start implements node.Lifecycle, starting all internal goroutines needed by the
Expand Down
55 changes: 55 additions & 0 deletions les/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/les/vflux"
vfs "github.com/ethereum/go-ethereum/les/vflux/server"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/nodestate"
"github.com/ethereum/go-ethereum/rlp"
)

const (
Expand Down Expand Up @@ -382,3 +384,56 @@ func (f *clientPool) forClients(ids []enode.ID, cb func(client *clientInfo)) {
}
}
}

// serveCapQuery serves a vflux capacity query. It receives multiple token amount values
// and a bias time value. For each given token amount it calculates the maximum achievable
// capacity in case the amount is added to the balance.
func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []byte {
var req vflux.CapacityQueryReq
if rlp.DecodeBytes(data, &req) != nil {
return nil
}
if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen {
return nil
}
node := f.ns.GetNode(id)
if node == nil {
node = enode.SignNull(&enr.Record{}, id)
}
c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo)
if c == nil {
c = &clientInfo{node: node}
f.ns.SetField(node, clientInfoField, c)
f.ns.SetField(node, connAddressField, freeID)
defer func() {
f.ns.SetField(node, connAddressField, nil)
f.ns.SetField(node, clientInfoField, nil)
}()
if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*vfs.NodeBalance); c.balance == nil {
log.Error("BalanceField is missing", "node", node.ID())
return nil
}
}
// use vfs.CapacityCurve to answer request for multiple newly bought token amounts
curve := f.pp.GetCapacityCurve().Exclude(id)
result := make(vflux.CapacityQueryReply, len(req.AddTokens))
bias := time.Second * time.Duration(req.Bias)
if f.connectedBias > bias {
bias = f.connectedBias
}
pb, _ := c.balance.GetBalance()
for i, addTokens := range req.AddTokens {
add := addTokens.Int64()
result[i] = curve.MaxCapacity(func(capacity uint64) int64 {
return c.balance.EstimatePriority(capacity, add, 0, bias, false) / int64(capacity)
})
if add <= 0 && uint64(-add) >= pb && result[i] > f.minCap {
result[i] = f.minCap
}
if result[i] < f.minCap {
result[i] = 0
}
}
reply, _ := rlp.EncodeToBytes(&result)
return reply
}
6 changes: 4 additions & 2 deletions les/clientpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,10 @@ func TestNegativeBalanceCalculation(t *testing.T) {
for i := 0; i < 10; i++ {
pool.disconnect(newPoolTestPeer(i, nil))
_, nb := getBalance(pool, newPoolTestPeer(i, nil))
if checkDiff(nb, uint64(time.Minute)/1000) {
t.Fatalf("Negative balance mismatch, want %v, got %v", uint64(time.Minute)/1000, nb)
exp := uint64(time.Minute) / 1000
exp -= exp / 120 // correct for negative balance expiration
if checkDiff(nb, exp) {
t.Fatalf("Negative balance mismatch, want %v, got %v", exp, nb)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion les/enr_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (
// lesEntry is the "les" ENR entry. This is set for LES servers only.
type lesEntry struct {
// Ignore additional fields (for forward compatibility).
_ []rlp.RawValue `rlp:"tail"`
VfxVersion uint
Rest []rlp.RawValue `rlp:"tail"`
}

func (lesEntry) ENRKey() string { return "les" }
Expand Down
29 changes: 26 additions & 3 deletions les/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/les/vflux"
vfs "github.com/ethereum/go-ethereum/les/vflux/server"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -68,6 +69,7 @@ type LesServer struct {
archiveMode bool // Flag whether the ethereum node runs in archive mode.
handler *serverHandler
broadcaster *broadcaster
vfluxServer *vfs.Server
privateKey *ecdsa.PrivateKey

// Flow control and capacity management
Expand Down Expand Up @@ -112,12 +114,14 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
ns: ns,
archiveMode: e.ArchiveMode(),
broadcaster: newBroadcaster(ns),
vfluxServer: vfs.NewServer(time.Millisecond * 10),
fcManager: flowcontrol.NewClientManager(nil, &mclock.System{}),
servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100),
threadsBusy: config.LightServ/100 + 1,
threadsIdle: threads,
p2pSrv: node.Server(),
}
srv.vfluxServer.Register(srv)
issync := e.Synced
if config.LightNoSyncServe {
issync = func() bool { return true }
Expand Down Expand Up @@ -201,7 +205,9 @@ func (s *LesServer) Protocols() []p2p.Protocol {
}, nil)
// Add "les" ENR entries.
for i := range ps {
ps[i].Attributes = []enr.Entry{&lesEntry{}}
ps[i].Attributes = []enr.Entry{&lesEntry{
VfxVersion: 1,
}}
}
return ps
}
Expand All @@ -211,10 +217,11 @@ func (s *LesServer) Start() error {
s.privateKey = s.p2pSrv.PrivateKey
s.broadcaster.setSignerKey(s.privateKey)
s.handler.start()

s.wg.Add(1)
go s.capacityManagement()

if s.p2pSrv.DiscV5 != nil {
s.p2pSrv.DiscV5.RegisterTalkHandler("vfx", s.vfluxServer.ServeEncoded)
}
return nil
}

Expand All @@ -228,6 +235,7 @@ func (s *LesServer) Stop() error {
s.costTracker.stop()
s.handler.stop()
s.servingQueue.stop()
s.vfluxServer.Stop()

// Note, bloom trie indexer is closed by parent bloombits indexer.
s.chtIndexer.Close()
Expand Down Expand Up @@ -311,3 +319,18 @@ func (s *LesServer) dropClient(id enode.ID) {
p.Peer.Disconnect(p2p.DiscRequested)
}
}

// ServiceInfo implements vfs.Service
func (s *LesServer) ServiceInfo() (string, string) {
return "les", "Ethereum light client service"
}

// Handle implements vfs.Service
func (s *LesServer) Handle(id enode.ID, address string, name string, data []byte) []byte {
switch name {
case vflux.CapacityQueryName:
return s.clientPool.serveCapQuery(id, address, data)
default:
return nil
}
}
Loading

0 comments on commit d968704

Please sign in to comment.