From 4b1e54c3d51509049ce52fd661341107e9e69ccc Mon Sep 17 00:00:00 2001 From: jkoberg Date: Mon, 3 Apr 2023 17:14:16 +0200 Subject: [PATCH] add sses to userlog Signed-off-by: jkoberg --- go.mod | 2 ++ go.sum | 5 +++ services/userlog/pkg/service/conversion.go | 42 +++++++--------------- services/userlog/pkg/service/http.go | 39 ++++++++++++++++++-- services/userlog/pkg/service/service.go | 21 +++++++---- 5 files changed, 72 insertions(+), 37 deletions(-) diff --git a/go.mod b/go.mod index 9c3f644ec96..294dd7bdd48 100644 --- a/go.mod +++ b/go.mod @@ -273,6 +273,7 @@ require ( github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/prometheus/statsd_exporter v0.22.8 // indirect + github.com/r3labs/sse/v2 v2.10.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect github.com/rivo/uniseg v0.4.2 // indirect github.com/rs/cors v1.8.2 // indirect @@ -312,6 +313,7 @@ require ( golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.7.0 // indirect google.golang.org/appengine v1.6.7 // indirect + gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect diff --git a/go.sum b/go.sum index e87613ad01d..1b6289df63b 100644 --- a/go.sum +++ b/go.sum @@ -1463,6 +1463,8 @@ github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9 github.com/prometheus/statsd_exporter v0.22.8 h1:Qo2D9ZzaQG+id9i5NYNGmbf1aa/KxKbB9aKfMS+Yib0= github.com/prometheus/statsd_exporter v0.22.8/go.mod h1:/DzwbTEaFTE0Ojz5PqcSk6+PFHOPWGxdXVr6yC8eFOM= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0= +github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= github.com/rainycape/memcache v0.0.0-20150622160815-1031fa0ce2f2/go.mod h1:7tZKcyumwBO6qip7RNQ5r77yrssm9bfCowcLEBcU5IA= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ= @@ -1793,6 +1795,7 @@ golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -2363,6 +2366,8 @@ google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175 google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/Acconut/lockfile.v1 v1.1.0/go.mod h1:6UCz3wJ8tSFUsPR6uP/j8uegEtDuEEqFxlpi0JI4Umw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= +gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/services/userlog/pkg/service/conversion.go b/services/userlog/pkg/service/conversion.go index c2cd6081a93..bceedea4e8f 100644 --- a/services/userlog/pkg/service/conversion.go +++ b/services/userlog/pkg/service/conversion.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "embed" - "errors" "fmt" "io/fs" "strings" @@ -19,7 +18,6 @@ import ( "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" "github.com/leonelquinteros/gotext" - ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0" ) //go:embed l10n/locale @@ -55,7 +53,6 @@ type Converter struct { gwClient gateway.GatewayAPIClient machineAuthAPIKey string serviceName string - registeredEvents map[string]events.Unmarshaller translationPath string // cached within one request not to query other service too much @@ -66,13 +63,12 @@ type Converter struct { } // NewConverter returns a new Converter -func NewConverter(loc string, gwc gateway.GatewayAPIClient, machineAuthAPIKey string, name string, translationPath string, registeredEvents map[string]events.Unmarshaller) *Converter { +func NewConverter(loc string, gwc gateway.GatewayAPIClient, machineAuthAPIKey string, name string, translationPath string) *Converter { return &Converter{ locale: loc, gwClient: gwc, machineAuthAPIKey: machineAuthAPIKey, serviceName: name, - registeredEvents: registeredEvents, translationPath: translationPath, spaces: make(map[string]*storageprovider.StorageSpace), users: make(map[string]*user.User), @@ -82,20 +78,8 @@ func NewConverter(loc string, gwc gateway.GatewayAPIClient, machineAuthAPIKey st } // ConvertEvent converts an eventhistory event to an OC10Notification -func (c *Converter) ConvertEvent(event *ehmsg.Event) (OC10Notification, error) { - etype, ok := c.registeredEvents[event.Type] - if !ok { - // this should not happen - return OC10Notification{}, errors.New("eventtype not registered") - } - - einterface, err := etype.Unmarshal(event.Event) - if err != nil { - // this shouldn't happen either - return OC10Notification{}, errors.New("cant unmarshal event") - } - - switch ev := einterface.(type) { +func (c *Converter) ConvertEvent(eventid string, event interface{}) (OC10Notification, error) { + switch ev := event.(type) { default: return OC10Notification{}, fmt.Errorf("unknown event type: %T", ev) // file related @@ -103,31 +87,31 @@ func (c *Converter) ConvertEvent(event *ehmsg.Event) (OC10Notification, error) { switch ev.FinishedStep { case events.PPStepAntivirus: res := ev.Result.(events.VirusscanResult) - return c.virusMessage(event.Id, VirusFound, ev.ExecutingUser, res.ResourceID, ev.Filename, res.Description, res.Scandate) + return c.virusMessage(eventid, VirusFound, ev.ExecutingUser, res.ResourceID, ev.Filename, res.Description, res.Scandate) case events.PPStepPolicies: - return c.policiesMessage(event.Id, PoliciesEnforced, ev.ExecutingUser, ev.Filename, time.Now()) + return c.policiesMessage(eventid, PoliciesEnforced, ev.ExecutingUser, ev.Filename, time.Now()) default: return OC10Notification{}, fmt.Errorf("unknown postprocessing step: %s", ev.FinishedStep) } // space related case events.SpaceDisabled: - return c.spaceMessage(event.Id, SpaceDisabled, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp) + return c.spaceMessage(eventid, SpaceDisabled, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp) case events.SpaceDeleted: - return c.spaceDeletedMessage(event.Id, ev.Executant, ev.ID.GetOpaqueId(), ev.SpaceName, ev.Timestamp) + return c.spaceDeletedMessage(eventid, ev.Executant, ev.ID.GetOpaqueId(), ev.SpaceName, ev.Timestamp) case events.SpaceShared: - return c.spaceMessage(event.Id, SpaceShared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp) + return c.spaceMessage(eventid, SpaceShared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp) case events.SpaceUnshared: - return c.spaceMessage(event.Id, SpaceUnshared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp) + return c.spaceMessage(eventid, SpaceUnshared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp) case events.SpaceMembershipExpired: - return c.spaceMessage(event.Id, SpaceMembershipExpired, ev.SpaceOwner, ev.SpaceID.GetOpaqueId(), ev.ExpiredAt) + return c.spaceMessage(eventid, SpaceMembershipExpired, ev.SpaceOwner, ev.SpaceID.GetOpaqueId(), ev.ExpiredAt) // share related case events.ShareCreated: - return c.shareMessage(event.Id, ShareCreated, ev.Executant, ev.ItemID, ev.ShareID, utils.TSToTime(ev.CTime)) + return c.shareMessage(eventid, ShareCreated, ev.Executant, ev.ItemID, ev.ShareID, utils.TSToTime(ev.CTime)) case events.ShareExpired: - return c.shareMessage(event.Id, ShareExpired, ev.ShareOwner, ev.ItemID, ev.ShareID, ev.ExpiredAt) + return c.shareMessage(eventid, ShareExpired, ev.ShareOwner, ev.ItemID, ev.ShareID, ev.ExpiredAt) case events.ShareRemoved: - return c.shareMessage(event.Id, ShareRemoved, ev.Executant, ev.ItemID, ev.ShareID, ev.Timestamp) + return c.shareMessage(eventid, ShareRemoved, ev.Executant, ev.ItemID, ev.ShareID, ev.Timestamp) } } diff --git a/services/userlog/pkg/service/http.go b/services/userlog/pkg/service/http.go index 31d7c9b640d..70b659d7ae4 100644 --- a/services/userlog/pkg/service/http.go +++ b/services/userlog/pkg/service/http.go @@ -4,6 +4,7 @@ import ( "encoding/json" "net/http" + "github.com/cs3org/reva/v2/pkg/ctx" revactx "github.com/cs3org/reva/v2/pkg/ctx" ) @@ -31,11 +32,20 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request return } - conv := NewConverter(r.Header.Get(HeaderAcceptLanguage), ul.gwClient, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.registeredEvents) + conv := NewConverter(r.Header.Get(HeaderAcceptLanguage), ul.gwClient, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath) resp := GetEventResponseOC10{} for _, e := range evs { - noti, err := conv.ConvertEvent(e) + etype, ok := ul.registeredEvents[e.Type] + if !ok { + // this should not happen + } + + einterface, err := etype.Unmarshal(e.Event) + if err != nil { + // this shouldn't happen either + } + noti, err := conv.ConvertEvent(e.Id, einterface) if err != nil { ul.log.Error().Err(err).Str("eventid", e.Id).Str("eventtype", e.Type).Msg("failed to convert event") continue @@ -49,6 +59,31 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request w.Write(b) } +// HandleSSE is the GET handler for events +func (ul *UserlogService) HandleSSE(w http.ResponseWriter, r *http.Request) { + u, ok := ctx.ContextGetUser(r.Context()) + if !ok { + w.WriteHeader(http.StatusInternalServerError) + return + } + + uid := u.GetId().GetOpaqueId() + if uid == "" { + w.WriteHeader(http.StatusInternalServerError) + return + } + + stream := ul.sse.CreateStream(uid) + stream.AutoReplay = false + + // add stream to URL + q := r.URL.Query() + q.Set("stream", uid) + r.URL.RawQuery = q.Encode() + + ul.sse.ServeHTTP(w, r) +} + // HandleDeleteEvents is the DELETE handler for events func (ul *UserlogService) HandleDeleteEvents(w http.ResponseWriter, r *http.Request) { u, ok := revactx.ContextGetUser(r.Context()) diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index f3f028aad4b..4918ad54182 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -20,6 +20,7 @@ import ( ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0" ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" + "github.com/r3labs/sse/v2" "go-micro.dev/v4/store" "google.golang.org/grpc/metadata" ) @@ -32,6 +33,7 @@ type UserlogService struct { cfg *config.Config historyClient ehsvc.EventHistoryService gwClient gateway.GatewayAPIClient + sse *sse.Server registeredEvents map[string]events.Unmarshaller translationPath string } @@ -59,6 +61,7 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) { cfg: o.Config, historyClient: o.HistoryClient, gwClient: o.GatewayClient, + sse: sse.New(), registeredEvents: make(map[string]events.Unmarshaller), } @@ -67,9 +70,10 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) { ul.registeredEvents[typ.String()] = e } - ul.m.Route("/", func(r chi.Router) { - r.Get("/*", ul.HandleGetEvents) - r.Delete("/*", ul.HandleDeleteEvents) + ul.m.Route("/ocs/v2.php/apps/notifications/api/v1/notifications", func(r chi.Router) { + r.Get("/", ul.HandleGetEvents) + r.Get("/sse", ul.HandleSSE) + r.Delete("/", ul.HandleDeleteEvents) }) go ul.MemorizeEvents(ch) @@ -154,7 +158,7 @@ func (ul *UserlogService) MemorizeEvents(ch <-chan events.Event) { // III) store the eventID for each user for _, id := range users { - if err := ul.addEventsToUser(id, event.ID); err != nil { + if err := ul.addEventToUser(id, event); err != nil { ul.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user") continue } @@ -218,9 +222,14 @@ func (ul *UserlogService) DeleteEvents(userid string, evids []string) error { }) } -func (ul *UserlogService) addEventsToUser(userid string, eventids ...string) error { +func (ul *UserlogService) addEventToUser(userid string, event events.Event) error { + loc := "en" // TODO: where to get the locale from? + ev, _ := NewConverter(loc, ul.gwClient, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath).ConvertEvent(event.ID, event.Event) + b, _ := json.Marshal(ev) + + ul.sse.Publish(userid, &sse.Event{Data: b}) return ul.alterUserEventList(userid, func(ids []string) []string { - return append(ids, eventids...) + return append(ids, event.ID) }) }