diff --git a/benchmarks_test.go b/benchmarks_test.go index b8c90d97..dbe05889 100644 --- a/benchmarks_test.go +++ b/benchmarks_test.go @@ -1,4 +1,4 @@ -package bitswap +package bitswap_test import ( "context" @@ -10,19 +10,21 @@ import ( "time" "github.com/ipfs/go-bitswap/testutil" + blocks "github.com/ipfs/go-block-format" + bitswap "github.com/ipfs/go-bitswap" bssession "github.com/ipfs/go-bitswap/session" + testinstance "github.com/ipfs/go-bitswap/testinstance" tn "github.com/ipfs/go-bitswap/testnet" - "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" blocksutil "github.com/ipfs/go-ipfs-blocksutil" delay "github.com/ipfs/go-ipfs-delay" mockrouting "github.com/ipfs/go-ipfs-routing/mock" ) -type fetchFunc func(b *testing.B, bs *Bitswap, ks []cid.Cid) +type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) -type distFunc func(b *testing.B, provs []Instance, blocks []blocks.Block) +type distFunc func(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) type runStats struct { Dups uint64 @@ -146,12 +148,12 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, d start := time.Now() net := tn.VirtualNetwork(mockrouting.NewServer(), d) - sg := NewTestSessionGenerator(net) - defer sg.Close() + ig := testinstance.NewTestInstanceGenerator(net) + defer ig.Close() bg := blocksutil.NewBlockGenerator() - instances := sg.Instances(numnodes) + instances := ig.Instances(numnodes) blocks := bg.Blocks(numblks) runDistribution(b, instances, blocks, df, ff, start) } @@ -160,16 +162,16 @@ func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d start := time.Now() net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator) - sg := NewTestSessionGenerator(net) - defer sg.Close() + ig := testinstance.NewTestInstanceGenerator(net) + defer ig.Close() - instances := sg.Instances(numnodes) + instances := ig.Instances(numnodes) blocks := testutil.GenerateBlocksOfSize(numblks, blockSize) runDistribution(b, instances, blocks, df, ff, start) } -func runDistribution(b *testing.B, instances []Instance, blocks []blocks.Block, df distFunc, ff fetchFunc, start time.Time) { +func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []blocks.Block, df distFunc, ff fetchFunc, start time.Time) { numnodes := len(instances) @@ -189,7 +191,7 @@ func runDistribution(b *testing.B, instances []Instance, blocks []blocks.Block, b.Fatal(err) } - nst := fetcher.Exchange.network.Stats() + nst := fetcher.Adapter.Stats() stats := runStats{ Time: time.Now().Sub(start), MsgRecd: nst.MessagesRecvd, @@ -204,7 +206,7 @@ func runDistribution(b *testing.B, instances []Instance, blocks []blocks.Block, } } -func allToAll(b *testing.B, provs []Instance, blocks []blocks.Block) { +func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) { for _, p := range provs { if err := p.Blockstore().PutMany(blocks); err != nil { b.Fatal(err) @@ -214,7 +216,7 @@ func allToAll(b *testing.B, provs []Instance, blocks []blocks.Block) { // overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks // to the second peer. This means both peers have the middle 50 blocks -func overlap1(b *testing.B, provs []Instance, blks []blocks.Block) { +func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) { if len(provs) != 2 { b.Fatal("overlap1 only works with 2 provs") } @@ -231,7 +233,7 @@ func overlap1(b *testing.B, provs []Instance, blks []blocks.Block) { // overlap2 gives every even numbered block to the first peer, odd numbered // blocks to the second. it also gives every third block to both peers -func overlap2(b *testing.B, provs []Instance, blks []blocks.Block) { +func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) { if len(provs) != 2 { b.Fatal("overlap2 only works with 2 provs") } @@ -252,7 +254,7 @@ func overlap2(b *testing.B, provs []Instance, blks []blocks.Block) { } } -func overlap3(b *testing.B, provs []Instance, blks []blocks.Block) { +func overlap3(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) { if len(provs) != 2 { b.Fatal("overlap3 only works with 2 provs") } @@ -277,13 +279,13 @@ func overlap3(b *testing.B, provs []Instance, blks []blocks.Block) { // onePeerPerBlock picks a random peer to hold each block // with this layout, we shouldnt actually ever see any duplicate blocks // but we're mostly just testing performance of the sync algorithm -func onePeerPerBlock(b *testing.B, provs []Instance, blks []blocks.Block) { +func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) { for _, blk := range blks { provs[rand.Intn(len(provs))].Blockstore().Put(blk) } } -func oneAtATime(b *testing.B, bs *Bitswap, ks []cid.Cid) { +func oneAtATime(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) { ses := bs.NewSession(context.Background()).(*bssession.Session) for _, c := range ks { _, err := ses.GetBlock(context.Background(), c) @@ -295,7 +297,7 @@ func oneAtATime(b *testing.B, bs *Bitswap, ks []cid.Cid) { } // fetch data in batches, 10 at a time -func batchFetchBy10(b *testing.B, bs *Bitswap, ks []cid.Cid) { +func batchFetchBy10(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) { ses := bs.NewSession(context.Background()) for i := 0; i < len(ks); i += 10 { out, err := ses.GetBlocks(context.Background(), ks[i:i+10]) @@ -308,7 +310,7 @@ func batchFetchBy10(b *testing.B, bs *Bitswap, ks []cid.Cid) { } // fetch each block at the same time concurrently -func fetchAllConcurrent(b *testing.B, bs *Bitswap, ks []cid.Cid) { +func fetchAllConcurrent(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) { ses := bs.NewSession(context.Background()) var wg sync.WaitGroup @@ -325,7 +327,7 @@ func fetchAllConcurrent(b *testing.B, bs *Bitswap, ks []cid.Cid) { wg.Wait() } -func batchFetchAll(b *testing.B, bs *Bitswap, ks []cid.Cid) { +func batchFetchAll(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) { ses := bs.NewSession(context.Background()) out, err := ses.GetBlocks(context.Background(), ks) if err != nil { @@ -336,7 +338,7 @@ func batchFetchAll(b *testing.B, bs *Bitswap, ks []cid.Cid) { } // simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible -func unixfsFileFetch(b *testing.B, bs *Bitswap, ks []cid.Cid) { +func unixfsFileFetch(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) { ses := bs.NewSession(context.Background()) _, err := ses.GetBlock(context.Background(), ks[0]) if err != nil { diff --git a/bitswap.go b/bitswap.go index e6f90fe7..4a407feb 100644 --- a/bitswap.go +++ b/bitswap.go @@ -1,4 +1,4 @@ -// package bitswap implements the IPFS exchange interface with the BitSwap +// Package bitswap implements the IPFS exchange interface with the BitSwap // bilateral exchange protocol. package bitswap @@ -24,7 +24,6 @@ import ( blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" - delay "github.com/ipfs/go-ipfs-delay" exchange "github.com/ipfs/go-ipfs-exchange-interface" logging "github.com/ipfs/go-log" metrics "github.com/ipfs/go-metrics-interface" @@ -43,8 +42,14 @@ const ( ) var ( + // ProvideEnabled is a variable that tells Bitswap whether or not + // to handle providing blocks (see experimental provider system) ProvideEnabled = true + // HasBlockBufferSize is the buffer size of the channel for new blocks + // that need to be provided. They should get pulled over by the + // provideCollector even before they are actually provided. + // TODO: Does this need to be this large givent that? HasBlockBufferSize = 256 provideKeysBufferSize = 2048 provideWorkerMax = 6 @@ -53,12 +58,9 @@ var ( metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} ) -var rebroadcastDelay = delay.Fixed(time.Minute) - // New initializes a BitSwap instance that communicates over the provided // BitSwapNetwork. This function registers the returned instance as the network -// delegate. -// Runs until context is cancelled. +// delegate. Runs until context is cancelled or bitswap.Close is called. func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore) exchange.Interface { @@ -121,7 +123,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, network.SetDelegate(bs) // Start up bitswaps async worker routines - bs.startWorkers(px, ctx) + bs.startWorkers(ctx, px) // bind the context and process. // do it over here to avoid closing before all setup is done. @@ -190,6 +192,8 @@ func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, er return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks) } +// WantlistForPeer returns the currently understood list of blocks requested by a +// given peer. func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid { var out []cid.Cid for _, e := range bs.engine.WantlistForPeer(p) { @@ -198,6 +202,8 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid { return out } +// LedgerForPeer returns aggregated data about blocks swapped and communication +// with a given peer. func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt { return bs.engine.LedgerForPeer(p) } @@ -258,6 +264,8 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error { return nil } +// ReceiveMessage is called by the network interface when a new message is +// received. func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { bs.counterLk.Lock() bs.counters.messagesRecvd++ @@ -300,8 +308,6 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg wg.Wait() } -var ErrAlreadyHaveBlock = errors.New("already have block") - func (bs *Bitswap) updateReceiveCounters(b blocks.Block) { blkLen := len(b.RawData()) has, err := bs.blockstore.Has(b.Cid()) @@ -327,28 +333,34 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) { } } -// Connected/Disconnected warns bitswap about peer connections. +// PeerConnected is called by the network interface +// when a peer initiates a new connection to bitswap. func (bs *Bitswap) PeerConnected(p peer.ID) { bs.wm.Connected(p) bs.engine.PeerConnected(p) } -// Connected/Disconnected warns bitswap about peer connections. +// PeerDisconnected is called by the network interface when a peer +// closes a connection func (bs *Bitswap) PeerDisconnected(p peer.ID) { bs.wm.Disconnected(p) bs.engine.PeerDisconnected(p) } +// ReceiveError is called by the network interface when an error happens +// at the network layer. Currently just logs error. func (bs *Bitswap) ReceiveError(err error) { log.Infof("Bitswap ReceiveError: %s", err) // TODO log the network error // TODO bubble the network error up to the parent context/error logger } +// Close is called to shutdown Bitswap func (bs *Bitswap) Close() error { return bs.process.Close() } +// GetWantlist returns the current local wantlist. func (bs *Bitswap) GetWantlist() []cid.Cid { entries := bs.wm.CurrentWants() out := make([]cid.Cid, 0, len(entries)) @@ -358,10 +370,17 @@ func (bs *Bitswap) GetWantlist() []cid.Cid { return out } +// IsOnline is needed to match go-ipfs-exchange-interface func (bs *Bitswap) IsOnline() bool { return true } +// NewSession generates a new Bitswap session. You should use this, rather +// that calling Bitswap.GetBlocks, any time you intend to do several related +// block requests in a row. The session returned will have it's own GetBlocks +// method, but the session will use the fact that the requests are related to +// be more efficient in its requests to peers. If you are using a session +// from go-blockservice, it will create a bitswap session automatically. func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher { return bs.sm.NewSession(ctx) } diff --git a/bitswap_test.go b/bitswap_test.go index 127ac0dc..c1d059b4 100644 --- a/bitswap_test.go +++ b/bitswap_test.go @@ -1,4 +1,4 @@ -package bitswap +package bitswap_test import ( "bytes" @@ -8,11 +8,12 @@ import ( "testing" "time" + bitswap "github.com/ipfs/go-bitswap" decision "github.com/ipfs/go-bitswap/decision" "github.com/ipfs/go-bitswap/message" bssession "github.com/ipfs/go-bitswap/session" + testinstance "github.com/ipfs/go-bitswap/testinstance" tn "github.com/ipfs/go-bitswap/testnet" - blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" detectrace "github.com/ipfs/go-detect-race" @@ -35,12 +36,12 @@ func getVirtualNetwork() tn.Network { func TestClose(t *testing.T) { vnet := getVirtualNetwork() - sesgen := NewTestSessionGenerator(vnet) - defer sesgen.Close() + ig := testinstance.NewTestInstanceGenerator(vnet) + defer ig.Close() bgen := blocksutil.NewBlockGenerator() block := bgen.Next() - bitswap := sesgen.Next() + bitswap := ig.Next() bitswap.Exchange.Close() bitswap.Exchange.GetBlock(context.Background(), block.Cid()) @@ -50,14 +51,14 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this rs := mockrouting.NewServer() net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay)) - g := NewTestSessionGenerator(net) - defer g.Close() + ig := testinstance.NewTestInstanceGenerator(net) + defer ig.Close() block := blocks.NewBlock([]byte("block")) pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t) rs.Client(pinfo).Provide(context.Background(), block.Cid(), true) // but not on network - solo := g.Next() + solo := ig.Next() defer solo.Exchange.Close() ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) @@ -73,10 +74,10 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - g := NewTestSessionGenerator(net) - defer g.Close() + ig := testinstance.NewTestInstanceGenerator(net) + defer ig.Close() - peers := g.Instances(2) + peers := ig.Instances(2) hasBlock := peers[0] defer hasBlock.Exchange.Close() @@ -101,15 +102,15 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { } func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { - ProvideEnabled = false - defer func() { ProvideEnabled = true }() + bitswap.ProvideEnabled = false + defer func() { bitswap.ProvideEnabled = true }() net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - g := NewTestSessionGenerator(net) - defer g.Close() + ig := testinstance.NewTestInstanceGenerator(net) + defer ig.Close() - hasBlock := g.Next() + hasBlock := ig.Next() defer hasBlock.Exchange.Close() if err := hasBlock.Exchange.HasBlock(block); err != nil { @@ -119,7 +120,7 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - wantsBlock := g.Next() + wantsBlock := ig.Next() defer wantsBlock.Exchange.Close() ns := wantsBlock.Exchange.NewSession(ctx).(*bssession.Session) @@ -143,10 +144,10 @@ func TestUnwantedBlockNotAdded(t *testing.T) { bsMessage := message.New(true) bsMessage.AddBlock(block) - g := NewTestSessionGenerator(net) - defer g.Close() + ig := testinstance.NewTestInstanceGenerator(net) + defer ig.Close() - peers := g.Instances(2) + peers := ig.Instances(2) hasBlock := peers[0] defer hasBlock.Exchange.Close() @@ -162,7 +163,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) { doesNotWantBlock.Exchange.ReceiveMessage(ctx, hasBlock.Peer, bsMessage) - blockInStore, err := doesNotWantBlock.blockstore.Has(block.Cid()) + blockInStore, err := doesNotWantBlock.Blockstore().Has(block.Cid()) if err != nil || blockInStore { t.Fatal("Unwanted block added to block store") } @@ -200,18 +201,6 @@ func TestLargeFile(t *testing.T) { PerformDistributionTest(t, numInstances, numBlocks) } -func TestLargeFileNoRebroadcast(t *testing.T) { - rbd := rebroadcastDelay.Get() - rebroadcastDelay.Set(time.Hour * 24 * 365 * 10) // ten years should be long enough - if testing.Short() { - t.SkipNow() - } - numInstances := 10 - numBlocks := 100 - PerformDistributionTest(t, numInstances, numBlocks) - rebroadcastDelay.Set(rbd) -} - func TestLargeFileTwoPeers(t *testing.T) { if testing.Short() { t.SkipNow() @@ -227,11 +216,11 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { t.SkipNow() } net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sg := NewTestSessionGenerator(net) - defer sg.Close() + ig := testinstance.NewTestInstanceGenerator(net) + defer ig.Close() bg := blocksutil.NewBlockGenerator() - instances := sg.Instances(numInstances) + instances := ig.Instances(numInstances) blocks := bg.Blocks(numBlocks) t.Log("Give the blocks to the first instance") @@ -250,7 +239,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { for _, inst := range instances[1:] { wg.Add(1) - go func(inst Instance) { + go func(inst testinstance.Instance) { defer wg.Done() outch, err := inst.Exchange.GetBlocks(ctx, blkeys) if err != nil { @@ -290,14 +279,11 @@ func TestSendToWantingPeer(t *testing.T) { } net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sg := NewTestSessionGenerator(net) - defer sg.Close() + ig := testinstance.NewTestInstanceGenerator(net) + defer ig.Close() bg := blocksutil.NewBlockGenerator() - prev := rebroadcastDelay.Set(time.Second / 2) - defer func() { rebroadcastDelay.Set(prev) }() - - peers := sg.Instances(2) + peers := ig.Instances(2) peerA := peers[0] peerB := peers[1] @@ -335,9 +321,9 @@ func TestSendToWantingPeer(t *testing.T) { func TestEmptyKey(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sg := NewTestSessionGenerator(net) - defer sg.Close() - bs := sg.Instances(1)[0].Exchange + ig := testinstance.NewTestInstanceGenerator(net) + defer ig.Close() + bs := ig.Instances(1)[0].Exchange ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() @@ -348,7 +334,7 @@ func TestEmptyKey(t *testing.T) { } } -func assertStat(t *testing.T, st *Stat, sblks, rblks, sdata, rdata uint64) { +func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint64) { if sblks != st.BlocksSent { t.Errorf("mismatch in blocks sent: %d vs %d", sblks, st.BlocksSent) } @@ -368,13 +354,13 @@ func assertStat(t *testing.T, st *Stat, sblks, rblks, sdata, rdata uint64) { func TestBasicBitswap(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sg := NewTestSessionGenerator(net) - defer sg.Close() + ig := testinstance.NewTestInstanceGenerator(net) + defer ig.Close() bg := blocksutil.NewBlockGenerator() t.Log("Test a one node trying to get one block from another") - instances := sg.Instances(3) + instances := ig.Instances(3) blocks := bg.Blocks(1) err := instances[0].Exchange.HasBlock(blocks[0]) if err != nil { @@ -437,13 +423,13 @@ func TestBasicBitswap(t *testing.T) { func TestDoubleGet(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sg := NewTestSessionGenerator(net) - defer sg.Close() + ig := testinstance.NewTestInstanceGenerator(net) + defer ig.Close() bg := blocksutil.NewBlockGenerator() t.Log("Test a one node trying to get one block from another") - instances := sg.Instances(2) + instances := ig.Instances(2) blocks := bg.Blocks(1) // NOTE: A race condition can happen here where these GetBlocks requests go @@ -505,11 +491,11 @@ func TestDoubleGet(t *testing.T) { func TestWantlistCleanup(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sg := NewTestSessionGenerator(net) - defer sg.Close() + ig := testinstance.NewTestInstanceGenerator(net) + defer ig.Close() bg := blocksutil.NewBlockGenerator() - instances := sg.Instances(1)[0] + instances := ig.Instances(1)[0] bswap := instances.Exchange blocks := bg.Blocks(20) @@ -616,13 +602,13 @@ func newReceipt(sent, recv, exchanged uint64) *decision.Receipt { func TestBitswapLedgerOneWay(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sg := NewTestSessionGenerator(net) - defer sg.Close() + ig := testinstance.NewTestInstanceGenerator(net) + defer ig.Close() bg := blocksutil.NewBlockGenerator() t.Log("Test ledgers match when one peer sends block to another") - instances := sg.Instances(2) + instances := ig.Instances(2) blocks := bg.Blocks(1) err := instances[0].Exchange.HasBlock(blocks[0]) if err != nil { @@ -668,13 +654,13 @@ func TestBitswapLedgerOneWay(t *testing.T) { func TestBitswapLedgerTwoWay(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sg := NewTestSessionGenerator(net) - defer sg.Close() + ig := testinstance.NewTestInstanceGenerator(net) + defer ig.Close() bg := blocksutil.NewBlockGenerator() t.Log("Test ledgers match when two peers send one block to each other") - instances := sg.Instances(2) + instances := ig.Instances(2) blocks := bg.Blocks(2) err := instances[0].Exchange.HasBlock(blocks[0]) if err != nil { diff --git a/bitswap_with_sessions_test.go b/bitswap_with_sessions_test.go index d4d0cfee..50be52ca 100644 --- a/bitswap_with_sessions_test.go +++ b/bitswap_with_sessions_test.go @@ -1,4 +1,4 @@ -package bitswap +package bitswap_test import ( "context" @@ -7,6 +7,7 @@ import ( "time" bssession "github.com/ipfs/go-bitswap/session" + testinstance "github.com/ipfs/go-bitswap/testinstance" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" blocksutil "github.com/ipfs/go-ipfs-blocksutil" @@ -18,12 +19,12 @@ func TestBasicSessions(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - sesgen := NewTestSessionGenerator(vnet) - defer sesgen.Close() + ig := testinstance.NewTestInstanceGenerator(vnet) + defer ig.Close() bgen := blocksutil.NewBlockGenerator() block := bgen.Next() - inst := sesgen.Instances(2) + inst := ig.Instances(2) a := inst[0] b := inst[1] @@ -66,11 +67,11 @@ func TestSessionBetweenPeers(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - sesgen := NewTestSessionGenerator(vnet) - defer sesgen.Close() + ig := testinstance.NewTestInstanceGenerator(vnet) + defer ig.Close() bgen := blocksutil.NewBlockGenerator() - inst := sesgen.Instances(10) + inst := ig.Instances(10) blks := bgen.Blocks(101) if err := inst[0].Blockstore().PutMany(blks); err != nil { @@ -109,7 +110,7 @@ func TestSessionBetweenPeers(t *testing.T) { t.Fatal(err) } if stat.MessagesReceived > 2 { - t.Fatal("uninvolved nodes should only receive two messages", is.Exchange.counters.messagesRecvd) + t.Fatal("uninvolved nodes should only receive two messages", stat.MessagesReceived) } } } @@ -119,11 +120,11 @@ func TestSessionSplitFetch(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - sesgen := NewTestSessionGenerator(vnet) - defer sesgen.Close() + ig := testinstance.NewTestInstanceGenerator(vnet) + defer ig.Close() bgen := blocksutil.NewBlockGenerator() - inst := sesgen.Instances(11) + inst := ig.Instances(11) blks := bgen.Blocks(100) for i := 0; i < 10; i++ { @@ -162,11 +163,11 @@ func TestFetchNotConnected(t *testing.T) { bssession.SetProviderSearchDelay(10 * time.Millisecond) vnet := getVirtualNetwork() - sesgen := NewTestSessionGenerator(vnet) - defer sesgen.Close() + ig := testinstance.NewTestInstanceGenerator(vnet) + defer ig.Close() bgen := blocksutil.NewBlockGenerator() - other := sesgen.Next() + other := ig.Next() blks := bgen.Blocks(10) for _, block := range blks { @@ -180,7 +181,7 @@ func TestFetchNotConnected(t *testing.T) { cids = append(cids, blk.Cid()) } - thisNode := sesgen.Next() + thisNode := ig.Next() ses := thisNode.Exchange.NewSession(ctx).(*bssession.Session) ses.SetBaseTickDelay(time.Millisecond * 10) @@ -202,12 +203,12 @@ func TestInterestCacheOverflow(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - sesgen := NewTestSessionGenerator(vnet) - defer sesgen.Close() + ig := testinstance.NewTestInstanceGenerator(vnet) + defer ig.Close() bgen := blocksutil.NewBlockGenerator() blks := bgen.Blocks(2049) - inst := sesgen.Instances(2) + inst := ig.Instances(2) a := inst[0] b := inst[1] @@ -254,12 +255,12 @@ func TestPutAfterSessionCacheEvict(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - sesgen := NewTestSessionGenerator(vnet) - defer sesgen.Close() + ig := testinstance.NewTestInstanceGenerator(vnet) + defer ig.Close() bgen := blocksutil.NewBlockGenerator() blks := bgen.Blocks(2500) - inst := sesgen.Instances(1) + inst := ig.Instances(1) a := inst[0] @@ -294,12 +295,12 @@ func TestMultipleSessions(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - sesgen := NewTestSessionGenerator(vnet) - defer sesgen.Close() + ig := testinstance.NewTestInstanceGenerator(vnet) + defer ig.Close() bgen := blocksutil.NewBlockGenerator() blk := bgen.Blocks(1)[0] - inst := sesgen.Instances(2) + inst := ig.Instances(2) a := inst[0] b := inst[1] @@ -337,8 +338,8 @@ func TestWantlistClearsOnCancel(t *testing.T) { defer cancel() vnet := getVirtualNetwork() - sesgen := NewTestSessionGenerator(vnet) - defer sesgen.Close() + ig := testinstance.NewTestInstanceGenerator(vnet) + defer ig.Close() bgen := blocksutil.NewBlockGenerator() blks := bgen.Blocks(10) @@ -347,7 +348,7 @@ func TestWantlistClearsOnCancel(t *testing.T) { cids = append(cids, blk.Cid()) } - inst := sesgen.Instances(1) + inst := ig.Instances(1) a := inst[0] diff --git a/decision/engine.go b/decision/engine.go index 37737c8d..c2de9299 100644 --- a/decision/engine.go +++ b/decision/engine.go @@ -1,4 +1,4 @@ -// package decision implements the decision engine for the bitswap service. +// Package decision implements the decision engine for the bitswap service. package decision import ( @@ -68,6 +68,7 @@ type Envelope struct { Sent func() } +// Engine manages sending requested blocks to peers. type Engine struct { // peerRequestQueue is a priority queue of requests received from peers. // Requests are popped from the queue, packaged up, and placed in the @@ -94,6 +95,7 @@ type Engine struct { ticker *time.Ticker } +// NewEngine creates a new block sending engine for the given block store func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { e := &Engine{ ledgerMap: make(map[peer.ID]*ledger), @@ -107,6 +109,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { return e } +// WantlistForPeer returns the currently understood want list for a given peer func (e *Engine) WantlistForPeer(p peer.ID) (out []wl.Entry) { partner := e.findOrCreate(p) partner.lk.Lock() @@ -114,6 +117,8 @@ func (e *Engine) WantlistForPeer(p peer.ID) (out []wl.Entry) { return partner.wantList.SortedEntries() } +// LedgerForPeer returns aggregated data about blocks swapped and communication +// with a given peer. func (e *Engine) LedgerForPeer(p peer.ID) *Receipt { ledger := e.findOrCreate(p) @@ -295,6 +300,8 @@ func (e *Engine) addBlock(block blocks.Block) { } } +// AddBlock is called to when a new block is received and added to a block store +// meaning there may be peers who want that block that we should send it to. func (e *Engine) AddBlock(block blocks.Block) { e.lock.Lock() defer e.lock.Unlock() @@ -308,6 +315,8 @@ func (e *Engine) AddBlock(block blocks.Block) { // inconsistent. Would need to ensure that Sends and acknowledgement of the // send happen atomically +// MessageSent is called when a message has successfully been sent out, to record +// changes. func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) { l := e.findOrCreate(p) l.lk.Lock() @@ -321,6 +330,8 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) { } +// PeerConnected is called when a new peer connects, meaning we should start +// sending blocks. func (e *Engine) PeerConnected(p peer.ID) { e.lock.Lock() defer e.lock.Unlock() @@ -334,6 +345,7 @@ func (e *Engine) PeerConnected(p peer.ID) { l.ref++ } +// PeerDisconnected is called when a peer disconnects. func (e *Engine) PeerDisconnected(p peer.ID) { e.lock.Lock() defer e.lock.Unlock() diff --git a/decision/ledger.go b/decision/ledger.go index 374f0e7e..37ca5745 100644 --- a/decision/ledger.go +++ b/decision/ledger.go @@ -47,6 +47,9 @@ type ledger struct { lk sync.Mutex } +// Receipt is a summary of the ledger for a given peer +// collecting various pieces of aggregated data for external +// reporting purposes. type Receipt struct { Peer string Value float64 diff --git a/go.mod b/go.mod index 13fd85ed..30629108 100644 --- a/go.mod +++ b/go.mod @@ -30,4 +30,9 @@ require ( github.com/libp2p/go-libp2p-routing v0.0.1 github.com/libp2p/go-testutil v0.0.1 github.com/multiformats/go-multiaddr v0.0.1 + golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 // indirect + golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c // indirect + golang.org/x/sys v0.0.0-20190509141414-a5b02f93d862 // indirect + golang.org/x/text v0.3.2 // indirect + golang.org/x/tools v0.0.0-20190509153222-73554e0f7805 // indirect ) diff --git a/go.sum b/go.sum index 8cf8bc44..8190b7bf 100644 --- a/go.sum +++ b/go.sum @@ -241,21 +241,37 @@ golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 h1:jsG6UpNLt9iAsb0S2AGW28DveNzzgmbXR+ENoPjUeIU= golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo= +golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20180524181706-dfa909b99c79/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190227160552-c95aed5357e7 h1:C2F/nMkR/9sfUTpvR3QrjBuTdvMUC/cFajkphs1YLQo= golang.org/x/net v0.0.0-20190227160552-c95aed5357e7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c h1:uOCk1iQW6Vc18bnC13MfzScl+wdKBmM9Y9kU7Z83/lw= +golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e h1:ZytStCyV048ZqDsWHiYDdoI2Vd4msMcrDECFxS+tL9c= golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190509141414-a5b02f93d862 h1:rM0ROo5vb9AdYJi1110yjWGMej9ITfKddS89P3Fkhug= +golang.org/x/sys v0.0.0-20190509141414-a5b02f93d862/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190509153222-73554e0f7805 h1:1ufBXAsTpUhSmmPXEEs5PrGQSfnBhsjAd2SmVhp9xrY= +golang.org/x/tools v0.0.0-20190509153222-73554e0f7805/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= diff --git a/message/message.go b/message/message.go index b9035d8f..8bddc509 100644 --- a/message/message.go +++ b/message/message.go @@ -13,9 +13,8 @@ import ( inet "github.com/libp2p/go-libp2p-net" ) -// TODO move message.go into the bitswap package -// TODO move bs/msg/internal/pb to bs/internal/pb and rename pb package to bitswap_pb - +// BitSwapMessage is the basic interface for interacting building, encoding, +// and decoding messages sent on the BitSwap protocol. type BitSwapMessage interface { // Wantlist returns a slice of unique keys that represent data wanted by // the sender. @@ -40,6 +39,8 @@ type BitSwapMessage interface { Loggable() map[string]interface{} } +// Exportable is an interface for structures than can be +// encoded in a bitswap protobuf. type Exportable interface { ToProtoV0() *pb.Message ToProtoV1() *pb.Message @@ -53,6 +54,7 @@ type impl struct { blocks map[cid.Cid]blocks.Block } +// New returns a new, empty bitswap message func New(full bool) BitSwapMessage { return newMsg(full) } @@ -65,6 +67,8 @@ func newMsg(full bool) *impl { } } +// Entry is an wantlist entry in a Bitswap message (along with whether it's an +// add or cancel). type Entry struct { wantlist.Entry Cancel bool @@ -163,11 +167,13 @@ func (m *impl) AddBlock(b blocks.Block) { m.blocks[b.Cid()] = b } +// FromNet generates a new BitswapMessage from incoming data on an io.Reader. func FromNet(r io.Reader) (BitSwapMessage, error) { pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax) return FromPBReader(pbr) } +// FromPBReader generates a new Bitswap message from a gogo-protobuf reader func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) { pb := new(pb.Message) if err := pbr.ReadMsg(pb); err != nil { diff --git a/network/interface.go b/network/interface.go index 2d2c9b19..1d7cdc74 100644 --- a/network/interface.go +++ b/network/interface.go @@ -12,10 +12,12 @@ import ( ) var ( - // These two are equivalent, legacy - ProtocolBitswapOne protocol.ID = "/ipfs/bitswap/1.0.0" + // ProtocolBitswapOne is the prefix for the legacy bitswap protocol + ProtocolBitswapOne protocol.ID = "/ipfs/bitswap/1.0.0" + // ProtocolBitswapNoVers is equivalent to the legacy bitswap protocol ProtocolBitswapNoVers protocol.ID = "/ipfs/bitswap" + // ProtocolBitswap is the current version of bitswap protocol, 1.1.0 ProtocolBitswap protocol.ID = "/ipfs/bitswap/1.1.0" ) @@ -38,18 +40,20 @@ type BitSwapNetwork interface { ConnectionManager() ifconnmgr.ConnManager - Stats() NetworkStats + Stats() Stats Routing } +// MessageSender is an interface for sending a series of messages over the bitswap +// network type MessageSender interface { SendMsg(context.Context, bsmsg.BitSwapMessage) error Close() error Reset() error } -// Implement Receiver to receive messages from the BitSwapNetwork. +// Receiver is an interface that can receive messages from the BitSwapNetwork. type Receiver interface { ReceiveMessage( ctx context.Context, @@ -63,6 +67,8 @@ type Receiver interface { PeerDisconnected(peer.ID) } +// Routing is an interface to providing and finding providers on a bitswap +// network. type Routing interface { // FindProvidersAsync returns a channel of providers for the given key. FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID @@ -71,10 +77,10 @@ type Routing interface { Provide(context.Context, cid.Cid) error } -// NetworkStats is a container for statistics about the bitswap network +// Stats is a container for statistics about the bitswap network // the numbers inside are specific to bitswap, and not any other protocols // using the same underlying network. -type NetworkStats struct { +type Stats struct { MessagesSent uint64 MessagesRecvd uint64 } diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index 8c2f5d68..ffb4800d 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -49,7 +49,7 @@ type impl struct { // inbound messages from the network are forwarded to the receiver receiver Receiver - stats NetworkStats + stats Stats } type streamMessageSender struct { @@ -201,8 +201,8 @@ func (bsnet *impl) ConnectionManager() ifconnmgr.ConnManager { return bsnet.host.ConnManager() } -func (bsnet *impl) Stats() NetworkStats { - return NetworkStats{ +func (bsnet *impl) Stats() Stats { + return Stats{ MessagesRecvd: atomic.LoadUint64(&bsnet.stats.MessagesRecvd), MessagesSent: atomic.LoadUint64(&bsnet.stats.MessagesSent), } diff --git a/notifications/notifications.go b/notifications/notifications.go index b29640be..0934fa5f 100644 --- a/notifications/notifications.go +++ b/notifications/notifications.go @@ -11,12 +11,16 @@ import ( const bufferSize = 16 +// PubSub is a simple interface for publishing blocks and being able to subscribe +// for cids. It's used internally by bitswap to decouple receiving blocks +// and actually providing them back to the GetBlocks caller. type PubSub interface { Publish(block blocks.Block) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block Shutdown() } +// New generates a new PubSub interface. func New() PubSub { return &impl{ wrapped: *pubsub.New(bufferSize), diff --git a/stat.go b/stat.go index 99b2def1..af39ecb2 100644 --- a/stat.go +++ b/stat.go @@ -6,6 +6,7 @@ import ( cid "github.com/ipfs/go-cid" ) +// Stat is a struct that provides various statistics on bitswap operations type Stat struct { ProvideBufLen int Wantlist []cid.Cid @@ -19,6 +20,7 @@ type Stat struct { MessagesReceived uint64 } +// Stat returns aggregated statistics about bitswap operations func (bs *Bitswap) Stat() (*Stat, error) { st := new(Stat) st.ProvideBufLen = len(bs.newBlocks) diff --git a/testutils.go b/testinstance/testinstance.go similarity index 55% rename from testutils.go rename to testinstance/testinstance.go index f9be6943..f459065f 100644 --- a/testutils.go +++ b/testinstance/testinstance.go @@ -1,11 +1,12 @@ -package bitswap +package testsession import ( "context" "time" + bitswap "github.com/ipfs/go-bitswap" + bsnet "github.com/ipfs/go-bitswap/network" tn "github.com/ipfs/go-bitswap/testnet" - ds "github.com/ipfs/go-datastore" delayed "github.com/ipfs/go-datastore/delayed" ds_sync "github.com/ipfs/go-datastore/sync" @@ -16,11 +17,12 @@ import ( testutil "github.com/libp2p/go-testutil" ) -// WARNING: this uses RandTestBogusIdentity DO NOT USE for NON TESTS! -func NewTestSessionGenerator( - net tn.Network) SessionGenerator { +// NewTestInstanceGenerator generates a new InstanceGenerator for the given +// testnet +func NewTestInstanceGenerator( + net tn.Network) InstanceGenerator { ctx, cancel := context.WithCancel(context.Background()) - return SessionGenerator{ + return InstanceGenerator{ net: net, seq: 0, ctx: ctx, // TODO take ctx as param to Next, Instances @@ -28,29 +30,32 @@ func NewTestSessionGenerator( } } -// TODO move this SessionGenerator to the core package and export it as the core generator -type SessionGenerator struct { +// InstanceGenerator generates new test instances of bitswap+dependencies +type InstanceGenerator struct { seq int net tn.Network ctx context.Context cancel context.CancelFunc } -func (g *SessionGenerator) Close() error { +// Close closes the clobal context, shutting down all test instances +func (g *InstanceGenerator) Close() error { g.cancel() return nil // for Closer interface } -func (g *SessionGenerator) Next() Instance { +// Next generates a new instance of bitswap + dependencies +func (g *InstanceGenerator) Next() Instance { g.seq++ p, err := p2ptestutil.RandTestBogusIdentity() if err != nil { panic("FIXME") // TODO change signature } - return MkSession(g.ctx, g.net, p) + return NewInstance(g.ctx, g.net, p) } -func (g *SessionGenerator) Instances(n int) []Instance { +// Instances creates N test instances of bitswap + dependencies +func (g *InstanceGenerator) Instances(n int) []Instance { var instances []Instance for j := 0; j < n; j++ { inst := g.Next() @@ -59,34 +64,38 @@ func (g *SessionGenerator) Instances(n int) []Instance { for i, inst := range instances { for j := i + 1; j < len(instances); j++ { oinst := instances[j] - inst.Exchange.network.ConnectTo(context.Background(), oinst.Peer) + inst.Adapter.ConnectTo(context.Background(), oinst.Peer) } } return instances } +// Instance is a test instance of bitswap + dependencies for integration testing type Instance struct { - Peer peer.ID - Exchange *Bitswap - blockstore blockstore.Blockstore - + Peer peer.ID + Exchange *bitswap.Bitswap + blockstore blockstore.Blockstore + Adapter bsnet.BitSwapNetwork blockstoreDelay delay.D } +// Blockstore returns the block store for this test instance func (i *Instance) Blockstore() blockstore.Blockstore { return i.blockstore } +// SetBlockstoreLatency customizes the artificial delay on receiving blocks +// from a blockstore test instance. func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { return i.blockstoreDelay.Set(t) } -// session creates a test bitswap instance. +// NewInstance creates a test bitswap instance. // // NB: It's easy make mistakes by providing the same peer ID to two different -// sessions. To safeguard, use the SessionGenerator to generate sessions. It's +// instances. To safeguard, use the InstanceGenerator to generate instances. It's // just a much better idea. -func MkSession(ctx context.Context, net tn.Network, p testutil.Identity) Instance { +func NewInstance(ctx context.Context, net tn.Network, p testutil.Identity) Instance { bsdelay := delay.Fixed(0) adapter := net.Adapter(p) @@ -99,9 +108,10 @@ func MkSession(ctx context.Context, net tn.Network, p testutil.Identity) Instanc panic(err.Error()) // FIXME perhaps change signature and return error. } - bs := New(ctx, adapter, bstore).(*Bitswap) + bs := bitswap.New(ctx, adapter, bstore).(*bitswap.Bitswap) return Instance{ + Adapter: adapter, Peer: p.ID(), Exchange: bs, blockstore: bstore, diff --git a/testnet/interface.go b/testnet/interface.go index ed7d4b1e..3441f69d 100644 --- a/testnet/interface.go +++ b/testnet/interface.go @@ -6,6 +6,8 @@ import ( "github.com/libp2p/go-testutil" ) +// Network is an interface for generating bitswap network interfaces +// based on a test network. type Network interface { Adapter(testutil.Identity) bsnet.BitSwapNetwork diff --git a/testnet/peernet.go b/testnet/peernet.go index dbad1f65..cea4b727 100644 --- a/testnet/peernet.go +++ b/testnet/peernet.go @@ -17,6 +17,7 @@ type peernet struct { routingserver mockrouting.Server } +// StreamNet is a testnet that uses libp2p's MockNet func StreamNet(ctx context.Context, net mockpeernet.Mocknet, rs mockrouting.Server) (Network, error) { return &peernet{net, rs}, nil } diff --git a/testnet/virtual.go b/testnet/virtual.go index e3af99d0..19cc47d3 100644 --- a/testnet/virtual.go +++ b/testnet/virtual.go @@ -24,6 +24,8 @@ import ( var log = logging.Logger("bstestnet") +// VirtualNetwork generates a new testnet instance - a fake network that +// is used to simulate sending messages. func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { return &network{ latencies: make(map[peer.ID]map[peer.ID]time.Duration), @@ -36,10 +38,13 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { } } +// RateLimitGenerator is an interface for generating rate limits across peers type RateLimitGenerator interface { NextRateLimit() float64 } +// RateLimitedVirtualNetwork generates a testnet instance where nodes are rate +// limited in the upload/download speed. func RateLimitedVirtualNetwork(rs mockrouting.Server, d delay.D, rateLimitGenerator RateLimitGenerator) Network { return &network{ latencies: make(map[peer.ID]map[peer.ID]time.Duration), @@ -168,7 +173,7 @@ type networkClient struct { bsnet.Receiver network *network routing routing.IpfsRouting - stats bsnet.NetworkStats + stats bsnet.Stats } func (nc *networkClient) SendMessage( @@ -182,8 +187,8 @@ func (nc *networkClient) SendMessage( return nil } -func (nc *networkClient) Stats() bsnet.NetworkStats { - return bsnet.NetworkStats{ +func (nc *networkClient) Stats() bsnet.Stats { + return bsnet.Stats{ MessagesRecvd: atomic.LoadUint64(&nc.stats.MessagesRecvd), MessagesSent: atomic.LoadUint64(&nc.stats.MessagesSent), } @@ -234,11 +239,11 @@ func (mp *messagePasser) Reset() error { return nil } -func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) { +func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) { return &messagePasser{ - net: n, + net: nc, target: p, - local: n.local, + local: nc.local, ctx: ctx, }, nil } diff --git a/testutil/testutil.go b/testutil/testutil.go index 87bd91d2..6f82fede 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -5,7 +5,7 @@ import ( bsmsg "github.com/ipfs/go-bitswap/message" "github.com/ipfs/go-bitswap/wantlist" - "github.com/ipfs/go-block-format" + blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" blocksutil "github.com/ipfs/go-ipfs-blocksutil" peer "github.com/libp2p/go-libp2p-peer" diff --git a/wantlist/wantlist.go b/wantlist/wantlist.go index 999fcd9e..b5c2a602 100644 --- a/wantlist/wantlist.go +++ b/wantlist/wantlist.go @@ -8,14 +8,18 @@ import ( cid "github.com/ipfs/go-cid" ) +// SessionTrackedWantlist is a list of wants that also track which bitswap +// sessions have requested them type SessionTrackedWantlist struct { set map[cid.Cid]*sessionTrackedEntry } +// Wantlist is a raw list of wanted blocks and their priorities type Wantlist struct { set map[cid.Cid]Entry } +// Entry is an entry in a want list, consisting of a cid and its priority type Entry struct { Cid cid.Cid Priority int @@ -40,12 +44,14 @@ func (es entrySlice) Len() int { return len(es) } func (es entrySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] } func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priority } +// NewSessionTrackedWantlist generates a new SessionTrackedWantList. func NewSessionTrackedWantlist() *SessionTrackedWantlist { return &SessionTrackedWantlist{ set: make(map[cid.Cid]*sessionTrackedEntry), } } +// New generates a new raw Wantlist func New() *Wantlist { return &Wantlist{ set: make(map[cid.Cid]Entry), @@ -116,6 +122,7 @@ func (w *SessionTrackedWantlist) Contains(k cid.Cid) (Entry, bool) { return e.Entry, true } +// Entries returns all wantlist entries for a given session tracked want list. func (w *SessionTrackedWantlist) Entries() []Entry { es := make([]Entry, 0, len(w.set)) for _, e := range w.set { @@ -124,16 +131,20 @@ func (w *SessionTrackedWantlist) Entries() []Entry { return es } +// SortedEntries returns wantlist entries ordered by priority. func (w *SessionTrackedWantlist) SortedEntries() []Entry { es := w.Entries() sort.Sort(entrySlice(es)) return es } +// Len returns the number of entries in a wantlist. func (w *SessionTrackedWantlist) Len() int { return len(w.set) } +// CopyWants copies all wants from one SessionTrackWantlist to another (along with +// the session data) func (w *SessionTrackedWantlist) CopyWants(to *SessionTrackedWantlist) { for _, e := range w.set { for k := range e.sesTrk { @@ -142,10 +153,12 @@ func (w *SessionTrackedWantlist) CopyWants(to *SessionTrackedWantlist) { } } +// Len returns the number of entries in a wantlist. func (w *Wantlist) Len() int { return len(w.set) } +// Add adds an entry in a wantlist from CID & Priority, if not already present. func (w *Wantlist) Add(c cid.Cid, priority int) bool { if _, ok := w.set[c]; ok { return false @@ -159,6 +172,7 @@ func (w *Wantlist) Add(c cid.Cid, priority int) bool { return true } +// AddEntry adds an entry to a wantlist if not already present. func (w *Wantlist) AddEntry(e Entry) bool { if _, ok := w.set[e.Cid]; ok { return false @@ -167,6 +181,7 @@ func (w *Wantlist) AddEntry(e Entry) bool { return true } +// Remove removes the given cid from the wantlist. func (w *Wantlist) Remove(c cid.Cid) bool { _, ok := w.set[c] if !ok { @@ -177,11 +192,14 @@ func (w *Wantlist) Remove(c cid.Cid) bool { return true } +// Contains returns the entry, if present, for the given CID, plus whether it +// was present. func (w *Wantlist) Contains(c cid.Cid) (Entry, bool) { e, ok := w.set[c] return e, ok } +// Entries returns all wantlist entries for a want list. func (w *Wantlist) Entries() []Entry { es := make([]Entry, 0, len(w.set)) for _, e := range w.set { @@ -190,6 +208,7 @@ func (w *Wantlist) Entries() []Entry { return es } +// SortedEntries returns wantlist entries ordered by priority. func (w *Wantlist) SortedEntries() []Entry { es := w.Entries() sort.Sort(entrySlice(es)) diff --git a/workers.go b/workers.go index 6e0bf037..4a6e91dd 100644 --- a/workers.go +++ b/workers.go @@ -11,9 +11,11 @@ import ( procctx "github.com/jbenet/goprocess/context" ) +// TaskWorkerCount is the total number of simultaneous threads sending +// outgoing messages var TaskWorkerCount = 8 -func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { +func (bs *Bitswap) startWorkers(ctx context.Context, px process.Process) { // Start up workers to handle requests from other nodes for the data on this node for i := 0; i < TaskWorkerCount; i++ {