Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement go-ipld-prime readable and writable storage API (v2/storage) #363

Merged
merged 9 commits into from
Feb 8, 2023
177 changes: 36 additions & 141 deletions v2/blockstore/readonly.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package blockstore

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -15,9 +14,8 @@ import (
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/index"
"github.com/ipld/go-car/v2/internal/carv1"
"github.com/ipld/go-car/v2/internal/carv1/util"
internalio "github.com/ipld/go-car/v2/internal/io"
"github.com/multiformats/go-multihash"
"github.com/ipld/go-car/v2/internal/store"
"github.com/multiformats/go-varint"
"golang.org/x/exp/mmap"
)
Expand Down Expand Up @@ -61,29 +59,7 @@ type contextKey string

const asyncErrHandlerKey contextKey = "asyncErrorHandlerKey"

// UseWholeCIDs is a read option which makes a CAR blockstore identify blocks by
// whole CIDs, and not just their multihashes. The default is to use
// multihashes, which matches the current semantics of go-ipfs-blockstore v1.
//
// Enabling this option affects a number of methods, including read-only ones:
//
// • Get, Has, and HasSize will only return a block
// only if the entire CID is present in the CAR file.
//
// • AllKeysChan will return the original whole CIDs, instead of with their
// multicodec set to "raw" to just provide multihashes.
//
// • If AllowDuplicatePuts isn't set,
// Put and PutMany will deduplicate by the whole CID,
// allowing different CIDs with equal multihashes.
//
// Note that this option only affects the blockstore, and is ignored by the root
// go-car/v2 package.
func UseWholeCIDs(enable bool) carv2.Option {
return func(o *carv2.Options) {
o.BlockstoreUseWholeCIDs = enable
}
}
var UseWholeCIDs = carv2.UseWholeCIDs

// NewReadOnly creates a new ReadOnly blockstore from the backing with a optional index as idx.
// This function accepts both CARv1 and CARv2 backing.
Expand Down Expand Up @@ -203,14 +179,6 @@ func OpenReadOnly(path string, opts ...carv2.Option) (*ReadOnly, error) {
return robs, nil
}

func (b *ReadOnly) readBlock(idx int64) (cid.Cid, []byte, error) {
r, err := internalio.NewOffsetReadSeeker(b.backing, idx)
if err != nil {
return cid.Cid{}, nil, err
}
return util.ReadNode(r, b.opts.ZeroLengthSectionAsEOF, b.opts.MaxAllowedSectionSize)
}

// DeleteBlock is unsupported and always errors.
func (b *ReadOnly) DeleteBlock(_ context.Context, _ cid.Cid) error {
return errReadOnly
Expand All @@ -229,7 +197,7 @@ func (b *ReadOnly) Has(ctx context.Context, key cid.Cid) (bool, error) {
// If we don't store identity CIDs then we can return them straight away as if they are here,
// otherwise we need to check for their existence.
// Note, we do this without locking, since there is no shared information to lock for in order to perform the check.
if _, ok, err := isIdentity(key); err != nil {
if _, ok, err := store.IsIdentity(key); err != nil {
return false, err
} else if ok {
return true, nil
Expand All @@ -243,38 +211,21 @@ func (b *ReadOnly) Has(ctx context.Context, key cid.Cid) (bool, error) {
return false, errClosed
}

var fnFound bool
var fnErr error
err := b.idx.GetAll(key, func(offset uint64) bool {
uar, err := internalio.NewOffsetReadSeeker(b.backing, int64(offset))
if err != nil {
fnErr = err
return false
}
_, err = varint.ReadUvarint(uar)
if err != nil {
fnErr = err
return false
}
_, readCid, err := cid.CidFromReader(uar)
if err != nil {
fnErr = err
return false
}
if b.opts.BlockstoreUseWholeCIDs {
fnFound = readCid.Equals(key)
return !fnFound // continue looking if we haven't found it
} else {
fnFound = bytes.Equal(readCid.Hash(), key.Hash())
return false
}
})
_, _, size, err := store.FindCid(
b.backing,
b.idx,
key,
b.opts.BlockstoreUseWholeCIDs,
b.opts.ZeroLengthSectionAsEOF,
b.opts.MaxAllowedSectionSize,
false,
)
if errors.Is(err, index.ErrNotFound) {
return false, nil
} else if err != nil {
return false, err
}
return fnFound, fnErr
return size > -1, nil
}

// Get gets a block corresponding to the given key.
Expand All @@ -290,7 +241,7 @@ func (b *ReadOnly) Get(ctx context.Context, key cid.Cid) (blocks.Block, error) {
// If we don't store identity CIDs then we can return them straight away as if they are here,
// otherwise we need to check for their existence.
// Note, we do this without locking, since there is no shared information to lock for in order to perform the check.
if digest, ok, err := isIdentity(key); err != nil {
if digest, ok, err := store.IsIdentity(key); err != nil {
return nil, err
} else if ok {
return blocks.NewBlockWithCid(digest, key)
Expand All @@ -304,46 +255,28 @@ func (b *ReadOnly) Get(ctx context.Context, key cid.Cid) (blocks.Block, error) {
return nil, errClosed
}

var fnData []byte
var fnErr error
err := b.idx.GetAll(key, func(offset uint64) bool {
readCid, data, err := b.readBlock(int64(offset))
if err != nil {
fnErr = err
return false
}
if b.opts.BlockstoreUseWholeCIDs {
if readCid.Equals(key) {
fnData = data
return false
} else {
return true // continue looking
}
} else {
if bytes.Equal(readCid.Hash(), key.Hash()) {
fnData = data
}
return false
}
})
data, _, _, err := store.FindCid(
b.backing,
b.idx,
key,
b.opts.BlockstoreUseWholeCIDs,
b.opts.ZeroLengthSectionAsEOF,
b.opts.MaxAllowedSectionSize,
true,
)
if errors.Is(err, index.ErrNotFound) {
return nil, format.ErrNotFound{Cid: key}
} else if err != nil {
return nil, format.ErrNotFound{Cid: key}
} else if fnErr != nil {
return nil, fnErr
}
if fnData == nil {
return nil, format.ErrNotFound{Cid: key}
return nil, err
}
return blocks.NewBlockWithCid(fnData, key)
return blocks.NewBlockWithCid(data, key)
}

// GetSize gets the size of an item corresponding to the given key.
func (b *ReadOnly) GetSize(ctx context.Context, key cid.Cid) (int, error) {
// Check if the given CID has multihash.IDENTITY code
// Note, we do this without locking, since there is no shared information to lock for in order to perform the check.
if digest, ok, err := isIdentity(key); err != nil {
if digest, ok, err := store.IsIdentity(key); err != nil {
return 0, err
} else if ok {
return len(digest), nil
Expand All @@ -356,59 +289,21 @@ func (b *ReadOnly) GetSize(ctx context.Context, key cid.Cid) (int, error) {
return 0, errClosed
}

fnSize := -1
var fnErr error
err := b.idx.GetAll(key, func(offset uint64) bool {
rdr, err := internalio.NewOffsetReadSeeker(b.backing, int64(offset))
if err != nil {
fnErr = err
return false
}
sectionLen, err := varint.ReadUvarint(rdr)
if err != nil {
fnErr = err
return false
}
cidLen, readCid, err := cid.CidFromReader(rdr)
if err != nil {
fnErr = err
return false
}
if b.opts.BlockstoreUseWholeCIDs {
if readCid.Equals(key) {
fnSize = int(sectionLen) - cidLen
return false
} else {
return true // continue looking
}
} else {
if bytes.Equal(readCid.Hash(), key.Hash()) {
fnSize = int(sectionLen) - cidLen
}
return false
}
})
_, _, size, err := store.FindCid(
b.backing,
b.idx,
key,
b.opts.BlockstoreUseWholeCIDs,
b.opts.ZeroLengthSectionAsEOF,
b.opts.MaxAllowedSectionSize,
false,
)
if errors.Is(err, index.ErrNotFound) {
return -1, format.ErrNotFound{Cid: key}
} else if err != nil {
return -1, err
} else if fnErr != nil {
return -1, fnErr
}
if fnSize == -1 {
return -1, format.ErrNotFound{Cid: key}
}
return fnSize, nil
}

func isIdentity(key cid.Cid) (digest []byte, ok bool, err error) {
dmh, err := multihash.Decode(key.Hash())
if err != nil {
return nil, false, err
}
ok = dmh.Code == multihash.IDENTITY
digest = dmh.Digest
return digest, ok, nil
return size, nil
}

// Put is not supported and always returns an error.
Expand Down
Loading