Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Bitswap Refactor #3: Extract sessions to package #30

Merged
merged 2 commits into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 11 additions & 22 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

decision "github.com/ipfs/go-bitswap/decision"
bsgetter "github.com/ipfs/go-bitswap/getter"
bsmsg "github.com/ipfs/go-bitswap/message"
bsmq "github.com/ipfs/go-bitswap/messagequeue"
bsnet "github.com/ipfs/go-bitswap/network"
Expand Down Expand Up @@ -100,6 +101,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return bsmq.New(p, network)
}

wm := bswm.New(ctx)
bs := &Bitswap{
blockstore: bstore,
notifications: notif,
Expand All @@ -109,9 +111,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: bswm.New(ctx),
wm: wm,
pm: bspm.New(ctx, peerQueueFactory),
sm: bssm.New(),
sm: bssm.New(ctx, wm, network),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
Expand Down Expand Up @@ -202,7 +204,7 @@ type blockRequest struct {
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
return getBlock(parent, k, bs.GetBlocks)
return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks)
}

func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
Expand Down Expand Up @@ -307,7 +309,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
return out, nil
}

// CancelWant removes a given key from the wantlist.
// CancelWants removes a given key from the wantlist.
func (bs *Bitswap) CancelWants(cids []cid.Cid, ses uint64) {
if len(cids) == 0 {
return
Expand Down Expand Up @@ -345,12 +347,7 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
// it now as it requires more thought and isnt causing immediate problems.
bs.notifications.Publish(blk)

k := blk.Cid()
ks := []cid.Cid{k}
for _, s := range bs.SessionsForBlock(k) {
s.receiveBlockFrom(from, blk)
bs.CancelWants(ks, s.id)
}
bs.sm.ReceiveBlockFrom(from, blk)

bs.engine.AddBlock(blk)

Expand All @@ -363,18 +360,6 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
return nil
}

// SessionsForBlock returns a slice of all sessions that may be interested in the given cid.
func (bs *Bitswap) SessionsForBlock(c cid.Cid) []*Session {
var out []*Session
bs.sm.IterateSessions(func(session exchange.Fetcher) {
s := session.(*Session)
if s.interestedIn(c) {
out = append(out, s)
}
})
return out
}

func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
atomic.AddUint64(&bs.counters.messagesRecvd, 1)

Expand Down Expand Up @@ -477,3 +462,7 @@ func (bs *Bitswap) GetWantlist() []cid.Cid {
func (bs *Bitswap) IsOnline() bool {
return true
}

func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
return bs.sm.NewSession(ctx)
}
5 changes: 3 additions & 2 deletions session_test.go → bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

bssession "github.com/ipfs/go-bitswap/session"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
Expand Down Expand Up @@ -132,8 +133,8 @@ func TestSessionSplitFetch(t *testing.T) {
cids = append(cids, blk.Cid())
}

ses := inst[10].Exchange.NewSession(ctx).(*Session)
ses.baseTickDelay = time.Millisecond * 10
ses := inst[10].Exchange.NewSession(ctx).(*bssession.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)

for i := 0; i < 10; i++ {
ch, err := ses.GetBlocks(ctx, cids[i*10:(i+1)*10])
Expand Down
5 changes: 3 additions & 2 deletions dup_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

tn "github.com/ipfs/go-bitswap/testnet"

bssession "github.com/ipfs/go-bitswap/session"
"github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
Expand Down Expand Up @@ -248,14 +249,14 @@ func onePeerPerBlock(b *testing.B, provs []Instance, blks []blocks.Block) {
}

func oneAtATime(b *testing.B, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background()).(*Session)
ses := bs.NewSession(context.Background()).(*bssession.Session)
for _, c := range ks {
_, err := ses.GetBlock(context.Background(), c)
if err != nil {
b.Fatal(err)
}
}
b.Logf("Session fetch latency: %s", ses.latTotal/time.Duration(ses.fetchcnt))
b.Logf("Session fetch latency: %s", ses.GetAverageLatency())
}

// fetch data in batches, 10 at a time
Expand Down
22 changes: 17 additions & 5 deletions get.go → getter/getter.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
package bitswap
package getter

import (
"context"
"errors"

notifications "github.com/ipfs/go-bitswap/notifications"
logging "github.com/ipfs/go-log"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)

type getBlocksFunc func(context.Context, []cid.Cid) (<-chan blocks.Block, error)
var log = logging.Logger("bitswap")

func getBlock(p context.Context, k cid.Cid, gb getBlocksFunc) (blocks.Block, error) {
// GetBlocksFunc is any function that can take an array of CIDs and return a
// channel of incoming blocks.
type GetBlocksFunc func(context.Context, []cid.Cid) (<-chan blocks.Block, error)

// SyncGetBlock takes a block cid and an async function for getting several
// blocks that returns a channel, and uses that function to return the
// block syncronously.
func SyncGetBlock(p context.Context, k cid.Cid, gb GetBlocksFunc) (blocks.Block, error) {
if !k.Defined() {
log.Error("undefined cid in GetBlock")
return nil, blockstore.ErrNotFound
Expand Down Expand Up @@ -49,9 +57,13 @@ func getBlock(p context.Context, k cid.Cid, gb getBlocksFunc) (blocks.Block, err
}
}

type wantFunc func(context.Context, []cid.Cid)
// WantFunc is any function that can express a want for set of blocks.
type WantFunc func(context.Context, []cid.Cid)

func getBlocksImpl(ctx context.Context, keys []cid.Cid, notif notifications.PubSub, want wantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) {
// AsyncGetBlocks take a set of block cids, a pubsub channel for incoming
// blocks, a want function, and a close function,
// and returns a channel of incoming blocks.
func AsyncGetBlocks(ctx context.Context, keys []cid.Cid, notif notifications.PubSub, want WantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) {
if len(keys) == 0 {
out := make(chan blocks.Block)
close(out)
Expand Down
Loading