Skip to content

Commit

Permalink
indexer: introduce LookupCtx (#3043)
Browse files Browse the repository at this point in the history
* indexer: introduce LookupCtx

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

lint

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

add changelog

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* use sets instead af arrays

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* Update pkg/storage/utils/indexer/index/autoincrement.go

Co-authored-by: David Christofas <[email protected]>

* fix logic

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* ignore result order

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

Co-authored-by: David Christofas <[email protected]>
  • Loading branch information
butonic and C0rby authored Jul 8, 2022
1 parent 0b65112 commit 047ab30
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 61 deletions.
6 changes: 6 additions & 0 deletions changelog/unreleased/lookupctx.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Enhancement: introduce LookupCtx for index interface

The index interface now has a new LookupCtx that can look up multiple values so we can more efficiently look up multiple shares by id.
It also takes a context so we can pass on the trace context to the CS3 backend

https://github.com/cs3org/reva/pull/3043
32 changes: 17 additions & 15 deletions internal/http/services/owncloud/ocdav/propfind/propfind.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,26 +366,28 @@ func (p *Handler) propfindResponse(ctx context.Context, w http.ResponseWriter, r
ctx, span := appctx.GetTracerProvider(r.Context()).Tracer(tracerName).Start(ctx, "propfind_response")
defer span.End()

filters := make([]*link.ListPublicSharesRequest_Filter, 0, len(resourceInfos))
for i := range resourceInfos {
// the list of filters grows with every public link in a folder
filters = append(filters, publicshare.ResourceIDFilter(resourceInfos[i].Id))
}

client, err := p.getClient()
if err != nil {
log.Error().Err(err).Msg("error getting grpc client")
w.WriteHeader(http.StatusInternalServerError)
return
}

var linkshares map[string]struct{}
// public link access does not show share-types
// oc:share-type is not part of an allprops response
if namespace != "/public" {
// only fetch this if property was queried
for _, p := range pf.Prop {
if p.Space == net.NsOwncloud && (p.Local == "share-types" || p.Local == "permissions") {
for _, prop := range pf.Prop {
if prop.Space == net.NsOwncloud && (prop.Local == "share-types" || prop.Local == "permissions") {
filters := make([]*link.ListPublicSharesRequest_Filter, 0, len(resourceInfos))
for i := range resourceInfos {
// FIXME this is expensive
// the filters array grow by one for every file in a folder
// TODO store public links as grants on the storage, reassembling them here is too costly
// we can then add the filter if the file has share-types=3 in the opaque,
// same as user / group shares for share indicators
filters = append(filters, publicshare.ResourceIDFilter(resourceInfos[i].Id))
}
client, err := p.getClient()
if err != nil {
log.Error().Err(err).Msg("error getting grpc client")
w.WriteHeader(http.StatusInternalServerError)
return
}
listResp, err := client.ListPublicShares(ctx, &link.ListPublicSharesRequest{Filters: filters})
if err == nil {
linkshares = make(map[string]struct{}, len(listResp.Share))
Expand Down
36 changes: 18 additions & 18 deletions pkg/publicshare/manager/cs3/cs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,33 +397,37 @@ func (m *Manager) ListPublicShares(ctx context.Context, u *user.User, filters []
return result, nil
}

tokensByResourceID := make(map[string]*provider.ResourceId)
for _, filter := range idFilter {
resourceID := filter.GetResourceId()
tokens, err := m.indexer.FindBy(&link.PublicShare{},
indexer.NewField("ResourceId", resourceIDToIndex(resourceID)),
)
if err != nil {
continue
var tokens []string
if len(idFilter) > 0 {
idFilters := make([]indexer.Field, 0, len(idFilter))
for _, filter := range idFilter {
resourceID := filter.GetResourceId()
idFilters = append(idFilters, indexer.NewField("ResourceId", resourceIDToIndex(resourceID)))
}
for _, token := range tokens {
tokensByResourceID[token] = resourceID
tokens, err = m.indexer.FindBy(&link.PublicShare{}, idFilters...)
if err != nil {
return nil, err
}
}

// statMem is used as a local cache to prevent statting resources which
// already have been checked.
statMem := make(map[string]struct{})
for token, resourceID := range tokensByResourceID {
for _, token := range tokens {
if _, handled := shareMem[token]; handled {
// We don't want to add a share multiple times when we added it
// already.
continue
}

if _, checked := statMem[resourceIDToIndex(resourceID)]; !checked {
s, err := m.getByToken(ctx, token)
if err != nil {
return nil, err
}

if _, checked := statMem[resourceIDToIndex(s.PublicShare.GetResourceId())]; !checked {
sReq := &provider.StatRequest{
Ref: &provider.Reference{ResourceId: resourceID},
Ref: &provider.Reference{ResourceId: s.PublicShare.GetResourceId()},
}
sRes, err := m.gatewayClient.Stat(ctx, sReq)
if err != nil {
Expand All @@ -435,13 +439,9 @@ func (m *Manager) ListPublicShares(ctx context.Context, u *user.User, filters []
if !sRes.Info.PermissionSet.ListGrants {
continue
}
statMem[resourceIDToIndex(resourceID)] = struct{}{}
statMem[resourceIDToIndex(s.PublicShare.GetResourceId())] = struct{}{}
}

s, err := m.getByToken(ctx, token)
if err != nil {
return nil, err
}
if publicshare.MatchesFilters(s.PublicShare, filters) {
result = append(result, &s.PublicShare)
shareMem[s.PublicShare.Token] = struct{}{}
Expand Down
53 changes: 46 additions & 7 deletions pkg/storage/utils/indexer/index/autoincrement.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,56 @@ func (idx *Autoincrement) Init() error {

// Lookup exact lookup by value.
func (idx *Autoincrement) Lookup(v string) ([]string, error) {
searchPath := path.Join(idx.indexRootDir, v)
oldname, err := idx.storage.ResolveSymlink(context.Background(), searchPath)
if err != nil {
if os.IsNotExist(err) {
err = &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v}
return idx.LookupCtx(context.Background(), v)
}

// LookupCtx retieves multiple exact values and allows passing in a context
func (idx *Autoincrement) LookupCtx(ctx context.Context, values ...string) ([]string, error) {
var allValues map[string]struct{}
if len(values) != 1 {
// prefetch all values with one request
entries, err := idx.storage.ReadDir(context.Background(), path.Join("/", idx.indexRootDir))
if err != nil {
return nil, err
}
// convert known values to set
allValues = make(map[string]struct{}, len(entries))
for _, e := range entries {
allValues[path.Base(e)] = struct{}{}
}
}

return nil, err
// convert requested values to set
valueSet := make(map[string]struct{}, len(values))
for _, v := range values {
valueSet[v] = struct{}{}
}

return []string{oldname}, nil
var matches = []string{}
for v := range valueSet {
if _, ok := allValues[v]; ok || len(allValues) == 0 {
oldname, err := idx.storage.ResolveSymlink(context.Background(), path.Join("/", idx.indexRootDir, v))
if err != nil {
continue
}
matches = append(matches, oldname)
}
}

if len(matches) == 0 {
var v string
switch len(values) {
case 0:
v = "none"
case 1:
v = values[0]
default:
v = "multiple"
}
return nil, &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v}
}

return matches, nil
}

// Add a new value to the index.
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/utils/indexer/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@

package index

import "github.com/cs3org/reva/v2/pkg/storage/utils/indexer/option"
import (
"context"

"github.com/cs3org/reva/v2/pkg/storage/utils/indexer/option"
)

// Index can be implemented to create new indexer-strategies. See Unique for example.
// Each indexer implementation is bound to one data-column (IndexBy) and a data-type (TypeName)
type Index interface {
Init() error
Lookup(v string) ([]string, error)
LookupCtx(ctx context.Context, v ...string) ([]string, error)
Add(id, v string) (string, error)
Remove(id string, v string) error
Update(id, oldV, newV string) error
Expand Down
57 changes: 49 additions & 8 deletions pkg/storage/utils/indexer/index/non_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,24 +79,65 @@ func (idx *NonUnique) Init() error {

// Lookup exact lookup by value.
func (idx *NonUnique) Lookup(v string) ([]string, error) {
if idx.caseInsensitive {
v = strings.ToLower(v)
}
paths, err := idx.storage.ReadDir(context.Background(), path.Join("/", idx.indexRootDir, v))
return idx.LookupCtx(context.Background(), v)
}

// LookupCtx retieves multiple exact values and allows passing in a context
func (idx *NonUnique) LookupCtx(ctx context.Context, values ...string) ([]string, error) {
// prefetch all values with one request
entries, err := idx.storage.ReadDir(context.Background(), path.Join("/", idx.indexRootDir))
if err != nil {
return nil, err
}
// convert known values to set
allValues := make(map[string]struct{}, len(entries))
for _, e := range entries {
allValues[path.Base(e)] = struct{}{}
}

var matches = make([]string, 0)
for _, p := range paths {
matches = append(matches, path.Base(p))
// convert requested values to set
valueSet := make(map[string]struct{}, len(values))
if idx.caseInsensitive {
for _, v := range values {
valueSet[strings.ToLower(v)] = struct{}{}
}
} else {
for _, v := range values {
valueSet[v] = struct{}{}
}
}

var matches = map[string]struct{}{}
for v := range valueSet {
if _, ok := allValues[v]; ok {
children, err := idx.storage.ReadDir(context.Background(), path.Join("/", idx.indexRootDir, v))
if err != nil {
continue
}
for _, c := range children {
matches[path.Base(c)] = struct{}{}
}
}
}

if len(matches) == 0 {
var v string
switch len(values) {
case 0:
v = "none"
case 1:
v = values[0]
default:
v = "multiple"
}
return nil, &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v}
}

return matches, nil
ret := make([]string, 0, len(matches))
for m := range matches {
ret = append(ret, m)
}
return ret, nil
}

// Add a new value to the index.
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/utils/indexer/index/non_unique_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func TestNonUniqueIndexAdd(t *testing.T) {

ids, err := sut.Lookup("Green")
assert.NoError(t, err)
assert.EqualValues(t, []string{"goefe-789", "xadaf-189"}, ids)
assert.Len(t, ids, 2)
assert.Contains(t, ids, "goefe-789")
assert.Contains(t, ids, "xadaf-189")

ids, err = sut.Lookup("White")
assert.NoError(t, err)
Expand Down
58 changes: 50 additions & 8 deletions pkg/storage/utils/indexer/index/unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,62 @@ func (idx *Unique) Init() error {

// Lookup exact lookup by value.
func (idx *Unique) Lookup(v string) ([]string, error) {
return idx.LookupCtx(context.Background(), v)
}

// LookupCtx retieves multiple exact values and allows passing in a context
func (idx *Unique) LookupCtx(ctx context.Context, values ...string) ([]string, error) {
var allValues map[string]struct{}
if len(values) != 1 {
// prefetch all values with one request
entries, err := idx.storage.ReadDir(context.Background(), path.Join("/", idx.indexRootDir))
if err != nil {
return nil, err
}
// convert known values to set
allValues = make(map[string]struct{}, len(entries))
for _, e := range entries {
allValues[path.Base(e)] = struct{}{}
}
}

// convert requested values to set
valueSet := make(map[string]struct{}, len(values))
if idx.caseInsensitive {
v = strings.ToLower(v)
for _, v := range values {
valueSet[strings.ToLower(v)] = struct{}{}
}
} else {
for _, v := range values {
valueSet[v] = struct{}{}
}
}
searchPath := path.Join(idx.indexRootDir, v)
oldname, err := idx.storage.ResolveSymlink(context.Background(), searchPath)
if err != nil {
if os.IsNotExist(err) {
err = &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v}

var matches = make([]string, 0)
for v := range valueSet {
if _, ok := allValues[v]; ok || len(allValues) == 0 {
oldname, err := idx.storage.ResolveSymlink(context.Background(), path.Join(idx.indexRootDir, v))
if err != nil {
continue
}
matches = append(matches, oldname)
}
}

return nil, err
if len(matches) == 0 {
var v string
switch len(values) {
case 0:
v = "none"
case 1:
v = values[0]
default:
v = "multiple"
}
return nil, &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v}
}

return []string{oldname}, nil
return matches, nil
}

// Add adds a value to the index, returns the path to the root-document
Expand Down
20 changes: 17 additions & 3 deletions pkg/storage/utils/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,14 @@ func (i *StorageIndexer) FindBy(t interface{}, queryFields ...Field) ([]string,

resultPaths := make(map[string]struct{})
if fields, ok := i.indices[typeName]; ok {
for _, field := range queryFields {
for _, idx := range fields.IndicesByField[strcase.ToCamel(field.Name)] {
res, err := idx.Lookup(field.Value)
for fieldName, queryFields := range groupFieldsByName(queryFields) {
idxes := fields.IndicesByField[strcase.ToCamel(fieldName)]
values := make([]string, 0, len(queryFields))
for _, f := range queryFields {
values = append(values, f.Value)
}
for _, idx := range idxes {
res, err := idx.LookupCtx(context.Background(), values...)
if err != nil {
if _, ok := err.(errtypes.IsNotFound); ok {
continue
Expand All @@ -193,6 +198,15 @@ func (i *StorageIndexer) FindBy(t interface{}, queryFields ...Field) ([]string,
return result, nil
}

// groupFieldsByName groups the given filters and returns a map using the filter type as the key.
func groupFieldsByName(queryFields []Field) map[string][]Field {
grouped := make(map[string][]Field)
for _, f := range queryFields {
grouped[f.Name] = append(grouped[f.Name], f)
}
return grouped
}

// Delete deletes all indexed fields of a given type t on the Indexer.
func (i *StorageIndexer) Delete(t interface{}) error {
typeName := getTypeFQN(t)
Expand Down

0 comments on commit 047ab30

Please sign in to comment.