From 6bcdd347d2a4baf8ab1e36889fa3954ffc6c3f4f Mon Sep 17 00:00:00 2001 From: jkoberg Date: Thu, 29 Dec 2022 15:49:28 +0100 Subject: [PATCH] sse stream per user Signed-off-by: jkoberg --- ocis-pkg/config/config.go | 2 +- .../hub/pkg/config/defaults/defaultconfig.go | 25 +- services/hub/pkg/config/parser/parse.go | 3 + services/hub/pkg/service/events.go | 11 + services/hub/pkg/service/service.go | 68 +++- services/hub/pkg/service/sse.go | 301 ++++++++++++++++-- 6 files changed, 386 insertions(+), 24 deletions(-) create mode 100644 services/hub/pkg/service/events.go diff --git a/ocis-pkg/config/config.go b/ocis-pkg/config/config.go index ff57992ab55..bb9b3489872 100644 --- a/ocis-pkg/config/config.go +++ b/ocis-pkg/config/config.go @@ -98,7 +98,7 @@ type Config struct { Policies *policies.Config `yaml:"policies"` Proxy *proxy.Config `yaml:"proxy"` Settings *settings.Config `yaml:"settings"` - Hub *hub.Config `yaml:"settings"` + Hub *hub.Config `yaml:"hub"` Sharing *sharing.Config `yaml:"sharing"` StorageSystem *storagesystem.Config `yaml:"storage_system"` StoragePublicLink *storagepublic.Config `yaml:"storage_public"` diff --git a/services/hub/pkg/config/defaults/defaultconfig.go b/services/hub/pkg/config/defaults/defaultconfig.go index d640863aeef..14d3b55cc5a 100644 --- a/services/hub/pkg/config/defaults/defaultconfig.go +++ b/services/hub/pkg/config/defaults/defaultconfig.go @@ -1,8 +1,10 @@ package defaults import ( - "github.com/owncloud/ocis/v2/services/hub/pkg/config" "strings" + + "github.com/owncloud/ocis/v2/ocis-pkg/shared" + "github.com/owncloud/ocis/v2/services/hub/pkg/config" ) // FullDefaultConfig used by doc generation @@ -24,9 +26,16 @@ func DefaultConfig() *config.Config { Namespace: "com.owncloud.web", Root: "/", }, + Reva: shared.DefaultRevaConfig(), + Events: config.Events{ + Endpoint: "127.0.0.1:9233", + Cluster: "ocis-cluster", + EnableTLS: false, + }, } } +// EnsureDefaults ensures default values func EnsureDefaults(cfg *config.Config) { if cfg.TokenManager == nil && cfg.Commons != nil && cfg.Commons.TokenManager != nil { cfg.TokenManager = &config.TokenManager{ @@ -35,8 +44,22 @@ func EnsureDefaults(cfg *config.Config) { } else if cfg.TokenManager == nil { cfg.TokenManager = &config.TokenManager{} } + + if cfg.MachineAuthAPIKey == "" && cfg.Commons != nil && cfg.Commons.MachineAuthAPIKey != "" { + cfg.MachineAuthAPIKey = cfg.Commons.MachineAuthAPIKey + } + + if cfg.Reva == nil && cfg.Commons != nil && cfg.Commons.Reva != nil { + cfg.Reva = &shared.Reva{ + Address: cfg.Commons.Reva.Address, + TLS: cfg.Commons.Reva.TLS, + } + } else if cfg.Reva == nil { + cfg.Reva = &shared.Reva{} + } } +// Sanitize saniztizes the config func Sanitize(cfg *config.Config) { // sanitize config if cfg.HTTP.Root != "/" { diff --git a/services/hub/pkg/config/parser/parse.go b/services/hub/pkg/config/parser/parse.go index 2337017bd9e..b98e42aba8c 100644 --- a/services/hub/pkg/config/parser/parse.go +++ b/services/hub/pkg/config/parser/parse.go @@ -38,5 +38,8 @@ func Validate(cfg *config.Config) error { return shared.MissingJWTTokenError(cfg.Service.Name) } + if cfg.MachineAuthAPIKey == "" { + return shared.MissingMachineAuthApiKeyError(cfg.Service.Name) + } return nil } diff --git a/services/hub/pkg/service/events.go b/services/hub/pkg/service/events.go new file mode 100644 index 00000000000..abb797b3f95 --- /dev/null +++ b/services/hub/pkg/service/events.go @@ -0,0 +1,11 @@ +package service + +// UploadReady informs an client that an upload is ready to work with +type UploadReady struct { + FileID string + SpaceID string + Filename string + Timestamp string + + Message string +} diff --git a/services/hub/pkg/service/service.go b/services/hub/pkg/service/service.go index d7166329fa8..6865087dbd1 100644 --- a/services/hub/pkg/service/service.go +++ b/services/hub/pkg/service/service.go @@ -1,15 +1,23 @@ package service import ( + "crypto/tls" + "crypto/x509" + "log" + "net/http" + "os" + + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/go-chi/chi/v5" + "github.com/go-micro/plugins/v4/events/natsjs" "github.com/owncloud/ocis/v2/ocis-pkg/account" + "github.com/owncloud/ocis/v2/ocis-pkg/crypto" opkgm "github.com/owncloud/ocis/v2/ocis-pkg/middleware" "github.com/owncloud/ocis/v2/services/hub/pkg/config" - "net/http" ) // Service defines the service handlers. - type Service struct { m *chi.Mux } @@ -23,8 +31,22 @@ func New(cfg *config.Config) Service { ), ) + s, err := NewSSE(cfg) + if err != nil { + log.Fatal("cant initiate sse", err) + } + + ch, err := eventsConsumer(cfg.Events) + if err != nil { + log.Fatal("cant consume events", err) + } + + go s.ListenForEvents(ch) + m.Route("/hub", func(r chi.Router) { - r.Route("/sse", ServeSSE) + r.Route("/sse", func(r chi.Router) { + r.Get("/", s.ServeHTTP) + }) }) svc := Service{ @@ -34,6 +56,46 @@ func New(cfg *config.Config) Service { return svc } +func eventsConsumer(evtsCfg config.Events) (<-chan events.Event, error) { + var tlsConf *tls.Config + if evtsCfg.EnableTLS { + var rootCAPool *x509.CertPool + if evtsCfg.TLSRootCACertificate != "" { + rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) + if err != nil { + return nil, err + } + + rootCAPool, err = crypto.NewCertPoolFromPEM(rootCrtFile) + if err != nil { + return nil, err + } + evtsCfg.TLSInsecure = false + } + + tlsConf = &tls.Config{ + MinVersion: tls.VersionTLS12, + InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec + RootCAs: rootCAPool, + } + } + client, err := stream.Nats( + natsjs.TLSConfig(tlsConf), + natsjs.Address(evtsCfg.Endpoint), + natsjs.ClusterID(evtsCfg.Cluster), + ) + if err != nil { + return nil, err + } + + evts, err := events.Consume(client, "hub", events.UploadReady{}) + if err != nil { + return nil, err + } + + return evts, nil +} + // ServeHTTP implements the Service interface. func (s Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.m.ServeHTTP(w, r) diff --git a/services/hub/pkg/service/sse.go b/services/hub/pkg/service/sse.go index 335c79c9ef6..dd6fe446812 100644 --- a/services/hub/pkg/service/sse.go +++ b/services/hub/pkg/service/sse.go @@ -1,38 +1,301 @@ package service import ( + "context" + "encoding/json" "fmt" + "log" + "net/http" + + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/ctx" - "github.com/go-chi/chi/v5" + revactx "github.com/cs3org/reva/v2/pkg/ctx" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" + "github.com/cs3org/reva/v2/pkg/storagespace" + "github.com/cs3org/reva/v2/pkg/utils" + "github.com/owncloud/ocis/v2/services/hub/pkg/config" "github.com/r3labs/sse/v2" - "net/http" - "time" + "google.golang.org/grpc/metadata" ) -type SSE struct{} +// SSE provides server sent events functionality +type SSE struct { + cfg *config.Config + gwc gateway.GatewayAPIClient + server *sse.Server +} -func ServeSSE(r chi.Router) { +// NewSSE returns a new SSE instance +func NewSSE(cfg *config.Config) (*SSE, error) { server := sse.New() - stream := server.CreateStream("messages") + + gwc, err := pool.GetGatewayServiceClient(cfg.Reva.Address) + if err != nil { + return nil, err + } + + return &SSE{ + cfg: cfg, + gwc: gwc, + server: server, + }, nil +} + +// ServeHTTP allows clients to subscribe to sse +func (s *SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) { + u, ok := ctx.ContextGetUser(r.Context()) + if !ok { + w.WriteHeader(http.StatusInternalServerError) + return + } + + uid := u.GetId().GetOpaqueId() + if uid == "" { + w.WriteHeader(http.StatusInternalServerError) + return + } + + stream := s.server.CreateStream(uid) stream.AutoReplay = false - r.Get("/", func(w http.ResponseWriter, r *http.Request) { - u, ok := ctx.ContextGetUser(r.Context()) - if !ok { - w.WriteHeader(http.StatusInternalServerError) - return + // add stream to URL + q := r.URL.Query() + q.Set("stream", uid) + r.URL.RawQuery = q.Encode() + + s.server.ServeHTTP(w, r) + +} + +// ListenForEvents listens for events to inform clients about changes. Blocking. Start in seperate go routine. +func (s *SSE) ListenForEvents(evts <-chan events.Event) { + for e := range evts { + rcps, ev := s.extractDetails(e.Event) + + for r := range rcps { + s.server.Publish(r, &sse.Event{Data: ev}) + } + } +} + +// extracts recipients and builds event to send to client +func (s *SSE) extractDetails(e interface{}) (<-chan string, []byte) { + + // determining recipients can take longer. We spawn a seperate go routine to do it + ch := make(chan string) + go s.determineRecipients(e, ch) + + var event interface{} + + switch ev := e.(type) { + case events.UploadReady: + t := ev.Timestamp.Format("2006-01-02 15:04:05") + id, _ := storagespace.FormatReference(ev.FileRef) + event = UploadReady{ + FileID: id, + SpaceID: ev.FileRef.GetResourceId().GetSpaceId(), + Filename: ev.Filename, + Timestamp: t, + Message: fmt.Sprintf("[%s] Hello! The file %s is ready to work with", t, ev.Filename), + } + + } + + b, err := json.Marshal(event) + if err != nil { + log.Println("ERROR:", err) + } + + return ch, b +} + +func (s *SSE) determineRecipients(e interface{}, ch chan<- string) { + defer close(ch) + + var ( + ref *provider.Reference + user *user.User + ) + switch ev := e.(type) { + case events.UploadReady: + ref = ev.FileRef + user = ev.ExecutingUser + } + + // impersonate executing user to stat the resource + // FIXME: What to do if executing user is not member of the space? + ctx, err := s.impersonate(user.GetId()) + if err != nil { + log.Println("ERROR:", err) + return + } + + space, err := s.getStorageSpace(ctx, ref.GetResourceId().GetSpaceId()) + if err != nil { + log.Println("ERROR:", err) + return + } + + informed := make(map[string]struct{}) + inform := func(users ...string) { + for _, id := range users { + if _, ok := informed[id]; ok { + continue + } + ch <- id + informed[id] = struct{}{} } + } + + // inform executing user first and foremost + inform(user.GetId().GetOpaqueId()) - go func() { - for range time.Tick(time.Second * 4) { - t := time.Now() - server.Publish("messages", &sse.Event{ - Data: []byte(fmt.Sprintf("[%s] Hello %s, new push notification from server!", t.Format("2006-01-02 15:04:05"), u.Username)), - }) + // inform space members next + var grants map[string]*provider.ResourcePermissions + if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "grants", &grants); err == nil { + groups := s.resolveGroups(ctx, space) + + // FIXME: Which space permissions allow me to get this event? + for id := range grants { + users, ok := groups[id] + if !ok { + users = []string{id} } - }() - server.ServeHTTP(w, r) + inform(users...) + } + } + + // inform share recipients also + // TODO: How to get all shares pointing to the resource? + resp, err := s.gwc.ListShares(ctx, listSharesRequest(ref.GetResourceId())) + if err != nil || resp.GetStatus().GetCode() != rpc.Code_CODE_OK { + log.Println("ERROR:", err, resp.GetStatus().GetCode()) + return + } + + for _, share := range resp.GetShares() { + users := []string{share.GetGrantee().GetUserId().GetOpaqueId()} + if users[0] == "" { + users = s.getGroupMembers(ctx, share.GetGrantee().GetGroupId().GetOpaqueId()) + } + + inform(users...) + } +} + +// resolve group members for all groups of a space +func (s *SSE) resolveGroups(ctx context.Context, space *provider.StorageSpace) map[string][]string { + groups := make(map[string][]string) + + var grpids map[string]struct{} + if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "groups", &grpids); err != nil { + return groups + } + + for g := range grpids { + groups[g] = append(groups[g], s.getGroupMembers(ctx, g)...) + } + + return groups +} + +// returns authenticated context of `userID` +func (s *SSE) impersonate(userID *user.UserId) (context.Context, error) { + getUserResponse, err := s.gwc.GetUser(context.Background(), &user.GetUserRequest{ + UserId: userID, }) + if err != nil { + return nil, err + } + if getUserResponse.Status.Code != rpc.Code_CODE_OK { + return nil, fmt.Errorf("error getting user: %s", getUserResponse.Status.Message) + } + + // Get auth context + ownerCtx := revactx.ContextSetUser(context.Background(), getUserResponse.User) + authRes, err := s.gwc.Authenticate(ownerCtx, &gateway.AuthenticateRequest{ + Type: "machine", + ClientId: "userid:" + userID.OpaqueId, + ClientSecret: s.cfg.MachineAuthAPIKey, + }) + if err != nil { + return nil, err + } + if authRes.GetStatus().GetCode() != rpc.Code_CODE_OK { + return nil, fmt.Errorf("error impersonating user: %s", authRes.Status.Message) + } + + return metadata.AppendToOutgoingContext(context.Background(), revactx.TokenHeader, authRes.Token), nil +} + +// calls gateway for storage space +func (s *SSE) getStorageSpace(ctx context.Context, id string) (*provider.StorageSpace, error) { + resp, err := s.gwc.ListStorageSpaces(ctx, listStorageSpaceRequest(id)) + if err != nil { + return nil, err + } + + if resp.GetStatus().GetCode() != rpc.Code_CODE_OK || len(resp.GetStorageSpaces()) != 1 { + return nil, fmt.Errorf("can't fetch storage space: %s", resp.GetStatus().GetCode()) + } + + return resp.GetStorageSpaces()[0], nil +} + +// calls gateway for group members +func (s *SSE) getGroupMembers(ctx context.Context, id string) []string { + var members []string + r, err := s.gwc.GetGroup(ctx, getGroupRequest(id)) + if err != nil || r.GetStatus().GetCode() != rpc.Code_CODE_OK { + log.Println("ERROR", err, r.GetStatus().GetCode()) + } + + for _, uid := range r.GetGroup().GetMembers() { + members = append(members, uid.GetOpaqueId()) + } + + return members +} + +func listStorageSpaceRequest(id string) *provider.ListStorageSpacesRequest { + return &provider.ListStorageSpacesRequest{ + Filters: []*provider.ListStorageSpacesRequest_Filter{ + { + Type: provider.ListStorageSpacesRequest_Filter_TYPE_ID, + Term: &provider.ListStorageSpacesRequest_Filter_Id{ + Id: &provider.StorageSpaceId{ + OpaqueId: id, + }, + }, + }, + }, + } +} + +func getGroupRequest(id string) *group.GetGroupRequest { + return &group.GetGroupRequest{ + GroupId: &group.GroupId{ + OpaqueId: id, + }, + } +} +func listSharesRequest(id *provider.ResourceId) *collaboration.ListSharesRequest { + return &collaboration.ListSharesRequest{ + Filters: []*collaboration.Filter{ + { + Type: collaboration.Filter_TYPE_RESOURCE_ID, + Term: &collaboration.Filter_ResourceId{ + ResourceId: id, + }, + }, + }, + } }