diff --git a/services/hub/pkg/service/service.go b/services/hub/pkg/service/service.go index ffb98e125f9..fb4e6315911 100644 --- a/services/hub/pkg/service/service.go +++ b/services/hub/pkg/service/service.go @@ -30,12 +30,23 @@ func New(cfg *config.Config) Service { account.JWTSecret(cfg.TokenManager.JWTSecret), ), ) + + s, err := NewSSE(cfg) + if err != nil { + log.Fatal("cant initiate sse", err) + } + ch, err := eventsConsumer(cfg.Events) if err != nil { log.Fatal("cant consume events", err) } + + go s.ListenForEvents(ch) + m.Route("/hub", func(r chi.Router) { - r.Route("/sse", ServeSSE(ch, cfg)) + r.Route("/sse", func(r chi.Router) { + r.Get("/", s.ServeHTTP) + }) }) svc := Service{ diff --git a/services/hub/pkg/service/sse.go b/services/hub/pkg/service/sse.go index b08ce65bdc0..7c42142a691 100644 --- a/services/hub/pkg/service/sse.go +++ b/services/hub/pkg/service/sse.go @@ -18,67 +18,76 @@ import ( "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" - "github.com/go-chi/chi/v5" "github.com/owncloud/ocis/v2/services/hub/pkg/config" "github.com/r3labs/sse/v2" "google.golang.org/grpc/metadata" ) -// ServeSSE provides server sent events functionality -func ServeSSE(evts <-chan interface{}, cfg *config.Config) func(chi.Router) { +// SSE provides server sent events functionality +type SSE struct { + cfg *config.Config + gwc gateway.GatewayAPIClient + server *sse.Server +} + +// NewSSE returns a new SSE instance +func NewSSE(cfg *config.Config) (*SSE, error) { server := sse.New() gwc, err := pool.GetGatewayServiceClient(cfg.Reva.Address) if err != nil { - log.Fatal(err) + return nil, err } - // TODO: start multiple eventListeners? - go eventListener(evts, server, gwc, cfg) - - return func(r chi.Router) { - r.Get("/", func(w http.ResponseWriter, r *http.Request) { - u, ok := ctx.ContextGetUser(r.Context()) - if !ok { - w.WriteHeader(http.StatusInternalServerError) - return - } + return &SSE{ + cfg: cfg, + gwc: gwc, + server: server, + }, nil +} - uid := u.GetId().GetOpaqueId() - if uid == "" { - w.WriteHeader(http.StatusInternalServerError) - return - } +// ListenForEvents listens for events to inform clients about changes. Blocking. Start in seperate go routine. +func (s *SSE) ListenForEvents(evts <-chan interface{}) { + for e := range evts { + rcps, ev := s.extractDetails(e) - stream := server.CreateStream(uid) - stream.AutoReplay = false + for r := range rcps { + s.server.Publish(r, &sse.Event{Data: ev}) + } + } +} - // add stream to URL - q := r.URL.Query() - q.Set("stream", uid) - r.URL.RawQuery = q.Encode() +// ServeHTTP allows clients to subscribe to sse +func (s *SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) { + u, ok := ctx.ContextGetUser(r.Context()) + if !ok { + w.WriteHeader(http.StatusInternalServerError) + return + } - server.ServeHTTP(w, r) - }) + uid := u.GetId().GetOpaqueId() + if uid == "" { + w.WriteHeader(http.StatusInternalServerError) + return } -} + stream := s.server.CreateStream(uid) + stream.AutoReplay = false -func eventListener(evts <-chan interface{}, server *sse.Server, gwc gateway.GatewayAPIClient, cfg *config.Config) { - for e := range evts { - rcps, ev := extractDetails(e, gwc, cfg) + // add stream to URL + q := r.URL.Query() + q.Set("stream", uid) + r.URL.RawQuery = q.Encode() + + s.server.ServeHTTP(w, r) - for r := range rcps { - server.Publish(r, &sse.Event{Data: ev}) - } - } } // extracts recipients and builds event to send to client -func extractDetails(e interface{}, gwc gateway.GatewayAPIClient, cfg *config.Config) (<-chan string, []byte) { +func (s *SSE) extractDetails(e interface{}) (<-chan string, []byte) { // determining recipients can take longer. We spawn a seperate go routine to do it - ch := determineRecipients(e, gwc, cfg) + ch := s.determineRecipients(e) var event interface{} @@ -104,7 +113,7 @@ func extractDetails(e interface{}, gwc gateway.GatewayAPIClient, cfg *config.Con return ch, b } -func determineRecipients(e interface{}, gwc gateway.GatewayAPIClient, cfg *config.Config) <-chan string { +func (s *SSE) determineRecipients(e interface{}) <-chan string { ch := make(chan string) go func() { defer close(ch) @@ -122,44 +131,34 @@ func determineRecipients(e interface{}, gwc gateway.GatewayAPIClient, cfg *confi // impersonate executing user to stat the resource // FIXME: What to do if executing user is not member of the space? - ctx, err := impersonate(user.GetId(), gwc, cfg) + ctx, err := s.impersonate(user.GetId()) if err != nil { log.Println("ERROR:", err) return } - space, err := getStorageSpace(ctx, gwc, ref.GetResourceId().GetSpaceId()) + space, err := s.getStorageSpace(ctx, ref.GetResourceId().GetSpaceId()) if err != nil { log.Println("ERROR:", err) return } informed := make(map[string]struct{}) + inform := func(id string) { + if _, ok := informed[id]; ok { + return + } + ch <- id + informed[id] = struct{}{} + } // inform executing user first and foremost - ch <- user.Id.OpaqueId - informed[user.GetId().GetOpaqueId()] = struct{}{} + inform(user.GetId().GetOpaqueId()) // inform space members next var grants map[string]*provider.ResourcePermissions if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "grants", &grants); err == nil { - var ( - groups = make(map[string][]string) - grpids map[string]struct{} - ) - if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "groups", &grpids); err == nil { - for g := range grpids { - r, err := gwc.GetGroup(ctx, &group.GetGroupRequest{GroupId: &group.GroupId{OpaqueId: g}}) - if err != nil || r.GetStatus().GetCode() != rpc.Code_CODE_OK { - log.Println(err) - } - - for _, uid := range r.GetGroup().GetMembers() { - groups[g] = append(groups[g], uid.GetOpaqueId()) - } - } - - } + groups := s.resolveGroups(ctx, space) // FIXME: Which space permissions allow me to get this event? for id := range grants { @@ -169,12 +168,7 @@ func determineRecipients(e interface{}, gwc gateway.GatewayAPIClient, cfg *confi } for _, u := range users { - if _, ok := informed[u]; ok { - continue - } - - ch <- u - informed[u] = struct{}{} + inform(u) } } } @@ -184,8 +178,9 @@ func determineRecipients(e interface{}, gwc gateway.GatewayAPIClient, cfg *confi return ch } -func impersonate(userID *user.UserId, gwc gateway.GatewayAPIClient, cfg *config.Config) (context.Context, error) { - getUserResponse, err := gwc.GetUser(context.Background(), &user.GetUserRequest{ +// returns authenticated context of `userID` +func (s *SSE) impersonate(userID *user.UserId) (context.Context, error) { + getUserResponse, err := s.gwc.GetUser(context.Background(), &user.GetUserRequest{ UserId: userID, }) if err != nil { @@ -197,10 +192,10 @@ func impersonate(userID *user.UserId, gwc gateway.GatewayAPIClient, cfg *config. // Get auth context ownerCtx := revactx.ContextSetUser(context.Background(), getUserResponse.User) - authRes, err := gwc.Authenticate(ownerCtx, &gateway.AuthenticateRequest{ + authRes, err := s.gwc.Authenticate(ownerCtx, &gateway.AuthenticateRequest{ Type: "machine", ClientId: "userid:" + userID.OpaqueId, - ClientSecret: cfg.MachineAuthAPIKey, + ClientSecret: s.cfg.MachineAuthAPIKey, }) if err != nil { return nil, err @@ -212,8 +207,9 @@ func impersonate(userID *user.UserId, gwc gateway.GatewayAPIClient, cfg *config. return metadata.AppendToOutgoingContext(context.Background(), revactx.TokenHeader, authRes.Token), nil } -func getStorageSpace(ctx context.Context, gwc gateway.GatewayAPIClient, id string) (*provider.StorageSpace, error) { - resp, err := gwc.ListStorageSpaces(ctx, listStorageSpaceRequest(id)) +// calls gateway for storage space +func (s *SSE) getStorageSpace(ctx context.Context, id string) (*provider.StorageSpace, error) { + resp, err := s.gwc.ListStorageSpaces(ctx, listStorageSpaceRequest(id)) if err != nil { return nil, err } @@ -225,6 +221,29 @@ func getStorageSpace(ctx context.Context, gwc gateway.GatewayAPIClient, id strin return resp.GetStorageSpaces()[0], nil } +// resolve group members for all groups of a space +func (s *SSE) resolveGroups(ctx context.Context, space *provider.StorageSpace) map[string][]string { + groups := make(map[string][]string) + + var grpids map[string]struct{} + if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "groups", &grpids); err != nil { + return groups + } + + for g := range grpids { + r, err := s.gwc.GetGroup(ctx, &group.GetGroupRequest{GroupId: &group.GroupId{OpaqueId: g}}) + if err != nil || r.GetStatus().GetCode() != rpc.Code_CODE_OK { + continue + } + + for _, uid := range r.GetGroup().GetMembers() { + groups[g] = append(groups[g], uid.GetOpaqueId()) + } + } + + return groups +} + func listStorageSpaceRequest(id string) *provider.ListStorageSpacesRequest { return &provider.ListStorageSpacesRequest{ Filters: []*provider.ListStorageSpacesRequest_Filter{