diff --git a/services/search/pkg/search/events.go b/services/search/pkg/search/events.go index 1717657aa71..8e50d7416a5 100644 --- a/services/search/pkg/search/events.go +++ b/services/search/pkg/search/events.go @@ -1,31 +1,15 @@ package search import ( - "context" - - gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" - user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/events" - "github.com/cs3org/reva/v2/pkg/storagespace" - "github.com/cs3org/reva/v2/pkg/utils" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/search/pkg/config" - "github.com/owncloud/ocis/v2/services/search/pkg/content" - "github.com/owncloud/ocis/v2/services/search/pkg/engine" ) -type eventHandler struct { - logger log.Logger - engine engine.Engine - gateway gateway.GatewayAPIClient - extractor content.Extractor - secret string -} - // HandleEvents listens to the needed events, // it handles the whole resource indexing livecycle. -func HandleEvents(eng engine.Engine, extractor content.Extractor, gw gateway.GatewayAPIClient, bus events.Consumer, logger log.Logger, cfg *config.Config) error { +func HandleEvents(s Searcher, bus events.Consumer, logger log.Logger, cfg *config.Config) error { evts := []events.Unmarshaller{ events.ItemTrashed{}, events.ItemRestored{}, @@ -53,124 +37,48 @@ func HandleEvents(eng engine.Engine, extractor content.Extractor, gw gateway.Gat } for i := 0; i < cfg.Events.NumConsumers; i++ { - go func(eh *eventHandler, ch <-chan interface{}) { + go func(s Searcher, ch <-chan interface{}) { for e := range ch { - eh.logger.Debug().Interface("event", e).Msg("updating index") + logger.Debug().Interface("event", e).Msg("updating index") + + spaceId := func(ref *provider.Reference) *provider.ResourceId { + return &provider.ResourceId{ + StorageId: ref.GetResourceId().GetStorageId(), + SpaceId: ref.GetResourceId().GetSpaceId(), + } + } switch ev := e.(type) { case events.ItemTrashed: - eh.trashItem(ev.ID) + s.TrashItem(ev.ID) + s.IndexSpace(spaceId(ev.Ref), ev.Executant) case events.ItemMoved: - eh.moveItem(ev.Ref, ev.Executant) + s.MoveItem(ev.Ref, ev.Executant) + s.IndexSpace(spaceId(ev.Ref), ev.Executant) case events.ItemRestored: - eh.restoreItem(ev.Ref, ev.Executant) + s.RestoreItem(ev.Ref, ev.Executant) + s.IndexSpace(spaceId(ev.Ref), ev.Executant) case events.ContainerCreated: - eh.upsertItem(ev.Ref, ev.Executant) + s.IndexSpace(spaceId(ev.Ref), ev.Executant) case events.FileTouched: - eh.upsertItem(ev.Ref, ev.Executant) + s.IndexSpace(spaceId(ev.Ref), ev.Executant) case events.FileVersionRestored: - eh.upsertItem(ev.Ref, ev.Executant) - case events.FileUploaded: - eh.upsertItem(ev.Ref, ev.Executant) - case events.UploadReady: - eh.upsertItem(ev.FileRef, ev.ExecutingUser.Id) + s.IndexSpace(spaceId(ev.Ref), ev.Executant) case events.TagsAdded: - eh.upsertItem(ev.Ref, ev.Executant) + s.IndexSpace(spaceId(ev.Ref), ev.Executant) case events.TagsRemoved: - eh.upsertItem(ev.Ref, ev.Executant) + s.IndexSpace(spaceId(ev.Ref), ev.Executant) + case events.FileUploaded: + s.IndexSpace(spaceId(ev.Ref), ev.Executant) + case events.UploadReady: + s.IndexSpace(spaceId(ev.FileRef), ev.ExecutingUser.Id) } } }( - &eventHandler{ - logger: logger, - engine: eng, - secret: cfg.MachineAuthAPIKey, - gateway: gw, - extractor: extractor, - }, + s, ch, ) } return nil } - -func (eh *eventHandler) trashItem(rid *provider.ResourceId) { - err := eh.engine.Delete(storagespace.FormatResourceID(*rid)) - if err != nil { - eh.logger.Error().Err(err).Interface("Id", rid).Msg("failed to remove item from index") - } -} - -func (eh *eventHandler) upsertItem(ref *provider.Reference, uid *user.UserId) { - ctx, stat, path := eh.resInfo(uid, ref) - if ctx == nil || stat == nil || path == "" { - return - } - - doc, err := eh.extractor.Extract(ctx, stat.Info) - if err != nil { - eh.logger.Error().Err(err).Msg("failed to extract resource content") - return - } - - r := engine.Resource{ - ID: storagespace.FormatResourceID(*stat.Info.Id), - RootID: storagespace.FormatResourceID(provider.ResourceId{ - StorageId: stat.Info.Id.StorageId, - OpaqueId: stat.Info.Id.SpaceId, - SpaceId: stat.Info.Id.SpaceId, - }), - ParentID: storagespace.FormatResourceID(*stat.GetInfo().GetParentId()), - Path: utils.MakeRelativePath(path), - Type: uint64(stat.Info.Type), - Document: doc, - } - - if err = eh.engine.Upsert(r.ID, r); err != nil { - eh.logger.Error().Err(err).Msg("error adding updating the resource in the index") - } else { - logDocCount(eh.engine, eh.logger) - } -} - -func (eh *eventHandler) restoreItem(ref *provider.Reference, uid *user.UserId) { - ctx, stat, path := eh.resInfo(uid, ref) - if ctx == nil || stat == nil || path == "" { - return - } - - if err := eh.engine.Restore(storagespace.FormatResourceID(*stat.Info.Id)); err != nil { - eh.logger.Error().Err(err).Msg("failed to restore the changed resource in the index") - } -} - -func (eh *eventHandler) moveItem(ref *provider.Reference, uid *user.UserId) { - ctx, stat, path := eh.resInfo(uid, ref) - if ctx == nil || stat == nil || path == "" { - return - } - - if err := eh.engine.Move(storagespace.FormatResourceID(*stat.GetInfo().GetId()), storagespace.FormatResourceID(*stat.GetInfo().GetParentId()), path); err != nil { - eh.logger.Error().Err(err).Msg("failed to move the changed resource in the index") - } -} - -func (eh *eventHandler) resInfo(uid *user.UserId, ref *provider.Reference) (context.Context, *provider.StatResponse, string) { - ownerCtx, err := getAuthContext(&user.User{Id: uid}, eh.gateway, eh.secret, eh.logger) - if err != nil { - return nil, nil, "" - } - - statRes, err := statResource(ownerCtx, ref, eh.gateway, eh.logger) - if err != nil { - return nil, nil, "" - } - - r, err := ResolveReference(ownerCtx, ref, statRes.GetInfo(), eh.gateway) - if err != nil { - return nil, nil, "" - } - - return ownerCtx, statRes, r.GetPath() -} diff --git a/services/search/pkg/search/events_test.go b/services/search/pkg/search/events_test.go index 3f98c3e17ce..02f1f61ce38 100644 --- a/services/search/pkg/search/events_test.go +++ b/services/search/pkg/search/events_test.go @@ -1,186 +1,54 @@ package search_test import ( - "context" - - gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" - provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/events" - "github.com/cs3org/reva/v2/pkg/rgrpc/status" - cs3mocks "github.com/cs3org/reva/v2/tests/cs3mocks/mocks" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/search/pkg/config" - "github.com/owncloud/ocis/v2/services/search/pkg/content" - contentMocks "github.com/owncloud/ocis/v2/services/search/pkg/content/mocks" - engineMocks "github.com/owncloud/ocis/v2/services/search/pkg/engine/mocks" "github.com/owncloud/ocis/v2/services/search/pkg/search" + searchMocks "github.com/owncloud/ocis/v2/services/search/pkg/search/mocks" "github.com/stretchr/testify/mock" mEvents "go-micro.dev/v4/events" ) -var _ = Describe("Events", func() { - var ( - gw *cs3mocks.GatewayAPIClient - engine *engineMocks.Engine - extractor *contentMocks.Extractor - bus events.Stream - ctx context.Context +var _ = DescribeTable("eevents", + func(mcks []string, e interface{}, asyncUploads bool) { + var ( + s = &searchMocks.Searcher{} + calls int + ) - logger = log.NewLogger() - user = &userv1beta1.User{ - Id: &userv1beta1.UserId{ - OpaqueId: "user", - }, - } + bus, _ := mEvents.NewStream() - ref = &provider.Reference{ - ResourceId: &provider.ResourceId{ - StorageId: "storageid", - OpaqueId: "rootopaqueid", - }, - Path: "./foo.pdf", - } - ri = &provider.ResourceInfo{ - Id: &provider.ResourceId{ - StorageId: "storageid", - OpaqueId: "opaqueid", - }, - ParentId: &provider.ResourceId{ - StorageId: "storageid", - OpaqueId: "parentopaqueid", + search.HandleEvents(s, bus, log.NewLogger(), &config.Config{ + Events: config.Events{ + AsyncUploads: asyncUploads, }, - Path: "foo.pdf", - Size: 12345, - } - ) - - BeforeEach(func() { - ctx = context.Background() - gw = &cs3mocks.GatewayAPIClient{} - engine = &engineMocks.Engine{} - bus, _ = mEvents.NewStream() - extractor = &contentMocks.Extractor{} - - _ = search.HandleEvents(engine, extractor, gw, bus, logger, &config.Config{}) - - gw.On("Authenticate", mock.Anything, mock.Anything).Return(&gateway.AuthenticateResponse{ - Status: status.NewOK(ctx), - Token: "authtoken", - }, nil) - gw.On("Stat", mock.Anything, mock.Anything).Return(&provider.StatResponse{ - Status: status.NewOK(ctx), - Info: ri, - }, nil) - gw.On("GetPath", mock.Anything, mock.Anything).Return(&provider.GetPathResponse{ - Status: status.NewOK(ctx), - Path: "", - }, nil) - engine.On("DocCount").Return(uint64(1), nil) - }) - Describe("events", func() { - It("triggers an index update when a file has been uploaded", func() { - called := false - extractor.Mock.On("Extract", mock.Anything, mock.Anything).Return(content.Document{}, nil) - engine.On("Upsert", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { - called = true - }) - - _ = events.Publish(bus, events.FileUploaded{ - Ref: ref, - Executant: user.Id, - }) - - Eventually(func() bool { - return called - }, "2s").Should(BeTrue()) - }) - - It("triggers an index update when a file has been touched", func() { - called := false - extractor.Mock.On("Extract", mock.Anything, mock.Anything, mock.Anything).Return(content.Document{}, nil) - engine.On("Upsert", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { - called = true - }) - _ = events.Publish(bus, events.FileTouched{ - Ref: ref, - Executant: user.Id, - }) - - Eventually(func() bool { - return called - }, "2s").Should(BeTrue()) - }) - - It("removes an entry from the index when the file has been deleted", func() { - called := false - gw.On("Stat", mock.Anything, mock.Anything).Return(&provider.StatResponse{ - Status: status.NewNotFound(context.Background(), ""), - }, nil) - engine.On("Delete", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - called = true - }) - - _ = events.Publish(bus, events.ItemTrashed{ - Ref: ref, - ID: ri.Id, - Executant: user.Id, - }) - - Eventually(func() bool { - return called - }, "2s").Should(BeTrue()) }) - It("indexes items when they are being restored", func() { - called := false - engine.On("Restore", mock.Anything).Return(nil).Run(func(args mock.Arguments) { - called = true - }) - - _ = events.Publish(bus, events.ItemRestored{ - Ref: ref, - Executant: user.Id, - }) - - Eventually(func() bool { - return called - }, "2s").Should(BeTrue()) - }) - - It("indexes items when a version has been restored", func() { - called := false - extractor.Mock.On("Extract", mock.Anything, mock.Anything, mock.Anything).Return(content.Document{}, nil) - engine.On("Upsert", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { - called = true - }) - - _ = events.Publish(bus, events.FileVersionRestored{ - Ref: ref, - Executant: user.Id, - }) - - Eventually(func() bool { - return called - }, "2s").Should(BeTrue()) - }) - - It("indexes items when they are being moved", func() { - called := false - engine.On("Move", mock.Anything, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { - called = true - }) - - _ = events.Publish(bus, events.ItemMoved{ - Ref: ref, - Executant: user.Id, + for _, mck := range mcks { + s.On(mck, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { + calls += 1 }) + } - Eventually(func() bool { - return called - }, "2s").Should(BeTrue()) - }) - }) -}) + err := events.Publish(bus, e) + + Expect(err).To(BeNil()) + Eventually(func() int { + return calls + }, "2s").Should(Equal(len(mcks))) + }, + Entry("ItemTrashed", []string{"TrashItem", "IndexSpace"}, events.ItemTrashed{}, false), + Entry("ItemMoved", []string{"MoveItem", "IndexSpace"}, events.ItemMoved{}, false), + Entry("ItemRestored", []string{"RestoreItem", "IndexSpace"}, events.ItemRestored{}, false), + Entry("ContainerCreated", []string{"IndexSpace"}, events.ContainerCreated{}, false), + Entry("FileTouched", []string{"IndexSpace"}, events.FileTouched{}, false), + Entry("FileVersionRestored", []string{"IndexSpace"}, events.FileVersionRestored{}, false), + Entry("TagsAdded", []string{"IndexSpace"}, events.TagsAdded{}, false), + Entry("TagsRemoved", []string{"IndexSpace"}, events.TagsRemoved{}, false), + Entry("FileUploaded", []string{"IndexSpace"}, events.FileUploaded{}, false), + Entry("UploadReady", []string{"IndexSpace"}, events.UploadReady{ExecutingUser: &userv1beta1.User{}}, true), +) diff --git a/services/search/pkg/search/mocks/Searcher.go b/services/search/pkg/search/mocks/Searcher.go new file mode 100644 index 00000000000..1c0e858aeb4 --- /dev/null +++ b/services/search/pkg/search/mocks/Searcher.go @@ -0,0 +1,91 @@ +// Code generated by mockery v2.15.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + mock "github.com/stretchr/testify/mock" + + userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + + v0 "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0" +) + +// Searcher is an autogenerated mock type for the Searcher type +type Searcher struct { + mock.Mock +} + +// IndexSpace provides a mock function with given fields: rid, uId +func (_m *Searcher) IndexSpace(rid *providerv1beta1.ResourceId, uId *userv1beta1.UserId) error { + ret := _m.Called(rid, uId) + + var r0 error + if rf, ok := ret.Get(0).(func(*providerv1beta1.ResourceId, *userv1beta1.UserId) error); ok { + r0 = rf(rid, uId) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MoveItem provides a mock function with given fields: ref, uid +func (_m *Searcher) MoveItem(ref *providerv1beta1.Reference, uid *userv1beta1.UserId) { + _m.Called(ref, uid) +} + +// RestoreItem provides a mock function with given fields: ref, uid +func (_m *Searcher) RestoreItem(ref *providerv1beta1.Reference, uid *userv1beta1.UserId) { + _m.Called(ref, uid) +} + +// Search provides a mock function with given fields: ctx, req +func (_m *Searcher) Search(ctx context.Context, req *v0.SearchRequest) (*v0.SearchResponse, error) { + ret := _m.Called(ctx, req) + + var r0 *v0.SearchResponse + if rf, ok := ret.Get(0).(func(context.Context, *v0.SearchRequest) *v0.SearchResponse); ok { + r0 = rf(ctx, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*v0.SearchResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *v0.SearchRequest) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// TrashItem provides a mock function with given fields: rid +func (_m *Searcher) TrashItem(rid *providerv1beta1.ResourceId) { + _m.Called(rid) +} + +// UpsertItem provides a mock function with given fields: ref, uid +func (_m *Searcher) UpsertItem(ref *providerv1beta1.Reference, uid *userv1beta1.UserId) { + _m.Called(ref, uid) +} + +type mockConstructorTestingTNewSearcher interface { + mock.TestingT + Cleanup(func()) +} + +// NewSearcher creates a new instance of Searcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewSearcher(t mockConstructorTestingTNewSearcher) *Searcher { + mock := &Searcher{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/services/search/pkg/search/search.go b/services/search/pkg/search/search.go index 662a14ae7ce..954ea55f546 100644 --- a/services/search/pkg/search/search.go +++ b/services/search/pkg/search/search.go @@ -3,6 +3,8 @@ package search import ( "context" "errors" + "fmt" + "strings" gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" @@ -93,3 +95,47 @@ func statResource(ctx context.Context, ref *provider.Reference, gw gateway.Gatew return res, nil } + +// NOTE: this converts CS3 to WebDAV permissions +// since conversions pkg is reva internal we have no other choice than to duplicate the logic +func convertToWebDAVPermissions(isShared, isMountpoint, isDir bool, p *provider.ResourcePermissions) string { + if p == nil { + return "" + } + var b strings.Builder + if isShared { + fmt.Fprintf(&b, "S") + } + if p.ListContainer && + p.ListFileVersions && + p.ListRecycle && + p.Stat && + p.GetPath && + p.GetQuota && + p.InitiateFileDownload { + fmt.Fprintf(&b, "R") + } + if isMountpoint { + fmt.Fprintf(&b, "M") + } + if p.Delete && + p.PurgeRecycle { + fmt.Fprintf(&b, "D") + } + if p.InitiateFileUpload && + p.RestoreFileVersion && + p.RestoreRecycleItem { + fmt.Fprintf(&b, "NV") + if !isDir { + fmt.Fprintf(&b, "W") + } + } + if isDir && + p.ListContainer && + p.Stat && + p.CreateContainer && + p.InitiateFileUpload { + fmt.Fprintf(&b, "CK") + } + return b.String() +} diff --git a/services/search/pkg/search/provider.go b/services/search/pkg/search/service.go similarity index 51% rename from services/search/pkg/search/provider.go rename to services/search/pkg/search/service.go index 157364036b5..4fa82df9965 100644 --- a/services/search/pkg/search/provider.go +++ b/services/search/pkg/search/service.go @@ -2,64 +2,41 @@ package search import ( "context" - "fmt" - "path/filepath" - "sort" - "strings" - gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" - rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" - "github.com/cs3org/reva/v2/pkg/events" sdk "github.com/cs3org/reva/v2/pkg/sdk/common" "github.com/cs3org/reva/v2/pkg/storage/utils/walker" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" "github.com/owncloud/ocis/v2/ocis-pkg/log" - "github.com/owncloud/ocis/v2/services/search/pkg/content" - "github.com/owncloud/ocis/v2/services/search/pkg/engine" - "google.golang.org/grpc/metadata" - searchmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/search/v0" searchsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0" + "github.com/owncloud/ocis/v2/services/search/pkg/content" + "github.com/owncloud/ocis/v2/services/search/pkg/engine" + "path/filepath" + "sort" + "strings" + "time" ) -// Permissions is copied from reva internal conversion pkg -type Permissions uint - -// consts are copied from reva internal conversion pkg -const ( - // PermissionInvalid represents an invalid permission - PermissionInvalid Permissions = 0 - // PermissionRead grants read permissions on a resource - PermissionRead Permissions = 1 << (iota - 1) - // PermissionWrite grants write permissions on a resource - PermissionWrite - // PermissionCreate grants create permissions on a resource - PermissionCreate - // PermissionDelete grants delete permissions on a resource - PermissionDelete - // PermissionShare grants share permissions on a resource - PermissionShare -) +//go:generate mockery --name=Searcher -// ListenEvents are the events the search service is listening to -var ListenEvents = []events.Unmarshaller{ - events.ItemTrashed{}, - events.ItemRestored{}, - events.ItemMoved{}, - events.ContainerCreated{}, - events.FileUploaded{}, - events.FileTouched{}, - events.FileVersionRestored{}, +// Searcher is the interface to the SearchService +type Searcher interface { + Search(ctx context.Context, req *searchsvc.SearchRequest) (*searchsvc.SearchResponse, error) + IndexSpace(rid *provider.ResourceId, uId *user.UserId) error + TrashItem(rid *provider.ResourceId) + UpsertItem(ref *provider.Reference, uid *user.UserId) + RestoreItem(ref *provider.Reference, uid *user.UserId) + MoveItem(ref *provider.Reference, uid *user.UserId) } -// Provider is responsible for indexing spaces and pass on a search +// SearchService is responsible for indexing spaces and pass on a search // to it's underlying engine. -type Provider struct { +type Service struct { logger log.Logger gateway gateway.GatewayAPIClient engine engine.Engine @@ -67,9 +44,9 @@ type Provider struct { secret string } -// NewProvider creates a new Provider instance. -func NewProvider(gw gateway.GatewayAPIClient, eng engine.Engine, extractor content.Extractor, logger log.Logger, secret string) *Provider { - return &Provider{ +// NewService creates a new Provider instance. +func NewService(gw gateway.GatewayAPIClient, eng engine.Engine, extractor content.Extractor, logger log.Logger, secret string) *Service { + return &Service{ gateway: gw, engine: eng, secret: secret, @@ -79,13 +56,13 @@ func NewProvider(gw gateway.GatewayAPIClient, eng engine.Engine, extractor conte } // Search processes a search request and passes it down to the engine. -func (p *Provider) Search(ctx context.Context, req *searchsvc.SearchRequest) (*searchsvc.SearchResponse, error) { +func (s *Service) Search(ctx context.Context, req *searchsvc.SearchRequest) (*searchsvc.SearchResponse, error) { if req.Query == "" { return nil, errtypes.BadRequest("empty query provided") } - p.logger.Debug().Str("query", req.Query).Msg("performing a search") + s.logger.Debug().Str("query", req.Query).Msg("performing a search") - listSpacesRes, err := p.gateway.ListStorageSpaces(ctx, &provider.ListStorageSpacesRequest{ + listSpacesRes, err := s.gateway.ListStorageSpaces(ctx, &provider.ListStorageSpacesRequest{ Filters: []*provider.ListStorageSpacesRequest_Filter{ { Type: provider.ListStorageSpacesRequest_Filter_TYPE_SPACE_TYPE, @@ -94,7 +71,7 @@ func (p *Provider) Search(ctx context.Context, req *searchsvc.SearchRequest) (*s }, }) if err != nil { - p.logger.Error().Err(err).Msg("failed to list the user's storage spaces") + s.logger.Error().Err(err).Msg("failed to list the user's storage spaces") return nil, err } @@ -142,24 +119,24 @@ func (p *Provider) Search(ctx context.Context, req *searchsvc.SearchRequest) (*s searchRootID.OpaqueId = space.Root.SpaceId mountpointID, ok := mountpointMap[space.Id.OpaqueId] if !ok { - p.logger.Warn().Interface("space", space).Msg("could not find mountpoint space for grant space") + s.logger.Warn().Interface("space", space).Msg("could not find mountpoint space for grant space") continue } - gpRes, err := p.gateway.GetPath(ctx, &provider.GetPathRequest{ + gpRes, err := s.gateway.GetPath(ctx, &provider.GetPathRequest{ ResourceId: space.Root, }) if err != nil { - p.logger.Error().Err(err).Str("space", space.Id.OpaqueId).Msg("failed to get path for grant space root") + s.logger.Error().Err(err).Str("space", space.Id.OpaqueId).Msg("failed to get path for grant space root") continue } if gpRes.Status.Code != rpcv1beta1.Code_CODE_OK { - p.logger.Error().Interface("status", gpRes.Status).Str("space", space.Id.OpaqueId).Msg("failed to get path for grant space root") + s.logger.Error().Interface("status", gpRes.Status).Str("space", space.Id.OpaqueId).Msg("failed to get path for grant space root") continue } mountpointPrefix = utils.MakeRelativePath(gpRes.Path) sid, spid, oid, err := storagespace.SplitID(mountpointID) if err != nil { - p.logger.Error().Err(err).Str("space", space.Id.OpaqueId).Str("mountpointId", mountpointID).Msg("invalid mountpoint space id") + s.logger.Error().Err(err).Str("space", space.Id.OpaqueId).Str("mountpointId", mountpointID).Msg("invalid mountpoint space id") continue } mountpointRootID = &searchmsg.ResourceID{ @@ -169,12 +146,12 @@ func (p *Provider) Search(ctx context.Context, req *searchsvc.SearchRequest) (*s } rootName = space.GetRootInfo().GetPath() permissions = space.GetRootInfo().GetPermissionSet() - p.logger.Debug().Interface("grantSpace", space).Interface("mountpointRootId", mountpointRootID).Msg("searching a grant") + s.logger.Debug().Interface("grantSpace", space).Interface("mountpointRootId", mountpointRootID).Msg("searching a grant") case "personal": permissions = space.GetRootInfo().GetPermissionSet() } - res, err := p.engine.Search(ctx, &searchsvc.SearchIndexRequest{ + res, err := s.engine.Search(ctx, &searchsvc.SearchIndexRequest{ Query: req.Query, Ref: &searchmsg.Reference{ ResourceId: searchRootID, @@ -183,10 +160,10 @@ func (p *Provider) Search(ctx context.Context, req *searchsvc.SearchRequest) (*s PageSize: req.PageSize, }) if err != nil { - p.logger.Error().Err(err).Str("space", space.Id.OpaqueId).Msg("failed to search the index") + s.logger.Error().Err(err).Str("space", space.Id.OpaqueId).Msg("failed to search the index") return nil, err } - p.logger.Debug().Str("space", space.Id.OpaqueId).Int("hits", len(res.Matches)).Msg("space search done") + s.logger.Debug().Str("space", space.Id.OpaqueId).Int("hits", len(res.Matches)).Msg("space search done") total += res.TotalMatches for _, match := range res.Matches { @@ -223,120 +200,131 @@ func (p *Provider) Search(ctx context.Context, req *searchsvc.SearchRequest) (*s } // IndexSpace (re)indexes all resources of a given space. -func (p *Provider) IndexSpace(ctx context.Context, req *searchsvc.IndexSpaceRequest) (*searchsvc.IndexSpaceResponse, error) { - // Get auth context - authRes, err := p.gateway.Authenticate(ctx, &gateway.AuthenticateRequest{ - Type: "machine", - ClientId: "userid:" + req.UserId, - ClientSecret: p.secret, - }) - if err != nil || authRes.GetStatus().GetCode() != rpc.Code_CODE_OK { - return nil, err - } - - if authRes.GetStatus().GetCode() != rpc.Code_CODE_OK { - return nil, fmt.Errorf("could not get authenticated context for user") - } - ownerCtx := ctxpkg.ContextSetUser(context.Background(), authRes.User) - ownerCtx = metadata.AppendToOutgoingContext(ownerCtx, ctxpkg.TokenHeader, authRes.Token) - - // Walk the space and index all files - w := walker.NewWalker(p.gateway) - rootID, err := storagespace.ParseID(req.SpaceId) +func (s *Service) IndexSpace(rid *provider.ResourceId, uId *user.UserId) error { + ownerCtx, err := getAuthContext(&user.User{Id: uId}, s.gateway, s.secret, s.logger) if err != nil { - p.logger.Error().Err(err).Msg(err.Error()) - return nil, err + return err } - err = w.Walk(ownerCtx, &rootID, func(wd string, info *provider.ResourceInfo, err error) error { + + w := walker.NewWalker(s.gateway) + err = w.Walk(ownerCtx, rid, func(wd string, info *provider.ResourceInfo, err error) error { if err != nil { - p.logger.Error().Err(err).Msg("error walking the tree") + s.logger.Error().Err(err).Msg("error walking the tree") + return err } - ref, err := ResolveReference(ownerCtx, &provider.Reference{ - Path: utils.MakeRelativePath(filepath.Join(wd, info.Path)), - ResourceId: &rootID, - }, info, p.gateway) - if err != nil { - p.logger.Error().Err(err).Msg("error resolving reference") + + if info == nil { return nil } - doc, err := p.extractor.Extract(ownerCtx, info) - if err != nil { - p.logger.Error().Err(err).Msg("error extracting content") + ref := &provider.Reference{ + Path: utils.MakeRelativePath(filepath.Join(wd, info.Path)), + ResourceId: rid, } + s.logger.Debug().Str("path", ref.Path).Msg("Walking tree") - var pid string - if info.ParentId != nil { - pid = storagespace.FormatResourceID(*info.ParentId) - } - r := engine.Resource{ - ID: storagespace.FormatResourceID(*info.Id), - RootID: storagespace.FormatResourceID(provider.ResourceId{ - StorageId: info.Id.StorageId, - OpaqueId: info.Id.SpaceId, - SpaceId: info.Id.SpaceId, - }), - ParentID: pid, - Path: ref.Path, - Type: uint64(info.Type), - Document: doc, - } + searchRes, err := s.engine.Search(ownerCtx, &searchsvc.SearchIndexRequest{ + Query: "+ID:" + storagespace.FormatResourceID(*info.Id) + ` +Mtime:>="` + utils.TSToTime(info.Mtime).Format(time.RFC3339Nano) + `"`, + }) - err = p.engine.Upsert(r.ID, r) - if err != nil { - p.logger.Error().Err(err).Msg("error adding resource to the index") - } else { - p.logger.Debug().Interface("ref", ref).Msg("added resource to index") + if err == nil && len(searchRes.Matches) >= 1 { + if info.Type == provider.ResourceType_RESOURCE_TYPE_CONTAINER { + s.logger.Debug().Str("path", ref.Path).Msg("subtree hasn't changed. Skipping.") + return filepath.SkipDir + } + s.logger.Debug().Str("path", ref.Path).Msg("element hasn't changed. Skipping.") + return nil } + + s.UpsertItem(ref, info.Owner) + return nil }) + + logDocCount(s.engine, s.logger) + + return err +} + +func (s *Service) TrashItem(rid *provider.ResourceId) { + err := s.engine.Delete(storagespace.FormatResourceID(*rid)) if err != nil { - return nil, err + s.logger.Error().Err(err).Interface("Id", rid).Msg("failed to remove item from index") + } +} + +func (s *Service) UpsertItem(ref *provider.Reference, uid *user.UserId) { + ctx, stat, path := s.resInfo(uid, ref) + if ctx == nil || stat == nil || path == "" { + return + } + + doc, err := s.extractor.Extract(ctx, stat.Info) + if err != nil { + s.logger.Error().Err(err).Msg("failed to extract resource content") + return + } + + r := engine.Resource{ + ID: storagespace.FormatResourceID(*stat.Info.Id), + RootID: storagespace.FormatResourceID(provider.ResourceId{ + StorageId: stat.Info.Id.StorageId, + OpaqueId: stat.Info.Id.SpaceId, + SpaceId: stat.Info.Id.SpaceId, + }), + Path: utils.MakeRelativePath(path), + Type: uint64(stat.Info.Type), + Document: doc, } - return &searchsvc.IndexSpaceResponse{}, nil + if parentId := stat.GetInfo().GetParentId(); parentId != nil { + r.ParentID = storagespace.FormatResourceID(*parentId) + } + + if err = s.engine.Upsert(r.ID, r); err != nil { + s.logger.Error().Err(err).Msg("error adding updating the resource in the index") + } else { + logDocCount(s.engine, s.logger) + } } -// NOTE: this converts CS3 to WebDAV permissions -// since conversions pkg is reva internal we have no other choice than to duplicate the logic -func convertToWebDAVPermissions(isShared, isMountpoint, isDir bool, p *provider.ResourcePermissions) string { - if p == nil { - return "" +func (s *Service) RestoreItem(ref *provider.Reference, uid *user.UserId) { + ctx, stat, path := s.resInfo(uid, ref) + if ctx == nil || stat == nil || path == "" { + return } - var b strings.Builder - if isShared { - fmt.Fprintf(&b, "S") + + if err := s.engine.Restore(storagespace.FormatResourceID(*stat.Info.Id)); err != nil { + s.logger.Error().Err(err).Msg("failed to restore the changed resource in the index") } - if p.ListContainer && - p.ListFileVersions && - p.ListRecycle && - p.Stat && - p.GetPath && - p.GetQuota && - p.InitiateFileDownload { - fmt.Fprintf(&b, "R") +} + +func (s *Service) MoveItem(ref *provider.Reference, uid *user.UserId) { + ctx, stat, path := s.resInfo(uid, ref) + if ctx == nil || stat == nil || path == "" { + return } - if isMountpoint { - fmt.Fprintf(&b, "M") + + if err := s.engine.Move(storagespace.FormatResourceID(*stat.GetInfo().GetId()), storagespace.FormatResourceID(*stat.GetInfo().GetParentId()), path); err != nil { + s.logger.Error().Err(err).Msg("failed to move the changed resource in the index") } - if p.Delete && - p.PurgeRecycle { - fmt.Fprintf(&b, "D") +} + +func (s *Service) resInfo(uid *user.UserId, ref *provider.Reference) (context.Context, *provider.StatResponse, string) { + ownerCtx, err := getAuthContext(&user.User{Id: uid}, s.gateway, s.secret, s.logger) + if err != nil { + return nil, nil, "" } - if p.InitiateFileUpload && - p.RestoreFileVersion && - p.RestoreRecycleItem { - fmt.Fprintf(&b, "NV") - if !isDir { - fmt.Fprintf(&b, "W") - } + + statRes, err := statResource(ownerCtx, ref, s.gateway, s.logger) + if err != nil { + return nil, nil, "" } - if isDir && - p.ListContainer && - p.Stat && - p.CreateContainer && - p.InitiateFileUpload { - fmt.Fprintf(&b, "CK") + + r, err := ResolveReference(ownerCtx, ref, statRes.GetInfo(), s.gateway) + if err != nil { + return nil, nil, "" } - return b.String() + + return ownerCtx, statRes, r.GetPath() } diff --git a/services/search/pkg/search/provider_test.go b/services/search/pkg/search/service_test.go similarity index 94% rename from services/search/pkg/search/provider_test.go rename to services/search/pkg/search/service_test.go index 16bd13f4059..422d23ab025 100644 --- a/services/search/pkg/search/provider_test.go +++ b/services/search/pkg/search/service_test.go @@ -23,7 +23,7 @@ import ( var _ = Describe("Searchprovider", func() { var ( - p *search.Provider + s search.Searcher extractor *contentMocks.Extractor gw *cs3mocks.GatewayAPIClient indexClient *engineMocks.Engine @@ -73,7 +73,7 @@ var _ = Describe("Searchprovider", func() { indexClient = &engineMocks.Engine{} extractor = &contentMocks.Extractor{} - p = search.NewProvider(gw, indexClient, extractor, logger, "") + s = search.NewService(gw, indexClient, extractor, logger, "") gw.On("Authenticate", mock.Anything, mock.Anything).Return(&gateway.AuthenticateResponse{ Status: status.NewOK(ctx), @@ -94,8 +94,8 @@ var _ = Describe("Searchprovider", func() { Describe("New", func() { It("returns a new instance", func() { - p := search.NewProvider(gw, indexClient, extractor, logger, "") - Expect(p).ToNot(BeNil()) + s := search.NewService(gw, indexClient, extractor, logger, "") + Expect(s).ToNot(BeNil()) }) }) @@ -107,19 +107,16 @@ var _ = Describe("Searchprovider", func() { }, nil) extractor.Mock.On("Extract", mock.Anything, mock.Anything, mock.Anything).Return(content.Document{}, nil) indexClient.On("Upsert", mock.Anything, mock.Anything).Return(nil) + indexClient.On("Search", mock.Anything, mock.Anything).Return(&searchsvc.SearchIndexResponse{}, nil) - res, err := p.IndexSpace(ctx, &searchsvc.IndexSpaceRequest{ - SpaceId: "storageid$spaceid!spaceid", - UserId: "user", - }) - Expect(err).ToNot(HaveOccurred()) - Expect(res).ToNot(BeNil()) + err := s.IndexSpace(&sprovider.ResourceId{}, user.Id) + Expect(err).ShouldNot(HaveOccurred()) }) }) Describe("Search", func() { It("fails when an empty query is given", func() { - res, err := p.Search(ctx, &searchsvc.SearchRequest{ + res, err := s.Search(ctx, &searchsvc.SearchRequest{ Query: "", }) Expect(err).To(HaveOccurred()) @@ -158,7 +155,7 @@ var _ = Describe("Searchprovider", func() { }) It("does not mess with field-based searches", func() { - _, err := p.Search(ctx, &searchsvc.SearchRequest{ + _, err := s.Search(ctx, &searchsvc.SearchRequest{ Query: "Size:<10", }) Expect(err).ToNot(HaveOccurred()) @@ -168,7 +165,7 @@ var _ = Describe("Searchprovider", func() { }) It("searches the personal user space", func() { - res, err := p.Search(ctx, &searchsvc.SearchRequest{ + res, err := s.Search(ctx, &searchsvc.SearchRequest{ Query: "foo", }) Expect(err).ToNot(HaveOccurred()) @@ -245,7 +242,7 @@ var _ = Describe("Searchprovider", func() { }, }, nil) - res, err := p.Search(ctx, &searchsvc.SearchRequest{ + res, err := s.Search(ctx, &searchsvc.SearchRequest{ Query: "Foo", }) Expect(err).ToNot(HaveOccurred()) @@ -337,7 +334,7 @@ var _ = Describe("Searchprovider", func() { }) It("considers the search Ref parameter", func() { - res, err := p.Search(ctx, &searchsvc.SearchRequest{ + res, err := s.Search(ctx, &searchsvc.SearchRequest{ Query: "foo", Ref: &searchmsg.Reference{ ResourceId: &searchmsg.ResourceID{ @@ -354,7 +351,7 @@ var _ = Describe("Searchprovider", func() { }) It("finds matches in both the personal space AND the grant", func() { - res, err := p.Search(ctx, &searchsvc.SearchRequest{ + res, err := s.Search(ctx, &searchsvc.SearchRequest{ Query: "foo", }) Expect(err).ToNot(HaveOccurred()) @@ -365,7 +362,7 @@ var _ = Describe("Searchprovider", func() { }) It("sorts and limits the combined results from all spaces", func() { - res, err := p.Search(ctx, &searchsvc.SearchRequest{ + res, err := s.Search(ctx, &searchsvc.SearchRequest{ Query: "foo", PageSize: 2, }) diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index 549887309b1..11fbbdc2aef 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -6,6 +6,7 @@ import ( "crypto/x509" "errors" "fmt" + "github.com/cs3org/reva/v2/pkg/storagespace" "os" "time" @@ -103,8 +104,10 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) return nil, teardown, err } + ss := search.NewService(gw, eng, extractor, logger, cfg.MachineAuthAPIKey) + // setup event handling - if err := search.HandleEvents(eng, extractor, gw, bus, logger, cfg); err != nil { + if err := search.HandleEvents(ss, bus, logger, cfg); err != nil { return nil, teardown, err } @@ -116,7 +119,7 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) return &Service{ id: cfg.GRPC.Namespace + "." + cfg.Service.Name, log: logger, - provider: search.NewProvider(gw, eng, extractor, logger, cfg.MachineAuthAPIKey), + searcher: ss, cache: cache, }, teardown, nil } @@ -125,7 +128,7 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) type Service struct { id string log log.Logger - provider *search.Provider + searcher search.Searcher cache *ttlcache.Cache } @@ -144,7 +147,7 @@ func (s Service) Search(ctx context.Context, in *searchsvc.SearchRequest, out *s res, ok := s.FromCache(key) if !ok { var err error - res, err = s.provider.Search(ctx, &searchsvc.SearchRequest{ + res, err = s.searcher.Search(ctx, &searchsvc.SearchRequest{ Query: in.Query, PageSize: in.PageSize, Ref: in.Ref, @@ -169,8 +172,12 @@ func (s Service) Search(ctx context.Context, in *searchsvc.SearchRequest, out *s // IndexSpace (re)indexes all resources of a given space. func (s Service) IndexSpace(ctx context.Context, in *searchsvc.IndexSpaceRequest, _ *searchsvc.IndexSpaceResponse) error { - _, err := s.provider.IndexSpace(ctx, in) - return err + rid, err := storagespace.ParseID(in.SpaceId) + if err != nil { + return err + } + + return s.searcher.IndexSpace(&rid, &user.UserId{OpaqueId: in.UserId}) } // FromCache pulls a search result from cache