Skip to content

Commit

Permalink
refactor sse into struct
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <[email protected]>
  • Loading branch information
kobergj committed Jan 3, 2023
1 parent 655c786 commit e728339
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 72 deletions.
13 changes: 12 additions & 1 deletion services/hub/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,23 @@ func New(cfg *config.Config) Service {
account.JWTSecret(cfg.TokenManager.JWTSecret),
),
)

s, err := NewSSE(cfg)
if err != nil {
log.Fatal("cant initiate sse", err)
}

ch, err := eventsConsumer(cfg.Events)
if err != nil {
log.Fatal("cant consume events", err)
}

go s.ListenForEvents(ch)

m.Route("/hub", func(r chi.Router) {
r.Route("/sse", ServeSSE(ch, cfg))
r.Route("/sse", func(r chi.Router) {
r.Get("/", s.ServeHTTP)
})
})

svc := Service{
Expand Down
161 changes: 90 additions & 71 deletions services/hub/pkg/service/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,67 +18,76 @@ import (
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/go-chi/chi/v5"
"github.com/owncloud/ocis/v2/services/hub/pkg/config"
"github.com/r3labs/sse/v2"
"google.golang.org/grpc/metadata"
)

// ServeSSE provides server sent events functionality
func ServeSSE(evts <-chan interface{}, cfg *config.Config) func(chi.Router) {
// SSE provides server sent events functionality
type SSE struct {
cfg *config.Config
gwc gateway.GatewayAPIClient
server *sse.Server
}

// NewSSE returns a new SSE instance
func NewSSE(cfg *config.Config) (*SSE, error) {
server := sse.New()

gwc, err := pool.GetGatewayServiceClient(cfg.Reva.Address)
if err != nil {
log.Fatal(err)
return nil, err
}

// TODO: start multiple eventListeners?
go eventListener(evts, server, gwc, cfg)

return func(r chi.Router) {
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
u, ok := ctx.ContextGetUser(r.Context())
if !ok {
w.WriteHeader(http.StatusInternalServerError)
return
}
return &SSE{
cfg: cfg,
gwc: gwc,
server: server,
}, nil
}

uid := u.GetId().GetOpaqueId()
if uid == "" {
w.WriteHeader(http.StatusInternalServerError)
return
}
// ListenForEvents listens for events to inform clients about changes. Blocking. Start in seperate go routine.
func (s *SSE) ListenForEvents(evts <-chan interface{}) {
for e := range evts {
rcps, ev := s.extractDetails(e)

stream := server.CreateStream(uid)
stream.AutoReplay = false
for r := range rcps {
s.server.Publish(r, &sse.Event{Data: ev})
}
}
}

// add stream to URL
q := r.URL.Query()
q.Set("stream", uid)
r.URL.RawQuery = q.Encode()
// ServeHTTP allows clients to subscribe to sse
func (s *SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) {
u, ok := ctx.ContextGetUser(r.Context())
if !ok {
w.WriteHeader(http.StatusInternalServerError)
return
}

server.ServeHTTP(w, r)
})
uid := u.GetId().GetOpaqueId()
if uid == "" {
w.WriteHeader(http.StatusInternalServerError)
return
}

}
stream := s.server.CreateStream(uid)
stream.AutoReplay = false

func eventListener(evts <-chan interface{}, server *sse.Server, gwc gateway.GatewayAPIClient, cfg *config.Config) {
for e := range evts {
rcps, ev := extractDetails(e, gwc, cfg)
// add stream to URL
q := r.URL.Query()
q.Set("stream", uid)
r.URL.RawQuery = q.Encode()

s.server.ServeHTTP(w, r)

for r := range rcps {
server.Publish(r, &sse.Event{Data: ev})
}
}
}

// extracts recipients and builds event to send to client
func extractDetails(e interface{}, gwc gateway.GatewayAPIClient, cfg *config.Config) (<-chan string, []byte) {
func (s *SSE) extractDetails(e interface{}) (<-chan string, []byte) {

// determining recipients can take longer. We spawn a seperate go routine to do it
ch := determineRecipients(e, gwc, cfg)
ch := s.determineRecipients(e)

var event interface{}

Expand All @@ -104,7 +113,7 @@ func extractDetails(e interface{}, gwc gateway.GatewayAPIClient, cfg *config.Con
return ch, b
}

func determineRecipients(e interface{}, gwc gateway.GatewayAPIClient, cfg *config.Config) <-chan string {
func (s *SSE) determineRecipients(e interface{}) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
Expand All @@ -122,44 +131,34 @@ func determineRecipients(e interface{}, gwc gateway.GatewayAPIClient, cfg *confi

// impersonate executing user to stat the resource
// FIXME: What to do if executing user is not member of the space?
ctx, err := impersonate(user.GetId(), gwc, cfg)
ctx, err := s.impersonate(user.GetId())
if err != nil {
log.Println("ERROR:", err)
return
}

space, err := getStorageSpace(ctx, gwc, ref.GetResourceId().GetSpaceId())
space, err := s.getStorageSpace(ctx, ref.GetResourceId().GetSpaceId())
if err != nil {
log.Println("ERROR:", err)
return
}

informed := make(map[string]struct{})
inform := func(id string) {
if _, ok := informed[id]; ok {
return
}
ch <- id
informed[id] = struct{}{}
}

// inform executing user first and foremost
ch <- user.Id.OpaqueId
informed[user.GetId().GetOpaqueId()] = struct{}{}
inform(user.GetId().GetOpaqueId())

// inform space members next
var grants map[string]*provider.ResourcePermissions
if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "grants", &grants); err == nil {
var (
groups = make(map[string][]string)
grpids map[string]struct{}
)
if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "groups", &grpids); err == nil {
for g := range grpids {
r, err := gwc.GetGroup(ctx, &group.GetGroupRequest{GroupId: &group.GroupId{OpaqueId: g}})
if err != nil || r.GetStatus().GetCode() != rpc.Code_CODE_OK {
log.Println(err)
}

for _, uid := range r.GetGroup().GetMembers() {
groups[g] = append(groups[g], uid.GetOpaqueId())
}
}

}
groups := s.resolveGroups(ctx, space)

// FIXME: Which space permissions allow me to get this event?
for id := range grants {
Expand All @@ -169,12 +168,7 @@ func determineRecipients(e interface{}, gwc gateway.GatewayAPIClient, cfg *confi
}

for _, u := range users {
if _, ok := informed[u]; ok {
continue
}

ch <- u
informed[u] = struct{}{}
inform(u)
}
}
}
Expand All @@ -184,8 +178,9 @@ func determineRecipients(e interface{}, gwc gateway.GatewayAPIClient, cfg *confi
return ch
}

func impersonate(userID *user.UserId, gwc gateway.GatewayAPIClient, cfg *config.Config) (context.Context, error) {
getUserResponse, err := gwc.GetUser(context.Background(), &user.GetUserRequest{
// returns authenticated context of `userID`
func (s *SSE) impersonate(userID *user.UserId) (context.Context, error) {
getUserResponse, err := s.gwc.GetUser(context.Background(), &user.GetUserRequest{
UserId: userID,
})
if err != nil {
Expand All @@ -197,10 +192,10 @@ func impersonate(userID *user.UserId, gwc gateway.GatewayAPIClient, cfg *config.

// Get auth context
ownerCtx := revactx.ContextSetUser(context.Background(), getUserResponse.User)
authRes, err := gwc.Authenticate(ownerCtx, &gateway.AuthenticateRequest{
authRes, err := s.gwc.Authenticate(ownerCtx, &gateway.AuthenticateRequest{
Type: "machine",
ClientId: "userid:" + userID.OpaqueId,
ClientSecret: cfg.MachineAuthAPIKey,
ClientSecret: s.cfg.MachineAuthAPIKey,
})
if err != nil {
return nil, err
Expand All @@ -212,8 +207,9 @@ func impersonate(userID *user.UserId, gwc gateway.GatewayAPIClient, cfg *config.
return metadata.AppendToOutgoingContext(context.Background(), revactx.TokenHeader, authRes.Token), nil
}

func getStorageSpace(ctx context.Context, gwc gateway.GatewayAPIClient, id string) (*provider.StorageSpace, error) {
resp, err := gwc.ListStorageSpaces(ctx, listStorageSpaceRequest(id))
// calls gateway for storage space
func (s *SSE) getStorageSpace(ctx context.Context, id string) (*provider.StorageSpace, error) {
resp, err := s.gwc.ListStorageSpaces(ctx, listStorageSpaceRequest(id))
if err != nil {
return nil, err
}
Expand All @@ -225,6 +221,29 @@ func getStorageSpace(ctx context.Context, gwc gateway.GatewayAPIClient, id strin
return resp.GetStorageSpaces()[0], nil
}

// resolve group members for all groups of a space
func (s *SSE) resolveGroups(ctx context.Context, space *provider.StorageSpace) map[string][]string {
groups := make(map[string][]string)

var grpids map[string]struct{}
if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "groups", &grpids); err != nil {
return groups
}

for g := range grpids {
r, err := s.gwc.GetGroup(ctx, &group.GetGroupRequest{GroupId: &group.GroupId{OpaqueId: g}})
if err != nil || r.GetStatus().GetCode() != rpc.Code_CODE_OK {
continue
}

for _, uid := range r.GetGroup().GetMembers() {
groups[g] = append(groups[g], uid.GetOpaqueId())
}
}

return groups
}

func listStorageSpaceRequest(id string) *provider.ListStorageSpacesRequest {
return &provider.ListStorageSpacesRequest{
Filters: []*provider.ListStorageSpacesRequest_Filter{
Expand Down

0 comments on commit e728339

Please sign in to comment.