Skip to content

Commit

Permalink
les --> lifecycle decoupled from ethbackend (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
renaynay authored Jul 10, 2020
1 parent 30bd6c7 commit 0ba4d69
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 29 deletions.
25 changes: 1 addition & 24 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ import (
)

type LesServer interface {
Start(srvr *p2p.Server)
Stop()
APIs() []rpc.API
Protocols() []p2p.Protocol
SetBloomBitsIndexer(bbIndexer *core.ChainIndexer)
}

Expand Down Expand Up @@ -98,7 +94,6 @@ type Ethereum struct {
}

func (s *Ethereum) AddLesServer(ls LesServer) {
s.lesServer = ls
ls.SetBloomBitsIndexer(s.bloomIndexer)
}

Expand Down Expand Up @@ -288,19 +283,10 @@ func CreateConsensusEngine(stack *node.Node, chainConfig *params.ChainConfig, co
// NOTE, some of these services probably need to be moved to somewhere else.
func (s *Ethereum) APIs() []rpc.API {
apis := ethapi.GetAPIs(s.APIBackend)

// Append any APIs exposed explicitly by the les server
if s.lesServer != nil {
apis = append(apis, s.lesServer.APIs()...)
}

// Append any APIs exposed explicitly by the consensus engine
apis = append(apis, s.engine.APIs(s.BlockChain())...)

// Append any APIs exposed explicitly by the les server
if s.lesServer != nil {
apis = append(apis, s.lesServer.APIs()...)
}

// Append all the local APIs and return
return append(apis, []rpc.API{
{
Expand Down Expand Up @@ -525,9 +511,6 @@ func (s *Ethereum) Protocols() []p2p.Protocol {
protos[i].Attributes = []enr.Entry{s.currentEthEntry()}
protos[i].DialCandidates = s.dialCandidates
}
if s.lesServer != nil {
protos = append(protos, s.lesServer.Protocols()...)
}
return protos
}

Expand All @@ -549,9 +532,6 @@ func (s *Ethereum) Start() error {
}
// Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
s.lesServer.Start(s.p2pServer)
}
return nil
}

Expand All @@ -560,9 +540,6 @@ func (s *Ethereum) Start() error {
func (s *Ethereum) Stop() error {
// Stop all the peer-related stuff first.
s.protocolManager.Stop()
if s.lesServer != nil {
s.lesServer.Stop()
}

// Then stop everything else.
s.bloomIndexer.Close()
Expand Down
22 changes: 17 additions & 5 deletions les/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type LesServer struct {
minCapacity, maxCapacity, freeCapacity uint64
threadsIdle int // Request serving threads count when system is idle.
threadsBusy int // Request serving threads count when system is busy(block insertion).

p2pSrv *p2p.Server
}

func NewLesServer(node *node.Node, e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
Expand Down Expand Up @@ -87,6 +89,7 @@ func NewLesServer(node *node.Node, e *eth.Ethereum, config *eth.Config) (*LesSer
servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100),
threadsBusy: config.LightServ/100 + 1,
threadsIdle: threads,
p2pSrv: node.Server(),
}
srv.handler = newServerHandler(srv, e.BlockChain(), e.ChainDb(), e.TxPool(), e.Synced)
srv.costTracker, srv.minCapacity = newCostTracker(e.ChainDb(), config)
Expand Down Expand Up @@ -119,6 +122,11 @@ func NewLesServer(node *node.Node, e *eth.Ethereum, config *eth.Config) (*LesSer
"chtroot", checkpoint.CHTRoot, "bloomroot", checkpoint.BloomRoot)
}
srv.chtIndexer.Start(e.BlockChain())

node.RegisterProtocols(srv.Protocols())
node.RegisterAPIs(srv.APIs())
node.RegisterLifecycle(srv)

return srv, nil
}

Expand Down Expand Up @@ -160,29 +168,31 @@ func (s *LesServer) Protocols() []p2p.Protocol {
}

// Start starts the LES server
func (s *LesServer) Start(srvr *p2p.Server) {
s.privateKey = srvr.PrivateKey
func (s *LesServer) Start() error {
s.privateKey = s.p2pSrv.PrivateKey
s.handler.start()

s.wg.Add(1)
go s.capacityManagement()

if srvr.DiscV5 != nil {
if s.p2pSrv.DiscV5 != nil {
for _, topic := range s.lesTopics {
topic := topic
go func() {
logger := log.New("topic", topic)
logger.Info("Starting topic registration")
defer logger.Info("Terminated topic registration")

srvr.DiscV5.RegisterTopic(topic, s.closeCh)
s.p2pSrv.DiscV5.RegisterTopic(topic, s.closeCh)
}()
}
}

return nil
}

// Stop stops the LES service
func (s *LesServer) Stop() {
func (s *LesServer) Stop() error {
close(s.closeCh)

// Disconnect existing sessions.
Expand All @@ -201,6 +211,8 @@ func (s *LesServer) Stop() {
s.chtIndexer.Close()
s.wg.Wait()
log.Info("Les server stopped")

return nil
}

func (s *LesServer) SetBloomBitsIndexer(bloomIndexer *core.ChainIndexer) {
Expand Down

0 comments on commit 0ba4d69

Please sign in to comment.