Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bitswap sessions #3867

Merged
merged 11 commits into from
Jul 7, 2017
Prev Previous commit
Next Next commit
WIP: wire sessions up through into FetchGraph
License: MIT
Signed-off-by: Jeromy <[email protected]>
whyrusleeping committed Jul 5, 2017

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit b1247d3323d2c13fa451210fc2e3ea31177380a9
55 changes: 49 additions & 6 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
@@ -10,9 +10,10 @@ import (

"github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"

logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
)

@@ -31,6 +32,7 @@ type BlockService interface {
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
DeleteBlock(o blocks.Block) error
NewSession(context.Context) *Session
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a public interface and Session has private fields.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really didnt feel like making yet another only-implemented-once interface.

Copy link
Member

@Stebalien Stebalien Jul 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this just be a helper function?

func NewSession(ctx context.Context, bs BlockService) *Session {
	exchange := bs.Exchange()
	if bswap, ok := exchange.(*bitswap.Bitswap); ok {
		ses := bswap.NewSession(ctx)
		return &Session{
			ses: ses,
			bs:  bs.Blockstore(),
		}
	}
	return &Session{
		ses: exchange,
		bs:  bs.Blockstore(),
	}
}

(we could also make this non-bitswap specific by adding a NewSession(context.Context) exchange.Interface method to exchange.Interface but we don't need that now).

Close() error
}

@@ -77,6 +79,21 @@ func (bs *blockService) Exchange() exchange.Interface {
return bs.exchange
}

func (bs *blockService) NewSession(ctx context.Context) *Session {
bswap, ok := bs.Exchange().(*bitswap.Bitswap)
if ok {
ses := bswap.NewSession(ctx)
return &Session{
ses: ses,
bs: bs.blockstore,
}
}
return &Session{
ses: bs.exchange,
bs: bs.blockstore,
}
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
@@ -141,16 +158,25 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", c)

block, err := s.blockstore.Get(c)
var f exchange.Fetcher
if s.exchange != nil {
f = s.exchange
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed, can't we just pass s.exchange?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, because then the response wouldnt be nil (it would be a typed nil, which in interface form is != nil)


return getBlock(ctx, c, s.blockstore, f)
}

func getBlock(ctx context.Context, c *cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) (blocks.Block, error) {
block, err := bs.Get(c)
if err == nil {
return block, nil
}

if err == blockstore.ErrNotFound && s.exchange != nil {
if err == blockstore.ErrNotFound && f != nil {
// TODO be careful checking ErrNotFound. If the underlying
// implementation changes, this will break.
log.Debug("Blockservice: Searching bitswap")
blk, err := s.exchange.GetBlock(ctx, c)
blk, err := f.GetBlock(ctx, c)
if err != nil {
if err == blockstore.ErrNotFound {
return nil, ErrNotFound
@@ -172,12 +198,16 @@ func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block,
// the returned channel.
// NB: No guarantees are made about order.
func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
return getBlocks(ctx, ks, s.blockstore, s.exchange)
}

func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) <-chan blocks.Block {
out := make(chan blocks.Block)
go func() {
defer close(out)
var misses []*cid.Cid
for _, c := range ks {
hit, err := s.blockstore.Get(c)
hit, err := bs.Get(c)
if err != nil {
misses = append(misses, c)
continue
@@ -194,7 +224,7 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
return
}

rblocks, err := s.exchange.GetBlocks(ctx, misses)
rblocks, err := f.GetBlocks(ctx, misses)
if err != nil {
log.Debugf("Error with GetBlocks: %s", err)
return
@@ -220,3 +250,16 @@ func (s *blockService) Close() error {
log.Debug("blockservice is shutting down...")
return s.exchange.Close()
}

type Session struct {
bs blockstore.Blockstore
ses exchange.Fetcher
}

func (s *Session) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
return getBlock(ctx, c, s.bs, s.ses)
}

func (s *Session) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
return getBlocks(ctx, ks, s.bs, s.ses)
}
1 change: 0 additions & 1 deletion exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@ import (
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
2 changes: 1 addition & 1 deletion exchange/bitswap/get.go
Original file line number Diff line number Diff line change
@@ -4,9 +4,9 @@ import (
"context"
"errors"

blocks "github.com/ipfs/go-ipfs/blocks"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"

cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
)
2 changes: 1 addition & 1 deletion exchange/bitswap/session.go
Original file line number Diff line number Diff line change
@@ -4,8 +4,8 @@ import (
"context"
"time"

blocks "github.com/ipfs/go-ipfs/blocks"
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"

logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
2 changes: 1 addition & 1 deletion exchange/bitswap/session_test.go
Original file line number Diff line number Diff line change
@@ -6,8 +6,8 @@ import (
"testing"
"time"

blocks "github.com/ipfs/go-ipfs/blocks"
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"

cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spacing

)
2 changes: 1 addition & 1 deletion exchange/bitswap/wantlist/wantlist_test.go
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ package wantlist
import (
"testing"

cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
)

var testcids []*cid.Cid
11 changes: 7 additions & 4 deletions exchange/interface.go
Original file line number Diff line number Diff line change
@@ -13,10 +13,7 @@ import (
// Any type that implements exchange.Interface may be used as an IPFS block
// exchange protocol.
type Interface interface { // type Exchanger interface
// GetBlock returns the block associated with a given key.
GetBlock(context.Context, *cid.Cid) (blocks.Block, error)

GetBlocks(context.Context, []*cid.Cid) (<-chan blocks.Block, error)
Fetcher

// TODO Should callers be concerned with whether the block was made
// available on the network?
@@ -26,3 +23,9 @@ type Interface interface { // type Exchanger interface

io.Closer
}

type Fetcher interface {
// GetBlock returns the block associated with a given key.
GetBlock(context.Context, *cid.Cid) (blocks.Block, error)
GetBlocks(context.Context, []*cid.Cid) (<-chan blocks.Block, error)
}
23 changes: 21 additions & 2 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
@@ -161,11 +161,30 @@ func GetLinksDirect(serv node.NodeGetter) GetLinks {
}
}

type sesGetter struct {
bs *bserv.Session
}

func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
blk, err := sg.bs.GetBlock(ctx, c)
if err != nil {
return nil, err
}

return decodeBlock(blk)
}

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
var ng node.NodeGetter = serv
ds, ok := serv.(*dagService)
if ok {
ng = &sesGetter{ds.Blocks.NewSession(ctx)}
}

v, _ := ctx.Value("progress").(*ProgressTracker)
if v == nil {
return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, cid.NewSet().Visit)
return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit)
}
set := cid.NewSet()
visit := func(c *cid.Cid) bool {
@@ -176,7 +195,7 @@ func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
return false
}
}
return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, visit)
return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit)
}

// FindLinks searches this nodes links for the given key,