Skip to content

Commit

Permalink
Merge pull request #127 from ipfs/fix/ktds
Browse files Browse the repository at this point in the history
fix the keytransform datastore's query implementation
  • Loading branch information
Stebalien authored Apr 19, 2019
2 parents b19d692 + ffc9f91 commit 9db638c
Show file tree
Hide file tree
Showing 12 changed files with 422 additions and 195 deletions.
21 changes: 0 additions & 21 deletions keytransform/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,3 @@ type KeyTransform interface {
ConvertKey(ds.Key) ds.Key
InvertKey(ds.Key) ds.Key
}

// Datastore is a keytransform.Datastore
type Datastore interface {
ds.Shim
KeyTransform
}

// Wrap wraps a given datastore with a KeyTransform function.
// The resulting wrapped datastore will use the transform on all Datastore
// operations.
func Wrap(child ds.Datastore, t KeyTransform) *ktds {
if t == nil {
panic("t (KeyTransform) is nil")
}

if child == nil {
panic("child (ds.Datastore) is nil")
}

return &ktds{child: child, KeyTransform: t}
}
167 changes: 138 additions & 29 deletions keytransform/keytransform.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,68 +5,72 @@ import (
dsq "github.com/ipfs/go-datastore/query"
)

type Pair struct {
Convert KeyMapping
Invert KeyMapping
}
// Wrap wraps a given datastore with a KeyTransform function.
// The resulting wrapped datastore will use the transform on all Datastore
// operations.
func Wrap(child ds.Datastore, t KeyTransform) *Datastore {
if t == nil {
panic("t (KeyTransform) is nil")
}

func (t *Pair) ConvertKey(k ds.Key) ds.Key {
return t.Convert(k)
}
if child == nil {
panic("child (ds.Datastore) is nil")
}

func (t *Pair) InvertKey(k ds.Key) ds.Key {
return t.Invert(k)
return &Datastore{child: child, KeyTransform: t}
}

// ktds keeps a KeyTransform function
type ktds struct {
// Datastore keeps a KeyTransform function
type Datastore struct {
child ds.Datastore

KeyTransform
}

// Children implements ds.Shim
func (d *ktds) Children() []ds.Datastore {
func (d *Datastore) Children() []ds.Datastore {
return []ds.Datastore{d.child}
}

// Put stores the given value, transforming the key first.
func (d *ktds) Put(key ds.Key, value []byte) (err error) {
func (d *Datastore) Put(key ds.Key, value []byte) (err error) {
return d.child.Put(d.ConvertKey(key), value)
}

// Get returns the value for given key, transforming the key first.
func (d *ktds) Get(key ds.Key) (value []byte, err error) {
func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
return d.child.Get(d.ConvertKey(key))
}

// Has returns whether the datastore has a value for a given key, transforming
// the key first.
func (d *ktds) Has(key ds.Key) (exists bool, err error) {
func (d *Datastore) Has(key ds.Key) (exists bool, err error) {
return d.child.Has(d.ConvertKey(key))
}

// GetSize returns the size of the value named by the given key, transforming
// the key first.
func (d *ktds) GetSize(key ds.Key) (size int, err error) {
func (d *Datastore) GetSize(key ds.Key) (size int, err error) {
return d.child.GetSize(d.ConvertKey(key))
}

// Delete removes the value for given key
func (d *ktds) Delete(key ds.Key) (err error) {
func (d *Datastore) Delete(key ds.Key) (err error) {
return d.child.Delete(d.ConvertKey(key))
}

// Query implements Query, inverting keys on the way back out.
func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
qr, err := d.child.Query(q)
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
nq, cq := d.prepareQuery(q)

cqr, err := d.child.Query(cq)
if err != nil {
return nil, err
}

return dsq.ResultsFromIterator(q, dsq.Iterator{
qr := dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
r, ok := qr.NextSync()
r, ok := cqr.NextSync()
if !ok {
return r, false
}
Expand All @@ -76,21 +80,120 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
return r, true
},
Close: func() error {
return qr.Close()
return cqr.Close()
},
}), nil
})
return dsq.NaiveQueryApply(nq, qr), nil
}

func (d *ktds) Close() error {
// Split the query into a child query and a naive query. That way, we can make
// the child datastore do as much work as possible.
func (d *Datastore) prepareQuery(q dsq.Query) (naive, child dsq.Query) {

// First, put everything in the child query. Then, start taking things
// out.
child = q

// Always let the child handle the key prefix.
child.Prefix = d.ConvertKey(ds.NewKey(child.Prefix)).String()

// Check if the key transform is order-preserving so we can use the
// child datastore's built-in ordering.
orderPreserving := false
switch d.KeyTransform.(type) {
case PrefixTransform, *PrefixTransform:
orderPreserving = true
}

// Try to let the child handle ordering.
orders:
for i, o := range child.Orders {
switch o.(type) {
case dsq.OrderByValue, *dsq.OrderByValue,
dsq.OrderByValueDescending, *dsq.OrderByValueDescending:
// Key doesn't matter.
continue
case dsq.OrderByKey, *dsq.OrderByKey,
dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
// if the key transform preserves order, we can delegate
// to the child datastore.
if orderPreserving {
// When sorting, we compare with the first
// Order, then, if equal, we compare with the
// second Order, etc. However, keys are _unique_
// so we'll never apply any additional orders
// after ordering by key.
child.Orders = child.Orders[:i+1]
break orders
}
}

// Can't handle this order under transform, punt it to a naive
// ordering.
naive.Orders = q.Orders
child.Orders = nil
naive.Offset = q.Offset
child.Offset = 0
naive.Limit = q.Limit
child.Limit = 0
break
}

// Try to let the child handle the filters.

// don't modify the original filters.
child.Filters = append([]dsq.Filter(nil), child.Filters...)

for i, f := range child.Filters {
switch f := f.(type) {
case dsq.FilterValueCompare, *dsq.FilterValueCompare:
continue
case dsq.FilterKeyCompare:
child.Filters[i] = dsq.FilterKeyCompare{
Op: f.Op,
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
}
continue
case *dsq.FilterKeyCompare:
child.Filters[i] = &dsq.FilterKeyCompare{
Op: f.Op,
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
}
continue
case dsq.FilterKeyPrefix:
child.Filters[i] = dsq.FilterKeyPrefix{
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
}
continue
case *dsq.FilterKeyPrefix:
child.Filters[i] = &dsq.FilterKeyPrefix{
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
}
continue
}

// Not a known filter, defer to the naive implementation.
naive.Filters = q.Filters
child.Filters = nil
naive.Offset = q.Offset
child.Offset = 0
naive.Limit = q.Limit
child.Limit = 0
break
}
return
}

func (d *Datastore) Close() error {
return d.child.Close()
}

// DiskUsage implements the PersistentDatastore interface.
func (d *ktds) DiskUsage() (uint64, error) {
func (d *Datastore) DiskUsage() (uint64, error) {
return ds.DiskUsage(d.child)
}

func (d *ktds) Batch() (ds.Batch, error) {
func (d *Datastore) Batch() (ds.Batch, error) {
bds, ok := d.child.(ds.Batching)
if !ok {
return nil, ds.ErrBatchUnsupported
Expand Down Expand Up @@ -124,23 +227,29 @@ func (t *transformBatch) Commit() error {
return t.dst.Commit()
}

func (d *ktds) Check() error {
func (d *Datastore) Check() error {
if c, ok := d.child.(ds.CheckedDatastore); ok {
return c.Check()
}
return nil
}

func (d *ktds) Scrub() error {
func (d *Datastore) Scrub() error {
if c, ok := d.child.(ds.ScrubbedDatastore); ok {
return c.Scrub()
}
return nil
}

func (d *ktds) CollectGarbage() error {
func (d *Datastore) CollectGarbage() error {
if c, ok := d.child.(ds.GCDatastore); ok {
return c.CollectGarbage()
}
return nil
}

var _ ds.Datastore = (*Datastore)(nil)
var _ ds.GCDatastore = (*Datastore)(nil)
var _ ds.Batching = (*Datastore)(nil)
var _ ds.PersistentDatastore = (*Datastore)(nil)
var _ ds.ScrubbedDatastore = (*Datastore)(nil)
42 changes: 28 additions & 14 deletions keytransform/keytransform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@ type DSSuite struct{}

var _ = Suite(&DSSuite{})

func (ks *DSSuite) TestBasic(c *C) {
func testDatastore() {
}

pair := &kt.Pair{
Convert: func(k ds.Key) ds.Key {
return ds.NewKey("/abc").Child(k)
},
Invert: func(k ds.Key) ds.Key {
// remove abc prefix
l := k.List()
if l[0] != "abc" {
panic("key does not have prefix. convert failed?")
}
return ds.KeyWithNamespaces(l[1:])
},
}
var pair = &kt.Pair{
Convert: func(k ds.Key) ds.Key {
return ds.NewKey("/abc").Child(k)
},
Invert: func(k ds.Key) ds.Key {
// remove abc prefix
l := k.List()
if l[0] != "abc" {
panic("key does not have prefix. convert failed?")
}
return ds.KeyWithNamespaces(l[1:])
},
}

func (ks *DSSuite) TestBasic(c *C) {
mpds := dstest.NewTestDatastore(true)
ktds := kt.Wrap(mpds, pair)

Expand Down Expand Up @@ -110,3 +112,15 @@ func strsToKeys(strs []string) []ds.Key {
}
return keys
}

func TestSuiteDefaultPair(t *testing.T) {
mpds := dstest.NewTestDatastore(true)
ktds := kt.Wrap(mpds, pair)
dstest.SubtestAll(t, ktds)
}

func TestSuitePrefixTransform(t *testing.T) {
mpds := dstest.NewTestDatastore(true)
ktds := kt.Wrap(mpds, kt.PrefixTransform{Prefix: ds.NewKey("/foo")})
dstest.SubtestAll(t, ktds)
}
49 changes: 49 additions & 0 deletions keytransform/transforms.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package keytransform

import ds "github.com/ipfs/go-datastore"

// Pair is a convince struct for constructing a key transform.
type Pair struct {
Convert KeyMapping
Invert KeyMapping
}

func (t *Pair) ConvertKey(k ds.Key) ds.Key {
return t.Convert(k)
}

func (t *Pair) InvertKey(k ds.Key) ds.Key {
return t.Invert(k)
}

var _ KeyTransform = (*Pair)(nil)

// PrefixTransform constructs a KeyTransform with a pair of functions that
// add or remove the given prefix key.
//
// Warning: will panic if prefix not found when it should be there. This is
// to avoid insidious data inconsistency errors.
type PrefixTransform struct {
Prefix ds.Key
}

// ConvertKey adds the prefix.
func (p PrefixTransform) ConvertKey(k ds.Key) ds.Key {
return p.Prefix.Child(k)
}

// InvertKey removes the prefix. panics if prefix not found.
func (p PrefixTransform) InvertKey(k ds.Key) ds.Key {
if p.Prefix.String() == "/" {
return k
}

if !p.Prefix.IsAncestorOf(k) {
panic("expected prefix not found")
}

s := k.String()[len(p.Prefix.String()):]
return ds.RawKey(s)
}

var _ KeyTransform = (*PrefixTransform)(nil)
Loading

0 comments on commit 9db638c

Please sign in to comment.