diff --git a/examples/benchmark/main.go b/examples/benchmark/main.go index 990ee81a..087394d5 100644 --- a/examples/benchmark/main.go +++ b/examples/benchmark/main.go @@ -83,7 +83,10 @@ func main() { aliceToBob.WaitFor(skademlia.SignalAuthenticated) - opcodeBenchmark := bob.Handle(bob.NextAvailableOpcode(), nil) + opcodeBenchmark := bob.Handle(bob.NextAvailableOpcode(), func(ctx noise.Context, buf []byte) ([]byte, error) { + atomic.AddUint64(&recvCount, 1) + return nil, nil + }) // Notifier. go func() { @@ -92,24 +95,6 @@ func main() { } }() - // Receiver. - go func() { - for len(bob.Peers()) != 1 { - continue - } - - bobToAlice := bob.Peers()[0] - - for { - select { - case <-bobToAlice.Ctx().Done(): - return - case <-bobToAlice.Recv(opcodeBenchmark): - atomic.AddUint64(&recvCount, 1) - } - } - }() - var buf [600]byte // Sender. diff --git a/examples/benchmark_concurrent/main.go b/examples/benchmark_concurrent/main.go index 0dd544b7..923dcce4 100644 --- a/examples/benchmark_concurrent/main.go +++ b/examples/benchmark_concurrent/main.go @@ -84,7 +84,10 @@ func main() { aliceToBob.WaitFor(skademlia.SignalAuthenticated) - opcodeBenchmark := bob.Handle(bob.NextAvailableOpcode(), nil) + opcodeBenchmark := bob.Handle(bob.NextAvailableOpcode(), func(ctx noise.Context, buf []byte) ([]byte, error) { + atomic.AddUint64(&recvCount, 1) + return nil, nil + }) // Notifier. go func() { @@ -93,24 +96,6 @@ func main() { } }() - // Receiver. - go func() { - for len(bob.Peers()) != 1 { - continue - } - - bobToAlice := bob.Peers()[0] - - for { - select { - case <-bobToAlice.Ctx().Done(): - return - case <-bobToAlice.Recv(opcodeBenchmark): - atomic.AddUint64(&recvCount, 1) - } - } - }() - var buf [600]byte // Sender. diff --git a/peer.go b/peer.go index 6f8e20e7..5423182f 100644 --- a/peer.go +++ b/peer.go @@ -36,6 +36,7 @@ type Peer struct { pendingRPC map[uint32]*evt pendingRPCLock sync.Mutex + queue chan evtRPC queueRPC chan evtRPC interceptSend, interceptRecv []func([]byte) ([]byte, error) @@ -68,6 +69,7 @@ func (p *Peer) Start() { g = append(g, continuously(p.receiveMessages())) for i := 0; i < runtime.NumCPU(); i++ { + g = append(g, continuously(p.processRecv())) g = append(g, continuously(p.processRPC())) } @@ -324,6 +326,7 @@ func newPeer(n *Node, addr net.Addr, w io.Writer, r io.Reader, c Conn) *Peer { pendingRecv: make(map[byte]chan []byte), pendingRPC: make(map[uint32]*evt), + queue: make(chan evtRPC, 1024), queueRPC: make(chan evtRPC, 1024), signals: make(map[string]chan struct{}), @@ -331,13 +334,13 @@ func newPeer(n *Node, addr net.Addr, w io.Writer, r io.Reader, c Conn) *Peer { recvLockOpcode: math.MaxUint32, } - // The channel buffer size of '3 + runtime.NumCPU()' is selected on purpose. It is the number of + // The channel buffer size of '3 + 2 * runtime.NumCPU()' is selected on purpose. It is the number of // goroutines expected to be spawned per-peer. p.ctx = Context{ n: n, p: p, - result: make(chan error, 3+runtime.NumCPU()), + result: make(chan error, 3+2*runtime.NumCPU()), stop: make(chan struct{}), v: make(map[string]interface{}), vm: new(sync.RWMutex), @@ -595,19 +598,10 @@ func (p *Peer) receiveMessages() func(stop <-chan struct{}) error { p.queueRPC <- evtRPC{nonce: nonce, opcode: opcode, msg: msg, handler: handler} } else if nonce == 0 { - p.pendingRecvLock.Lock() - if _, exists := p.pendingRecv[opcode]; !exists { - p.pendingRecv[opcode] = make(chan []byte, 128) - } - ch := p.pendingRecv[opcode] - p.pendingRecvLock.Unlock() - msg := make([]byte, len(buf.B)) copy(msg, buf.B) - if err := p.queueRecv(ch, msg); err != nil { - return err - } + p.queue <- evtRPC{nonce: nonce, opcode: opcode, msg: msg, handler: handler} } if lockOpcode := atomic.LoadUint32(&p.recvLockOpcode); lockOpcode != math.MaxUint32 { @@ -630,6 +624,45 @@ func (p *Peer) receiveMessages() func(stop <-chan struct{}) error { } } +func (p *Peer) processRecv() func(stop <-chan struct{}) error { + return func(stop <-chan struct{}) error { + var erpc evtRPC + + select { + case erpc = <-p.queue: + default: + select { + case <-stop: + return ErrDisconnect + case erpc = <-p.queue: + } + } + + var err error + + if erpc.handler != nil { + _, err = erpc.handler(p.ctx, erpc.msg) + + if err != nil { + return nil + } + } else { + p.pendingRecvLock.Lock() + if _, exists := p.pendingRecv[erpc.opcode]; !exists { + p.pendingRecv[erpc.opcode] = make(chan []byte, 128) + } + ch := p.pendingRecv[erpc.opcode] + p.pendingRecvLock.Unlock() + + if err := p.queueRecv(ch, erpc.msg); err != nil { + return err + } + } + + return nil + } +} + func (p *Peer) processRPC() func(stop <-chan struct{}) error { return func(stop <-chan struct{}) error { var erpc evtRPC