Skip to content

Commit

Permalink
move event methods into dedicated engine struct for better reusability.
Browse files Browse the repository at this point in the history
implement recursive search item indexing.
  • Loading branch information
fschade committed Dec 6, 2022
1 parent a1bc229 commit eb6ac63
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 119 deletions.
168 changes: 168 additions & 0 deletions services/search/pkg/search/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package search

import (
"context"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/storage/utils/walker"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
searchsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0"
"github.com/owncloud/ocis/v2/services/search/pkg/content"
"github.com/owncloud/ocis/v2/services/search/pkg/engine"
"path/filepath"
"time"
)

type EngineHandler struct {
logger log.Logger
engine engine.Engine
gateway gateway.GatewayAPIClient
extractor content.Extractor
secret string
}

// NewProvider creates a new Provider instance.
func NewEngineHandler(gw gateway.GatewayAPIClient, eng engine.Engine, extractor content.Extractor, logger log.Logger, secret string) *EngineHandler {
return &EngineHandler{
gateway: gw,
engine: eng,
secret: secret,
logger: logger,
extractor: extractor,
}
}

func (eh *EngineHandler) trashItem(rid *provider.ResourceId) {
err := eh.engine.Delete(storagespace.FormatResourceID(*rid))
if err != nil {
eh.logger.Error().Err(err).Interface("Id", rid).Msg("failed to remove item from index")
}
}

func (eh *EngineHandler) upsertItem(ref *provider.Reference, uid *user.UserId) {
ctx, stat, path := eh.resInfo(uid, ref)
if ctx == nil || stat == nil || path == "" {
return
}

doc, err := eh.extractor.Extract(ctx, stat.Info)
if err != nil {
eh.logger.Error().Err(err).Msg("failed to extract resource content")
return
}

r := engine.Resource{
ID: storagespace.FormatResourceID(*stat.Info.Id),
RootID: storagespace.FormatResourceID(provider.ResourceId{
StorageId: stat.Info.Id.StorageId,
OpaqueId: stat.Info.Id.SpaceId,
SpaceId: stat.Info.Id.SpaceId,
}),
Path: utils.MakeRelativePath(path),
Type: uint64(stat.Info.Type),
Document: doc,
}

if parentId := stat.GetInfo().GetParentId(); parentId != nil {
r.ParentID = storagespace.FormatResourceID(*parentId)
}

if err = eh.engine.Upsert(r.ID, r); err != nil {
eh.logger.Error().Err(err).Msg("error adding updating the resource in the index")
} else {
logDocCount(eh.engine, eh.logger)
}
}

func (eh *EngineHandler) restoreItem(ref *provider.Reference, uid *user.UserId) {
ctx, stat, path := eh.resInfo(uid, ref)
if ctx == nil || stat == nil || path == "" {
return
}

if err := eh.engine.Restore(storagespace.FormatResourceID(*stat.Info.Id)); err != nil {
eh.logger.Error().Err(err).Msg("failed to restore the changed resource in the index")
}
}

func (eh *EngineHandler) moveItem(ref *provider.Reference, uid *user.UserId) {
ctx, stat, path := eh.resInfo(uid, ref)
if ctx == nil || stat == nil || path == "" {
return
}

if err := eh.engine.Move(storagespace.FormatResourceID(*stat.GetInfo().GetId()), storagespace.FormatResourceID(*stat.GetInfo().GetParentId()), path); err != nil {
eh.logger.Error().Err(err).Msg("failed to move the changed resource in the index")
}
}

func (eh *EngineHandler) resInfo(uid *user.UserId, ref *provider.Reference) (context.Context, *provider.StatResponse, string) {
ownerCtx, err := getAuthContext(&user.User{Id: uid}, eh.gateway, eh.secret, eh.logger)
if err != nil {
return nil, nil, ""
}

statRes, err := statResource(ownerCtx, ref, eh.gateway, eh.logger)
if err != nil {
return nil, nil, ""
}

r, err := ResolveReference(ownerCtx, ref, statRes.GetInfo(), eh.gateway)
if err != nil {
return nil, nil, ""
}

return ownerCtx, statRes, r.GetPath()
}

func (eh *EngineHandler) indexSpace(entry *provider.Reference, uId *user.UserId) {
ownerCtx, err := getAuthContext(&user.User{Id: uId}, eh.gateway, eh.secret, eh.logger)
if err != nil {
return
}

rootId := &provider.ResourceId{
StorageId: entry.GetResourceId().GetStorageId(),
SpaceId: entry.GetResourceId().GetSpaceId(),
}

w := walker.NewWalker(eh.gateway)
err = w.Walk(ownerCtx, rootId, func(wd string, info *provider.ResourceInfo, err error) error {
if err != nil {
eh.logger.Error().Err(err).Msg("error walking the tree")
return err
}

if info == nil {
return nil
}

ref := &provider.Reference{
Path: utils.MakeRelativePath(filepath.Join(wd, info.Path)),
ResourceId: rootId,
}
eh.logger.Debug().Str("path", ref.Path).Msg("Walking tree")

searchRes, err := eh.engine.Search(ownerCtx, &searchsvc.SearchIndexRequest{
Query: "+ID:" + storagespace.FormatResourceID(*info.Id) + ` +Mtime:>="` + utils.TSToTime(info.Mtime).Format(time.RFC3339Nano) + `"`,
})

if err == nil && len(searchRes.Matches) >= 1 {
if info.Type == provider.ResourceType_RESOURCE_TYPE_CONTAINER {
eh.logger.Debug().Str("path", ref.Path).Msg("subtree hasn't changed. Skipping.")
return filepath.SkipDir
}
eh.logger.Debug().Str("path", ref.Path).Msg("element hasn't changed. Skipping.")
return nil
}

eh.upsertItem(ref, info.Owner)

return nil
})

logDocCount(eh.engine, eh.logger)
}
131 changes: 14 additions & 117 deletions services/search/pkg/search/events.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,13 @@
package search

import (
"context"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/search/pkg/config"
"github.com/owncloud/ocis/v2/services/search/pkg/content"
"github.com/owncloud/ocis/v2/services/search/pkg/engine"
)

type eventHandler struct {
logger log.Logger
engine engine.Engine
gateway gateway.GatewayAPIClient
extractor content.Extractor
secret string
}

// HandleEvents listens to the needed events,
// it handles the whole resource indexing livecycle.
func HandleEvents(eng engine.Engine, extractor content.Extractor, gw gateway.GatewayAPIClient, bus events.Consumer, logger log.Logger, cfg *config.Config) error {
func HandleEvents(eh *EngineHandler, bus events.Consumer, cfg *config.Config) error {
evts := []events.Unmarshaller{
events.ItemTrashed{},
events.ItemRestored{},
Expand Down Expand Up @@ -53,124 +35,39 @@ func HandleEvents(eng engine.Engine, extractor content.Extractor, gw gateway.Gat
}

for i := 0; i < cfg.Events.NumConsumers; i++ {
go func(eh *eventHandler, ch <-chan interface{}) {
go func(eh *EngineHandler, ch <-chan interface{}) {
for e := range ch {
eh.logger.Debug().Interface("event", e).Msg("updating index")

switch ev := e.(type) {

case events.ItemTrashed:
eh.trashItem(ev.ID)
eh.indexSpace(ev.Ref, ev.Executant)
case events.ItemMoved:
eh.moveItem(ev.Ref, ev.Executant)
eh.indexSpace(ev.Ref, ev.Executant)
case events.ItemRestored:
eh.restoreItem(ev.Ref, ev.Executant)
eh.indexSpace(ev.Ref, ev.Executant)
case events.ContainerCreated:
eh.upsertItem(ev.Ref, ev.Executant)
eh.indexSpace(ev.Ref, ev.Executant)
case events.FileTouched:
eh.upsertItem(ev.Ref, ev.Executant)
eh.indexSpace(ev.Ref, ev.Executant)
case events.FileVersionRestored:
eh.upsertItem(ev.Ref, ev.Executant)
eh.indexSpace(ev.Ref, ev.Executant)
case events.FileUploaded:
eh.upsertItem(ev.Ref, ev.Executant)
eh.indexSpace(ev.Ref, ev.Executant)
case events.UploadReady:
eh.upsertItem(ev.FileRef, ev.ExecutingUser.Id)
eh.indexSpace(ev.FileRef, ev.ExecutingUser.Id)
case events.TagsAdded:
eh.upsertItem(ev.Ref, ev.Executant)
eh.indexSpace(ev.Ref, ev.Executant)
case events.TagsRemoved:
eh.upsertItem(ev.Ref, ev.Executant)
eh.indexSpace(ev.Ref, ev.Executant)
}
}
}(
&eventHandler{
logger: logger,
engine: eng,
secret: cfg.MachineAuthAPIKey,
gateway: gw,
extractor: extractor,
},
ch,
)
}(eh, ch)
}

return nil
}

func (eh *eventHandler) trashItem(rid *provider.ResourceId) {
err := eh.engine.Delete(storagespace.FormatResourceID(*rid))
if err != nil {
eh.logger.Error().Err(err).Interface("Id", rid).Msg("failed to remove item from index")
}
}

func (eh *eventHandler) upsertItem(ref *provider.Reference, uid *user.UserId) {
ctx, stat, path := eh.resInfo(uid, ref)
if ctx == nil || stat == nil || path == "" {
return
}

doc, err := eh.extractor.Extract(ctx, stat.Info)
if err != nil {
eh.logger.Error().Err(err).Msg("failed to extract resource content")
return
}

r := engine.Resource{
ID: storagespace.FormatResourceID(*stat.Info.Id),
RootID: storagespace.FormatResourceID(provider.ResourceId{
StorageId: stat.Info.Id.StorageId,
OpaqueId: stat.Info.Id.SpaceId,
SpaceId: stat.Info.Id.SpaceId,
}),
ParentID: storagespace.FormatResourceID(*stat.GetInfo().GetParentId()),
Path: utils.MakeRelativePath(path),
Type: uint64(stat.Info.Type),
Document: doc,
}

if err = eh.engine.Upsert(r.ID, r); err != nil {
eh.logger.Error().Err(err).Msg("error adding updating the resource in the index")
} else {
logDocCount(eh.engine, eh.logger)
}
}

func (eh *eventHandler) restoreItem(ref *provider.Reference, uid *user.UserId) {
ctx, stat, path := eh.resInfo(uid, ref)
if ctx == nil || stat == nil || path == "" {
return
}

if err := eh.engine.Restore(storagespace.FormatResourceID(*stat.Info.Id)); err != nil {
eh.logger.Error().Err(err).Msg("failed to restore the changed resource in the index")
}
}

func (eh *eventHandler) moveItem(ref *provider.Reference, uid *user.UserId) {
ctx, stat, path := eh.resInfo(uid, ref)
if ctx == nil || stat == nil || path == "" {
return
}

if err := eh.engine.Move(storagespace.FormatResourceID(*stat.GetInfo().GetId()), storagespace.FormatResourceID(*stat.GetInfo().GetParentId()), path); err != nil {
eh.logger.Error().Err(err).Msg("failed to move the changed resource in the index")
}
}

func (eh *eventHandler) resInfo(uid *user.UserId, ref *provider.Reference) (context.Context, *provider.StatResponse, string) {
ownerCtx, err := getAuthContext(&user.User{Id: uid}, eh.gateway, eh.secret, eh.logger)
if err != nil {
return nil, nil, ""
}

statRes, err := statResource(ownerCtx, ref, eh.gateway, eh.logger)
if err != nil {
return nil, nil, ""
}

r, err := ResolveReference(ownerCtx, ref, statRes.GetInfo(), eh.gateway)
if err != nil {
return nil, nil, ""
}

return ownerCtx, statRes, r.GetPath()
}
3 changes: 2 additions & 1 deletion services/search/pkg/search/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ var _ = Describe("Events", func() {
engine = &engineMocks.Engine{}
bus, _ = mEvents.NewStream()
extractor = &contentMocks.Extractor{}
engineHandler := search.NewEngineHandler(gw, engine, extractor, logger, "")

_ = search.HandleEvents(engine, extractor, gw, bus, logger, &config.Config{})
_ = search.HandleEvents(engineHandler, bus, &config.Config{})

gw.On("Authenticate", mock.Anything, mock.Anything).Return(&gateway.AuthenticateResponse{
Status: status.NewOK(ctx),
Expand Down
4 changes: 3 additions & 1 deletion services/search/pkg/service/grpc/v0/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,10 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error)
return nil, teardown, err
}

eh := search.NewEngineHandler(gw, eng, extractor, logger, cfg.MachineAuthAPIKey)

// setup event handling
if err := search.HandleEvents(eng, extractor, gw, bus, logger, cfg); err != nil {
if err := search.HandleEvents(eh, bus, cfg); err != nil {
return nil, teardown, err
}

Expand Down

0 comments on commit eb6ac63

Please sign in to comment.