Skip to content

Commit

Permalink
inform share recipients
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <[email protected]>
  • Loading branch information
kobergj committed Jan 3, 2023
1 parent e728339 commit 076fcc5
Showing 1 changed file with 118 additions and 77 deletions.
195 changes: 118 additions & 77 deletions services/hub/pkg/service/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/ctx"
revactx "github.com/cs3org/reva/v2/pkg/ctx"
Expand Down Expand Up @@ -46,17 +47,6 @@ func NewSSE(cfg *config.Config) (*SSE, error) {
}, nil
}

// ListenForEvents listens for events to inform clients about changes. Blocking. Start in seperate go routine.
func (s *SSE) ListenForEvents(evts <-chan interface{}) {
for e := range evts {
rcps, ev := s.extractDetails(e)

for r := range rcps {
s.server.Publish(r, &sse.Event{Data: ev})
}
}
}

// ServeHTTP allows clients to subscribe to sse
func (s *SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) {
u, ok := ctx.ContextGetUser(r.Context())
Expand All @@ -83,11 +73,23 @@ func (s *SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) {

}

// ListenForEvents listens for events to inform clients about changes. Blocking. Start in seperate go routine.
func (s *SSE) ListenForEvents(evts <-chan interface{}) {
for e := range evts {
rcps, ev := s.extractDetails(e)

for r := range rcps {
s.server.Publish(r, &sse.Event{Data: ev})
}
}
}

// extracts recipients and builds event to send to client
func (s *SSE) extractDetails(e interface{}) (<-chan string, []byte) {

// determining recipients can take longer. We spawn a seperate go routine to do it
ch := s.determineRecipients(e)
ch := make(chan string)
go s.determineRecipients(e, ch)

var event interface{}

Expand All @@ -113,69 +115,95 @@ func (s *SSE) extractDetails(e interface{}) (<-chan string, []byte) {
return ch, b
}

func (s *SSE) determineRecipients(e interface{}) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)

var (
ref *provider.Reference
user *user.User
)
switch ev := e.(type) {
case events.UploadReady:
ref = ev.FileRef
user = ev.ExecutingUser
func (s *SSE) determineRecipients(e interface{}, ch chan<- string) {
defer close(ch)

}
var (
ref *provider.Reference
user *user.User
)
switch ev := e.(type) {
case events.UploadReady:
ref = ev.FileRef
user = ev.ExecutingUser
}

// impersonate executing user to stat the resource
// FIXME: What to do if executing user is not member of the space?
ctx, err := s.impersonate(user.GetId())
if err != nil {
log.Println("ERROR:", err)
return
}
// impersonate executing user to stat the resource
// FIXME: What to do if executing user is not member of the space?
ctx, err := s.impersonate(user.GetId())
if err != nil {
log.Println("ERROR:", err)
return
}

space, err := s.getStorageSpace(ctx, ref.GetResourceId().GetSpaceId())
if err != nil {
log.Println("ERROR:", err)
return
}
space, err := s.getStorageSpace(ctx, ref.GetResourceId().GetSpaceId())
if err != nil {
log.Println("ERROR:", err)
return
}

informed := make(map[string]struct{})
inform := func(id string) {
informed := make(map[string]struct{})
inform := func(users ...string) {
for _, id := range users {
if _, ok := informed[id]; ok {
return
continue
}
ch <- id
informed[id] = struct{}{}
}
}

// inform executing user first and foremost
inform(user.GetId().GetOpaqueId())

// inform space members next
var grants map[string]*provider.ResourcePermissions
if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "grants", &grants); err == nil {
groups := s.resolveGroups(ctx, space)
// inform executing user first and foremost
inform(user.GetId().GetOpaqueId())

// FIXME: Which space permissions allow me to get this event?
for id := range grants {
users, ok := groups[id]
if !ok {
users = []string{id}
}
// inform space members next
var grants map[string]*provider.ResourcePermissions
if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "grants", &grants); err == nil {
groups := s.resolveGroups(ctx, space)

for _, u := range users {
inform(u)
}
// FIXME: Which space permissions allow me to get this event?
for id := range grants {
users, ok := groups[id]
if !ok {
users = []string{id}
}

inform(users...)
}
}

// inform share recipients also
// TODO: How to get all shares pointing to the resource?
resp, err := s.gwc.ListShares(ctx, listSharesRequest(ref.GetResourceId()))
if err != nil || resp.GetStatus().GetCode() != rpc.Code_CODE_OK {
log.Println("ERROR:", err, resp.GetStatus().GetCode())
return
}

for _, share := range resp.GetShares() {
users := []string{share.GetGrantee().GetUserId().GetOpaqueId()}
if users[0] == "" {
users = s.getGroupMembers(ctx, share.GetGrantee().GetGroupId().GetOpaqueId())
}

// TODO: inform share recipients
}()
return ch
inform(users...)
}
}

// resolve group members for all groups of a space
func (s *SSE) resolveGroups(ctx context.Context, space *provider.StorageSpace) map[string][]string {
groups := make(map[string][]string)

var grpids map[string]struct{}
if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "groups", &grpids); err != nil {
return groups
}

for g := range grpids {
groups[g] = append(groups[g], s.getGroupMembers(ctx, g)...)
}

return groups
}

// returns authenticated context of `userID`
Expand Down Expand Up @@ -221,27 +249,19 @@ func (s *SSE) getStorageSpace(ctx context.Context, id string) (*provider.Storage
return resp.GetStorageSpaces()[0], nil
}

// resolve group members for all groups of a space
func (s *SSE) resolveGroups(ctx context.Context, space *provider.StorageSpace) map[string][]string {
groups := make(map[string][]string)

var grpids map[string]struct{}
if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "groups", &grpids); err != nil {
return groups
// calls gateway for group members
func (s *SSE) getGroupMembers(ctx context.Context, id string) []string {
var members []string
r, err := s.gwc.GetGroup(ctx, getGroupRequest(id))
if err != nil || r.GetStatus().GetCode() != rpc.Code_CODE_OK {
log.Println("ERROR", err, r.GetStatus().GetCode())
}

for g := range grpids {
r, err := s.gwc.GetGroup(ctx, &group.GetGroupRequest{GroupId: &group.GroupId{OpaqueId: g}})
if err != nil || r.GetStatus().GetCode() != rpc.Code_CODE_OK {
continue
}

for _, uid := range r.GetGroup().GetMembers() {
groups[g] = append(groups[g], uid.GetOpaqueId())
}
for _, uid := range r.GetGroup().GetMembers() {
members = append(members, uid.GetOpaqueId())
}

return groups
return members
}

func listStorageSpaceRequest(id string) *provider.ListStorageSpacesRequest {
Expand All @@ -258,3 +278,24 @@ func listStorageSpaceRequest(id string) *provider.ListStorageSpacesRequest {
},
}
}

func getGroupRequest(id string) *group.GetGroupRequest {
return &group.GetGroupRequest{
GroupId: &group.GroupId{
OpaqueId: id,
},
}
}

func listSharesRequest(id *provider.ResourceId) *collaboration.ListSharesRequest {
return &collaboration.ListSharesRequest{
Filters: []*collaboration.Filter{
{
Type: collaboration.Filter_TYPE_RESOURCE_ID,
Term: &collaboration.Filter_ResourceId{
ResourceId: id,
},
},
},
}
}

0 comments on commit 076fcc5

Please sign in to comment.