Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
refactor(sessions): extract sessions to package
Browse files Browse the repository at this point in the history
- moved sessions out of main bitswap package
- modified session manager to manage all sessions
- moved get functions to their own package so sessions can directly

BREAKING CHANGE: SessionsForBlock, while not used outside of Bitswap, has been removed, and was an
exported function
  • Loading branch information
hannahhoward committed Dec 4, 2018
1 parent 03f4615 commit 604ae5e
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 199 deletions.
33 changes: 11 additions & 22 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

decision "github.com/ipfs/go-bitswap/decision"
bsgetter "github.com/ipfs/go-bitswap/getter"
bsmsg "github.com/ipfs/go-bitswap/message"
bsmq "github.com/ipfs/go-bitswap/messagequeue"
bsnet "github.com/ipfs/go-bitswap/network"
Expand Down Expand Up @@ -100,6 +101,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return bsmq.New(p, network)
}

wm := bswm.New(ctx)
bs := &Bitswap{
blockstore: bstore,
notifications: notif,
Expand All @@ -109,9 +111,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: bswm.New(ctx),
wm: wm,
pm: bspm.New(ctx, peerQueueFactory),
sm: bssm.New(),
sm: bssm.New(ctx, wm, network),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
Expand Down Expand Up @@ -202,7 +204,7 @@ type blockRequest struct {
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
return getBlock(parent, k, bs.GetBlocks)
return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks)
}

func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
Expand Down Expand Up @@ -307,7 +309,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
return out, nil
}

// CancelWant removes a given key from the wantlist
// CancelWants removes a given key from the wantlist
func (bs *Bitswap) CancelWants(cids []cid.Cid, ses uint64) {
if len(cids) == 0 {
return
Expand Down Expand Up @@ -345,12 +347,7 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
// it now as it requires more thought and isnt causing immediate problems.
bs.notifications.Publish(blk)

k := blk.Cid()
ks := []cid.Cid{k}
for _, s := range bs.SessionsForBlock(k) {
s.receiveBlockFrom(from, blk)
bs.CancelWants(ks, s.id)
}
bs.sm.ReceiveBlockFrom(from, blk)

bs.engine.AddBlock(blk)

Expand All @@ -363,18 +360,6 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
return nil
}

// SessionsForBlock returns a slice of all sessions that may be interested in the given cid
func (bs *Bitswap) SessionsForBlock(c cid.Cid) []*Session {
var out []*Session
bs.sm.IterateSessions(func(session exchange.Fetcher) {
s := session.(*Session)
if s.interestedIn(c) {
out = append(out, s)
}
})
return out
}

func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
atomic.AddUint64(&bs.counters.messagesRecvd, 1)

Expand Down Expand Up @@ -477,3 +462,7 @@ func (bs *Bitswap) GetWantlist() []cid.Cid {
func (bs *Bitswap) IsOnline() bool {
return true
}

func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
return bs.sm.NewSession(ctx)
}
5 changes: 3 additions & 2 deletions session_test.go → bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

bssession "github.com/ipfs/go-bitswap/session"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
Expand Down Expand Up @@ -132,8 +133,8 @@ func TestSessionSplitFetch(t *testing.T) {
cids = append(cids, blk.Cid())
}

ses := inst[10].Exchange.NewSession(ctx).(*Session)
ses.baseTickDelay = time.Millisecond * 10
ses := inst[10].Exchange.NewSession(ctx).(*bssession.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)

for i := 0; i < 10; i++ {
ch, err := ses.GetBlocks(ctx, cids[i*10:(i+1)*10])
Expand Down
5 changes: 3 additions & 2 deletions dup_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

tn "github.com/ipfs/go-bitswap/testnet"

bssession "github.com/ipfs/go-bitswap/session"
"github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
Expand Down Expand Up @@ -248,14 +249,14 @@ func onePeerPerBlock(b *testing.B, provs []Instance, blks []blocks.Block) {
}

func oneAtATime(b *testing.B, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background()).(*Session)
ses := bs.NewSession(context.Background()).(*bssession.Session)
for _, c := range ks {
_, err := ses.GetBlock(context.Background(), c)
if err != nil {
b.Fatal(err)
}
}
b.Logf("Session fetch latency: %s", ses.latTotal/time.Duration(ses.fetchcnt))
b.Logf("Session fetch latency: %s", ses.GetAverageLatency())
}

// fetch data in batches, 10 at a time
Expand Down
22 changes: 17 additions & 5 deletions get.go → getter/getter.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
package bitswap
package getter

import (
"context"
"errors"

notifications "github.com/ipfs/go-bitswap/notifications"
logging "github.com/ipfs/go-log"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)

type getBlocksFunc func(context.Context, []cid.Cid) (<-chan blocks.Block, error)
var log = logging.Logger("bitswap")

func getBlock(p context.Context, k cid.Cid, gb getBlocksFunc) (blocks.Block, error) {
// GetBlocksFunc is any function that can take an array of CIDs and return a
// channel of incoming blocks
type GetBlocksFunc func(context.Context, []cid.Cid) (<-chan blocks.Block, error)

// SyncGetBlock takes a block cid and an async function for getting several
// blocks that returns a channel, and uses that function to return the
// block syncronously
func SyncGetBlock(p context.Context, k cid.Cid, gb GetBlocksFunc) (blocks.Block, error) {
if !k.Defined() {
log.Error("undefined cid in GetBlock")
return nil, blockstore.ErrNotFound
Expand Down Expand Up @@ -49,9 +57,13 @@ func getBlock(p context.Context, k cid.Cid, gb getBlocksFunc) (blocks.Block, err
}
}

type wantFunc func(context.Context, []cid.Cid)
// WantFunc is any function that can express a want for set of blocks
type WantFunc func(context.Context, []cid.Cid)

func getBlocksImpl(ctx context.Context, keys []cid.Cid, notif notifications.PubSub, want wantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) {
// AsyncGetBlocks take a set of block cids, a pubsub channel for incoming
// blocks, a want function, and a close function,
// and returns a channel of incoming blocks
func AsyncGetBlocks(ctx context.Context, keys []cid.Cid, notif notifications.PubSub, want WantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) {
if len(keys) == 0 {
out := make(chan blocks.Block)
close(out)
Expand Down
Loading

0 comments on commit 604ae5e

Please sign in to comment.