Skip to content

Commit

Permalink
[stream] added stream manager module
Browse files Browse the repository at this point in the history
  • Loading branch information
JackyWYX committed Feb 26, 2021
1 parent c7edd63 commit bb5dc2e
Show file tree
Hide file tree
Showing 7 changed files with 1,037 additions and 0 deletions.
25 changes: 25 additions & 0 deletions p2p/stream/common/streammanager/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package streammanager

import "time"

const (
// checkInterval is the default interval for checking stream number. If the stream
// number is smaller than softLoCap, an active discover through DHT will be triggered.
checkInterval = 30 * time.Second
// discTimeout is the timeout for one batch of discovery
discTimeout = 10 * time.Second
// connectTimeout is the timeout for setting up a stream with a discovered peer
connectTimeout = 60 * time.Second
)

// Config is the config for stream manager
type Config struct {
// HardLoCap is low cap of stream number that immediately trigger discovery
HardLoCap int
// SoftLoCap is low cap of stream number that will trigger discovery during stream check
SoftLoCap int
// HiCap is the high cap of stream number
HiCap int
// DiscBatch is the size of each discovery
DiscBatch int
}
28 changes: 28 additions & 0 deletions p2p/stream/common/streammanager/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package streammanager

import (
"github.com/ethereum/go-ethereum/event"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
)

// EvtStreamAdded is the event of adding a new stream
type (
EvtStreamAdded struct {
Stream sttypes.Stream
}

// EvtStreamRemoved is an event of stream removed
EvtStreamRemoved struct {
ID sttypes.StreamID
}
)

// SubscribeAddStreamEvent subscribe the add stream event
func (sm *streamManager) SubscribeAddStreamEvent(ch chan<- EvtStreamAdded) event.Subscription {
return sm.addStreamFeed.Subscribe(ch)
}

// SubscribeRemoveStreamEvent subscribe the remove stream event
func (sm *streamManager) SubscribeRemoveStreamEvent(ch chan<- EvtStreamRemoved) event.Subscription {
return sm.removeStreamFeed.Subscribe(ch)
}
73 changes: 73 additions & 0 deletions p2p/stream/common/streammanager/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package streammanager

import (
"sync/atomic"
"testing"
"time"
)

func TestStreamManager_SubscribeAddStreamEvent(t *testing.T) {
sm := newTestStreamManager()

addStreamEvtC := make(chan EvtStreamAdded, 1)
sub := sm.SubscribeAddStreamEvent(addStreamEvtC)
defer sub.Unsubscribe()
stopC := make(chan struct{}, 1)

var numStreamAdded uint32
go func() {
for {
select {
case <-addStreamEvtC:
atomic.AddUint32(&numStreamAdded, 1)
case <-stopC:
return
}
}
}()

sm.Start()
time.Sleep(defTestWait)
close(stopC)
sm.Close()

if atomic.LoadUint32(&numStreamAdded) != 16 {
t.Errorf("numStreamAdded unexpected")
}
}

func TestStreamManager_SubscribeRemoveStreamEvent(t *testing.T) {
sm := newTestStreamManager()

rmStreamEvtC := make(chan EvtStreamRemoved, 1)
sub := sm.SubscribeRemoveStreamEvent(rmStreamEvtC)
defer sub.Unsubscribe()
stopC := make(chan struct{}, 1)

var numStreamRemoved uint32
go func() {
for {
select {
case <-rmStreamEvtC:
atomic.AddUint32(&numStreamRemoved, 1)
case <-stopC:
return
}
}
}()

sm.Start()
time.Sleep(defTestWait)

err := sm.RemoveStream(makeStreamID(1))
if err != nil {
t.Fatal(err)
}
time.Sleep(defTestWait)
close(stopC)
sm.Close()

if atomic.LoadUint32(&numStreamRemoved) != 1 {
t.Errorf("numStreamAdded unexpected")
}
}
50 changes: 50 additions & 0 deletions p2p/stream/common/streammanager/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package streammanager

import (
"context"

"github.com/ethereum/go-ethereum/event"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
p2ptypes "github.com/harmony-one/harmony/p2p/types"
"github.com/libp2p/go-libp2p-core/network"
libp2p_peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)

// StreamManager is the interface for streamManager
type StreamManager interface {
p2ptypes.LifeCycle
StreamOperator
Subscriber
StreamReader
}

// StreamOperator handles new stream or remove stream
type StreamOperator interface {
NewStream(stream sttypes.Stream) error
RemoveStream(stID sttypes.StreamID) error
}

// Subscriber is the interface to support stream event subscription
type Subscriber interface {
SubscribeAddStreamEvent(ch chan<- EvtStreamAdded) event.Subscription
SubscribeRemoveStreamEvent(ch chan<- EvtStreamRemoved) event.Subscription
}

// StreamReader is the interface to read stream in stream manager
type StreamReader interface {
GetStreams() []sttypes.Stream
GetStreamByID(id sttypes.StreamID) (sttypes.Stream, bool)
}

// host is the adapter interface of the libp2p host implementation.
// TODO: further adapt the host
type host interface {
ID() libp2p_peer.ID
NewStream(ctx context.Context, p libp2p_peer.ID, pids ...protocol.ID) (network.Stream, error)
}

// peerFinder is the adapter interface of discovery.Discovery
type peerFinder interface {
FindPeers(ctx context.Context, ns string, peerLimit int) (<-chan libp2p_peer.AddrInfo, error)
}
203 changes: 203 additions & 0 deletions p2p/stream/common/streammanager/interface_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package streammanager

import (
"context"
"errors"
"strconv"
"sync"
"sync/atomic"

sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/libp2p/go-libp2p-core/network"
libp2p_peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)

var _ StreamManager = &streamManager{}

var (
myPeerID = makePeerID(0)
testProtoID = sttypes.ProtoID("harmony/sync/unitest/0/1.0.0")
)

const (
defHardLoCap = 16 // discovery trigger immediately when size smaller than this number
defSoftLoCap = 32 // discovery trigger for routine check
defHiCap = 128 // Hard cap of the stream number
defDiscBatch = 16 // batch size for discovery
)

var defConfig = Config{
HardLoCap: defHardLoCap,
SoftLoCap: defSoftLoCap,
HiCap: defHiCap,
DiscBatch: defDiscBatch,
}

func newTestStreamManager() *streamManager {
pid := testProtoID
host := newTestHost()
pf := newTestPeerFinder(makeRemotePeers(100), emptyDelayFunc)

sm := newStreamManager(pid, host, pf, nil, defConfig)
host.sm = sm
return sm
}

type testStream struct {
id sttypes.StreamID
proto sttypes.ProtoID
closed bool
}

func newTestStream(id sttypes.StreamID, proto sttypes.ProtoID) *testStream {
return &testStream{id: id, proto: proto}
}

func (st *testStream) ID() sttypes.StreamID {
return st.id
}

func (st *testStream) ProtoID() sttypes.ProtoID {
return st.proto
}

func (st *testStream) WriteBytes([]byte) error {
return nil
}

func (st *testStream) ReadBytes() ([]byte, error) {
return nil, nil
}

func (st *testStream) Close() error {
if st.closed {
return errors.New("already closed")
}
st.closed = true
return nil
}

func (st *testStream) ResetOnClose() error {
if st.closed {
return errors.New("already closed")
}
st.closed = true
return nil
}

func (st *testStream) ProtoSpec() (sttypes.ProtoSpec, error) {
return sttypes.ProtoIDToProtoSpec(st.ProtoID())
}

type testHost struct {
sm *streamManager
streams map[sttypes.StreamID]*testStream
lock sync.Mutex

errHook streamErrorHook
}

type streamErrorHook func(id sttypes.StreamID, err error)

func newTestHost() *testHost {
return &testHost{
streams: make(map[sttypes.StreamID]*testStream),
}
}

func (h *testHost) ID() libp2p_peer.ID {
return myPeerID
}

// NewStream mock the upper function logic. When stream setup and running protocol, the
// upper code logic will call StreamManager to add new stream
func (h *testHost) NewStream(ctx context.Context, p libp2p_peer.ID, pids ...protocol.ID) (network.Stream, error) {
if len(pids) == 0 {
return nil, errors.New("nil protocol ids")
}
var err error
stid := sttypes.StreamID(p)
defer func() {
if err != nil && h.errHook != nil {
h.errHook(stid, err)
}
}()

st := newTestStream(stid, sttypes.ProtoID(pids[0]))
h.lock.Lock()
h.streams[stid] = st
h.lock.Unlock()

err = h.sm.NewStream(st)
return nil, err
}

func makeStreamID(index int) sttypes.StreamID {
return sttypes.StreamID(strconv.Itoa(index))
}

func makePeerID(index int) libp2p_peer.ID {
return libp2p_peer.ID(strconv.Itoa(index))
}

func makeRemotePeers(size int) []libp2p_peer.ID {
ids := make([]libp2p_peer.ID, 0, size)
for i := 1; i != size+1; i++ {
ids = append(ids, makePeerID(i))
}
return ids
}

type testPeerFinder struct {
peerIDs []libp2p_peer.ID
curIndex int32
fpHook delayFunc
}

type delayFunc func(id libp2p_peer.ID) <-chan struct{}

func emptyDelayFunc(id libp2p_peer.ID) <-chan struct{} {
c := make(chan struct{})
go func() {
c <- struct{}{}
}()
return c
}

func newTestPeerFinder(ids []libp2p_peer.ID, fpHook delayFunc) *testPeerFinder {
return &testPeerFinder{
peerIDs: ids,
curIndex: 0,
fpHook: fpHook,
}
}

func (pf *testPeerFinder) FindPeers(ctx context.Context, ns string, peerLimit int) (<-chan libp2p_peer.AddrInfo, error) {
if peerLimit > len(pf.peerIDs) {
peerLimit = len(pf.peerIDs)
}
resC := make(chan libp2p_peer.AddrInfo)

go func() {
defer close(resC)

for i := 0; i != peerLimit; i++ {
// hack to prevent race
curIndex := atomic.LoadInt32(&pf.curIndex)
pid := pf.peerIDs[curIndex]
select {
case <-ctx.Done():
return
case <-pf.fpHook(pid):
}
resC <- libp2p_peer.AddrInfo{ID: pid}
atomic.AddInt32(&pf.curIndex, 1)
if int(atomic.LoadInt32(&pf.curIndex)) == len(pf.peerIDs) {
pf.curIndex = 0
}
}
}()

return resC, nil
}
Loading

0 comments on commit bb5dc2e

Please sign in to comment.