Skip to content

Commit

Permalink
bitswap: redo: don't re-provide blocks we've provided very recently
Browse files Browse the repository at this point in the history
Redo how pull request #3105 is implemented.  Instead of calling Has()
is the blockservice modify the blockstore interface to return the
blocks actually added.

License: MIT
Signed-off-by: Kevin Atkinson <[email protected]>
  • Loading branch information
kevina committed Sep 23, 2016
1 parent 300187a commit 12c9078
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 75 deletions.
16 changes: 8 additions & 8 deletions blocks/blockstore/arc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ func (b *arccache) Get(k key.Key) (blocks.Block, error) {
return bl, err
}

func (b *arccache) Put(bl blocks.Block) error {
func (b *arccache) Put(bl blocks.Block) (error, blocks.Block) {
if has, ok := b.hasCached(bl.Key()); ok && has {
return nil
return nil, nil
}

err := b.blockstore.Put(bl)
err, added := b.blockstore.Put(bl)
if err == nil {
b.arc.Add(bl.Key(), true)
}
return err
return err, added
}

func (b *arccache) PutMany(bs []blocks.Block) error {
func (b *arccache) PutMany(bs []blocks.Block) (error, []blocks.Block) {
var good []blocks.Block
for _, block := range bs {
// call put on block if result is inconclusive or we are sure that
Expand All @@ -112,14 +112,14 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
good = append(good, block)
}
}
err := b.blockstore.PutMany(good)
err, added := b.blockstore.PutMany(good)
if err != nil {
return err
return err, nil
}
for _, block := range good {
b.arc.Add(block.Key(), true)
}
return nil
return nil, added
}

func (b *arccache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
Expand Down
4 changes: 2 additions & 2 deletions blocks/blockstore/arc_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestHasRequestTriggersCache(t *testing.T) {
}

untrap(cd)
err := arc.Put(exampleBlock)
err, _ := arc.Put(exampleBlock)
if err != nil {
t.Fatal(err)
}
Expand All @@ -112,7 +112,7 @@ func TestGetFillsCache(t *testing.T) {

untrap(cd)

if err := arc.Put(exampleBlock); err != nil {
if err, _ := arc.Put(exampleBlock); err != nil {
t.Fatal(err)
}

Expand Down
26 changes: 16 additions & 10 deletions blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ type Blockstore interface {
DeleteBlock(key.Key) error
Has(key.Key) (bool, error)
Get(key.Key) (blocks.Block, error)
Put(blocks.Block) error
PutMany([]blocks.Block) error

// Put and PutMany return the blocks(s) actually added to the
// blockstore. If a block already exists it will not be returned.

Put(blocks.Block) (error, blocks.Block)
PutMany([]blocks.Block) (error, []blocks.Block)

AllKeysChan(ctx context.Context) (<-chan key.Key, error)
}
Expand Down Expand Up @@ -109,23 +113,24 @@ func (bs *blockstore) Get(k key.Key) (blocks.Block, error) {
}
}

func (bs *blockstore) Put(block blocks.Block) error {
func (bs *blockstore) Put(block blocks.Block) (error, blocks.Block) {
k := block.Key().DsKey()

// Has is cheaper than Put, so see if we already have it
exists, err := bs.datastore.Has(k)
if err == nil && exists {
return nil // already stored.
return nil, nil // already stored.
}
return bs.datastore.Put(k, block.RawData())
return bs.datastore.Put(k, block.RawData()), block
}

func (bs *blockstore) PutMany(blocks []blocks.Block) error {
func (bs *blockstore) PutMany(blks []blocks.Block) (error, []blocks.Block) {
t, err := bs.datastore.Batch()
if err != nil {
return err
return err, nil
}
for _, b := range blocks {
added := make([]blocks.Block, 0, len(blks))
for _, b := range blks {
k := b.Key().DsKey()
exists, err := bs.datastore.Has(k)
if err == nil && exists {
Expand All @@ -134,10 +139,11 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error {

err = t.Put(k, b.RawData())
if err != nil {
return err
return err, nil
}
added = append(added, b)
}
return t.Commit()
return t.Commit(), added
}

func (bs *blockstore) Has(k key.Key) (bool, error) {
Expand Down
4 changes: 2 additions & 2 deletions blocks/blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPutThenGetBlock(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
block := blocks.NewBlock([]byte("some data"))

err := bs.Put(block)
err, _ := bs.Put(block)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func newBlockStoreWithKeys(t *testing.T, d ds.Datastore, N int) (Blockstore, []k
keys := make([]key.Key, N)
for i := 0; i < N; i++ {
block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
err := bs.Put(block)
err, _ := bs.Put(block)
if err != nil {
t.Fatal(err)
}
Expand Down
16 changes: 8 additions & 8 deletions blocks/blockstore/bloom_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,31 +140,31 @@ func (b *bloomcache) Get(k key.Key) (blocks.Block, error) {
return b.blockstore.Get(k)
}

func (b *bloomcache) Put(bl blocks.Block) error {
func (b *bloomcache) Put(bl blocks.Block) (error, blocks.Block) {
if has, ok := b.hasCached(bl.Key()); ok && has {
return nil
return nil, nil
}

err := b.blockstore.Put(bl)
err, added := b.blockstore.Put(bl)
if err == nil {
b.bloom.AddTS([]byte(bl.Key()))
}
return err
return err, added
}

func (b *bloomcache) PutMany(bs []blocks.Block) error {
func (b *bloomcache) PutMany(bs []blocks.Block) (error, []blocks.Block) {
// bloom cache gives only conclusive resulty if key is not contained
// to reduce number of puts we need conclusive infomration if block is contained
// this means that PutMany can't be improved with bloom cache so we just
// just do a passthrough.
err := b.blockstore.PutMany(bs)
err, added := b.blockstore.PutMany(bs)
if err != nil {
return err
return err, nil
}
for _, bl := range bs {
b.bloom.AddTS([]byte(bl.Key()))
}
return nil
return nil, added
}

func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
Expand Down
52 changes: 12 additions & 40 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package blockservice

import (
"errors"
"fmt"

blocks "github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
Expand Down Expand Up @@ -51,67 +50,40 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {
// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *BlockService) AddObject(o Object) (*cid.Cid, error) {
// TODO: while this is a great optimization, we should think about the
// possibility of streaming writes directly to disk. If we can pass this object
// all the way down to the datastore without having to 'buffer' its data,
// we could implement a `WriteTo` method on it that could do a streaming write
// of the content, saving us (probably) considerable memory.
c := o.Cid()
has, err := s.Blockstore.Has(key.Key(c.Hash()))
err, added := s.Blockstore.Put(o)
if err != nil {
return nil, err
}

if has {
return c, nil
}

err = s.Blockstore.Put(o)
if err != nil {
return nil, err
if added == nil {
return o.Cid(), nil
}

if err := s.Exchange.HasBlock(o); err != nil {
return nil, errors.New("blockservice is closed")
}

return c, nil
return o.Cid(), nil
}

func (s *BlockService) AddObjects(bs []Object) ([]*cid.Cid, error) {
var toput []blocks.Block
var toputcids []*cid.Cid
cids := make([]*cid.Cid, 0, len(bs))
blks := make([]blocks.Block, 0, len(bs))
for _, b := range bs {
c := b.Cid()

has, err := s.Blockstore.Has(key.Key(c.Hash()))
if err != nil {
return nil, err
}

if has {
continue
}

toput = append(toput, b)
toputcids = append(toputcids, c)
cids = append(cids, b.Cid())
blks = append(blks, b)
}

err := s.Blockstore.PutMany(toput)
err, added := s.Blockstore.PutMany(blks)
if err != nil {
return nil, err
}

var ks []*cid.Cid
for _, o := range toput {
for _, o := range added {
if err := s.Exchange.HasBlock(o); err != nil {
return nil, fmt.Errorf("blockservice is closed (%s)", err)
return nil, errors.New("blockservice is closed")
}

c := o.(Object).Cid() // cast is safe, we created these
ks = append(ks, c)
}
return ks, nil
return cids, nil
}

// GetBlock retrieves a particular block from the service,
Expand Down
2 changes: 1 addition & 1 deletion exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
default:
}

err := bs.blockstore.Put(blk)
err,_ := bs.blockstore.Put(blk)
if err != nil {
log.Errorf("Error writing block to datastore: %s", err)
return err
Expand Down
2 changes: 1 addition & 1 deletion exchange/bitswap/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestPartnerWantsThenCancels(t *testing.T) {
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
for _, letter := range alphabet {
block := blocks.NewBlock([]byte(letter))
if err := bs.Put(block); err != nil {
if err, _ := bs.Put(block); err != nil {
t.Fatal(err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion exchange/offline/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func (e *offlineExchange) GetBlock(_ context.Context, k key.Key) (blocks.Block,

// HasBlock always returns nil.
func (e *offlineExchange) HasBlock(b blocks.Block) error {
return e.bs.Put(b)
err, _ := e.bs.Put(b)
return err
}

// Close always returns nil.
Expand Down
4 changes: 2 additions & 2 deletions test/integration/bitswap_wo_routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestBitswapWithoutRouting(t *testing.T) {
block1 := blocks.NewBlock([]byte("block1"))

// put 1 before
if err := nodes[0].Blockstore.Put(block0); err != nil {
if err, _ := nodes[0].Blockstore.Put(block0); err != nil {
t.Fatal(err)
}

Expand All @@ -81,7 +81,7 @@ func TestBitswapWithoutRouting(t *testing.T) {
}

// put 1 after
if err := nodes[1].Blockstore.Put(block1); err != nil {
if err, _ := nodes[1].Blockstore.Put(block1); err != nil {
t.Fatal(err)
}

Expand Down

0 comments on commit 12c9078

Please sign in to comment.