diff --git a/services/hub/pkg/service/sse.go b/services/hub/pkg/service/sse.go index 7c42142a691..b8c7c988773 100644 --- a/services/hub/pkg/service/sse.go +++ b/services/hub/pkg/service/sse.go @@ -11,6 +11,7 @@ import ( group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/ctx" revactx "github.com/cs3org/reva/v2/pkg/ctx" @@ -46,17 +47,6 @@ func NewSSE(cfg *config.Config) (*SSE, error) { }, nil } -// ListenForEvents listens for events to inform clients about changes. Blocking. Start in seperate go routine. -func (s *SSE) ListenForEvents(evts <-chan interface{}) { - for e := range evts { - rcps, ev := s.extractDetails(e) - - for r := range rcps { - s.server.Publish(r, &sse.Event{Data: ev}) - } - } -} - // ServeHTTP allows clients to subscribe to sse func (s *SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) { u, ok := ctx.ContextGetUser(r.Context()) @@ -83,11 +73,23 @@ func (s *SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) { } +// ListenForEvents listens for events to inform clients about changes. Blocking. Start in seperate go routine. +func (s *SSE) ListenForEvents(evts <-chan interface{}) { + for e := range evts { + rcps, ev := s.extractDetails(e) + + for r := range rcps { + s.server.Publish(r, &sse.Event{Data: ev}) + } + } +} + // extracts recipients and builds event to send to client func (s *SSE) extractDetails(e interface{}) (<-chan string, []byte) { // determining recipients can take longer. We spawn a seperate go routine to do it - ch := s.determineRecipients(e) + ch := make(chan string) + go s.determineRecipients(e, ch) var event interface{} @@ -113,69 +115,95 @@ func (s *SSE) extractDetails(e interface{}) (<-chan string, []byte) { return ch, b } -func (s *SSE) determineRecipients(e interface{}) <-chan string { - ch := make(chan string) - go func() { - defer close(ch) - - var ( - ref *provider.Reference - user *user.User - ) - switch ev := e.(type) { - case events.UploadReady: - ref = ev.FileRef - user = ev.ExecutingUser +func (s *SSE) determineRecipients(e interface{}, ch chan<- string) { + defer close(ch) - } + var ( + ref *provider.Reference + user *user.User + ) + switch ev := e.(type) { + case events.UploadReady: + ref = ev.FileRef + user = ev.ExecutingUser + } - // impersonate executing user to stat the resource - // FIXME: What to do if executing user is not member of the space? - ctx, err := s.impersonate(user.GetId()) - if err != nil { - log.Println("ERROR:", err) - return - } + // impersonate executing user to stat the resource + // FIXME: What to do if executing user is not member of the space? + ctx, err := s.impersonate(user.GetId()) + if err != nil { + log.Println("ERROR:", err) + return + } - space, err := s.getStorageSpace(ctx, ref.GetResourceId().GetSpaceId()) - if err != nil { - log.Println("ERROR:", err) - return - } + space, err := s.getStorageSpace(ctx, ref.GetResourceId().GetSpaceId()) + if err != nil { + log.Println("ERROR:", err) + return + } - informed := make(map[string]struct{}) - inform := func(id string) { + informed := make(map[string]struct{}) + inform := func(users ...string) { + for _, id := range users { if _, ok := informed[id]; ok { - return + continue } ch <- id informed[id] = struct{}{} } + } - // inform executing user first and foremost - inform(user.GetId().GetOpaqueId()) - - // inform space members next - var grants map[string]*provider.ResourcePermissions - if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "grants", &grants); err == nil { - groups := s.resolveGroups(ctx, space) + // inform executing user first and foremost + inform(user.GetId().GetOpaqueId()) - // FIXME: Which space permissions allow me to get this event? - for id := range grants { - users, ok := groups[id] - if !ok { - users = []string{id} - } + // inform space members next + var grants map[string]*provider.ResourcePermissions + if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "grants", &grants); err == nil { + groups := s.resolveGroups(ctx, space) - for _, u := range users { - inform(u) - } + // FIXME: Which space permissions allow me to get this event? + for id := range grants { + users, ok := groups[id] + if !ok { + users = []string{id} } + + inform(users...) + } + } + + // inform share recipients also + // TODO: How to get all shares pointing to the resource? + resp, err := s.gwc.ListShares(ctx, listSharesRequest(ref.GetResourceId())) + if err != nil || resp.GetStatus().GetCode() != rpc.Code_CODE_OK { + log.Println("ERROR:", err, resp.GetStatus().GetCode()) + return + } + + for _, share := range resp.GetShares() { + users := []string{share.GetGrantee().GetUserId().GetOpaqueId()} + if users[0] == "" { + users = s.getGroupMembers(ctx, share.GetGrantee().GetGroupId().GetOpaqueId()) } - // TODO: inform share recipients - }() - return ch + inform(users...) + } +} + +// resolve group members for all groups of a space +func (s *SSE) resolveGroups(ctx context.Context, space *provider.StorageSpace) map[string][]string { + groups := make(map[string][]string) + + var grpids map[string]struct{} + if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "groups", &grpids); err != nil { + return groups + } + + for g := range grpids { + groups[g] = append(groups[g], s.getGroupMembers(ctx, g)...) + } + + return groups } // returns authenticated context of `userID` @@ -221,27 +249,19 @@ func (s *SSE) getStorageSpace(ctx context.Context, id string) (*provider.Storage return resp.GetStorageSpaces()[0], nil } -// resolve group members for all groups of a space -func (s *SSE) resolveGroups(ctx context.Context, space *provider.StorageSpace) map[string][]string { - groups := make(map[string][]string) - - var grpids map[string]struct{} - if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "groups", &grpids); err != nil { - return groups +// calls gateway for group members +func (s *SSE) getGroupMembers(ctx context.Context, id string) []string { + var members []string + r, err := s.gwc.GetGroup(ctx, getGroupRequest(id)) + if err != nil || r.GetStatus().GetCode() != rpc.Code_CODE_OK { + log.Println("ERROR", err, r.GetStatus().GetCode()) } - for g := range grpids { - r, err := s.gwc.GetGroup(ctx, &group.GetGroupRequest{GroupId: &group.GroupId{OpaqueId: g}}) - if err != nil || r.GetStatus().GetCode() != rpc.Code_CODE_OK { - continue - } - - for _, uid := range r.GetGroup().GetMembers() { - groups[g] = append(groups[g], uid.GetOpaqueId()) - } + for _, uid := range r.GetGroup().GetMembers() { + members = append(members, uid.GetOpaqueId()) } - return groups + return members } func listStorageSpaceRequest(id string) *provider.ListStorageSpacesRequest { @@ -258,3 +278,24 @@ func listStorageSpaceRequest(id string) *provider.ListStorageSpacesRequest { }, } } + +func getGroupRequest(id string) *group.GetGroupRequest { + return &group.GetGroupRequest{ + GroupId: &group.GroupId{ + OpaqueId: id, + }, + } +} + +func listSharesRequest(id *provider.ResourceId) *collaboration.ListSharesRequest { + return &collaboration.ListSharesRequest{ + Filters: []*collaboration.Filter{ + { + Type: collaboration.Filter_TYPE_RESOURCE_ID, + Term: &collaboration.Filter_ResourceId{ + ResourceId: id, + }, + }, + }, + } +}