Skip to content

Commit

Permalink
adjust userlog service
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <[email protected]>
  • Loading branch information
kobergj committed Aug 29, 2023
1 parent 1bfdc43 commit 91176db
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 59 deletions.
4 changes: 2 additions & 2 deletions services/userlog/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func Server(cfg *config.Config) *cli.Command {

defer cancel()

consumer, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
stream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func Server(cfg *config.Config) *cli.Command {
http.Config(cfg),
http.Metrics(mtrcs),
http.Store(st),
http.Consumer(consumer),
http.Stream(stream),
http.GatewaySelector(gatewaySelector),
http.History(hClient),
http.Value(vClient),
Expand Down
2 changes: 1 addition & 1 deletion services/userlog/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Config struct {
Events Events `yaml:"events"`
Persistence Persistence `yaml:"persistence"`

DisableSSE bool `yaml:"disable_sse" env:"OCIS_DISABLE_SSE,USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer be able to connect to the sse endpoint."`
DisableSSE bool `yaml:"disable_sse" env:"OCIS_DISABLE_SSE,USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer receive sse notifications."`

GlobalNotificationsSecret string `yaml:"global_notifications_secret" env:"USERLOG_GLOBAL_NOTIFICATIONS_SECRET" desc:"The secret to secure the global notifications endpoint. Only system admins and users knowing that secret can call the global notifications POST/DELETE endpoints."`

Expand Down
8 changes: 4 additions & 4 deletions services/userlog/pkg/server/http/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Options struct {
Flags []cli.Flag
Namespace string
Store store.Store
Consumer events.Consumer
Stream events.Stream
GatewaySelector pool.Selectable[gateway.GatewayAPIClient]
HistoryClient ehsvc.EventHistoryService
ValueClient settingssvc.ValueService
Expand Down Expand Up @@ -97,10 +97,10 @@ func Store(store store.Store) Option {
}
}

// Consumer provides a function to configure the consumer
func Consumer(consumer events.Consumer) Option {
// Stream provides a function to configure the stream
func Stream(stream events.Stream) Option {
return func(o *Options) {
o.Consumer = consumer
o.Stream = stream
}
}

Expand Down
2 changes: 1 addition & 1 deletion services/userlog/pkg/server/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func Server(opts ...Option) (http.Service, error) {

handle, err := svc.NewUserlogService(
svc.Logger(options.Logger),
svc.Consumer(options.Consumer),
svc.Stream(options.Stream),
svc.Mux(mux),
svc.Store(options.Store),
svc.Config(options.Config),
Expand Down
27 changes: 0 additions & 27 deletions services/userlog/pkg/service/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,33 +91,6 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
w.Write(b)
}

// HandleSSE is the GET handler for events
func (ul *UserlogService) HandleSSE(w http.ResponseWriter, r *http.Request) {
u, ok := revactx.ContextGetUser(r.Context())
if !ok {
ul.log.Error().Msg("sse: no user in context")
w.WriteHeader(http.StatusInternalServerError)
return
}

uid := u.GetId().GetOpaqueId()
if uid == "" {
ul.log.Error().Msg("sse: user in context is broken")
w.WriteHeader(http.StatusInternalServerError)
return
}

stream := ul.sse.CreateStream(uid)
stream.AutoReplay = false

// add stream to URL
q := r.URL.Query()
q.Set("stream", uid)
r.URL.RawQuery = q.Encode()

ul.sse.ServeHTTP(w, r)
}

// HandlePostGlobaelEvent is the POST handler for global events
func (ul *UserlogService) HandlePostGlobalEvent(w http.ResponseWriter, r *http.Request) {
var req PostEventsRequest
Expand Down
8 changes: 4 additions & 4 deletions services/userlog/pkg/service/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Option func(*Options)
// Options for the userlog service
type Options struct {
Logger log.Logger
Consumer events.Consumer
Stream events.Stream
Mux *chi.Mux
Store store.Store
Config *config.Config
Expand All @@ -38,10 +38,10 @@ func Logger(log log.Logger) Option {
}
}

// Consumer configures an event consumer for the userlog service
func Consumer(c events.Consumer) Option {
// Stream configures an event stream for the userlog service
func Stream(s events.Stream) Option {
return func(o *Options) {
o.Consumer = c
o.Stream = s
}
}

Expand Down
25 changes: 10 additions & 15 deletions services/userlog/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/r3labs/sse/v2"
micrometadata "go-micro.dev/v4/metadata"
"go-micro.dev/v4/store"
"go.opentelemetry.io/otel/trace"
Expand All @@ -42,10 +41,10 @@ type UserlogService struct {
historyClient ehsvc.EventHistoryService
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
valueClient settingssvc.ValueService
sse *sse.Server
registeredEvents map[string]events.Unmarshaller
tp trace.TracerProvider
tracer trace.Tracer
publisher events.Publisher
}

// NewUserlogService returns an EventHistory service
Expand All @@ -55,11 +54,11 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) {
opt(o)
}

if o.Consumer == nil || o.Store == nil {
return nil, fmt.Errorf("need non nil consumer (%v) and store (%v) to work properly", o.Consumer, o.Store)
if o.Stream == nil || o.Store == nil {
return nil, fmt.Errorf("need non nil stream (%v) and store (%v) to work properly", o.Stream, o.Store)
}

ch, err := events.Consume(o.Consumer, "userlog", o.RegisteredEvents...)
ch, err := events.Consume(o.Stream, "userlog", o.RegisteredEvents...)
if err != nil {
return nil, err
}
Expand All @@ -75,10 +74,7 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) {
registeredEvents: make(map[string]events.Unmarshaller),
tp: o.TraceProvider,
tracer: o.TraceProvider.Tracer("github.com/owncloud/ocis/services/userlog/pkg/service"),
}

if !ul.cfg.DisableSSE {
ul.sse = sse.New()
publisher: o.Stream,
}

for _, e := range o.RegisteredEvents {
Expand All @@ -97,10 +93,6 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) {
r.Delete("/", ul.HandleDeleteEvents)
r.Post("/global", RequireAdminOrSecret(&m, o.Config.GlobalNotificationsSecret)(ul.HandlePostGlobalEvent))
r.Delete("/global", RequireAdminOrSecret(&m, o.Config.GlobalNotificationsSecret)(ul.HandleDeleteGlobalEvent))

if !ul.cfg.DisableSSE {
r.Get("/sse", ul.HandleSSE)
}
})

go ul.MemorizeEvents(ch)
Expand Down Expand Up @@ -348,8 +340,11 @@ func (ul *UserlogService) sendSSE(userid string, event events.Event) error {
return err
}

ul.sse.Publish(userid, &sse.Event{Data: b})
return nil
return events.Publish(context.Background(), ul.publisher, events.SendSSE{
UserID: userid,
Type: "userlog-notification",
Message: b,
})
}

func (ul *UserlogService) removeExpiredEvents(userid string, all []string, received []*ehmsg.Event) error {
Expand Down
14 changes: 9 additions & 5 deletions services/userlog/pkg/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var _ = Describe("UserlogService", func() {

ul, err = service.NewUserlogService(
service.Config(cfg),
service.Consumer(bus),
service.Stream(bus),
service.Store(sto),
service.Logger(log.NewLogger()),
service.Mux(chi.NewMux()),
Expand All @@ -96,9 +96,9 @@ var _ = Describe("UserlogService", func() {

It("it stores, returns and deletes a couple of events", func() {
ids := make(map[string]struct{})
ids[bus.Publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{}
ids[bus.Publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{}
ids[bus.Publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{}
ids[bus.publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{}
ids[bus.publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{}
ids[bus.publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{}
// ids[bus.Publish(events.SpaceMembershipExpired{SpaceOwner: &user.UserId{OpaqueId: "userid"}})] = struct{}{}
// ids[bus.Publish(events.ShareCreated{Executant: &user.UserId{OpaqueId: "userid"}})] = struct{}{}

Expand Down Expand Up @@ -156,7 +156,11 @@ func (tb testBus) Consume(_ string, _ ...microevents.ConsumeOption) (<-chan micr
return ch, nil
}

func (tb testBus) Publish(e interface{}) string {
func (tb testBus) Publish(_ string, _ interface{}, _ ...microevents.PublishOption) error {
return nil
}

func (tb testBus) publish(e interface{}) string {
ev := events.Event{
ID: uuid.New().String(),
Type: reflect.TypeOf(e).String(),
Expand Down

0 comments on commit 91176db

Please sign in to comment.