Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #117 from ipfs/feat/improve-docs
Browse files Browse the repository at this point in the history
Add missing godoc comments, refactor to avoid confusion
  • Loading branch information
Stebalien authored May 10, 2019
2 parents f14ce2b + 7af3e0a commit 5ec87a8
Show file tree
Hide file tree
Showing 20 changed files with 265 additions and 164 deletions.
46 changes: 24 additions & 22 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package bitswap
package bitswap_test

import (
"context"
Expand All @@ -10,19 +10,21 @@ import (
"time"

"github.com/ipfs/go-bitswap/testutil"
blocks "github.com/ipfs/go-block-format"

bitswap "github.com/ipfs/go-bitswap"
bssession "github.com/ipfs/go-bitswap/session"
testinstance "github.com/ipfs/go-bitswap/testinstance"
tn "github.com/ipfs/go-bitswap/testnet"
"github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
)

type fetchFunc func(b *testing.B, bs *Bitswap, ks []cid.Cid)
type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid)

type distFunc func(b *testing.B, provs []Instance, blocks []blocks.Block)
type distFunc func(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block)

type runStats struct {
Dups uint64
Expand Down Expand Up @@ -146,12 +148,12 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, d
start := time.Now()
net := tn.VirtualNetwork(mockrouting.NewServer(), d)

sg := NewTestSessionGenerator(net)
defer sg.Close()
ig := testinstance.NewTestInstanceGenerator(net)
defer ig.Close()

bg := blocksutil.NewBlockGenerator()

instances := sg.Instances(numnodes)
instances := ig.Instances(numnodes)
blocks := bg.Blocks(numblks)
runDistribution(b, instances, blocks, df, ff, start)
}
Expand All @@ -160,16 +162,16 @@ func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d
start := time.Now()
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)

sg := NewTestSessionGenerator(net)
defer sg.Close()
ig := testinstance.NewTestInstanceGenerator(net)
defer ig.Close()

instances := sg.Instances(numnodes)
instances := ig.Instances(numnodes)
blocks := testutil.GenerateBlocksOfSize(numblks, blockSize)

runDistribution(b, instances, blocks, df, ff, start)
}

func runDistribution(b *testing.B, instances []Instance, blocks []blocks.Block, df distFunc, ff fetchFunc, start time.Time) {
func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []blocks.Block, df distFunc, ff fetchFunc, start time.Time) {

numnodes := len(instances)

Expand All @@ -189,7 +191,7 @@ func runDistribution(b *testing.B, instances []Instance, blocks []blocks.Block,
b.Fatal(err)
}

nst := fetcher.Exchange.network.Stats()
nst := fetcher.Adapter.Stats()
stats := runStats{
Time: time.Now().Sub(start),
MsgRecd: nst.MessagesRecvd,
Expand All @@ -204,7 +206,7 @@ func runDistribution(b *testing.B, instances []Instance, blocks []blocks.Block,
}
}

func allToAll(b *testing.B, provs []Instance, blocks []blocks.Block) {
func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
for _, p := range provs {
if err := p.Blockstore().PutMany(blocks); err != nil {
b.Fatal(err)
Expand All @@ -214,7 +216,7 @@ func allToAll(b *testing.B, provs []Instance, blocks []blocks.Block) {

// overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks
// to the second peer. This means both peers have the middle 50 blocks
func overlap1(b *testing.B, provs []Instance, blks []blocks.Block) {
func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap1 only works with 2 provs")
}
Expand All @@ -231,7 +233,7 @@ func overlap1(b *testing.B, provs []Instance, blks []blocks.Block) {

// overlap2 gives every even numbered block to the first peer, odd numbered
// blocks to the second. it also gives every third block to both peers
func overlap2(b *testing.B, provs []Instance, blks []blocks.Block) {
func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap2 only works with 2 provs")
}
Expand All @@ -252,7 +254,7 @@ func overlap2(b *testing.B, provs []Instance, blks []blocks.Block) {
}
}

func overlap3(b *testing.B, provs []Instance, blks []blocks.Block) {
func overlap3(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap3 only works with 2 provs")
}
Expand All @@ -277,13 +279,13 @@ func overlap3(b *testing.B, provs []Instance, blks []blocks.Block) {
// onePeerPerBlock picks a random peer to hold each block
// with this layout, we shouldnt actually ever see any duplicate blocks
// but we're mostly just testing performance of the sync algorithm
func onePeerPerBlock(b *testing.B, provs []Instance, blks []blocks.Block) {
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
for _, blk := range blks {
provs[rand.Intn(len(provs))].Blockstore().Put(blk)
}
}

func oneAtATime(b *testing.B, bs *Bitswap, ks []cid.Cid) {
func oneAtATime(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background()).(*bssession.Session)
for _, c := range ks {
_, err := ses.GetBlock(context.Background(), c)
Expand All @@ -295,7 +297,7 @@ func oneAtATime(b *testing.B, bs *Bitswap, ks []cid.Cid) {
}

// fetch data in batches, 10 at a time
func batchFetchBy10(b *testing.B, bs *Bitswap, ks []cid.Cid) {
func batchFetchBy10(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
for i := 0; i < len(ks); i += 10 {
out, err := ses.GetBlocks(context.Background(), ks[i:i+10])
Expand All @@ -308,7 +310,7 @@ func batchFetchBy10(b *testing.B, bs *Bitswap, ks []cid.Cid) {
}

// fetch each block at the same time concurrently
func fetchAllConcurrent(b *testing.B, bs *Bitswap, ks []cid.Cid) {
func fetchAllConcurrent(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())

var wg sync.WaitGroup
Expand All @@ -325,7 +327,7 @@ func fetchAllConcurrent(b *testing.B, bs *Bitswap, ks []cid.Cid) {
wg.Wait()
}

func batchFetchAll(b *testing.B, bs *Bitswap, ks []cid.Cid) {
func batchFetchAll(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
out, err := ses.GetBlocks(context.Background(), ks)
if err != nil {
Expand All @@ -336,7 +338,7 @@ func batchFetchAll(b *testing.B, bs *Bitswap, ks []cid.Cid) {
}

// simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible
func unixfsFileFetch(b *testing.B, bs *Bitswap, ks []cid.Cid) {
func unixfsFileFetch(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
_, err := ses.GetBlock(context.Background(), ks[0])
if err != nil {
Expand Down
41 changes: 30 additions & 11 deletions bitswap.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// package bitswap implements the IPFS exchange interface with the BitSwap
// Package bitswap implements the IPFS exchange interface with the BitSwap
// bilateral exchange protocol.
package bitswap

Expand All @@ -24,7 +24,6 @@ import (
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
delay "github.com/ipfs/go-ipfs-delay"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
logging "github.com/ipfs/go-log"
metrics "github.com/ipfs/go-metrics-interface"
Expand All @@ -43,8 +42,14 @@ const (
)

var (
// ProvideEnabled is a variable that tells Bitswap whether or not
// to handle providing blocks (see experimental provider system)
ProvideEnabled = true

// HasBlockBufferSize is the buffer size of the channel for new blocks
// that need to be provided. They should get pulled over by the
// provideCollector even before they are actually provided.
// TODO: Does this need to be this large givent that?
HasBlockBufferSize = 256
provideKeysBufferSize = 2048
provideWorkerMax = 6
Expand All @@ -53,12 +58,9 @@ var (
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)

var rebroadcastDelay = delay.Fixed(time.Minute)

// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
// delegate. Runs until context is cancelled or bitswap.Close is called.
func New(parent context.Context, network bsnet.BitSwapNetwork,
bstore blockstore.Blockstore) exchange.Interface {

Expand Down Expand Up @@ -121,7 +123,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
network.SetDelegate(bs)

// Start up bitswaps async worker routines
bs.startWorkers(px, ctx)
bs.startWorkers(ctx, px)

// bind the context and process.
// do it over here to avoid closing before all setup is done.
Expand Down Expand Up @@ -190,6 +192,8 @@ func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, er
return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks)
}

// WantlistForPeer returns the currently understood list of blocks requested by a
// given peer.
func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
var out []cid.Cid
for _, e := range bs.engine.WantlistForPeer(p) {
Expand All @@ -198,6 +202,8 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
return out
}

// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
return bs.engine.LedgerForPeer(p)
}
Expand Down Expand Up @@ -258,6 +264,8 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
return nil
}

// ReceiveMessage is called by the network interface when a new message is
// received.
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
bs.counterLk.Lock()
bs.counters.messagesRecvd++
Expand Down Expand Up @@ -300,8 +308,6 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
wg.Wait()
}

var ErrAlreadyHaveBlock = errors.New("already have block")

func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
blkLen := len(b.RawData())
has, err := bs.blockstore.Has(b.Cid())
Expand All @@ -327,28 +333,34 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
}
}

// Connected/Disconnected warns bitswap about peer connections.
// PeerConnected is called by the network interface
// when a peer initiates a new connection to bitswap.
func (bs *Bitswap) PeerConnected(p peer.ID) {
bs.wm.Connected(p)
bs.engine.PeerConnected(p)
}

// Connected/Disconnected warns bitswap about peer connections.
// PeerDisconnected is called by the network interface when a peer
// closes a connection
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.wm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}

// ReceiveError is called by the network interface when an error happens
// at the network layer. Currently just logs error.
func (bs *Bitswap) ReceiveError(err error) {
log.Infof("Bitswap ReceiveError: %s", err)
// TODO log the network error
// TODO bubble the network error up to the parent context/error logger
}

// Close is called to shutdown Bitswap
func (bs *Bitswap) Close() error {
return bs.process.Close()
}

// GetWantlist returns the current local wantlist.
func (bs *Bitswap) GetWantlist() []cid.Cid {
entries := bs.wm.CurrentWants()
out := make([]cid.Cid, 0, len(entries))
Expand All @@ -358,10 +370,17 @@ func (bs *Bitswap) GetWantlist() []cid.Cid {
return out
}

// IsOnline is needed to match go-ipfs-exchange-interface
func (bs *Bitswap) IsOnline() bool {
return true
}

// NewSession generates a new Bitswap session. You should use this, rather
// that calling Bitswap.GetBlocks, any time you intend to do several related
// block requests in a row. The session returned will have it's own GetBlocks
// method, but the session will use the fact that the requests are related to
// be more efficient in its requests to peers. If you are using a session
// from go-blockservice, it will create a bitswap session automatically.
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
return bs.sm.NewSession(ctx)
}
Loading

0 comments on commit 5ec87a8

Please sign in to comment.