diff --git a/services/userlog/pkg/command/server.go b/services/userlog/pkg/command/server.go index 6002bf4f3e5..3ec49220cdb 100644 --- a/services/userlog/pkg/command/server.go +++ b/services/userlog/pkg/command/server.go @@ -81,7 +81,7 @@ func Server(cfg *config.Config) *cli.Command { defer cancel() - consumer, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) + stream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) if err != nil { return err } @@ -121,7 +121,7 @@ func Server(cfg *config.Config) *cli.Command { http.Config(cfg), http.Metrics(mtrcs), http.Store(st), - http.Consumer(consumer), + http.Stream(stream), http.GatewaySelector(gatewaySelector), http.History(hClient), http.Value(vClient), diff --git a/services/userlog/pkg/config/config.go b/services/userlog/pkg/config/config.go index 11e81ccab65..782fb079172 100644 --- a/services/userlog/pkg/config/config.go +++ b/services/userlog/pkg/config/config.go @@ -28,7 +28,7 @@ type Config struct { Events Events `yaml:"events"` Persistence Persistence `yaml:"persistence"` - DisableSSE bool `yaml:"disable_sse" env:"OCIS_DISABLE_SSE,USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer be able to connect to the sse endpoint."` + DisableSSE bool `yaml:"disable_sse" env:"OCIS_DISABLE_SSE,USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer receive sse notifications."` GlobalNotificationsSecret string `yaml:"global_notifications_secret" env:"USERLOG_GLOBAL_NOTIFICATIONS_SECRET" desc:"The secret to secure the global notifications endpoint. Only system admins and users knowing that secret can call the global notifications POST/DELETE endpoints."` diff --git a/services/userlog/pkg/server/http/option.go b/services/userlog/pkg/server/http/option.go index 02ffe0d6deb..44443d57bd4 100644 --- a/services/userlog/pkg/server/http/option.go +++ b/services/userlog/pkg/server/http/option.go @@ -28,7 +28,7 @@ type Options struct { Flags []cli.Flag Namespace string Store store.Store - Consumer events.Consumer + Stream events.Stream GatewaySelector pool.Selectable[gateway.GatewayAPIClient] HistoryClient ehsvc.EventHistoryService ValueClient settingssvc.ValueService @@ -97,10 +97,10 @@ func Store(store store.Store) Option { } } -// Consumer provides a function to configure the consumer -func Consumer(consumer events.Consumer) Option { +// Stream provides a function to configure the stream +func Stream(stream events.Stream) Option { return func(o *Options) { - o.Consumer = consumer + o.Stream = stream } } diff --git a/services/userlog/pkg/server/http/server.go b/services/userlog/pkg/server/http/server.go index 4adcaec018a..1766f1d566e 100644 --- a/services/userlog/pkg/server/http/server.go +++ b/services/userlog/pkg/server/http/server.go @@ -80,7 +80,7 @@ func Server(opts ...Option) (http.Service, error) { handle, err := svc.NewUserlogService( svc.Logger(options.Logger), - svc.Consumer(options.Consumer), + svc.Stream(options.Stream), svc.Mux(mux), svc.Store(options.Store), svc.Config(options.Config), diff --git a/services/userlog/pkg/service/http.go b/services/userlog/pkg/service/http.go index c689e18eb04..74167e4219f 100644 --- a/services/userlog/pkg/service/http.go +++ b/services/userlog/pkg/service/http.go @@ -91,33 +91,6 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request w.Write(b) } -// HandleSSE is the GET handler for events -func (ul *UserlogService) HandleSSE(w http.ResponseWriter, r *http.Request) { - u, ok := revactx.ContextGetUser(r.Context()) - if !ok { - ul.log.Error().Msg("sse: no user in context") - w.WriteHeader(http.StatusInternalServerError) - return - } - - uid := u.GetId().GetOpaqueId() - if uid == "" { - ul.log.Error().Msg("sse: user in context is broken") - w.WriteHeader(http.StatusInternalServerError) - return - } - - stream := ul.sse.CreateStream(uid) - stream.AutoReplay = false - - // add stream to URL - q := r.URL.Query() - q.Set("stream", uid) - r.URL.RawQuery = q.Encode() - - ul.sse.ServeHTTP(w, r) -} - // HandlePostGlobaelEvent is the POST handler for global events func (ul *UserlogService) HandlePostGlobalEvent(w http.ResponseWriter, r *http.Request) { var req PostEventsRequest diff --git a/services/userlog/pkg/service/options.go b/services/userlog/pkg/service/options.go index a4422926df8..9cd9f97ec01 100644 --- a/services/userlog/pkg/service/options.go +++ b/services/userlog/pkg/service/options.go @@ -19,7 +19,7 @@ type Option func(*Options) // Options for the userlog service type Options struct { Logger log.Logger - Consumer events.Consumer + Stream events.Stream Mux *chi.Mux Store store.Store Config *config.Config @@ -38,10 +38,10 @@ func Logger(log log.Logger) Option { } } -// Consumer configures an event consumer for the userlog service -func Consumer(c events.Consumer) Option { +// Stream configures an event stream for the userlog service +func Stream(s events.Stream) Option { return func(o *Options) { - o.Consumer = c + o.Stream = s } } diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index dd82fae18d7..7ebee777684 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -26,7 +26,6 @@ import ( settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" "github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" - "github.com/r3labs/sse/v2" micrometadata "go-micro.dev/v4/metadata" "go-micro.dev/v4/store" "go.opentelemetry.io/otel/trace" @@ -42,10 +41,10 @@ type UserlogService struct { historyClient ehsvc.EventHistoryService gatewaySelector pool.Selectable[gateway.GatewayAPIClient] valueClient settingssvc.ValueService - sse *sse.Server registeredEvents map[string]events.Unmarshaller tp trace.TracerProvider tracer trace.Tracer + publisher events.Publisher } // NewUserlogService returns an EventHistory service @@ -55,11 +54,11 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) { opt(o) } - if o.Consumer == nil || o.Store == nil { - return nil, fmt.Errorf("need non nil consumer (%v) and store (%v) to work properly", o.Consumer, o.Store) + if o.Stream == nil || o.Store == nil { + return nil, fmt.Errorf("need non nil stream (%v) and store (%v) to work properly", o.Stream, o.Store) } - ch, err := events.Consume(o.Consumer, "userlog", o.RegisteredEvents...) + ch, err := events.Consume(o.Stream, "userlog", o.RegisteredEvents...) if err != nil { return nil, err } @@ -75,10 +74,7 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) { registeredEvents: make(map[string]events.Unmarshaller), tp: o.TraceProvider, tracer: o.TraceProvider.Tracer("github.com/owncloud/ocis/services/userlog/pkg/service"), - } - - if !ul.cfg.DisableSSE { - ul.sse = sse.New() + publisher: o.Stream, } for _, e := range o.RegisteredEvents { @@ -97,10 +93,6 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) { r.Delete("/", ul.HandleDeleteEvents) r.Post("/global", RequireAdminOrSecret(&m, o.Config.GlobalNotificationsSecret)(ul.HandlePostGlobalEvent)) r.Delete("/global", RequireAdminOrSecret(&m, o.Config.GlobalNotificationsSecret)(ul.HandleDeleteGlobalEvent)) - - if !ul.cfg.DisableSSE { - r.Get("/sse", ul.HandleSSE) - } }) go ul.MemorizeEvents(ch) @@ -348,8 +340,11 @@ func (ul *UserlogService) sendSSE(userid string, event events.Event) error { return err } - ul.sse.Publish(userid, &sse.Event{Data: b}) - return nil + return events.Publish(context.Background(), ul.publisher, events.SendSSE{ + UserID: userid, + Type: "userlog-notification", + Message: b, + }) } func (ul *UserlogService) removeExpiredEvents(userid string, all []string, received []*ehmsg.Event) error { diff --git a/services/userlog/pkg/service/service_test.go b/services/userlog/pkg/service/service_test.go index 6c79b5f6b35..cc34f9c0c42 100644 --- a/services/userlog/pkg/service/service_test.go +++ b/services/userlog/pkg/service/service_test.go @@ -79,7 +79,7 @@ var _ = Describe("UserlogService", func() { ul, err = service.NewUserlogService( service.Config(cfg), - service.Consumer(bus), + service.Stream(bus), service.Store(sto), service.Logger(log.NewLogger()), service.Mux(chi.NewMux()), @@ -96,9 +96,9 @@ var _ = Describe("UserlogService", func() { It("it stores, returns and deletes a couple of events", func() { ids := make(map[string]struct{}) - ids[bus.Publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{} - ids[bus.Publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{} - ids[bus.Publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{} + ids[bus.publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{} + ids[bus.publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{} + ids[bus.publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{} // ids[bus.Publish(events.SpaceMembershipExpired{SpaceOwner: &user.UserId{OpaqueId: "userid"}})] = struct{}{} // ids[bus.Publish(events.ShareCreated{Executant: &user.UserId{OpaqueId: "userid"}})] = struct{}{} @@ -156,7 +156,11 @@ func (tb testBus) Consume(_ string, _ ...microevents.ConsumeOption) (<-chan micr return ch, nil } -func (tb testBus) Publish(e interface{}) string { +func (tb testBus) Publish(_ string, _ interface{}, _ ...microevents.PublishOption) error { + return nil +} + +func (tb testBus) publish(e interface{}) string { ev := events.Event{ ID: uuid.New().String(), Type: reflect.TypeOf(e).String(),