Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
feat: plumb through context changes
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert committed Nov 19, 2021
1 parent aef743c commit 9537994
Show file tree
Hide file tree
Showing 6 changed files with 1,005 additions and 182 deletions.
44 changes: 22 additions & 22 deletions filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,13 @@ func (f *Filestore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
// blockstore. As expected, in the case of FileManager blocks, only the
// reference is deleted, not its contents. It may return
// ErrNotFound when the block is not stored.
func (f *Filestore) DeleteBlock(c cid.Cid) error {
err1 := f.bs.DeleteBlock(c)
func (f *Filestore) DeleteBlock(ctx context.Context, c cid.Cid) error {
err1 := f.bs.DeleteBlock(ctx, c)
if err1 != nil && err1 != blockstore.ErrNotFound {
return err1
}

err2 := f.fm.DeleteBlock(c)
err2 := f.fm.DeleteBlock(ctx, c)
// if we successfully removed something from the blockstore, but the
// filestore didnt have it, return success

Expand All @@ -140,36 +140,36 @@ func (f *Filestore) DeleteBlock(c cid.Cid) error {

// Get retrieves the block with the given Cid. It may return
// ErrNotFound when the block is not stored.
func (f *Filestore) Get(c cid.Cid) (blocks.Block, error) {
blk, err := f.bs.Get(c)
func (f *Filestore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
blk, err := f.bs.Get(ctx, c)
switch err {
case nil:
return blk, nil
case blockstore.ErrNotFound:
return f.fm.Get(c)
return f.fm.Get(ctx, c)
default:
return nil, err
}
}

// GetSize returns the size of the requested block. It may return ErrNotFound
// when the block is not stored.
func (f *Filestore) GetSize(c cid.Cid) (int, error) {
size, err := f.bs.GetSize(c)
func (f *Filestore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
size, err := f.bs.GetSize(ctx, c)
switch err {
case nil:
return size, nil
case blockstore.ErrNotFound:
return f.fm.GetSize(c)
return f.fm.GetSize(ctx, c)
default:
return -1, err
}
}

// Has returns true if the block with the given Cid is
// stored in the Filestore.
func (f *Filestore) Has(c cid.Cid) (bool, error) {
has, err := f.bs.Has(c)
func (f *Filestore) Has(ctx context.Context, c cid.Cid) (bool, error) {
has, err := f.bs.Has(ctx, c)
if err != nil {
return false, err
}
Expand All @@ -178,15 +178,15 @@ func (f *Filestore) Has(c cid.Cid) (bool, error) {
return true, nil
}

return f.fm.Has(c)
return f.fm.Has(ctx, c)
}

// Put stores a block in the Filestore. For blocks of
// underlying type FilestoreNode, the operation is
// delegated to the FileManager, while the rest of blocks
// are handled by the regular blockstore.
func (f *Filestore) Put(b blocks.Block) error {
has, err := f.Has(b.Cid())
func (f *Filestore) Put(ctx context.Context, b blocks.Block) error {
has, err := f.Has(ctx, b.Cid())
if err != nil {
return err
}
Expand All @@ -197,20 +197,20 @@ func (f *Filestore) Put(b blocks.Block) error {

switch b := b.(type) {
case *posinfo.FilestoreNode:
return f.fm.Put(b)
return f.fm.Put(ctx, b)
default:
return f.bs.Put(b)
return f.bs.Put(ctx, b)
}
}

// PutMany is like Put(), but takes a slice of blocks, allowing
// the underlying blockstore to perform batch transactions.
func (f *Filestore) PutMany(bs []blocks.Block) error {
func (f *Filestore) PutMany(ctx context.Context, bs []blocks.Block) error {
var normals []blocks.Block
var fstores []*posinfo.FilestoreNode

for _, b := range bs {
has, err := f.Has(b.Cid())
has, err := f.Has(ctx, b.Cid())
if err != nil {
return err
}
Expand All @@ -228,14 +228,14 @@ func (f *Filestore) PutMany(bs []blocks.Block) error {
}

if len(normals) > 0 {
err := f.bs.PutMany(normals)
err := f.bs.PutMany(ctx, normals)
if err != nil {
return err
}
}

if len(fstores) > 0 {
err := f.fm.PutMany(fstores)
err := f.fm.PutMany(ctx, fstores)
if err != nil {
return err
}
Expand All @@ -244,8 +244,8 @@ func (f *Filestore) PutMany(bs []blocks.Block) error {
}

// HashOnRead calls blockstore.HashOnRead.
func (f *Filestore) HashOnRead(enabled bool) {
f.bs.HashOnRead(enabled)
func (f *Filestore) HashOnRead(ctx context.Context, enabled bool) {
f.bs.HashOnRead(ctx, enabled)
}

var _ blockstore.Blockstore = (*Filestore)(nil)
12 changes: 7 additions & 5 deletions filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
posinfo "github.com/ipfs/go-ipfs-posinfo"
)

var bg = context.Background()

func newTestFilestore(t *testing.T) (string, *Filestore) {
mds := ds.NewMapDatastore()

Expand Down Expand Up @@ -65,15 +67,15 @@ func TestBasicFilestore(t *testing.T) {
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}

err := fs.Put(n)
err := fs.Put(bg, n)
if err != nil {
t.Fatal(err)
}
cids = append(cids, n.Node.Cid())
}

for i, c := range cids {
blk, err := fs.Get(c)
blk, err := fs.Get(bg, c)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -122,7 +124,7 @@ func randomFileAdd(t *testing.T, fs *Filestore, dir string, size int) (string, [
},
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}
err := fs.Put(n)
err := fs.Put(bg, n)
if err != nil {
t.Fatal(err)
}
Expand All @@ -137,15 +139,15 @@ func TestDeletes(t *testing.T) {
_, cids := randomFileAdd(t, fs, dir, 100)
todelete := cids[:4]
for _, c := range todelete {
err := fs.DeleteBlock(c)
err := fs.DeleteBlock(bg, c)
if err != nil {
t.Fatal(err)
}
}

deleted := make(map[string]bool)
for _, c := range todelete {
_, err := fs.Get(c)
_, err := fs.Get(bg, c)
if err != blockstore.ErrNotFound {
t.Fatal("expected blockstore not found error")
}
Expand Down
50 changes: 25 additions & 25 deletions fsrefstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewFileManager(ds ds.Batching, root string) *FileManager {
func (f *FileManager) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
q := dsq.Query{KeysOnly: true}

res, err := f.ds.Query(q)
res, err := f.ds.Query(ctx, q)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -100,8 +100,8 @@ func (f *FileManager) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {

// DeleteBlock deletes the reference-block from the underlying
// datastore. It does not touch the referenced data.
func (f *FileManager) DeleteBlock(c cid.Cid) error {
err := f.ds.Delete(dshelp.MultihashToDsKey(c.Hash()))
func (f *FileManager) DeleteBlock(ctx context.Context, c cid.Cid) error {
err := f.ds.Delete(ctx, dshelp.MultihashToDsKey(c.Hash()))
if err == ds.ErrNotFound {
return blockstore.ErrNotFound
}
Expand All @@ -112,12 +112,12 @@ func (f *FileManager) DeleteBlock(c cid.Cid) error {
// is done in two steps: the first step retrieves the reference
// block from the datastore. The second step uses the stored
// path and offsets to read the raw block data directly from disk.
func (f *FileManager) Get(c cid.Cid) (blocks.Block, error) {
dobj, err := f.getDataObj(c.Hash())
func (f *FileManager) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
dobj, err := f.getDataObj(ctx, c.Hash())
if err != nil {
return nil, err
}
out, err := f.readDataObj(c.Hash(), dobj)
out, err := f.readDataObj(ctx, c.Hash(), dobj)
if err != nil {
return nil, err
}
Expand All @@ -129,23 +129,23 @@ func (f *FileManager) Get(c cid.Cid) (blocks.Block, error) {
//
// This method may successfully return the size even if returning the block
// would fail because the associated file is no longer available.
func (f *FileManager) GetSize(c cid.Cid) (int, error) {
dobj, err := f.getDataObj(c.Hash())
func (f *FileManager) GetSize(ctx context.Context, c cid.Cid) (int, error) {
dobj, err := f.getDataObj(ctx, c.Hash())
if err != nil {
return -1, err
}
return int(dobj.GetSize_()), nil
}

func (f *FileManager) readDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, error) {
func (f *FileManager) readDataObj(ctx context.Context, m mh.Multihash, d *pb.DataObj) ([]byte, error) {
if IsURL(d.GetFilePath()) {
return f.readURLDataObj(m, d)
return f.readURLDataObj(ctx, m, d)
}
return f.readFileDataObj(m, d)
}

func (f *FileManager) getDataObj(m mh.Multihash) (*pb.DataObj, error) {
o, err := f.ds.Get(dshelp.MultihashToDsKey(m))
func (f *FileManager) getDataObj(ctx context.Context, m mh.Multihash) (*pb.DataObj, error) {
o, err := f.ds.Get(ctx, dshelp.MultihashToDsKey(m))
switch err {
case ds.ErrNotFound:
return nil, blockstore.ErrNotFound
Expand Down Expand Up @@ -213,12 +213,12 @@ func (f *FileManager) readFileDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, er
}

// reads and verifies the block from URL
func (f *FileManager) readURLDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, error) {
func (f *FileManager) readURLDataObj(ctx context.Context, m mh.Multihash, d *pb.DataObj) ([]byte, error) {
if !f.AllowUrls {
return nil, ErrUrlstoreNotEnabled
}

req, err := http.NewRequest("GET", d.GetFilePath(), nil)
req, err := http.NewRequestWithContext(ctx, "GET", d.GetFilePath(), nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -261,24 +261,24 @@ func (f *FileManager) readURLDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, err

// Has returns if the FileManager is storing a block reference. It does not
// validate the data, nor checks if the reference is valid.
func (f *FileManager) Has(c cid.Cid) (bool, error) {
func (f *FileManager) Has(ctx context.Context, c cid.Cid) (bool, error) {
// NOTE: interesting thing to consider. Has doesnt validate the data.
// So the data on disk could be invalid, and we could think we have it.
dsk := dshelp.MultihashToDsKey(c.Hash())
return f.ds.Has(dsk)
return f.ds.Has(ctx, dsk)
}

type putter interface {
Put(ds.Key, []byte) error
Put(context.Context, ds.Key, []byte) error
}

// Put adds a new reference block to the FileManager. It does not check
// that the reference is valid.
func (f *FileManager) Put(b *posinfo.FilestoreNode) error {
return f.putTo(b, f.ds)
func (f *FileManager) Put(ctx context.Context, b *posinfo.FilestoreNode) error {
return f.putTo(ctx, b, f.ds)
}

func (f *FileManager) putTo(b *posinfo.FilestoreNode, to putter) error {
func (f *FileManager) putTo(ctx context.Context, b *posinfo.FilestoreNode, to putter) error {
var dobj pb.DataObj

if IsURL(b.PosInfo.FullPath) {
Expand Down Expand Up @@ -310,24 +310,24 @@ func (f *FileManager) putTo(b *posinfo.FilestoreNode, to putter) error {
return err
}

return to.Put(dshelp.MultihashToDsKey(b.Cid().Hash()), data)
return to.Put(ctx, dshelp.MultihashToDsKey(b.Cid().Hash()), data)
}

// PutMany is like Put() but takes a slice of blocks instead,
// allowing it to create a batch transaction.
func (f *FileManager) PutMany(bs []*posinfo.FilestoreNode) error {
batch, err := f.ds.Batch()
func (f *FileManager) PutMany(ctx context.Context, bs []*posinfo.FilestoreNode) error {
batch, err := f.ds.Batch(ctx)
if err != nil {
return err
}

for _, b := range bs {
if err := f.putTo(b, batch); err != nil {
if err := f.putTo(ctx, b, batch); err != nil {
return err
}
}

return batch.Commit()
return batch.Commit(ctx)
}

// IsURL returns true if the string represents a valid URL that the
Expand Down
18 changes: 9 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ module github.com/ipfs/go-filestore
go 1.16

require (
github.com/gogo/protobuf v1.3.1
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-cid v0.0.5
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-ipfs-blockstore v1.0.0
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/gogo/protobuf v1.3.2
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.5.0
github.com/ipfs/go-ipfs-blockstore v1.1.0
github.com/ipfs/go-ipfs-ds-help v1.1.0
github.com/ipfs/go-ipfs-posinfo v0.0.1
github.com/ipfs/go-log v1.0.2
github.com/ipfs/go-merkledag v0.3.1
github.com/multiformats/go-multihash v0.0.13
github.com/ipfs/go-log v1.0.5
github.com/ipfs/go-merkledag v0.5.1
github.com/multiformats/go-multihash v0.0.15
)
Loading

0 comments on commit 9537994

Please sign in to comment.