Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the keytransform datastore's query implementation #127

Merged
merged 7 commits into from
Apr 19, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions keytransform/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,3 @@ type KeyTransform interface {
ConvertKey(ds.Key) ds.Key
InvertKey(ds.Key) ds.Key
}

// Pair is a convince struct for constructing a key transform.
type Pair struct {
Convert KeyMapping
Invert KeyMapping
}

func (t *Pair) ConvertKey(k ds.Key) ds.Key {
return t.Convert(k)
}

func (t *Pair) InvertKey(k ds.Key) ds.Key {
return t.Invert(k)
}

var _ KeyTransform = (*Pair)(nil)
104 changes: 99 additions & 5 deletions keytransform/keytransform.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,16 @@ func (d *Datastore) Delete(key ds.Key) (err error) {

// Query implements Query, inverting keys on the way back out.
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
qr, err := d.child.Query(q)
nq, cq := d.prepareQuery(q)

cqr, err := d.child.Query(cq)
if err != nil {
return nil, err
}

return dsq.ResultsFromIterator(q, dsq.Iterator{
qr := dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
r, ok := qr.NextSync()
r, ok := cqr.NextSync()
if !ok {
return r, false
}
Expand All @@ -78,9 +80,101 @@ func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
return r, true
},
Close: func() error {
return qr.Close()
return cqr.Close()
},
}), nil
})
return dsq.NaiveQueryApply(nq, qr), nil
}

// Split the query into a child query and a naive query. That way, we can make
// the child datastore do as much work as possible.
func (d *Datastore) prepareQuery(q dsq.Query) (naive, child dsq.Query) {

// First, put everything in the child query. Then, start taking things
// out.
child = q

// Always let the child handle the key prefix.
child.Prefix = d.ConvertKey(ds.NewKey(child.Prefix)).String()

// Check if the key transform is order-preserving so we can use the
// child datastore's built-in ordering.
orderPreserving := false
switch d.KeyTransform.(type) {
case PrefixTransform, *PrefixTransform:
orderPreserving = true
}

// Try to let the child handle ordering.
orders:
for i, o := range child.Orders {
switch o.(type) {
case dsq.OrderByValue, *dsq.OrderByValue,
dsq.OrderByValueDescending, *dsq.OrderByValueDescending:
// Key doesn't matter.
continue
case dsq.OrderByKey, *dsq.OrderByKey,
dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
if orderPreserving {
child.Orders = child.Orders[:i+1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like I'm missing something here. Why drop other orders here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We sort hierarchically. That is, when comparing two entries, we compare with the first "order", then the second order if equal, then the third order if still equal, etc. Given that keys are unique, "order by key" will never return "equal" so there's no point in applying further orders.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a comment and improved the test coverage more to get codecov to stop complaining.

break orders
}
}

// Can't handle this order under transform, punt it to a naive
// ordering.
naive.Orders = q.Orders
child.Orders = nil
naive.Offset = q.Offset
child.Offset = 0
naive.Limit = q.Limit
child.Limit = 0
break
}

// Try to let the child handle the filters.

// don't modify the original filters.
child.Filters = append([]dsq.Filter(nil), child.Filters...)

for i, f := range child.Filters {
switch f := f.(type) {
case dsq.FilterValueCompare, *dsq.FilterValueCompare:
continue
case dsq.FilterKeyCompare:
child.Filters[i] = dsq.FilterKeyCompare{
Op: f.Op,
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
}
continue
case *dsq.FilterKeyCompare:
child.Filters[i] = &dsq.FilterKeyCompare{
Op: f.Op,
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
}
continue
case dsq.FilterKeyPrefix:
child.Filters[i] = dsq.FilterKeyPrefix{
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
}
continue
case *dsq.FilterKeyPrefix:
child.Filters[i] = &dsq.FilterKeyPrefix{
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
}
continue
}

// Not a known filter, defer to the naive implementation.
naive.Filters = q.Filters
child.Filters = nil
naive.Offset = q.Offset
child.Offset = 0
naive.Limit = q.Limit
child.Limit = 0
break
}
return
}

func (d *Datastore) Close() error {
Expand Down
49 changes: 49 additions & 0 deletions keytransform/transforms.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package keytransform

import ds "github.com/ipfs/go-datastore"

// Pair is a convince struct for constructing a key transform.
type Pair struct {
Convert KeyMapping
Invert KeyMapping
}

func (t *Pair) ConvertKey(k ds.Key) ds.Key {
return t.Convert(k)
}

func (t *Pair) InvertKey(k ds.Key) ds.Key {
return t.Invert(k)
}

var _ KeyTransform = (*Pair)(nil)

// PrefixTransform constructs a KeyTransform with a pair of functions that
// add or remove the given prefix key.
//
// Warning: will panic if prefix not found when it should be there. This is
// to avoid insidious data inconsistency errors.
type PrefixTransform struct {
Prefix ds.Key
}

// ConvertKey adds the prefix.
func (p PrefixTransform) ConvertKey(k ds.Key) ds.Key {
return p.Prefix.Child(k)
}

// InvertKey removes the prefix. panics if prefix not found.
func (p PrefixTransform) InvertKey(k ds.Key) ds.Key {
if p.Prefix.String() == "/" {
return k
}

if !p.Prefix.IsAncestorOf(k) {
panic("expected prefix not found")
}

s := k.String()[len(p.Prefix.String()):]
return ds.RawKey(s)
}

var _ KeyTransform = (*PrefixTransform)(nil)
72 changes: 6 additions & 66 deletions namespace/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,84 +3,24 @@ package namespace
import (
ds "github.com/ipfs/go-datastore"
ktds "github.com/ipfs/go-datastore/keytransform"
dsq "github.com/ipfs/go-datastore/query"
)

// PrefixTransform constructs a KeyTransform with a pair of functions that
// add or remove the given prefix key.
//
// Warning: will panic if prefix not found when it should be there. This is
// to avoid insidious data inconsistency errors.
func PrefixTransform(prefix ds.Key) ktds.KeyTransform {
return &ktds.Pair{

// Convert adds the prefix
Convert: func(k ds.Key) ds.Key {
return prefix.Child(k)
},

// Invert removes the prefix. panics if prefix not found.
Invert: func(k ds.Key) ds.Key {
if prefix.String() == "/" {
return k
}

if !prefix.IsAncestorOf(k) {
panic("expected prefix not found")
}

s := k.String()[len(prefix.String()):]
return ds.RawKey(s)
},
}
//
// DEPRECATED: Use ktds.PrefixTransform directly.
func PrefixTransform(prefix ds.Key) ktds.PrefixTransform {
return ktds.PrefixTransform{Prefix: prefix}
}

// Wrap wraps a given datastore with a key-prefix.
func Wrap(child ds.Datastore, prefix ds.Key) *Datastore {
func Wrap(child ds.Datastore, prefix ds.Key) *ktds.Datastore {
if child == nil {
panic("child (ds.Datastore) is nil")
}

d := ktds.Wrap(child, PrefixTransform(prefix))
return &Datastore{Datastore: d, raw: child, prefix: prefix}
}

type Datastore struct {
*ktds.Datastore
prefix ds.Key
raw ds.Datastore
}

// Query implements Query, inverting keys on the way back out.
// This function assumes that child datastore.Query returns ordered results
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
q.Prefix = d.prefix.Child(ds.NewKey(q.Prefix)).String()
qr, err := d.raw.Query(q)
if err != nil {
return nil, err
}

return dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
for {
r, ok := qr.NextSync()
if !ok {
return r, false
}
if r.Error != nil {
return r, true
}
k := ds.RawKey(r.Entry.Key)
if !d.prefix.IsAncestorOf(k) {
return dsq.Result{}, false
}

r.Entry.Key = d.Datastore.InvertKey(k).String()
return r, true
}
},
Close: func() error {
return qr.Close()
},
}), nil
return ktds.Wrap(child, PrefixTransform(prefix))
}