Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

write blocks retrieved from the exchange to the blockstore #92

Merged
merged 9 commits into from
Jul 28, 2022
159 changes: 115 additions & 44 deletions blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (
"go.opentelemetry.io/otel/trace"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice/internal"
cid "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-verifcid"

"github.com/ipfs/go-blockservice/internal"
)

var logger = logging.Logger("blockservice")
Expand Down Expand Up @@ -84,7 +85,7 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
}
}

// NewWriteThrough ceates a BlockService that guarantees writes will go
// NewWriteThrough creates a BlockService that guarantees writes will go
// through to the blockstore and are not skipped by cache checks.
func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
if rem == nil {
Expand Down Expand Up @@ -117,21 +118,22 @@ func NewSession(ctx context.Context, bs BlockService) *Session {
exch := bs.Exchange()
if sessEx, ok := exch.(exchange.SessionExchange); ok {
return &Session{
sessCtx: ctx,
ses: nil,
sessEx: sessEx,
bs: bs.Blockstore(),
sessCtx: ctx,
ses: nil,
sessEx: sessEx,
bs: bs.Blockstore(),
notifier: exch,
}
}
return &Session{
ses: exch,
sessCtx: ctx,
bs: bs.Blockstore(),
ses: exch,
sessCtx: ctx,
bs: bs.Blockstore(),
notifier: exch,
}
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
ctx, span := internal.StartSpan(ctx, "blockService.AddBlock")
defer span.End()
Expand All @@ -155,8 +157,8 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
logger.Debugf("BlockService.BlockAdded %s", c)

if s.exchange != nil {
if err := s.exchange.HasBlock(ctx, o); err != nil {
logger.Errorf("HasBlock: %s", err.Error())
if err := s.exchange.NotifyNewBlocks(ctx, o); err != nil {
logger.Errorf("NotifyNewBlocks: %s", err.Error())
}
}

Expand Down Expand Up @@ -200,11 +202,9 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
}

if s.exchange != nil {
for _, o := range toput {
logger.Debugf("BlockService.BlockAdded %s", o.Cid())
if err := s.exchange.HasBlock(ctx, o); err != nil {
logger.Errorf("HasBlock: %s", err.Error())
}
logger.Debugf("BlockService.BlockAdded %d blocks", len(toput))
if err := s.exchange.NotifyNewBlocks(ctx, toput...); err != nil {
logger.Errorf("NotifyNewBlocks: %s", err.Error())
}
}
return nil
Expand All @@ -216,19 +216,19 @@ func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, e
ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

var f func() exchange.Fetcher
var f func() notifiableFetcher
if s.exchange != nil {
f = s.getExchange
}

return getBlock(ctx, c, s.blockstore, f) // hash security
}

func (s *blockService) getExchange() exchange.Fetcher {
func (s *blockService) getExchange() notifiableFetcher {
return s.exchange
}

func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget func() exchange.Fetcher) (blocks.Block, error) {
func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget func() notifiableFetcher) (blocks.Block, error) {
err := verifcid.ValidateCid(c) // hash security
if err != nil {
return nil, err
Expand All @@ -249,6 +249,15 @@ func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget fun
if err != nil {
return nil, err
}
// also write in the blockstore for caching, inform the exchange that the block is available
err = bs.Put(ctx, blk)
Jorropo marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
err = f.NotifyNewBlocks(ctx, blk)
if err != nil {
return nil, err
}
logger.Debugf("BlockService.BlockFetched %s", c)
return blk, nil
}
Expand All @@ -264,15 +273,15 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan block
ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks")
defer span.End()

var f func() exchange.Fetcher
var f func() notifiableFetcher
if s.exchange != nil {
f = s.getExchange
}

return getBlocks(ctx, ks, s.blockstore, f) // hash security
}

func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget func() exchange.Fetcher) <-chan blocks.Block {
func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget func() notifiableFetcher) <-chan blocks.Block {
out := make(chan blocks.Block)

go func() {
Expand Down Expand Up @@ -324,13 +333,53 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget
return
}

for b := range rblocks {
logger.Debugf("BlockService.BlockFetched %s", b.Cid())
select {
case out <- b:
case <-ctx.Done():
// batch available blocks together
const batchSize = 32
batch := make([]blocks.Block, 0, batchSize)
for {
var noMoreBlocks bool
batchLoop:
for len(batch) < batchSize {
select {
case b, ok := <-rblocks:
if !ok {
noMoreBlocks = true
break batchLoop
}

logger.Debugf("BlockService.BlockFetched %s", b.Cid())
batch = append(batch, b)
case <-ctx.Done():
return
default:
break batchLoop
}
}

// also write in the blockstore for caching, inform the exchange that the blocks are available
err = bs.PutMany(ctx, batch)
if err != nil {
logger.Errorf("could not write blocks from the network to the blockstore: %s", err)
return
}

err = f.NotifyNewBlocks(ctx, batch...)
if err != nil {
logger.Errorf("could not tell the exchange about new blocks: %s", err)
return
}

for _, b := range batch {
select {
case out <- b:
case <-ctx.Done():
return
}
}
batch = batch[:0]
if noMoreBlocks {
break
}
}
}()
return out
Expand All @@ -353,47 +402,69 @@ func (s *blockService) Close() error {
return s.exchange.Close()
}

type notifier interface {
NotifyNewBlocks(context.Context, ...blocks.Block) error
}

// Session is a helper type to provide higher level access to bitswap sessions
type Session struct {
bs blockstore.Blockstore
ses exchange.Fetcher
sessEx exchange.SessionExchange
sessCtx context.Context
lk sync.Mutex
bs blockstore.Blockstore
ses exchange.Fetcher
sessEx exchange.SessionExchange
sessCtx context.Context
notifier notifier
lk sync.Mutex
}

func (s *Session) getSession() exchange.Fetcher {
type notifiableFetcher interface {
exchange.Fetcher
notifier
}

type notifiableFetcherWrapper struct {
exchange.Fetcher
notifier
}

func (s *Session) getSession() notifiableFetcher {
s.lk.Lock()
defer s.lk.Unlock()
if s.ses == nil {
s.ses = s.sessEx.NewSession(s.sessCtx)
}

return s.ses
return notifiableFetcherWrapper{s.ses, s.notifier}
}

func (s *Session) getExchange() notifiableFetcher {
return notifiableFetcherWrapper{s.ses, s.notifier}
}

func (s *Session) getFetcherFactory() func() notifiableFetcher {
if s.sessEx != nil {
return s.getSession
}
if s.ses != nil {
// Our exchange isn't session compatible, let's fallback to non sessions fetches
return s.getExchange
}
return nil
}

// GetBlock gets a block in the context of a request session
func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

var f func() exchange.Fetcher
if s.sessEx != nil {
f = s.getSession
}
return getBlock(ctx, c, s.bs, f) // hash security
return getBlock(ctx, c, s.bs, s.getFetcherFactory()) // hash security
}

// GetBlocks gets blocks in the context of a request session
func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
ctx, span := internal.StartSpan(ctx, "Session.GetBlocks")
defer span.End()

var f func() exchange.Fetcher
if s.sessEx != nil {
f = s.getSession
}
return getBlocks(ctx, ks, s.bs, f) // hash security
return getBlocks(ctx, ks, s.bs, s.getFetcherFactory()) // hash security
}

var _ BlockGetter = (*Session)(nil)
Loading