From eb6ac63e511fa0061b4396230d2607a681e3a46f Mon Sep 17 00:00:00 2001 From: Florian Schade Date: Tue, 6 Dec 2022 19:07:01 +0100 Subject: [PATCH] move event methods into dedicated engine struct for better reusability. implement recursive search item indexing. --- services/search/pkg/search/engine.go | 168 ++++++++++++++++++ services/search/pkg/search/events.go | 131 ++------------ services/search/pkg/search/events_test.go | 3 +- .../search/pkg/service/grpc/v0/service.go | 4 +- 4 files changed, 187 insertions(+), 119 deletions(-) create mode 100644 services/search/pkg/search/engine.go diff --git a/services/search/pkg/search/engine.go b/services/search/pkg/search/engine.go new file mode 100644 index 00000000000..a2af05d6160 --- /dev/null +++ b/services/search/pkg/search/engine.go @@ -0,0 +1,168 @@ +package search + +import ( + "context" + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/storage/utils/walker" + "github.com/cs3org/reva/v2/pkg/storagespace" + "github.com/cs3org/reva/v2/pkg/utils" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + searchsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0" + "github.com/owncloud/ocis/v2/services/search/pkg/content" + "github.com/owncloud/ocis/v2/services/search/pkg/engine" + "path/filepath" + "time" +) + +type EngineHandler struct { + logger log.Logger + engine engine.Engine + gateway gateway.GatewayAPIClient + extractor content.Extractor + secret string +} + +// NewProvider creates a new Provider instance. +func NewEngineHandler(gw gateway.GatewayAPIClient, eng engine.Engine, extractor content.Extractor, logger log.Logger, secret string) *EngineHandler { + return &EngineHandler{ + gateway: gw, + engine: eng, + secret: secret, + logger: logger, + extractor: extractor, + } +} + +func (eh *EngineHandler) trashItem(rid *provider.ResourceId) { + err := eh.engine.Delete(storagespace.FormatResourceID(*rid)) + if err != nil { + eh.logger.Error().Err(err).Interface("Id", rid).Msg("failed to remove item from index") + } +} + +func (eh *EngineHandler) upsertItem(ref *provider.Reference, uid *user.UserId) { + ctx, stat, path := eh.resInfo(uid, ref) + if ctx == nil || stat == nil || path == "" { + return + } + + doc, err := eh.extractor.Extract(ctx, stat.Info) + if err != nil { + eh.logger.Error().Err(err).Msg("failed to extract resource content") + return + } + + r := engine.Resource{ + ID: storagespace.FormatResourceID(*stat.Info.Id), + RootID: storagespace.FormatResourceID(provider.ResourceId{ + StorageId: stat.Info.Id.StorageId, + OpaqueId: stat.Info.Id.SpaceId, + SpaceId: stat.Info.Id.SpaceId, + }), + Path: utils.MakeRelativePath(path), + Type: uint64(stat.Info.Type), + Document: doc, + } + + if parentId := stat.GetInfo().GetParentId(); parentId != nil { + r.ParentID = storagespace.FormatResourceID(*parentId) + } + + if err = eh.engine.Upsert(r.ID, r); err != nil { + eh.logger.Error().Err(err).Msg("error adding updating the resource in the index") + } else { + logDocCount(eh.engine, eh.logger) + } +} + +func (eh *EngineHandler) restoreItem(ref *provider.Reference, uid *user.UserId) { + ctx, stat, path := eh.resInfo(uid, ref) + if ctx == nil || stat == nil || path == "" { + return + } + + if err := eh.engine.Restore(storagespace.FormatResourceID(*stat.Info.Id)); err != nil { + eh.logger.Error().Err(err).Msg("failed to restore the changed resource in the index") + } +} + +func (eh *EngineHandler) moveItem(ref *provider.Reference, uid *user.UserId) { + ctx, stat, path := eh.resInfo(uid, ref) + if ctx == nil || stat == nil || path == "" { + return + } + + if err := eh.engine.Move(storagespace.FormatResourceID(*stat.GetInfo().GetId()), storagespace.FormatResourceID(*stat.GetInfo().GetParentId()), path); err != nil { + eh.logger.Error().Err(err).Msg("failed to move the changed resource in the index") + } +} + +func (eh *EngineHandler) resInfo(uid *user.UserId, ref *provider.Reference) (context.Context, *provider.StatResponse, string) { + ownerCtx, err := getAuthContext(&user.User{Id: uid}, eh.gateway, eh.secret, eh.logger) + if err != nil { + return nil, nil, "" + } + + statRes, err := statResource(ownerCtx, ref, eh.gateway, eh.logger) + if err != nil { + return nil, nil, "" + } + + r, err := ResolveReference(ownerCtx, ref, statRes.GetInfo(), eh.gateway) + if err != nil { + return nil, nil, "" + } + + return ownerCtx, statRes, r.GetPath() +} + +func (eh *EngineHandler) indexSpace(entry *provider.Reference, uId *user.UserId) { + ownerCtx, err := getAuthContext(&user.User{Id: uId}, eh.gateway, eh.secret, eh.logger) + if err != nil { + return + } + + rootId := &provider.ResourceId{ + StorageId: entry.GetResourceId().GetStorageId(), + SpaceId: entry.GetResourceId().GetSpaceId(), + } + + w := walker.NewWalker(eh.gateway) + err = w.Walk(ownerCtx, rootId, func(wd string, info *provider.ResourceInfo, err error) error { + if err != nil { + eh.logger.Error().Err(err).Msg("error walking the tree") + return err + } + + if info == nil { + return nil + } + + ref := &provider.Reference{ + Path: utils.MakeRelativePath(filepath.Join(wd, info.Path)), + ResourceId: rootId, + } + eh.logger.Debug().Str("path", ref.Path).Msg("Walking tree") + + searchRes, err := eh.engine.Search(ownerCtx, &searchsvc.SearchIndexRequest{ + Query: "+ID:" + storagespace.FormatResourceID(*info.Id) + ` +Mtime:>="` + utils.TSToTime(info.Mtime).Format(time.RFC3339Nano) + `"`, + }) + + if err == nil && len(searchRes.Matches) >= 1 { + if info.Type == provider.ResourceType_RESOURCE_TYPE_CONTAINER { + eh.logger.Debug().Str("path", ref.Path).Msg("subtree hasn't changed. Skipping.") + return filepath.SkipDir + } + eh.logger.Debug().Str("path", ref.Path).Msg("element hasn't changed. Skipping.") + return nil + } + + eh.upsertItem(ref, info.Owner) + + return nil + }) + + logDocCount(eh.engine, eh.logger) +} diff --git a/services/search/pkg/search/events.go b/services/search/pkg/search/events.go index 1717657aa71..d4469c2db57 100644 --- a/services/search/pkg/search/events.go +++ b/services/search/pkg/search/events.go @@ -1,31 +1,13 @@ package search import ( - "context" - - gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" - user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" - provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/events" - "github.com/cs3org/reva/v2/pkg/storagespace" - "github.com/cs3org/reva/v2/pkg/utils" - "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/search/pkg/config" - "github.com/owncloud/ocis/v2/services/search/pkg/content" - "github.com/owncloud/ocis/v2/services/search/pkg/engine" ) -type eventHandler struct { - logger log.Logger - engine engine.Engine - gateway gateway.GatewayAPIClient - extractor content.Extractor - secret string -} - // HandleEvents listens to the needed events, // it handles the whole resource indexing livecycle. -func HandleEvents(eng engine.Engine, extractor content.Extractor, gw gateway.GatewayAPIClient, bus events.Consumer, logger log.Logger, cfg *config.Config) error { +func HandleEvents(eh *EngineHandler, bus events.Consumer, cfg *config.Config) error { evts := []events.Unmarshaller{ events.ItemTrashed{}, events.ItemRestored{}, @@ -53,124 +35,39 @@ func HandleEvents(eng engine.Engine, extractor content.Extractor, gw gateway.Gat } for i := 0; i < cfg.Events.NumConsumers; i++ { - go func(eh *eventHandler, ch <-chan interface{}) { + go func(eh *EngineHandler, ch <-chan interface{}) { for e := range ch { eh.logger.Debug().Interface("event", e).Msg("updating index") switch ev := e.(type) { + case events.ItemTrashed: eh.trashItem(ev.ID) + eh.indexSpace(ev.Ref, ev.Executant) case events.ItemMoved: eh.moveItem(ev.Ref, ev.Executant) + eh.indexSpace(ev.Ref, ev.Executant) case events.ItemRestored: eh.restoreItem(ev.Ref, ev.Executant) + eh.indexSpace(ev.Ref, ev.Executant) case events.ContainerCreated: - eh.upsertItem(ev.Ref, ev.Executant) + eh.indexSpace(ev.Ref, ev.Executant) case events.FileTouched: - eh.upsertItem(ev.Ref, ev.Executant) + eh.indexSpace(ev.Ref, ev.Executant) case events.FileVersionRestored: - eh.upsertItem(ev.Ref, ev.Executant) + eh.indexSpace(ev.Ref, ev.Executant) case events.FileUploaded: - eh.upsertItem(ev.Ref, ev.Executant) + eh.indexSpace(ev.Ref, ev.Executant) case events.UploadReady: - eh.upsertItem(ev.FileRef, ev.ExecutingUser.Id) + eh.indexSpace(ev.FileRef, ev.ExecutingUser.Id) case events.TagsAdded: - eh.upsertItem(ev.Ref, ev.Executant) + eh.indexSpace(ev.Ref, ev.Executant) case events.TagsRemoved: - eh.upsertItem(ev.Ref, ev.Executant) + eh.indexSpace(ev.Ref, ev.Executant) } } - }( - &eventHandler{ - logger: logger, - engine: eng, - secret: cfg.MachineAuthAPIKey, - gateway: gw, - extractor: extractor, - }, - ch, - ) + }(eh, ch) } return nil } - -func (eh *eventHandler) trashItem(rid *provider.ResourceId) { - err := eh.engine.Delete(storagespace.FormatResourceID(*rid)) - if err != nil { - eh.logger.Error().Err(err).Interface("Id", rid).Msg("failed to remove item from index") - } -} - -func (eh *eventHandler) upsertItem(ref *provider.Reference, uid *user.UserId) { - ctx, stat, path := eh.resInfo(uid, ref) - if ctx == nil || stat == nil || path == "" { - return - } - - doc, err := eh.extractor.Extract(ctx, stat.Info) - if err != nil { - eh.logger.Error().Err(err).Msg("failed to extract resource content") - return - } - - r := engine.Resource{ - ID: storagespace.FormatResourceID(*stat.Info.Id), - RootID: storagespace.FormatResourceID(provider.ResourceId{ - StorageId: stat.Info.Id.StorageId, - OpaqueId: stat.Info.Id.SpaceId, - SpaceId: stat.Info.Id.SpaceId, - }), - ParentID: storagespace.FormatResourceID(*stat.GetInfo().GetParentId()), - Path: utils.MakeRelativePath(path), - Type: uint64(stat.Info.Type), - Document: doc, - } - - if err = eh.engine.Upsert(r.ID, r); err != nil { - eh.logger.Error().Err(err).Msg("error adding updating the resource in the index") - } else { - logDocCount(eh.engine, eh.logger) - } -} - -func (eh *eventHandler) restoreItem(ref *provider.Reference, uid *user.UserId) { - ctx, stat, path := eh.resInfo(uid, ref) - if ctx == nil || stat == nil || path == "" { - return - } - - if err := eh.engine.Restore(storagespace.FormatResourceID(*stat.Info.Id)); err != nil { - eh.logger.Error().Err(err).Msg("failed to restore the changed resource in the index") - } -} - -func (eh *eventHandler) moveItem(ref *provider.Reference, uid *user.UserId) { - ctx, stat, path := eh.resInfo(uid, ref) - if ctx == nil || stat == nil || path == "" { - return - } - - if err := eh.engine.Move(storagespace.FormatResourceID(*stat.GetInfo().GetId()), storagespace.FormatResourceID(*stat.GetInfo().GetParentId()), path); err != nil { - eh.logger.Error().Err(err).Msg("failed to move the changed resource in the index") - } -} - -func (eh *eventHandler) resInfo(uid *user.UserId, ref *provider.Reference) (context.Context, *provider.StatResponse, string) { - ownerCtx, err := getAuthContext(&user.User{Id: uid}, eh.gateway, eh.secret, eh.logger) - if err != nil { - return nil, nil, "" - } - - statRes, err := statResource(ownerCtx, ref, eh.gateway, eh.logger) - if err != nil { - return nil, nil, "" - } - - r, err := ResolveReference(ownerCtx, ref, statRes.GetInfo(), eh.gateway) - if err != nil { - return nil, nil, "" - } - - return ownerCtx, statRes, r.GetPath() -} diff --git a/services/search/pkg/search/events_test.go b/services/search/pkg/search/events_test.go index 3f98c3e17ce..6a6d061eafc 100644 --- a/services/search/pkg/search/events_test.go +++ b/services/search/pkg/search/events_test.go @@ -63,8 +63,9 @@ var _ = Describe("Events", func() { engine = &engineMocks.Engine{} bus, _ = mEvents.NewStream() extractor = &contentMocks.Extractor{} + engineHandler := search.NewEngineHandler(gw, engine, extractor, logger, "") - _ = search.HandleEvents(engine, extractor, gw, bus, logger, &config.Config{}) + _ = search.HandleEvents(engineHandler, bus, &config.Config{}) gw.On("Authenticate", mock.Anything, mock.Anything).Return(&gateway.AuthenticateResponse{ Status: status.NewOK(ctx), diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index 549887309b1..baa5a1a7e40 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -103,8 +103,10 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) return nil, teardown, err } + eh := search.NewEngineHandler(gw, eng, extractor, logger, cfg.MachineAuthAPIKey) + // setup event handling - if err := search.HandleEvents(eng, extractor, gw, bus, logger, cfg); err != nil { + if err := search.HandleEvents(eh, bus, cfg); err != nil { return nil, teardown, err }