Skip to content

Commit

Permalink
rename provider to searchService
Browse files Browse the repository at this point in the history
move event methods into searchService.
add recursive search item indexing.
  • Loading branch information
fschade committed Dec 7, 2022
1 parent a1bc229 commit b8efc99
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 454 deletions.
144 changes: 26 additions & 118 deletions services/search/pkg/search/events.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,15 @@
package search

import (
"context"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/search/pkg/config"
"github.com/owncloud/ocis/v2/services/search/pkg/content"
"github.com/owncloud/ocis/v2/services/search/pkg/engine"
)

type eventHandler struct {
logger log.Logger
engine engine.Engine
gateway gateway.GatewayAPIClient
extractor content.Extractor
secret string
}

// HandleEvents listens to the needed events,
// it handles the whole resource indexing livecycle.
func HandleEvents(eng engine.Engine, extractor content.Extractor, gw gateway.GatewayAPIClient, bus events.Consumer, logger log.Logger, cfg *config.Config) error {
func HandleEvents(s Searcher, bus events.Consumer, logger log.Logger, cfg *config.Config) error {
evts := []events.Unmarshaller{
events.ItemTrashed{},
events.ItemRestored{},
Expand Down Expand Up @@ -53,124 +37,48 @@ func HandleEvents(eng engine.Engine, extractor content.Extractor, gw gateway.Gat
}

for i := 0; i < cfg.Events.NumConsumers; i++ {
go func(eh *eventHandler, ch <-chan interface{}) {
go func(s Searcher, ch <-chan interface{}) {
for e := range ch {
eh.logger.Debug().Interface("event", e).Msg("updating index")
logger.Debug().Interface("event", e).Msg("updating index")

spaceId := func(ref *provider.Reference) *provider.ResourceId {
return &provider.ResourceId{
StorageId: ref.GetResourceId().GetStorageId(),
SpaceId: ref.GetResourceId().GetSpaceId(),
}
}

switch ev := e.(type) {
case events.ItemTrashed:
eh.trashItem(ev.ID)
s.TrashItem(ev.ID)
s.IndexSpace(spaceId(ev.Ref), ev.Executant)
case events.ItemMoved:
eh.moveItem(ev.Ref, ev.Executant)
s.MoveItem(ev.Ref, ev.Executant)
s.IndexSpace(spaceId(ev.Ref), ev.Executant)
case events.ItemRestored:
eh.restoreItem(ev.Ref, ev.Executant)
s.RestoreItem(ev.Ref, ev.Executant)
s.IndexSpace(spaceId(ev.Ref), ev.Executant)
case events.ContainerCreated:
eh.upsertItem(ev.Ref, ev.Executant)
s.IndexSpace(spaceId(ev.Ref), ev.Executant)
case events.FileTouched:
eh.upsertItem(ev.Ref, ev.Executant)
s.IndexSpace(spaceId(ev.Ref), ev.Executant)
case events.FileVersionRestored:
eh.upsertItem(ev.Ref, ev.Executant)
case events.FileUploaded:
eh.upsertItem(ev.Ref, ev.Executant)
case events.UploadReady:
eh.upsertItem(ev.FileRef, ev.ExecutingUser.Id)
s.IndexSpace(spaceId(ev.Ref), ev.Executant)
case events.TagsAdded:
eh.upsertItem(ev.Ref, ev.Executant)
s.IndexSpace(spaceId(ev.Ref), ev.Executant)
case events.TagsRemoved:
eh.upsertItem(ev.Ref, ev.Executant)
s.IndexSpace(spaceId(ev.Ref), ev.Executant)
case events.FileUploaded:
s.IndexSpace(spaceId(ev.Ref), ev.Executant)
case events.UploadReady:
s.IndexSpace(spaceId(ev.FileRef), ev.ExecutingUser.Id)
}
}
}(
&eventHandler{
logger: logger,
engine: eng,
secret: cfg.MachineAuthAPIKey,
gateway: gw,
extractor: extractor,
},
s,
ch,
)
}

return nil
}

func (eh *eventHandler) trashItem(rid *provider.ResourceId) {
err := eh.engine.Delete(storagespace.FormatResourceID(*rid))
if err != nil {
eh.logger.Error().Err(err).Interface("Id", rid).Msg("failed to remove item from index")
}
}

func (eh *eventHandler) upsertItem(ref *provider.Reference, uid *user.UserId) {
ctx, stat, path := eh.resInfo(uid, ref)
if ctx == nil || stat == nil || path == "" {
return
}

doc, err := eh.extractor.Extract(ctx, stat.Info)
if err != nil {
eh.logger.Error().Err(err).Msg("failed to extract resource content")
return
}

r := engine.Resource{
ID: storagespace.FormatResourceID(*stat.Info.Id),
RootID: storagespace.FormatResourceID(provider.ResourceId{
StorageId: stat.Info.Id.StorageId,
OpaqueId: stat.Info.Id.SpaceId,
SpaceId: stat.Info.Id.SpaceId,
}),
ParentID: storagespace.FormatResourceID(*stat.GetInfo().GetParentId()),
Path: utils.MakeRelativePath(path),
Type: uint64(stat.Info.Type),
Document: doc,
}

if err = eh.engine.Upsert(r.ID, r); err != nil {
eh.logger.Error().Err(err).Msg("error adding updating the resource in the index")
} else {
logDocCount(eh.engine, eh.logger)
}
}

func (eh *eventHandler) restoreItem(ref *provider.Reference, uid *user.UserId) {
ctx, stat, path := eh.resInfo(uid, ref)
if ctx == nil || stat == nil || path == "" {
return
}

if err := eh.engine.Restore(storagespace.FormatResourceID(*stat.Info.Id)); err != nil {
eh.logger.Error().Err(err).Msg("failed to restore the changed resource in the index")
}
}

func (eh *eventHandler) moveItem(ref *provider.Reference, uid *user.UserId) {
ctx, stat, path := eh.resInfo(uid, ref)
if ctx == nil || stat == nil || path == "" {
return
}

if err := eh.engine.Move(storagespace.FormatResourceID(*stat.GetInfo().GetId()), storagespace.FormatResourceID(*stat.GetInfo().GetParentId()), path); err != nil {
eh.logger.Error().Err(err).Msg("failed to move the changed resource in the index")
}
}

func (eh *eventHandler) resInfo(uid *user.UserId, ref *provider.Reference) (context.Context, *provider.StatResponse, string) {
ownerCtx, err := getAuthContext(&user.User{Id: uid}, eh.gateway, eh.secret, eh.logger)
if err != nil {
return nil, nil, ""
}

statRes, err := statResource(ownerCtx, ref, eh.gateway, eh.logger)
if err != nil {
return nil, nil, ""
}

r, err := ResolveReference(ownerCtx, ref, statRes.GetInfo(), eh.gateway)
if err != nil {
return nil, nil, ""
}

return ownerCtx, statRes, r.GetPath()
}
Loading

0 comments on commit b8efc99

Please sign in to comment.