Skip to content

Commit

Permalink
peer, examples/benchmark*: allow for a handler to be passed to proces…
Browse files Browse the repository at this point in the history
…s non-rpc messages in a bounded worker pool
  • Loading branch information
iwasaki-kenta committed May 8, 2019
1 parent f11e3e7 commit 71d6ff2
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 50 deletions.
23 changes: 4 additions & 19 deletions examples/benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ func main() {

aliceToBob.WaitFor(skademlia.SignalAuthenticated)

opcodeBenchmark := bob.Handle(bob.NextAvailableOpcode(), nil)
opcodeBenchmark := bob.Handle(bob.NextAvailableOpcode(), func(ctx noise.Context, buf []byte) ([]byte, error) {
atomic.AddUint64(&recvCount, 1)
return nil, nil
})

// Notifier.
go func() {
Expand All @@ -92,24 +95,6 @@ func main() {
}
}()

// Receiver.
go func() {
for len(bob.Peers()) != 1 {
continue
}

bobToAlice := bob.Peers()[0]

for {
select {
case <-bobToAlice.Ctx().Done():
return
case <-bobToAlice.Recv(opcodeBenchmark):
atomic.AddUint64(&recvCount, 1)
}
}
}()

var buf [600]byte

// Sender.
Expand Down
23 changes: 4 additions & 19 deletions examples/benchmark_concurrent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ func main() {

aliceToBob.WaitFor(skademlia.SignalAuthenticated)

opcodeBenchmark := bob.Handle(bob.NextAvailableOpcode(), nil)
opcodeBenchmark := bob.Handle(bob.NextAvailableOpcode(), func(ctx noise.Context, buf []byte) ([]byte, error) {
atomic.AddUint64(&recvCount, 1)
return nil, nil
})

// Notifier.
go func() {
Expand All @@ -93,24 +96,6 @@ func main() {
}
}()

// Receiver.
go func() {
for len(bob.Peers()) != 1 {
continue
}

bobToAlice := bob.Peers()[0]

for {
select {
case <-bobToAlice.Ctx().Done():
return
case <-bobToAlice.Recv(opcodeBenchmark):
atomic.AddUint64(&recvCount, 1)
}
}
}()

var buf [600]byte

// Sender.
Expand Down
57 changes: 45 additions & 12 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Peer struct {
pendingRPC map[uint32]*evt
pendingRPCLock sync.Mutex

queue chan evtRPC
queueRPC chan evtRPC

interceptSend, interceptRecv []func([]byte) ([]byte, error)
Expand Down Expand Up @@ -68,6 +69,7 @@ func (p *Peer) Start() {
g = append(g, continuously(p.receiveMessages()))

for i := 0; i < runtime.NumCPU(); i++ {
g = append(g, continuously(p.processRecv()))
g = append(g, continuously(p.processRPC()))
}

Expand Down Expand Up @@ -324,20 +326,21 @@ func newPeer(n *Node, addr net.Addr, w io.Writer, r io.Reader, c Conn) *Peer {
pendingRecv: make(map[byte]chan []byte),
pendingRPC: make(map[uint32]*evt),

queue: make(chan evtRPC, 1024),
queueRPC: make(chan evtRPC, 1024),

signals: make(map[string]chan struct{}),

recvLockOpcode: math.MaxUint32,
}

// The channel buffer size of '3 + runtime.NumCPU()' is selected on purpose. It is the number of
// The channel buffer size of '3 + 2 * runtime.NumCPU()' is selected on purpose. It is the number of
// goroutines expected to be spawned per-peer.

p.ctx = Context{
n: n,
p: p,
result: make(chan error, 3+runtime.NumCPU()),
result: make(chan error, 3+2*runtime.NumCPU()),
stop: make(chan struct{}),
v: make(map[string]interface{}),
vm: new(sync.RWMutex),
Expand Down Expand Up @@ -595,19 +598,10 @@ func (p *Peer) receiveMessages() func(stop <-chan struct{}) error {

p.queueRPC <- evtRPC{nonce: nonce, opcode: opcode, msg: msg, handler: handler}
} else if nonce == 0 {
p.pendingRecvLock.Lock()
if _, exists := p.pendingRecv[opcode]; !exists {
p.pendingRecv[opcode] = make(chan []byte, 128)
}
ch := p.pendingRecv[opcode]
p.pendingRecvLock.Unlock()

msg := make([]byte, len(buf.B))
copy(msg, buf.B)

if err := p.queueRecv(ch, msg); err != nil {
return err
}
p.queue <- evtRPC{nonce: nonce, opcode: opcode, msg: msg, handler: handler}
}

if lockOpcode := atomic.LoadUint32(&p.recvLockOpcode); lockOpcode != math.MaxUint32 {
Expand All @@ -630,6 +624,45 @@ func (p *Peer) receiveMessages() func(stop <-chan struct{}) error {
}
}

func (p *Peer) processRecv() func(stop <-chan struct{}) error {
return func(stop <-chan struct{}) error {
var erpc evtRPC

select {

This comment has been minimized.

Copy link
@a-urth

a-urth May 9, 2019

why not just

select {
	case erpc = <-p.queue:
	case <-stop:
		return ErrDisconnect
}

This comment has been minimized.

Copy link
@iwasaki-kenta

iwasaki-kenta May 9, 2019

Author Contributor

Select statements with 1 case and a default case are significantly faster. Basically the fast path case is the 1 case, and the default case is the slow path.

This comment has been minimized.

Copy link
@a-urth

a-urth May 9, 2019

really? i never heard about that. do you have any benchmarks?

This comment has been minimized.

Copy link
@iwasaki-kenta

iwasaki-kenta via email May 9, 2019

Author Contributor

This comment has been minimized.

Copy link
@a-urth

a-urth May 9, 2019

cool, thanks!

case erpc = <-p.queue:
default:
select {
case <-stop:
return ErrDisconnect
case erpc = <-p.queue:
}
}

var err error

if erpc.handler != nil {
_, err = erpc.handler(p.ctx, erpc.msg)

if err != nil {
return nil
}
} else {
p.pendingRecvLock.Lock()
if _, exists := p.pendingRecv[erpc.opcode]; !exists {
p.pendingRecv[erpc.opcode] = make(chan []byte, 128)
}
ch := p.pendingRecv[erpc.opcode]
p.pendingRecvLock.Unlock()

if err := p.queueRecv(ch, erpc.msg); err != nil {
return err
}
}

return nil
}
}

func (p *Peer) processRPC() func(stop <-chan struct{}) error {
return func(stop <-chan struct{}) error {
var erpc evtRPC
Expand Down

0 comments on commit 71d6ff2

Please sign in to comment.