Skip to content

Commit

Permalink
les: move server pool to les/vflux/client (ethereum#22377)
Browse files Browse the repository at this point in the history
* les: move serverPool to les/vflux/client

* les: add metrics

* les: moved ValueTracker inside ServerPool

* les: protect against node registration before server pool is started

* les/vflux/client: fixed tests

* les: make peer registration safe
  • Loading branch information
zsfelfoldi authored Feb 25, 2021
1 parent de9465f commit dc109cc
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 198 deletions.
14 changes: 7 additions & 7 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var Modules = map[string]string{
"swarmfs": SwarmfsJs,
"txpool": TxpoolJs,
"les": LESJs,
"lespay": LESPayJs,
"vflux": VfluxJs,
}

const ChequebookJs = `
Expand Down Expand Up @@ -877,32 +877,32 @@ web3._extend({
});
`

const LESPayJs = `
const VfluxJs = `
web3._extend({
property: 'lespay',
property: 'vflux',
methods:
[
new web3._extend.Method({
name: 'distribution',
call: 'lespay_distribution',
call: 'vflux_distribution',
params: 2
}),
new web3._extend.Method({
name: 'timeout',
call: 'lespay_timeout',
call: 'vflux_timeout',
params: 2
}),
new web3._extend.Method({
name: 'value',
call: 'lespay_value',
call: 'vflux_value',
params: 2
}),
],
properties:
[
new web3._extend.Property({
name: 'requestStats',
getter: 'lespay_requestStats'
getter: 'vflux_requestStats'
}),
]
});
Expand Down
38 changes: 8 additions & 30 deletions les/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ type LightEthereum struct {
handler *clientHandler
txPool *light.TxPool
blockchain *light.LightChain
serverPool *serverPool
valueTracker *vfc.ValueTracker
serverPool *vfc.ServerPool
dialCandidates enode.Iterator
pruner *pruner

Expand Down Expand Up @@ -109,17 +108,14 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
engine: ethconfig.CreateConsensusEngine(stack, chainConfig, &config.Ethash, nil, false, chainDb),
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
valueTracker: vfc.NewValueTracker(lesDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
p2pServer: stack.Server(),
p2pConfig: &stack.Config().P2P,
}
peers.subscribe((*vtSubscription)(leth.valueTracker))

leth.serverPool = newServerPool(lesDb, []byte("serverpool:"), leth.valueTracker, time.Second, nil, &mclock.System{}, config.UltraLightServers)
peers.subscribe(leth.serverPool)
leth.dialCandidates = leth.serverPool.dialIterator
leth.serverPool, leth.dialCandidates = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, nil, &mclock.System{}, config.UltraLightServers, requestList)
leth.serverPool.AddMetrics(suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge, sessionValueMeter, serverDialedMeter)

leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.getTimeout)
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.GetTimeout)
leth.relay = newLesTxRelay(peers, leth.retriever)

leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.peers, leth.retriever)
Expand Down Expand Up @@ -193,23 +189,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
return leth, nil
}

// vtSubscription implements serverPeerSubscriber
type vtSubscription vfc.ValueTracker

// registerPeer implements serverPeerSubscriber
func (v *vtSubscription) registerPeer(p *serverPeer) {
vt := (*vfc.ValueTracker)(v)
p.setValueTracker(vt, vt.Register(p.ID()))
p.updateVtParams()
}

// unregisterPeer implements serverPeerSubscriber
func (v *vtSubscription) unregisterPeer(p *serverPeer) {
vt := (*vfc.ValueTracker)(v)
vt.Unregister(p.ID())
p.setValueTracker(nil, nil)
}

type LightDummyAPI struct{}

// Etherbase is the address that mining rewards will be send to
Expand Down Expand Up @@ -266,7 +245,7 @@ func (s *LightEthereum) APIs() []rpc.API {
}, {
Namespace: "vflux",
Version: "1.0",
Service: vfc.NewPrivateClientAPI(s.valueTracker),
Service: s.serverPool.API(),
Public: false,
},
}...)
Expand Down Expand Up @@ -302,8 +281,8 @@ func (s *LightEthereum) Start() error {
if err != nil {
return err
}
s.serverPool.addSource(discovery)
s.serverPool.start()
s.serverPool.AddSource(discovery)
s.serverPool.Start()
// Start bloom request workers.
s.wg.Add(bloomServiceThreads)
s.startBloomHandlers(params.BloomBitsBlocksClient)
Expand All @@ -316,8 +295,7 @@ func (s *LightEthereum) Start() error {
// Ethereum protocol.
func (s *LightEthereum) Stop() error {
close(s.closeCh)
s.serverPool.stop()
s.valueTracker.Stop()
s.serverPool.Stop()
s.peers.close()
s.reqDist.close()
s.odr.Stop()
Expand Down
14 changes: 14 additions & 0 deletions les/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,25 @@ func (h *clientHandler) handle(p *serverPeer) error {
p.Log().Debug("Light Ethereum handshake failed", "err", err)
return err
}
// Register peer with the server pool
if h.backend.serverPool != nil {
if nvt, err := h.backend.serverPool.RegisterNode(p.Node()); err == nil {
p.setValueTracker(nvt)
p.updateVtParams()
defer func() {
p.setValueTracker(nil)
h.backend.serverPool.UnregisterNode(p.Node())
}()
} else {
return err
}
}
// Register the peer locally
if err := h.backend.peers.register(p); err != nil {
p.Log().Error("Light Ethereum peer registration failed", "err", err)
return err
}

serverConnectionGauge.Update(int64(h.backend.peers.len()))

connectedAt := mclock.Now()
Expand Down
9 changes: 3 additions & 6 deletions les/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,6 @@ type serverPeer struct {

fcServer *flowcontrol.ServerNode // Client side mirror token bucket.
vtLock sync.Mutex
valueTracker *vfc.ValueTracker
nodeValueTracker *vfc.NodeValueTracker
sentReqs map[uint64]sentReqEntry

Expand Down Expand Up @@ -676,9 +675,8 @@ func (p *serverPeer) Handshake(genesis common.Hash, forkid forkid.ID, forkFilter

// setValueTracker sets the value tracker references for connected servers. Note that the
// references should be removed upon disconnection by setValueTracker(nil, nil).
func (p *serverPeer) setValueTracker(vt *vfc.ValueTracker, nvt *vfc.NodeValueTracker) {
func (p *serverPeer) setValueTracker(nvt *vfc.NodeValueTracker) {
p.vtLock.Lock()
p.valueTracker = vt
p.nodeValueTracker = nvt
if nvt != nil {
p.sentReqs = make(map[uint64]sentReqEntry)
Expand All @@ -705,7 +703,7 @@ func (p *serverPeer) updateVtParams() {
}
}
}
p.valueTracker.UpdateCosts(p.nodeValueTracker, reqCosts)
p.nodeValueTracker.UpdateCosts(reqCosts)
}

// sentReqEntry remembers sent requests and their sending times
Expand All @@ -732,7 +730,6 @@ func (p *serverPeer) answeredRequest(id uint64) {
}
e, ok := p.sentReqs[id]
delete(p.sentReqs, id)
vt := p.valueTracker
nvt := p.nodeValueTracker
p.vtLock.Unlock()
if !ok {
Expand All @@ -752,7 +749,7 @@ func (p *serverPeer) answeredRequest(id uint64) {
vtReqs[1] = vfc.ServedRequest{ReqType: uint32(m.rest), Amount: e.amount - 1}
}
dt := time.Duration(mclock.Now() - e.at)
vt.Served(nvt, vtReqs[:reqCount], dt)
nvt.Served(vtReqs[:reqCount], dt)
}

// clientPeer represents each node to which the les server is connected.
Expand Down
11 changes: 0 additions & 11 deletions les/vflux/client/queueiterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/nodestate"
)

func testNodeID(i int) enode.ID {
return enode.ID{42, byte(i % 256), byte(i / 256)}
}

func testNodeIndex(id enode.ID) int {
if id[0] != 42 {
return -1
}
return int(id[1]) + int(id[2])*256
}

func testNode(i int) *enode.Node {
return enode.SignNull(new(enr.Record), testNodeID(i))
}
Expand Down
Loading

0 comments on commit dc109cc

Please sign in to comment.