From d2b873acff39e5dfd76cb13adc4c0e886d487c81 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Mon, 3 Apr 2023 17:14:16 +0200 Subject: [PATCH 1/7] add sses to userlog Signed-off-by: jkoberg --- go.mod | 2 ++ go.sum | 5 +++ services/userlog/pkg/service/conversion.go | 42 +++++++--------------- services/userlog/pkg/service/http.go | 39 ++++++++++++++++++-- services/userlog/pkg/service/service.go | 21 +++++++---- 5 files changed, 72 insertions(+), 37 deletions(-) diff --git a/go.mod b/go.mod index 9da8ca17804..325344b62c5 100644 --- a/go.mod +++ b/go.mod @@ -280,6 +280,7 @@ require ( github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect github.com/prometheus/statsd_exporter v0.22.8 // indirect + github.com/r3labs/sse/v2 v2.10.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect github.com/rivo/uniseg v0.4.2 // indirect github.com/rs/cors v1.9.0 // indirect @@ -325,6 +326,7 @@ require ( golang.org/x/tools v0.7.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect + gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect diff --git a/go.sum b/go.sum index b3d829fcff5..e3699ad1e41 100644 --- a/go.sum +++ b/go.sum @@ -1469,6 +1469,8 @@ github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9 github.com/prometheus/statsd_exporter v0.22.8 h1:Qo2D9ZzaQG+id9i5NYNGmbf1aa/KxKbB9aKfMS+Yib0= github.com/prometheus/statsd_exporter v0.22.8/go.mod h1:/DzwbTEaFTE0Ojz5PqcSk6+PFHOPWGxdXVr6yC8eFOM= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0= +github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= github.com/rainycape/memcache v0.0.0-20150622160815-1031fa0ce2f2/go.mod h1:7tZKcyumwBO6qip7RNQ5r77yrssm9bfCowcLEBcU5IA= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ= @@ -1808,6 +1810,7 @@ golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -2383,6 +2386,8 @@ google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cn google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/Acconut/lockfile.v1 v1.1.0/go.mod h1:6UCz3wJ8tSFUsPR6uP/j8uegEtDuEEqFxlpi0JI4Umw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= +gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/services/userlog/pkg/service/conversion.go b/services/userlog/pkg/service/conversion.go index 93fa14d2bdd..e10191f90dc 100644 --- a/services/userlog/pkg/service/conversion.go +++ b/services/userlog/pkg/service/conversion.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "embed" - "errors" "fmt" "io/fs" "strings" @@ -20,7 +19,6 @@ import ( "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" "github.com/leonelquinteros/gotext" - ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0" ) //go:embed l10n/locale @@ -56,7 +54,6 @@ type Converter struct { gatewaySelector pool.Selectable[gateway.GatewayAPIClient] machineAuthAPIKey string serviceName string - registeredEvents map[string]events.Unmarshaller translationPath string // cached within one request not to query other service too much @@ -67,13 +64,12 @@ type Converter struct { } // NewConverter returns a new Converter -func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], machineAuthAPIKey string, name string, translationPath string, registeredEvents map[string]events.Unmarshaller) *Converter { +func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], machineAuthAPIKey string, name string, translationPath string) *Converter { return &Converter{ locale: loc, gatewaySelector: gatewaySelector, machineAuthAPIKey: machineAuthAPIKey, serviceName: name, - registeredEvents: registeredEvents, translationPath: translationPath, spaces: make(map[string]*storageprovider.StorageSpace), users: make(map[string]*user.User), @@ -83,20 +79,8 @@ func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPI } // ConvertEvent converts an eventhistory event to an OC10Notification -func (c *Converter) ConvertEvent(event *ehmsg.Event) (OC10Notification, error) { - etype, ok := c.registeredEvents[event.Type] - if !ok { - // this should not happen - return OC10Notification{}, errors.New("eventtype not registered") - } - - einterface, err := etype.Unmarshal(event.Event) - if err != nil { - // this shouldn't happen either - return OC10Notification{}, errors.New("cant unmarshal event") - } - - switch ev := einterface.(type) { +func (c *Converter) ConvertEvent(eventid string, event interface{}) (OC10Notification, error) { + switch ev := event.(type) { default: return OC10Notification{}, fmt.Errorf("unknown event type: %T", ev) // file related @@ -104,31 +88,31 @@ func (c *Converter) ConvertEvent(event *ehmsg.Event) (OC10Notification, error) { switch ev.FinishedStep { case events.PPStepAntivirus: res := ev.Result.(events.VirusscanResult) - return c.virusMessage(event.Id, VirusFound, ev.ExecutingUser, res.ResourceID, ev.Filename, res.Description, res.Scandate) + return c.virusMessage(eventid, VirusFound, ev.ExecutingUser, res.ResourceID, ev.Filename, res.Description, res.Scandate) case events.PPStepPolicies: - return c.policiesMessage(event.Id, PoliciesEnforced, ev.ExecutingUser, ev.Filename, time.Now()) + return c.policiesMessage(eventid, PoliciesEnforced, ev.ExecutingUser, ev.Filename, time.Now()) default: return OC10Notification{}, fmt.Errorf("unknown postprocessing step: %s", ev.FinishedStep) } // space related case events.SpaceDisabled: - return c.spaceMessage(event.Id, SpaceDisabled, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp) + return c.spaceMessage(eventid, SpaceDisabled, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp) case events.SpaceDeleted: - return c.spaceDeletedMessage(event.Id, ev.Executant, ev.ID.GetOpaqueId(), ev.SpaceName, ev.Timestamp) + return c.spaceDeletedMessage(eventid, ev.Executant, ev.ID.GetOpaqueId(), ev.SpaceName, ev.Timestamp) case events.SpaceShared: - return c.spaceMessage(event.Id, SpaceShared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp) + return c.spaceMessage(eventid, SpaceShared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp) case events.SpaceUnshared: - return c.spaceMessage(event.Id, SpaceUnshared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp) + return c.spaceMessage(eventid, SpaceUnshared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp) case events.SpaceMembershipExpired: - return c.spaceMessage(event.Id, SpaceMembershipExpired, ev.SpaceOwner, ev.SpaceID.GetOpaqueId(), ev.ExpiredAt) + return c.spaceMessage(eventid, SpaceMembershipExpired, ev.SpaceOwner, ev.SpaceID.GetOpaqueId(), ev.ExpiredAt) // share related case events.ShareCreated: - return c.shareMessage(event.Id, ShareCreated, ev.Executant, ev.ItemID, ev.ShareID, utils.TSToTime(ev.CTime)) + return c.shareMessage(eventid, ShareCreated, ev.Executant, ev.ItemID, ev.ShareID, utils.TSToTime(ev.CTime)) case events.ShareExpired: - return c.shareMessage(event.Id, ShareExpired, ev.ShareOwner, ev.ItemID, ev.ShareID, ev.ExpiredAt) + return c.shareMessage(eventid, ShareExpired, ev.ShareOwner, ev.ItemID, ev.ShareID, ev.ExpiredAt) case events.ShareRemoved: - return c.shareMessage(event.Id, ShareRemoved, ev.Executant, ev.ItemID, ev.ShareID, ev.Timestamp) + return c.shareMessage(eventid, ShareRemoved, ev.Executant, ev.ItemID, ev.ShareID, ev.Timestamp) } } diff --git a/services/userlog/pkg/service/http.go b/services/userlog/pkg/service/http.go index f49d383e643..f674fa34601 100644 --- a/services/userlog/pkg/service/http.go +++ b/services/userlog/pkg/service/http.go @@ -4,6 +4,7 @@ import ( "encoding/json" "net/http" + "github.com/cs3org/reva/v2/pkg/ctx" revactx "github.com/cs3org/reva/v2/pkg/ctx" ) @@ -31,11 +32,20 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request return } - conv := NewConverter(r.Header.Get(HeaderAcceptLanguage), ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.registeredEvents) + conv := NewConverter(r.Header.Get(HeaderAcceptLanguage), ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath) resp := GetEventResponseOC10{} for _, e := range evs { - noti, err := conv.ConvertEvent(e) + etype, ok := ul.registeredEvents[e.Type] + if !ok { + // this should not happen + } + + einterface, err := etype.Unmarshal(e.Event) + if err != nil { + // this shouldn't happen either + } + noti, err := conv.ConvertEvent(e.Id, einterface) if err != nil { ul.log.Error().Err(err).Str("eventid", e.Id).Str("eventtype", e.Type).Msg("failed to convert event") continue @@ -49,6 +59,31 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request w.Write(b) } +// HandleSSE is the GET handler for events +func (ul *UserlogService) HandleSSE(w http.ResponseWriter, r *http.Request) { + u, ok := ctx.ContextGetUser(r.Context()) + if !ok { + w.WriteHeader(http.StatusInternalServerError) + return + } + + uid := u.GetId().GetOpaqueId() + if uid == "" { + w.WriteHeader(http.StatusInternalServerError) + return + } + + stream := ul.sse.CreateStream(uid) + stream.AutoReplay = false + + // add stream to URL + q := r.URL.Query() + q.Set("stream", uid) + r.URL.RawQuery = q.Encode() + + ul.sse.ServeHTTP(w, r) +} + // HandleDeleteEvents is the DELETE handler for events func (ul *UserlogService) HandleDeleteEvents(w http.ResponseWriter, r *http.Request) { u, ok := revactx.ContextGetUser(r.Context()) diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index 37e445dbf5c..72798f90acb 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -21,6 +21,7 @@ import ( ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0" ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" + "github.com/r3labs/sse/v2" "go-micro.dev/v4/store" "google.golang.org/grpc/metadata" ) @@ -33,6 +34,7 @@ type UserlogService struct { cfg *config.Config historyClient ehsvc.EventHistoryService gatewaySelector pool.Selectable[gateway.GatewayAPIClient] + sse *sse.Server registeredEvents map[string]events.Unmarshaller translationPath string } @@ -60,6 +62,7 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) { cfg: o.Config, historyClient: o.HistoryClient, gatewaySelector: o.GatewaySelector, + sse: sse.New(), registeredEvents: make(map[string]events.Unmarshaller), } @@ -68,9 +71,10 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) { ul.registeredEvents[typ.String()] = e } - ul.m.Route("/", func(r chi.Router) { - r.Get("/*", ul.HandleGetEvents) - r.Delete("/*", ul.HandleDeleteEvents) + ul.m.Route("/ocs/v2.php/apps/notifications/api/v1/notifications", func(r chi.Router) { + r.Get("/", ul.HandleGetEvents) + r.Get("/sse", ul.HandleSSE) + r.Delete("/", ul.HandleDeleteEvents) }) go ul.MemorizeEvents(ch) @@ -155,7 +159,7 @@ func (ul *UserlogService) MemorizeEvents(ch <-chan events.Event) { // III) store the eventID for each user for _, id := range users { - if err := ul.addEventsToUser(id, event.ID); err != nil { + if err := ul.addEventToUser(id, event); err != nil { ul.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user") continue } @@ -219,9 +223,14 @@ func (ul *UserlogService) DeleteEvents(userid string, evids []string) error { }) } -func (ul *UserlogService) addEventsToUser(userid string, eventids ...string) error { +func (ul *UserlogService) addEventToUser(userid string, event events.Event) error { + loc := "en" // TODO: where to get the locale from? + ev, _ := NewConverter(loc, ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath).ConvertEvent(event.ID, event.Event) + b, _ := json.Marshal(ev) + + ul.sse.Publish(userid, &sse.Event{Data: b}) return ul.alterUserEventList(userid, func(ids []string) []string { - return append(ids, eventids...) + return append(ids, event.ID) }) } From 8953336719551623026b88b67f167b9eeec92f8d Mon Sep 17 00:00:00 2001 From: jkoberg Date: Tue, 23 May 2023 11:34:42 +0200 Subject: [PATCH 2/7] translate sse notifications Signed-off-by: jkoberg --- services/userlog/pkg/command/server.go | 3 + services/userlog/pkg/server/http/option.go | 9 + services/userlog/pkg/server/http/server.go | 1 + services/userlog/pkg/service/options.go | 8 + services/userlog/pkg/service/service.go | 26 +- services/userlog/pkg/service/service_test.go | 7 + vendor/github.com/r3labs/sse/v2/.gitignore | 2 + vendor/github.com/r3labs/sse/v2/.golangci.yml | 15 + .../github.com/r3labs/sse/v2/CONTRIBUTING.md | 80 ++++ vendor/github.com/r3labs/sse/v2/LICENSE | 373 +++++++++++++++++ vendor/github.com/r3labs/sse/v2/Makefile | 20 + vendor/github.com/r3labs/sse/v2/README.md | 191 +++++++++ vendor/github.com/r3labs/sse/v2/client.go | 390 ++++++++++++++++++ vendor/github.com/r3labs/sse/v2/event.go | 114 +++++ vendor/github.com/r3labs/sse/v2/event_log.go | 43 ++ vendor/github.com/r3labs/sse/v2/http.go | 120 ++++++ vendor/github.com/r3labs/sse/v2/server.go | 156 +++++++ vendor/github.com/r3labs/sse/v2/stream.go | 153 +++++++ vendor/github.com/r3labs/sse/v2/subscriber.go | 24 ++ .../gopkg.in/cenkalti/backoff.v1/.gitignore | 22 + .../gopkg.in/cenkalti/backoff.v1/.travis.yml | 9 + vendor/gopkg.in/cenkalti/backoff.v1/LICENSE | 20 + vendor/gopkg.in/cenkalti/backoff.v1/README.md | 30 ++ .../gopkg.in/cenkalti/backoff.v1/backoff.go | 66 +++ .../gopkg.in/cenkalti/backoff.v1/context.go | 60 +++ .../cenkalti/backoff.v1/exponential.go | 156 +++++++ vendor/gopkg.in/cenkalti/backoff.v1/retry.go | 78 ++++ vendor/gopkg.in/cenkalti/backoff.v1/ticker.go | 81 ++++ vendor/gopkg.in/cenkalti/backoff.v1/tries.go | 35 ++ vendor/modules.txt | 6 + 30 files changed, 2297 insertions(+), 1 deletion(-) create mode 100644 vendor/github.com/r3labs/sse/v2/.gitignore create mode 100644 vendor/github.com/r3labs/sse/v2/.golangci.yml create mode 100644 vendor/github.com/r3labs/sse/v2/CONTRIBUTING.md create mode 100644 vendor/github.com/r3labs/sse/v2/LICENSE create mode 100644 vendor/github.com/r3labs/sse/v2/Makefile create mode 100644 vendor/github.com/r3labs/sse/v2/README.md create mode 100644 vendor/github.com/r3labs/sse/v2/client.go create mode 100644 vendor/github.com/r3labs/sse/v2/event.go create mode 100644 vendor/github.com/r3labs/sse/v2/event_log.go create mode 100644 vendor/github.com/r3labs/sse/v2/http.go create mode 100644 vendor/github.com/r3labs/sse/v2/server.go create mode 100644 vendor/github.com/r3labs/sse/v2/stream.go create mode 100644 vendor/github.com/r3labs/sse/v2/subscriber.go create mode 100644 vendor/gopkg.in/cenkalti/backoff.v1/.gitignore create mode 100644 vendor/gopkg.in/cenkalti/backoff.v1/.travis.yml create mode 100644 vendor/gopkg.in/cenkalti/backoff.v1/LICENSE create mode 100644 vendor/gopkg.in/cenkalti/backoff.v1/README.md create mode 100644 vendor/gopkg.in/cenkalti/backoff.v1/backoff.go create mode 100644 vendor/gopkg.in/cenkalti/backoff.v1/context.go create mode 100644 vendor/gopkg.in/cenkalti/backoff.v1/exponential.go create mode 100644 vendor/gopkg.in/cenkalti/backoff.v1/retry.go create mode 100644 vendor/gopkg.in/cenkalti/backoff.v1/ticker.go create mode 100644 vendor/gopkg.in/cenkalti/backoff.v1/tries.go diff --git a/services/userlog/pkg/command/server.go b/services/userlog/pkg/command/server.go index 58a6f209d4a..7417c1709b2 100644 --- a/services/userlog/pkg/command/server.go +++ b/services/userlog/pkg/command/server.go @@ -16,6 +16,7 @@ import ( ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" "github.com/owncloud/ocis/v2/ocis-pkg/version" ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" + settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" "github.com/owncloud/ocis/v2/services/userlog/pkg/config/parser" "github.com/owncloud/ocis/v2/services/userlog/pkg/logging" @@ -102,6 +103,7 @@ func Server(cfg *config.Config) *cli.Command { } hClient := ehsvc.NewEventHistoryService("com.owncloud.api.eventhistory", ogrpc.DefaultClient()) + vClient := settingssvc.NewValueService("com.owncloud.api.settings", ogrpc.DefaultClient()) { server, err := http.Server( @@ -113,6 +115,7 @@ func Server(cfg *config.Config) *cli.Command { http.Consumer(consumer), http.GatewaySelector(gatewaySelector), http.History(hClient), + http.Value(vClient), http.RegisteredEvents(_registeredEvents), ) diff --git a/services/userlog/pkg/server/http/option.go b/services/userlog/pkg/server/http/option.go index 430c854dbca..efe691101cc 100644 --- a/services/userlog/pkg/server/http/option.go +++ b/services/userlog/pkg/server/http/option.go @@ -8,6 +8,7 @@ import ( "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/owncloud/ocis/v2/ocis-pkg/log" ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" + settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" "github.com/owncloud/ocis/v2/services/userlog/pkg/metrics" "github.com/urfave/cli/v2" @@ -29,6 +30,7 @@ type Options struct { Consumer events.Consumer GatewaySelector pool.Selectable[gateway.GatewayAPIClient] HistoryClient ehsvc.EventHistoryService + ValueClient settingssvc.ValueService RegisteredEvents []events.Unmarshaller } @@ -119,3 +121,10 @@ func RegisteredEvents(evs []events.Unmarshaller) Option { o.RegisteredEvents = evs } } + +// Value provides a function to configure the value service client +func Value(vs settingssvc.ValueService) Option { + return func(o *Options) { + o.ValueClient = vs + } +} diff --git a/services/userlog/pkg/server/http/server.go b/services/userlog/pkg/server/http/server.go index 724248439c1..705529ed243 100644 --- a/services/userlog/pkg/server/http/server.go +++ b/services/userlog/pkg/server/http/server.go @@ -76,6 +76,7 @@ func Server(opts ...Option) (http.Service, error) { svc.Config(options.Config), svc.HistoryClient(options.HistoryClient), svc.GatewaySelector(options.GatewaySelector), + svc.ValueClient(options.ValueClient), svc.RegisteredEvents(options.RegisteredEvents), ) if err != nil { diff --git a/services/userlog/pkg/service/options.go b/services/userlog/pkg/service/options.go index acea1a4b796..98628cfd6f0 100644 --- a/services/userlog/pkg/service/options.go +++ b/services/userlog/pkg/service/options.go @@ -7,6 +7,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/owncloud/ocis/v2/ocis-pkg/log" ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" + settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" "go-micro.dev/v4/store" ) @@ -23,6 +24,7 @@ type Options struct { Config *config.Config HistoryClient ehsvc.EventHistoryService GatewaySelector pool.Selectable[gateway.GatewayAPIClient] + ValueClient settingssvc.ValueService RegisteredEvents []events.Unmarshaller } @@ -81,3 +83,9 @@ func RegisteredEvents(e []events.Unmarshaller) Option { o.RegisteredEvents = e } } + +func ValueClient(vs settingssvc.ValueService) Option { + return func(o *Options) { + o.ValueClient = vs + } +} diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index 72798f90acb..d2aec634287 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -18,14 +18,20 @@ import ( "github.com/cs3org/reva/v2/pkg/utils" "github.com/go-chi/chi/v5" "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/ocis-pkg/middleware" ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0" ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" + settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" + "github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" "github.com/r3labs/sse/v2" + micrometadata "go-micro.dev/v4/metadata" "go-micro.dev/v4/store" "google.golang.org/grpc/metadata" ) +var _defaultLocale = "en" + // UserlogService is the service responsible for user activities type UserlogService struct { log log.Logger @@ -34,6 +40,7 @@ type UserlogService struct { cfg *config.Config historyClient ehsvc.EventHistoryService gatewaySelector pool.Selectable[gateway.GatewayAPIClient] + valueClient settingssvc.ValueService sse *sse.Server registeredEvents map[string]events.Unmarshaller translationPath string @@ -62,6 +69,7 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) { cfg: o.Config, historyClient: o.HistoryClient, gatewaySelector: o.GatewaySelector, + valueClient: o.ValueClient, sse: sse.New(), registeredEvents: make(map[string]events.Unmarshaller), } @@ -224,7 +232,7 @@ func (ul *UserlogService) DeleteEvents(userid string, evids []string) error { } func (ul *UserlogService) addEventToUser(userid string, event events.Event) error { - loc := "en" // TODO: where to get the locale from? + loc := getUserLang(context.Background(), userid, ul.valueClient) ev, _ := NewConverter(loc, ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath).ConvertEvent(event.ID, event.Event) b, _ := json.Marshal(ev) @@ -542,3 +550,19 @@ func editor(perms *storageprovider.ResourcePermissions) bool { func manager(perms *storageprovider.ResourcePermissions) bool { return perms.DenyGrant } + +func getUserLang(ctx context.Context, userid string, vs settingssvc.ValueService) string { + granteeCtx := micrometadata.Set(ctx, middleware.AccountID, userid) + if resp, err := vs.GetValueByUniqueIdentifiers(granteeCtx, + &settingssvc.GetValueByUniqueIdentifiersRequest{ + AccountUuid: userid, + SettingId: defaults.SettingUUIDProfileLanguage, + }, + ); err == nil { + val := resp.GetValue().GetValue().GetListValue().GetValues() + if len(val) > 0 && val[0] != nil { + return val[0].GetStringValue() + } + } + return _defaultLocale +} diff --git a/services/userlog/pkg/service/service_test.go b/services/userlog/pkg/service/service_test.go index 02e5c5330d9..284b8742d53 100644 --- a/services/userlog/pkg/service/service_test.go +++ b/services/userlog/pkg/service/service_test.go @@ -22,10 +22,12 @@ import ( "github.com/owncloud/ocis/v2/ocis-pkg/log" ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0" ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" + settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" "github.com/owncloud/ocis/v2/services/userlog/mocks" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" "github.com/owncloud/ocis/v2/services/userlog/pkg/service" "github.com/stretchr/testify/mock" + "go-micro.dev/v4/client" microevents "go-micro.dev/v4/events" microstore "go-micro.dev/v4/store" "google.golang.org/grpc" @@ -43,6 +45,7 @@ var _ = Describe("UserlogService", func() { gatewaySelector pool.Selectable[gateway.GatewayAPIClient] ehc mocks.EventHistoryService + vc settingssvc.MockValueService ) BeforeEach(func() { @@ -69,6 +72,9 @@ var _ = Describe("UserlogService", func() { }, Status: &rpc.Status{Code: rpc.Code_CODE_OK}}, nil) gatewayClient.On("GetUser", mock.Anything, mock.Anything).Return(&user.GetUserResponse{User: &user.User{Id: &user.UserId{OpaqueId: "userid"}}, Status: &rpc.Status{Code: rpc.Code_CODE_OK}}, nil) gatewayClient.On("Authenticate", mock.Anything, mock.Anything).Return(&gateway.AuthenticateResponse{Status: &rpc.Status{Code: rpc.Code_CODE_OK}}, nil) + vc.GetValueByUniqueIdentifiersFunc = func(ctx context.Context, req *settingssvc.GetValueByUniqueIdentifiersRequest, opts ...client.CallOption) (*settingssvc.GetValueResponse, error) { + return nil, nil + } ul, err = service.NewUserlogService( service.Config(cfg), @@ -78,6 +84,7 @@ var _ = Describe("UserlogService", func() { service.Mux(chi.NewMux()), service.GatewaySelector(gatewaySelector), service.HistoryClient(&ehc), + service.ValueClient(&vc), service.RegisteredEvents([]events.Unmarshaller{ events.SpaceDisabled{}, }), diff --git a/vendor/github.com/r3labs/sse/v2/.gitignore b/vendor/github.com/r3labs/sse/v2/.gitignore new file mode 100644 index 00000000000..d48c759d6c9 --- /dev/null +++ b/vendor/github.com/r3labs/sse/v2/.gitignore @@ -0,0 +1,2 @@ +.idea +.vscode \ No newline at end of file diff --git a/vendor/github.com/r3labs/sse/v2/.golangci.yml b/vendor/github.com/r3labs/sse/v2/.golangci.yml new file mode 100644 index 00000000000..5a76e9a0fdd --- /dev/null +++ b/vendor/github.com/r3labs/sse/v2/.golangci.yml @@ -0,0 +1,15 @@ +linters: + enable-all: true + disable: + - gofmt + - gofumpt + - goimports + - golint # deprecated + - interfacer # deprecated + - maligned # deprecated + - scopelint # deprecated + - varnamelen + +linters-settings: + govet: + enable-all: true diff --git a/vendor/github.com/r3labs/sse/v2/CONTRIBUTING.md b/vendor/github.com/r3labs/sse/v2/CONTRIBUTING.md new file mode 100644 index 00000000000..b9c7859d3ce --- /dev/null +++ b/vendor/github.com/r3labs/sse/v2/CONTRIBUTING.md @@ -0,0 +1,80 @@ +# Contributing guidelines + +Looking to contribute something to this project? Here's how you can help: + +Please take a moment to review this document in order to make the contribution process easy and effective for everyone involved. + +Following these guidelines helps to communicate that you respect the time of the developers managing and developing this open source project. In return, they should reciprocate that respect in addressing your issue or assessing patches and features. + +We also have a [code of conduct](https://ernest.io/conduct). + +## Using the issue tracker + +The issue tracker is the preferred channel for [bug reports](#bug-reports), [features requests](#feature-requests) and [submitting pull requests](#pull-requests), but please respect the following restrictions: + +* Please **do not** use the issue tracker for personal support requests. + +* Please **do not** derail issues. Keep the discussion on topic and + respect the opinions of others. + + +## Bug reports + +A bug is a _demonstrable problem_ that is caused by the code in the repository. +Good bug reports are extremely helpful - thank you! + +Guidelines for bug reports: + +1. **Use the GitHub issue search** — check if the issue has already been + reported. + +2. **Check if the issue has been fixed** — try to reproduce it using the + latest `master` or `develop` branch in the repository. + +3. **Isolate the problem** — create a reduced test case and a live example. + +A good bug report shouldn't leave others needing to chase you up for more +information. Please try to be as detailed as possible in your report. What is +your environment? What steps will reproduce the issue? Which environment experience the problem? What would you expect to be the outcome? All these +details will help people to fix any potential bugs. + +Example: + +> Short and descriptive example bug report title +> +> A summary of the issue and the environment in which it occurs. If +> suitable, include the steps required to reproduce the bug. +> +> 1. This is the first step +> 2. This is the second step +> 3. Further steps, etc. +> +> `` - a link to the reduced test case +> +> Any other information you want to share that is relevant to the issue being +> reported. This might include the lines of code that you have identified as +> causing the bug, and potential solutions (and your opinions on their +> merits). + + +## Feature requests + +Feature requests are welcome. But take a moment to find out whether your idea +fits with the scope and aims of the project. It's up to *you* to make a strong +case to convince the project's developers of the merits of this feature. Please +provide as much detail and context as possible. + + +## Pull requests + +Good pull requests - patches, improvements, new features - are a fantastic +help. They should remain focused in scope and avoid containing unrelated +commits. + +[**Please ask first**](https://ernest.io/community) before embarking on any significant pull request (e.g. +implementing features, refactoring code, porting to a different language), +otherwise you risk spending a lot of time working on something that the +project's developers might not want to merge into the project. + +Please adhere to the coding conventions used throughout a project (indentation, +accurate comments, etc.) and any other requirements (such as test coverage). diff --git a/vendor/github.com/r3labs/sse/v2/LICENSE b/vendor/github.com/r3labs/sse/v2/LICENSE new file mode 100644 index 00000000000..a612ad9813b --- /dev/null +++ b/vendor/github.com/r3labs/sse/v2/LICENSE @@ -0,0 +1,373 @@ +Mozilla Public License Version 2.0 +================================== + +1. Definitions +-------------- + +1.1. "Contributor" + means each individual or legal entity that creates, contributes to + the creation of, or owns Covered Software. + +1.2. "Contributor Version" + means the combination of the Contributions of others (if any) used + by a Contributor and that particular Contributor's Contribution. + +1.3. "Contribution" + means Covered Software of a particular Contributor. + +1.4. "Covered Software" + means Source Code Form to which the initial Contributor has attached + the notice in Exhibit A, the Executable Form of such Source Code + Form, and Modifications of such Source Code Form, in each case + including portions thereof. + +1.5. "Incompatible With Secondary Licenses" + means + + (a) that the initial Contributor has attached the notice described + in Exhibit B to the Covered Software; or + + (b) that the Covered Software was made available under the terms of + version 1.1 or earlier of the License, but not also under the + terms of a Secondary License. + +1.6. "Executable Form" + means any form of the work other than Source Code Form. + +1.7. "Larger Work" + means a work that combines Covered Software with other material, in + a separate file or files, that is not Covered Software. + +1.8. "License" + means this document. + +1.9. "Licensable" + means having the right to grant, to the maximum extent possible, + whether at the time of the initial grant or subsequently, any and + all of the rights conveyed by this License. + +1.10. "Modifications" + means any of the following: + + (a) any file in Source Code Form that results from an addition to, + deletion from, or modification of the contents of Covered + Software; or + + (b) any new file in Source Code Form that contains any Covered + Software. + +1.11. "Patent Claims" of a Contributor + means any patent claim(s), including without limitation, method, + process, and apparatus claims, in any patent Licensable by such + Contributor that would be infringed, but for the grant of the + License, by the making, using, selling, offering for sale, having + made, import, or transfer of either its Contributions or its + Contributor Version. + +1.12. "Secondary License" + means either the GNU General Public License, Version 2.0, the GNU + Lesser General Public License, Version 2.1, the GNU Affero General + Public License, Version 3.0, or any later versions of those + licenses. + +1.13. "Source Code Form" + means the form of the work preferred for making modifications. + +1.14. "You" (or "Your") + means an individual or a legal entity exercising rights under this + License. For legal entities, "You" includes any entity that + controls, is controlled by, or is under common control with You. For + purposes of this definition, "control" means (a) the power, direct + or indirect, to cause the direction or management of such entity, + whether by contract or otherwise, or (b) ownership of more than + fifty percent (50%) of the outstanding shares or beneficial + ownership of such entity. + +2. License Grants and Conditions +-------------------------------- + +2.1. Grants + +Each Contributor hereby grants You a world-wide, royalty-free, +non-exclusive license: + +(a) under intellectual property rights (other than patent or trademark) + Licensable by such Contributor to use, reproduce, make available, + modify, display, perform, distribute, and otherwise exploit its + Contributions, either on an unmodified basis, with Modifications, or + as part of a Larger Work; and + +(b) under Patent Claims of such Contributor to make, use, sell, offer + for sale, have made, import, and otherwise transfer either its + Contributions or its Contributor Version. + +2.2. Effective Date + +The licenses granted in Section 2.1 with respect to any Contribution +become effective for each Contribution on the date the Contributor first +distributes such Contribution. + +2.3. Limitations on Grant Scope + +The licenses granted in this Section 2 are the only rights granted under +this License. No additional rights or licenses will be implied from the +distribution or licensing of Covered Software under this License. +Notwithstanding Section 2.1(b) above, no patent license is granted by a +Contributor: + +(a) for any code that a Contributor has removed from Covered Software; + or + +(b) for infringements caused by: (i) Your and any other third party's + modifications of Covered Software, or (ii) the combination of its + Contributions with other software (except as part of its Contributor + Version); or + +(c) under Patent Claims infringed by Covered Software in the absence of + its Contributions. + +This License does not grant any rights in the trademarks, service marks, +or logos of any Contributor (except as may be necessary to comply with +the notice requirements in Section 3.4). + +2.4. Subsequent Licenses + +No Contributor makes additional grants as a result of Your choice to +distribute the Covered Software under a subsequent version of this +License (see Section 10.2) or under the terms of a Secondary License (if +permitted under the terms of Section 3.3). + +2.5. Representation + +Each Contributor represents that the Contributor believes its +Contributions are its original creation(s) or it has sufficient rights +to grant the rights to its Contributions conveyed by this License. + +2.6. Fair Use + +This License is not intended to limit any rights You have under +applicable copyright doctrines of fair use, fair dealing, or other +equivalents. + +2.7. Conditions + +Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted +in Section 2.1. + +3. Responsibilities +------------------- + +3.1. Distribution of Source Form + +All distribution of Covered Software in Source Code Form, including any +Modifications that You create or to which You contribute, must be under +the terms of this License. You must inform recipients that the Source +Code Form of the Covered Software is governed by the terms of this +License, and how they can obtain a copy of this License. You may not +attempt to alter or restrict the recipients' rights in the Source Code +Form. + +3.2. Distribution of Executable Form + +If You distribute Covered Software in Executable Form then: + +(a) such Covered Software must also be made available in Source Code + Form, as described in Section 3.1, and You must inform recipients of + the Executable Form how they can obtain a copy of such Source Code + Form by reasonable means in a timely manner, at a charge no more + than the cost of distribution to the recipient; and + +(b) You may distribute such Executable Form under the terms of this + License, or sublicense it under different terms, provided that the + license for the Executable Form does not attempt to limit or alter + the recipients' rights in the Source Code Form under this License. + +3.3. Distribution of a Larger Work + +You may create and distribute a Larger Work under terms of Your choice, +provided that You also comply with the requirements of this License for +the Covered Software. If the Larger Work is a combination of Covered +Software with a work governed by one or more Secondary Licenses, and the +Covered Software is not Incompatible With Secondary Licenses, this +License permits You to additionally distribute such Covered Software +under the terms of such Secondary License(s), so that the recipient of +the Larger Work may, at their option, further distribute the Covered +Software under the terms of either this License or such Secondary +License(s). + +3.4. Notices + +You may not remove or alter the substance of any license notices +(including copyright notices, patent notices, disclaimers of warranty, +or limitations of liability) contained within the Source Code Form of +the Covered Software, except that You may alter any license notices to +the extent required to remedy known factual inaccuracies. + +3.5. Application of Additional Terms + +You may choose to offer, and to charge a fee for, warranty, support, +indemnity or liability obligations to one or more recipients of Covered +Software. However, You may do so only on Your own behalf, and not on +behalf of any Contributor. You must make it absolutely clear that any +such warranty, support, indemnity, or liability obligation is offered by +You alone, and You hereby agree to indemnify every Contributor for any +liability incurred by such Contributor as a result of warranty, support, +indemnity or liability terms You offer. You may include additional +disclaimers of warranty and limitations of liability specific to any +jurisdiction. + +4. Inability to Comply Due to Statute or Regulation +--------------------------------------------------- + +If it is impossible for You to comply with any of the terms of this +License with respect to some or all of the Covered Software due to +statute, judicial order, or regulation then You must: (a) comply with +the terms of this License to the maximum extent possible; and (b) +describe the limitations and the code they affect. Such description must +be placed in a text file included with all distributions of the Covered +Software under this License. Except to the extent prohibited by statute +or regulation, such description must be sufficiently detailed for a +recipient of ordinary skill to be able to understand it. + +5. Termination +-------------- + +5.1. The rights granted under this License will terminate automatically +if You fail to comply with any of its terms. However, if You become +compliant, then the rights granted under this License from a particular +Contributor are reinstated (a) provisionally, unless and until such +Contributor explicitly and finally terminates Your grants, and (b) on an +ongoing basis, if such Contributor fails to notify You of the +non-compliance by some reasonable means prior to 60 days after You have +come back into compliance. Moreover, Your grants from a particular +Contributor are reinstated on an ongoing basis if such Contributor +notifies You of the non-compliance by some reasonable means, this is the +first time You have received notice of non-compliance with this License +from such Contributor, and You become compliant prior to 30 days after +Your receipt of the notice. + +5.2. If You initiate litigation against any entity by asserting a patent +infringement claim (excluding declaratory judgment actions, +counter-claims, and cross-claims) alleging that a Contributor Version +directly or indirectly infringes any patent, then the rights granted to +You by any and all Contributors for the Covered Software under Section +2.1 of this License shall terminate. + +5.3. In the event of termination under Sections 5.1 or 5.2 above, all +end user license agreements (excluding distributors and resellers) which +have been validly granted by You or Your distributors under this License +prior to termination shall survive termination. + +************************************************************************ +* * +* 6. Disclaimer of Warranty * +* ------------------------- * +* * +* Covered Software is provided under this License on an "as is" * +* basis, without warranty of any kind, either expressed, implied, or * +* statutory, including, without limitation, warranties that the * +* Covered Software is free of defects, merchantable, fit for a * +* particular purpose or non-infringing. The entire risk as to the * +* quality and performance of the Covered Software is with You. * +* Should any Covered Software prove defective in any respect, You * +* (not any Contributor) assume the cost of any necessary servicing, * +* repair, or correction. This disclaimer of warranty constitutes an * +* essential part of this License. No use of any Covered Software is * +* authorized under this License except under this disclaimer. * +* * +************************************************************************ + +************************************************************************ +* * +* 7. Limitation of Liability * +* -------------------------- * +* * +* Under no circumstances and under no legal theory, whether tort * +* (including negligence), contract, or otherwise, shall any * +* Contributor, or anyone who distributes Covered Software as * +* permitted above, be liable to You for any direct, indirect, * +* special, incidental, or consequential damages of any character * +* including, without limitation, damages for lost profits, loss of * +* goodwill, work stoppage, computer failure or malfunction, or any * +* and all other commercial damages or losses, even if such party * +* shall have been informed of the possibility of such damages. This * +* limitation of liability shall not apply to liability for death or * +* personal injury resulting from such party's negligence to the * +* extent applicable law prohibits such limitation. Some * +* jurisdictions do not allow the exclusion or limitation of * +* incidental or consequential damages, so this exclusion and * +* limitation may not apply to You. * +* * +************************************************************************ + +8. Litigation +------------- + +Any litigation relating to this License may be brought only in the +courts of a jurisdiction where the defendant maintains its principal +place of business and such litigation shall be governed by laws of that +jurisdiction, without reference to its conflict-of-law provisions. +Nothing in this Section shall prevent a party's ability to bring +cross-claims or counter-claims. + +9. Miscellaneous +---------------- + +This License represents the complete agreement concerning the subject +matter hereof. If any provision of this License is held to be +unenforceable, such provision shall be reformed only to the extent +necessary to make it enforceable. Any law or regulation which provides +that the language of a contract shall be construed against the drafter +shall not be used to construe this License against a Contributor. + +10. Versions of the License +--------------------------- + +10.1. New Versions + +Mozilla Foundation is the license steward. Except as provided in Section +10.3, no one other than the license steward has the right to modify or +publish new versions of this License. Each version will be given a +distinguishing version number. + +10.2. Effect of New Versions + +You may distribute the Covered Software under the terms of the version +of the License under which You originally received the Covered Software, +or under the terms of any subsequent version published by the license +steward. + +10.3. Modified Versions + +If you create software not governed by this License, and you want to +create a new license for such software, you may create and use a +modified version of this License if you rename the license and remove +any references to the name of the license steward (except to note that +such modified license differs from this License). + +10.4. Distributing Source Code Form that is Incompatible With Secondary +Licenses + +If You choose to distribute Source Code Form that is Incompatible With +Secondary Licenses under the terms of this version of the License, the +notice described in Exhibit B of this License must be attached. + +Exhibit A - Source Code Form License Notice +------------------------------------------- + + This Source Code Form is subject to the terms of the Mozilla Public + License, v. 2.0. If a copy of the MPL was not distributed with this + file, You can obtain one at http://mozilla.org/MPL/2.0/. + +If it is not possible or desirable to put the notice in a particular +file, then You may include the notice in a location (such as a LICENSE +file in a relevant directory) where a recipient would be likely to look +for such a notice. + +You may add additional accurate notices of copyright ownership. + +Exhibit B - "Incompatible With Secondary Licenses" Notice +--------------------------------------------------------- + + This Source Code Form is "Incompatible With Secondary Licenses", as + defined by the Mozilla Public License, v. 2.0. diff --git a/vendor/github.com/r3labs/sse/v2/Makefile b/vendor/github.com/r3labs/sse/v2/Makefile new file mode 100644 index 00000000000..a63b7001e00 --- /dev/null +++ b/vendor/github.com/r3labs/sse/v2/Makefile @@ -0,0 +1,20 @@ +install: + go install -v + +build: + go build -v ./... + +lint: + golint ./... + go vet ./... + +test: + go test -v ./... --cover + +deps: + go get -u gopkg.in/cenkalti/backoff.v1 + go get -u github.com/golang/lint/golint + go get -u github.com/stretchr/testify + +clean: + go clean diff --git a/vendor/github.com/r3labs/sse/v2/README.md b/vendor/github.com/r3labs/sse/v2/README.md new file mode 100644 index 00000000000..c2201be698c --- /dev/null +++ b/vendor/github.com/r3labs/sse/v2/README.md @@ -0,0 +1,191 @@ +# SSE - Server Sent Events Client/Server Library for Go + +## Synopsis + +SSE is a client/server implementation for Server Sent Events for Golang. + +## Build status + +* Master: [![CircleCI Master](https://circleci.com/gh/r3labs/sse.svg?style=svg)](https://circleci.com/gh/r3labs/sse) + +## Quick start + +To install: +``` +go get github.com/r3labs/sse/v2 +``` + +To Test: + +```sh +$ make deps +$ make test +``` + +#### Example Server + +There are two parts of the server. It is comprised of the message scheduler and a http handler function. +The messaging system is started when running: + +```go +func main() { + server := sse.New() +} +``` + +To add a stream to this handler: + +```go +func main() { + server := sse.New() + server.CreateStream("messages") +} +``` + +This creates a new stream inside of the scheduler. Seeing as there are no consumers, publishing a message to this channel will do nothing. +Clients can connect to this stream once the http handler is started by specifying _stream_ as a url parameter, like so: + +``` +http://server/events?stream=messages +``` + + +In order to start the http server: + +```go +func main() { + server := sse.New() + + // Create a new Mux and set the handler + mux := http.NewServeMux() + mux.HandleFunc("/events", server.ServeHTTP) + + http.ListenAndServe(":8080", mux) +} +``` + +To publish messages to a stream: + +```go +func main() { + server := sse.New() + + // Publish a payload to the stream + server.Publish("messages", &sse.Event{ + Data: []byte("ping"), + }) +} +``` + +Please note there must be a stream with the name you specify and there must be subscribers to that stream + +A way to detect disconnected clients: + +```go +func main() { + server := sse.New() + + mux := http.NewServeMux() + mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { + go func() { + // Received Browser Disconnection + <-r.Context().Done() + println("The client is disconnected here") + return + }() + + server.ServeHTTP(w, r) + }) + + http.ListenAndServe(":8080", mux) +} +``` + +#### Example Client + +The client exposes a way to connect to an SSE server. The client can also handle multiple events under the same url. + +To create a new client: + +```go +func main() { + client := sse.NewClient("http://server/events") +} +``` + +To subscribe to an event stream, please use the Subscribe function. This accepts the name of the stream and a handler function: + +```go +func main() { + client := sse.NewClient("http://server/events") + + client.Subscribe("messages", func(msg *sse.Event) { + // Got some data! + fmt.Println(msg.Data) + }) +} +``` + +Please note that this function will block the current thread. You can run this function in a go routine. + +If you wish to have events sent to a channel, you can use SubscribeChan: + +```go +func main() { + events := make(chan *sse.Event) + + client := sse.NewClient("http://server/events") + client.SubscribeChan("messages", events) +} +``` + +#### HTTP client parameters + +To add additional parameters to the http client, such as disabling ssl verification for self signed certs, you can override the http client or update its options: + +```go +func main() { + client := sse.NewClient("http://server/events") + client.Connection.Transport = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } +} +``` + +#### URL query parameters + +To set custom query parameters on the client or disable the stream parameter altogether: + +```go +func main() { + client := sse.NewClient("http://server/events?search=example") + + client.SubscribeRaw(func(msg *sse.Event) { + // Got some data! + fmt.Println(msg.Data) + }) +} +``` + + +## Contributing + +Please read through our +[contributing guidelines](CONTRIBUTING.md). +Included are directions for opening issues, coding standards, and notes on +development. + +Moreover, if your pull request contains patches or features, you must include +relevant unit tests. + +## Versioning + +For transparency into our release cycle and in striving to maintain backward +compatibility, this project is maintained under [the Semantic Versioning guidelines](http://semver.org/). + +## Copyright and License + +Code and documentation copyright since 2015 r3labs.io authors. + +Code released under +[the Mozilla Public License Version 2.0](LICENSE). diff --git a/vendor/github.com/r3labs/sse/v2/client.go b/vendor/github.com/r3labs/sse/v2/client.go new file mode 100644 index 00000000000..61772b624d0 --- /dev/null +++ b/vendor/github.com/r3labs/sse/v2/client.go @@ -0,0 +1,390 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package sse + +import ( + "bytes" + "context" + "encoding/base64" + "errors" + "fmt" + "io" + "net/http" + "sync" + "sync/atomic" + "time" + + "gopkg.in/cenkalti/backoff.v1" +) + +var ( + headerID = []byte("id:") + headerData = []byte("data:") + headerEvent = []byte("event:") + headerRetry = []byte("retry:") +) + +func ClientMaxBufferSize(s int) func(c *Client) { + return func(c *Client) { + c.maxBufferSize = s + } +} + +// ConnCallback defines a function to be called on a particular connection event +type ConnCallback func(c *Client) + +// ResponseValidator validates a response +type ResponseValidator func(c *Client, resp *http.Response) error + +// Client handles an incoming server stream +type Client struct { + Retry time.Time + ReconnectStrategy backoff.BackOff + disconnectcb ConnCallback + connectedcb ConnCallback + subscribed map[chan *Event]chan struct{} + Headers map[string]string + ReconnectNotify backoff.Notify + ResponseValidator ResponseValidator + Connection *http.Client + URL string + LastEventID atomic.Value // []byte + maxBufferSize int + mu sync.Mutex + EncodingBase64 bool + Connected bool +} + +// NewClient creates a new client +func NewClient(url string, opts ...func(c *Client)) *Client { + c := &Client{ + URL: url, + Connection: &http.Client{}, + Headers: make(map[string]string), + subscribed: make(map[chan *Event]chan struct{}), + maxBufferSize: 1 << 16, + } + + for _, opt := range opts { + opt(c) + } + + return c +} + +// Subscribe to a data stream +func (c *Client) Subscribe(stream string, handler func(msg *Event)) error { + return c.SubscribeWithContext(context.Background(), stream, handler) +} + +// SubscribeWithContext to a data stream with context +func (c *Client) SubscribeWithContext(ctx context.Context, stream string, handler func(msg *Event)) error { + operation := func() error { + resp, err := c.request(ctx, stream) + if err != nil { + return err + } + if validator := c.ResponseValidator; validator != nil { + err = validator(c, resp) + if err != nil { + return err + } + } else if resp.StatusCode != 200 { + resp.Body.Close() + return fmt.Errorf("could not connect to stream: %s", http.StatusText(resp.StatusCode)) + } + defer resp.Body.Close() + + reader := NewEventStreamReader(resp.Body, c.maxBufferSize) + eventChan, errorChan := c.startReadLoop(reader) + + for { + select { + case err = <-errorChan: + return err + case msg := <-eventChan: + handler(msg) + } + } + } + + // Apply user specified reconnection strategy or default to standard NewExponentialBackOff() reconnection method + var err error + if c.ReconnectStrategy != nil { + err = backoff.RetryNotify(operation, c.ReconnectStrategy, c.ReconnectNotify) + } else { + err = backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), c.ReconnectNotify) + } + return err +} + +// SubscribeChan sends all events to the provided channel +func (c *Client) SubscribeChan(stream string, ch chan *Event) error { + return c.SubscribeChanWithContext(context.Background(), stream, ch) +} + +// SubscribeChanWithContext sends all events to the provided channel with context +func (c *Client) SubscribeChanWithContext(ctx context.Context, stream string, ch chan *Event) error { + var connected bool + errch := make(chan error) + c.mu.Lock() + c.subscribed[ch] = make(chan struct{}) + c.mu.Unlock() + + operation := func() error { + resp, err := c.request(ctx, stream) + if err != nil { + return err + } + if validator := c.ResponseValidator; validator != nil { + err = validator(c, resp) + if err != nil { + return err + } + } else if resp.StatusCode != 200 { + resp.Body.Close() + return fmt.Errorf("could not connect to stream: %s", http.StatusText(resp.StatusCode)) + } + defer resp.Body.Close() + + if !connected { + // Notify connect + errch <- nil + connected = true + } + + reader := NewEventStreamReader(resp.Body, c.maxBufferSize) + eventChan, errorChan := c.startReadLoop(reader) + + for { + var msg *Event + // Wait for message to arrive or exit + select { + case <-c.subscribed[ch]: + return nil + case err = <-errorChan: + return err + case msg = <-eventChan: + } + + // Wait for message to be sent or exit + if msg != nil { + select { + case <-c.subscribed[ch]: + return nil + case ch <- msg: + // message sent + } + } + } + } + + go func() { + defer c.cleanup(ch) + // Apply user specified reconnection strategy or default to standard NewExponentialBackOff() reconnection method + var err error + if c.ReconnectStrategy != nil { + err = backoff.RetryNotify(operation, c.ReconnectStrategy, c.ReconnectNotify) + } else { + err = backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), c.ReconnectNotify) + } + + // channel closed once connected + if err != nil && !connected { + errch <- err + } + }() + err := <-errch + close(errch) + return err +} + +func (c *Client) startReadLoop(reader *EventStreamReader) (chan *Event, chan error) { + outCh := make(chan *Event) + erChan := make(chan error) + go c.readLoop(reader, outCh, erChan) + return outCh, erChan +} + +func (c *Client) readLoop(reader *EventStreamReader, outCh chan *Event, erChan chan error) { + for { + // Read each new line and process the type of event + event, err := reader.ReadEvent() + if err != nil { + if err == io.EOF { + erChan <- nil + return + } + // run user specified disconnect function + if c.disconnectcb != nil { + c.Connected = false + c.disconnectcb(c) + } + erChan <- err + return + } + + if !c.Connected && c.connectedcb != nil { + c.Connected = true + c.connectedcb(c) + } + + // If we get an error, ignore it. + var msg *Event + if msg, err = c.processEvent(event); err == nil { + if len(msg.ID) > 0 { + c.LastEventID.Store(msg.ID) + } else { + msg.ID, _ = c.LastEventID.Load().([]byte) + } + + // Send downstream if the event has something useful + if msg.hasContent() { + outCh <- msg + } + } + } +} + +// SubscribeRaw to an sse endpoint +func (c *Client) SubscribeRaw(handler func(msg *Event)) error { + return c.Subscribe("", handler) +} + +// SubscribeRawWithContext to an sse endpoint with context +func (c *Client) SubscribeRawWithContext(ctx context.Context, handler func(msg *Event)) error { + return c.SubscribeWithContext(ctx, "", handler) +} + +// SubscribeChanRaw sends all events to the provided channel +func (c *Client) SubscribeChanRaw(ch chan *Event) error { + return c.SubscribeChan("", ch) +} + +// SubscribeChanRawWithContext sends all events to the provided channel with context +func (c *Client) SubscribeChanRawWithContext(ctx context.Context, ch chan *Event) error { + return c.SubscribeChanWithContext(ctx, "", ch) +} + +// Unsubscribe unsubscribes a channel +func (c *Client) Unsubscribe(ch chan *Event) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.subscribed[ch] != nil { + c.subscribed[ch] <- struct{}{} + } +} + +// OnDisconnect specifies the function to run when the connection disconnects +func (c *Client) OnDisconnect(fn ConnCallback) { + c.disconnectcb = fn +} + +// OnConnect specifies the function to run when the connection is successful +func (c *Client) OnConnect(fn ConnCallback) { + c.connectedcb = fn +} + +func (c *Client) request(ctx context.Context, stream string) (*http.Response, error) { + req, err := http.NewRequest("GET", c.URL, nil) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + + // Setup request, specify stream to connect to + if stream != "" { + query := req.URL.Query() + query.Add("stream", stream) + req.URL.RawQuery = query.Encode() + } + + req.Header.Set("Cache-Control", "no-cache") + req.Header.Set("Accept", "text/event-stream") + req.Header.Set("Connection", "keep-alive") + + lastID, exists := c.LastEventID.Load().([]byte) + if exists && lastID != nil { + req.Header.Set("Last-Event-ID", string(lastID)) + } + + // Add user specified headers + for k, v := range c.Headers { + req.Header.Set(k, v) + } + + return c.Connection.Do(req) +} + +func (c *Client) processEvent(msg []byte) (event *Event, err error) { + var e Event + + if len(msg) < 1 { + return nil, errors.New("event message was empty") + } + + // Normalize the crlf to lf to make it easier to split the lines. + // Split the line by "\n" or "\r", per the spec. + for _, line := range bytes.FieldsFunc(msg, func(r rune) bool { return r == '\n' || r == '\r' }) { + switch { + case bytes.HasPrefix(line, headerID): + e.ID = append([]byte(nil), trimHeader(len(headerID), line)...) + case bytes.HasPrefix(line, headerData): + // The spec allows for multiple data fields per event, concatenated them with "\n". + e.Data = append(e.Data[:], append(trimHeader(len(headerData), line), byte('\n'))...) + // The spec says that a line that simply contains the string "data" should be treated as a data field with an empty body. + case bytes.Equal(line, bytes.TrimSuffix(headerData, []byte(":"))): + e.Data = append(e.Data, byte('\n')) + case bytes.HasPrefix(line, headerEvent): + e.Event = append([]byte(nil), trimHeader(len(headerEvent), line)...) + case bytes.HasPrefix(line, headerRetry): + e.Retry = append([]byte(nil), trimHeader(len(headerRetry), line)...) + default: + // Ignore any garbage that doesn't match what we're looking for. + } + } + + // Trim the last "\n" per the spec. + e.Data = bytes.TrimSuffix(e.Data, []byte("\n")) + + if c.EncodingBase64 { + buf := make([]byte, base64.StdEncoding.DecodedLen(len(e.Data))) + + n, err := base64.StdEncoding.Decode(buf, e.Data) + if err != nil { + err = fmt.Errorf("failed to decode event message: %s", err) + } + e.Data = buf[:n] + } + return &e, err +} + +func (c *Client) cleanup(ch chan *Event) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.subscribed[ch] != nil { + close(c.subscribed[ch]) + delete(c.subscribed, ch) + } +} + +func trimHeader(size int, data []byte) []byte { + if data == nil || len(data) < size { + return data + } + + data = data[size:] + // Remove optional leading whitespace + if len(data) > 0 && data[0] == 32 { + data = data[1:] + } + // Remove trailing new line + if len(data) > 0 && data[len(data)-1] == 10 { + data = data[:len(data)-1] + } + return data +} diff --git a/vendor/github.com/r3labs/sse/v2/event.go b/vendor/github.com/r3labs/sse/v2/event.go new file mode 100644 index 00000000000..12580387867 --- /dev/null +++ b/vendor/github.com/r3labs/sse/v2/event.go @@ -0,0 +1,114 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package sse + +import ( + "bufio" + "bytes" + "context" + "io" + "time" +) + +// Event holds all of the event source fields +type Event struct { + timestamp time.Time + ID []byte + Data []byte + Event []byte + Retry []byte + Comment []byte +} + +func (e *Event) hasContent() bool { + return len(e.ID) > 0 || len(e.Data) > 0 || len(e.Event) > 0 || len(e.Retry) > 0 +} + +// EventStreamReader scans an io.Reader looking for EventStream messages. +type EventStreamReader struct { + scanner *bufio.Scanner +} + +// NewEventStreamReader creates an instance of EventStreamReader. +func NewEventStreamReader(eventStream io.Reader, maxBufferSize int) *EventStreamReader { + scanner := bufio.NewScanner(eventStream) + initBufferSize := minPosInt(4096, maxBufferSize) + scanner.Buffer(make([]byte, initBufferSize), maxBufferSize) + + split := func(data []byte, atEOF bool) (int, []byte, error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + + // We have a full event payload to parse. + if i, nlen := containsDoubleNewline(data); i >= 0 { + return i + nlen, data[0:i], nil + } + // If we're at EOF, we have all of the data. + if atEOF { + return len(data), data, nil + } + // Request more data. + return 0, nil, nil + } + // Set the split function for the scanning operation. + scanner.Split(split) + + return &EventStreamReader{ + scanner: scanner, + } +} + +// Returns a tuple containing the index of a double newline, and the number of bytes +// represented by that sequence. If no double newline is present, the first value +// will be negative. +func containsDoubleNewline(data []byte) (int, int) { + // Search for each potentially valid sequence of newline characters + crcr := bytes.Index(data, []byte("\r\r")) + lflf := bytes.Index(data, []byte("\n\n")) + crlflf := bytes.Index(data, []byte("\r\n\n")) + lfcrlf := bytes.Index(data, []byte("\n\r\n")) + crlfcrlf := bytes.Index(data, []byte("\r\n\r\n")) + // Find the earliest position of a double newline combination + minPos := minPosInt(crcr, minPosInt(lflf, minPosInt(crlflf, minPosInt(lfcrlf, crlfcrlf)))) + // Detemine the length of the sequence + nlen := 2 + if minPos == crlfcrlf { + nlen = 4 + } else if minPos == crlflf || minPos == lfcrlf { + nlen = 3 + } + return minPos, nlen +} + +// Returns the minimum non-negative value out of the two values. If both +// are negative, a negative value is returned. +func minPosInt(a, b int) int { + if a < 0 { + return b + } + if b < 0 { + return a + } + if a > b { + return b + } + return a +} + +// ReadEvent scans the EventStream for events. +func (e *EventStreamReader) ReadEvent() ([]byte, error) { + if e.scanner.Scan() { + event := e.scanner.Bytes() + return event, nil + } + if err := e.scanner.Err(); err != nil { + if err == context.Canceled { + return nil, io.EOF + } + return nil, err + } + return nil, io.EOF +} diff --git a/vendor/github.com/r3labs/sse/v2/event_log.go b/vendor/github.com/r3labs/sse/v2/event_log.go new file mode 100644 index 00000000000..aa17dad0584 --- /dev/null +++ b/vendor/github.com/r3labs/sse/v2/event_log.go @@ -0,0 +1,43 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package sse + +import ( + "strconv" + "time" +) + +// EventLog holds all of previous events +type EventLog []*Event + +// Add event to eventlog +func (e *EventLog) Add(ev *Event) { + if !ev.hasContent() { + return + } + + ev.ID = []byte(e.currentindex()) + ev.timestamp = time.Now() + *e = append(*e, ev) +} + +// Clear events from eventlog +func (e *EventLog) Clear() { + *e = nil +} + +// Replay events to a subscriber +func (e *EventLog) Replay(s *Subscriber) { + for i := 0; i < len(*e); i++ { + id, _ := strconv.Atoi(string((*e)[i].ID)) + if id >= s.eventid { + s.connection <- (*e)[i] + } + } +} + +func (e *EventLog) currentindex() string { + return strconv.Itoa(len(*e)) +} diff --git a/vendor/github.com/r3labs/sse/v2/http.go b/vendor/github.com/r3labs/sse/v2/http.go new file mode 100644 index 00000000000..c7a2b434a9b --- /dev/null +++ b/vendor/github.com/r3labs/sse/v2/http.go @@ -0,0 +1,120 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package sse + +import ( + "bytes" + "fmt" + "net/http" + "strconv" + "time" +) + +// ServeHTTP serves new connections with events for a given stream ... +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + flusher, err := w.(http.Flusher) + if !err { + http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + for k, v := range s.Headers { + w.Header().Set(k, v) + } + + // Get the StreamID from the URL + streamID := r.URL.Query().Get("stream") + if streamID == "" { + http.Error(w, "Please specify a stream!", http.StatusInternalServerError) + return + } + + stream := s.getStream(streamID) + + if stream == nil { + if !s.AutoStream { + http.Error(w, "Stream not found!", http.StatusInternalServerError) + return + } + + stream = s.CreateStream(streamID) + } + + eventid := 0 + if id := r.Header.Get("Last-Event-ID"); id != "" { + var err error + eventid, err = strconv.Atoi(id) + if err != nil { + http.Error(w, "Last-Event-ID must be a number!", http.StatusBadRequest) + return + } + } + + // Create the stream subscriber + sub := stream.addSubscriber(eventid, r.URL) + + go func() { + <-r.Context().Done() + + sub.close() + + if s.AutoStream && !s.AutoReplay && stream.getSubscriberCount() == 0 { + s.RemoveStream(streamID) + } + }() + + w.WriteHeader(http.StatusOK) + flusher.Flush() + + // Push events to client + for ev := range sub.connection { + // If the data buffer is an empty string abort. + if len(ev.Data) == 0 && len(ev.Comment) == 0 { + break + } + + // if the event has expired, dont send it + if s.EventTTL != 0 && time.Now().After(ev.timestamp.Add(s.EventTTL)) { + continue + } + + if len(ev.Data) > 0 { + fmt.Fprintf(w, "id: %s\n", ev.ID) + + if s.SplitData { + sd := bytes.Split(ev.Data, []byte("\n")) + for i := range sd { + fmt.Fprintf(w, "data: %s\n", sd[i]) + } + } else { + if bytes.HasPrefix(ev.Data, []byte(":")) { + fmt.Fprintf(w, "%s\n", ev.Data) + } else { + fmt.Fprintf(w, "data: %s\n", ev.Data) + } + } + + if len(ev.Event) > 0 { + fmt.Fprintf(w, "event: %s\n", ev.Event) + } + + if len(ev.Retry) > 0 { + fmt.Fprintf(w, "retry: %s\n", ev.Retry) + } + } + + if len(ev.Comment) > 0 { + fmt.Fprintf(w, ": %s\n", ev.Comment) + } + + fmt.Fprint(w, "\n") + + flusher.Flush() + } +} diff --git a/vendor/github.com/r3labs/sse/v2/server.go b/vendor/github.com/r3labs/sse/v2/server.go new file mode 100644 index 00000000000..d1b27af3257 --- /dev/null +++ b/vendor/github.com/r3labs/sse/v2/server.go @@ -0,0 +1,156 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package sse + +import ( + "encoding/base64" + "sync" + "time" +) + +// DefaultBufferSize size of the queue that holds the streams messages. +const DefaultBufferSize = 1024 + +// Server Is our main struct +type Server struct { + // Extra headers adding to the HTTP response to each client + Headers map[string]string + // Sets a ttl that prevents old events from being transmitted + EventTTL time.Duration + // Specifies the size of the message buffer for each stream + BufferSize int + // Encodes all data as base64 + EncodeBase64 bool + // Splits an events data into multiple data: entries + SplitData bool + // Enables creation of a stream when a client connects + AutoStream bool + // Enables automatic replay for each new subscriber that connects + AutoReplay bool + + // Specifies the function to run when client subscribe or un-subscribe + OnSubscribe func(streamID string, sub *Subscriber) + OnUnsubscribe func(streamID string, sub *Subscriber) + + streams map[string]*Stream + muStreams sync.RWMutex +} + +// New will create a server and setup defaults +func New() *Server { + return &Server{ + BufferSize: DefaultBufferSize, + AutoStream: false, + AutoReplay: true, + streams: make(map[string]*Stream), + Headers: map[string]string{}, + } +} + +// NewWithCallback will create a server and setup defaults with callback function +func NewWithCallback(onSubscribe, onUnsubscribe func(streamID string, sub *Subscriber)) *Server { + return &Server{ + BufferSize: DefaultBufferSize, + AutoStream: false, + AutoReplay: true, + streams: make(map[string]*Stream), + Headers: map[string]string{}, + OnSubscribe: onSubscribe, + OnUnsubscribe: onUnsubscribe, + } +} + +// Close shuts down the server, closes all of the streams and connections +func (s *Server) Close() { + s.muStreams.Lock() + defer s.muStreams.Unlock() + + for id := range s.streams { + s.streams[id].close() + delete(s.streams, id) + } +} + +// CreateStream will create a new stream and register it +func (s *Server) CreateStream(id string) *Stream { + s.muStreams.Lock() + defer s.muStreams.Unlock() + + if s.streams[id] != nil { + return s.streams[id] + } + + str := newStream(id, s.BufferSize, s.AutoReplay, s.AutoStream, s.OnSubscribe, s.OnUnsubscribe) + str.run() + + s.streams[id] = str + + return str +} + +// RemoveStream will remove a stream +func (s *Server) RemoveStream(id string) { + s.muStreams.Lock() + defer s.muStreams.Unlock() + + if s.streams[id] != nil { + s.streams[id].close() + delete(s.streams, id) + } +} + +// StreamExists checks whether a stream by a given id exists +func (s *Server) StreamExists(id string) bool { + return s.getStream(id) != nil +} + +// Publish sends a mesage to every client in a streamID. +// If the stream's buffer is full, it blocks until the message is sent out to +// all subscribers (but not necessarily arrived the clients), or when the +// stream is closed. +func (s *Server) Publish(id string, event *Event) { + stream := s.getStream(id) + if stream == nil { + return + } + + select { + case <-stream.quit: + case stream.event <- s.process(event): + } +} + +// TryPublish is the same as Publish except that when the operation would cause +// the call to be blocked, it simply drops the message and returns false. +// Together with a small BufferSize, it can be useful when publishing the +// latest message ASAP is more important than reliable delivery. +func (s *Server) TryPublish(id string, event *Event) bool { + stream := s.getStream(id) + if stream == nil { + return false + } + + select { + case stream.event <- s.process(event): + return true + default: + return false + } +} + +func (s *Server) getStream(id string) *Stream { + s.muStreams.RLock() + defer s.muStreams.RUnlock() + return s.streams[id] +} + +func (s *Server) process(event *Event) *Event { + if s.EncodeBase64 { + output := make([]byte, base64.StdEncoding.EncodedLen(len(event.Data))) + base64.StdEncoding.Encode(output, event.Data) + event.Data = output + } + return event +} diff --git a/vendor/github.com/r3labs/sse/v2/stream.go b/vendor/github.com/r3labs/sse/v2/stream.go new file mode 100644 index 00000000000..bfbcb9b5238 --- /dev/null +++ b/vendor/github.com/r3labs/sse/v2/stream.go @@ -0,0 +1,153 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package sse + +import ( + "net/url" + "sync" + "sync/atomic" +) + +// Stream ... +type Stream struct { + ID string + event chan *Event + quit chan struct{} + quitOnce sync.Once + register chan *Subscriber + deregister chan *Subscriber + subscribers []*Subscriber + Eventlog EventLog + subscriberCount int32 + // Enables replaying of eventlog to newly added subscribers + AutoReplay bool + isAutoStream bool + + // Specifies the function to run when client subscribe or un-subscribe + OnSubscribe func(streamID string, sub *Subscriber) + OnUnsubscribe func(streamID string, sub *Subscriber) +} + +// newStream returns a new stream +func newStream(id string, buffSize int, replay, isAutoStream bool, onSubscribe, onUnsubscribe func(string, *Subscriber)) *Stream { + return &Stream{ + ID: id, + AutoReplay: replay, + subscribers: make([]*Subscriber, 0), + isAutoStream: isAutoStream, + register: make(chan *Subscriber), + deregister: make(chan *Subscriber), + event: make(chan *Event, buffSize), + quit: make(chan struct{}), + Eventlog: make(EventLog, 0), + OnSubscribe: onSubscribe, + OnUnsubscribe: onUnsubscribe, + } +} + +func (str *Stream) run() { + go func(str *Stream) { + for { + select { + // Add new subscriber + case subscriber := <-str.register: + str.subscribers = append(str.subscribers, subscriber) + if str.AutoReplay { + str.Eventlog.Replay(subscriber) + } + + // Remove closed subscriber + case subscriber := <-str.deregister: + i := str.getSubIndex(subscriber) + if i != -1 { + str.removeSubscriber(i) + } + + if str.OnUnsubscribe != nil { + go str.OnUnsubscribe(str.ID, subscriber) + } + + // Publish event to subscribers + case event := <-str.event: + if str.AutoReplay { + str.Eventlog.Add(event) + } + for i := range str.subscribers { + str.subscribers[i].connection <- event + } + + // Shutdown if the server closes + case <-str.quit: + // remove connections + str.removeAllSubscribers() + return + } + } + }(str) +} + +func (str *Stream) close() { + str.quitOnce.Do(func() { + close(str.quit) + }) +} + +func (str *Stream) getSubIndex(sub *Subscriber) int { + for i := range str.subscribers { + if str.subscribers[i] == sub { + return i + } + } + return -1 +} + +// addSubscriber will create a new subscriber on a stream +func (str *Stream) addSubscriber(eventid int, url *url.URL) *Subscriber { + atomic.AddInt32(&str.subscriberCount, 1) + sub := &Subscriber{ + eventid: eventid, + quit: str.deregister, + connection: make(chan *Event, 64), + URL: url, + } + + if str.isAutoStream { + sub.removed = make(chan struct{}, 1) + } + + str.register <- sub + + if str.OnSubscribe != nil { + go str.OnSubscribe(str.ID, sub) + } + + return sub +} + +func (str *Stream) removeSubscriber(i int) { + atomic.AddInt32(&str.subscriberCount, -1) + close(str.subscribers[i].connection) + if str.subscribers[i].removed != nil { + str.subscribers[i].removed <- struct{}{} + close(str.subscribers[i].removed) + } + str.subscribers = append(str.subscribers[:i], str.subscribers[i+1:]...) +} + +func (str *Stream) removeAllSubscribers() { + for i := 0; i < len(str.subscribers); i++ { + close(str.subscribers[i].connection) + if str.subscribers[i].removed != nil { + str.subscribers[i].removed <- struct{}{} + close(str.subscribers[i].removed) + } + } + atomic.StoreInt32(&str.subscriberCount, 0) + str.subscribers = str.subscribers[:0] +} + +func (str *Stream) getSubscriberCount() int { + return int(atomic.LoadInt32(&str.subscriberCount)) +} diff --git a/vendor/github.com/r3labs/sse/v2/subscriber.go b/vendor/github.com/r3labs/sse/v2/subscriber.go new file mode 100644 index 00000000000..4b54c204f30 --- /dev/null +++ b/vendor/github.com/r3labs/sse/v2/subscriber.go @@ -0,0 +1,24 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package sse + +import "net/url" + +// Subscriber ... +type Subscriber struct { + quit chan *Subscriber + connection chan *Event + removed chan struct{} + eventid int + URL *url.URL +} + +// Close will let the stream know that the clients connection has terminated +func (s *Subscriber) close() { + s.quit <- s + if s.removed != nil { + <-s.removed + } +} diff --git a/vendor/gopkg.in/cenkalti/backoff.v1/.gitignore b/vendor/gopkg.in/cenkalti/backoff.v1/.gitignore new file mode 100644 index 00000000000..00268614f04 --- /dev/null +++ b/vendor/gopkg.in/cenkalti/backoff.v1/.gitignore @@ -0,0 +1,22 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe diff --git a/vendor/gopkg.in/cenkalti/backoff.v1/.travis.yml b/vendor/gopkg.in/cenkalti/backoff.v1/.travis.yml new file mode 100644 index 00000000000..1040404bfbc --- /dev/null +++ b/vendor/gopkg.in/cenkalti/backoff.v1/.travis.yml @@ -0,0 +1,9 @@ +language: go +go: + - 1.3.3 + - tip +before_install: + - go get github.com/mattn/goveralls + - go get golang.org/x/tools/cmd/cover +script: + - $HOME/gopath/bin/goveralls -service=travis-ci diff --git a/vendor/gopkg.in/cenkalti/backoff.v1/LICENSE b/vendor/gopkg.in/cenkalti/backoff.v1/LICENSE new file mode 100644 index 00000000000..89b81799655 --- /dev/null +++ b/vendor/gopkg.in/cenkalti/backoff.v1/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2014 Cenk Altı + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/gopkg.in/cenkalti/backoff.v1/README.md b/vendor/gopkg.in/cenkalti/backoff.v1/README.md new file mode 100644 index 00000000000..13b347fb951 --- /dev/null +++ b/vendor/gopkg.in/cenkalti/backoff.v1/README.md @@ -0,0 +1,30 @@ +# Exponential Backoff [![GoDoc][godoc image]][godoc] [![Build Status][travis image]][travis] [![Coverage Status][coveralls image]][coveralls] + +This is a Go port of the exponential backoff algorithm from [Google's HTTP Client Library for Java][google-http-java-client]. + +[Exponential backoff][exponential backoff wiki] +is an algorithm that uses feedback to multiplicatively decrease the rate of some process, +in order to gradually find an acceptable rate. +The retries exponentially increase and stop increasing when a certain threshold is met. + +## Usage + +See https://godoc.org/github.com/cenkalti/backoff#pkg-examples + +## Contributing + +* I would like to keep this library as small as possible. +* Please don't send a PR without opening an issue and discussing it first. +* If proposed change is not a common use case, I will probably not accept it. + +[godoc]: https://godoc.org/github.com/cenkalti/backoff +[godoc image]: https://godoc.org/github.com/cenkalti/backoff?status.png +[travis]: https://travis-ci.org/cenkalti/backoff +[travis image]: https://travis-ci.org/cenkalti/backoff.png?branch=master +[coveralls]: https://coveralls.io/github/cenkalti/backoff?branch=master +[coveralls image]: https://coveralls.io/repos/github/cenkalti/backoff/badge.svg?branch=master + +[google-http-java-client]: https://github.com/google/google-http-java-client +[exponential backoff wiki]: http://en.wikipedia.org/wiki/Exponential_backoff + +[advanced example]: https://godoc.org/github.com/cenkalti/backoff#example_ diff --git a/vendor/gopkg.in/cenkalti/backoff.v1/backoff.go b/vendor/gopkg.in/cenkalti/backoff.v1/backoff.go new file mode 100644 index 00000000000..2102c5f2de9 --- /dev/null +++ b/vendor/gopkg.in/cenkalti/backoff.v1/backoff.go @@ -0,0 +1,66 @@ +// Package backoff implements backoff algorithms for retrying operations. +// +// Use Retry function for retrying operations that may fail. +// If Retry does not meet your needs, +// copy/paste the function into your project and modify as you wish. +// +// There is also Ticker type similar to time.Ticker. +// You can use it if you need to work with channels. +// +// See Examples section below for usage examples. +package backoff + +import "time" + +// BackOff is a backoff policy for retrying an operation. +type BackOff interface { + // NextBackOff returns the duration to wait before retrying the operation, + // or backoff.Stop to indicate that no more retries should be made. + // + // Example usage: + // + // duration := backoff.NextBackOff(); + // if (duration == backoff.Stop) { + // // Do not retry operation. + // } else { + // // Sleep for duration and retry operation. + // } + // + NextBackOff() time.Duration + + // Reset to initial state. + Reset() +} + +// Stop indicates that no more retries should be made for use in NextBackOff(). +const Stop time.Duration = -1 + +// ZeroBackOff is a fixed backoff policy whose backoff time is always zero, +// meaning that the operation is retried immediately without waiting, indefinitely. +type ZeroBackOff struct{} + +func (b *ZeroBackOff) Reset() {} + +func (b *ZeroBackOff) NextBackOff() time.Duration { return 0 } + +// StopBackOff is a fixed backoff policy that always returns backoff.Stop for +// NextBackOff(), meaning that the operation should never be retried. +type StopBackOff struct{} + +func (b *StopBackOff) Reset() {} + +func (b *StopBackOff) NextBackOff() time.Duration { return Stop } + +// ConstantBackOff is a backoff policy that always returns the same backoff delay. +// This is in contrast to an exponential backoff policy, +// which returns a delay that grows longer as you call NextBackOff() over and over again. +type ConstantBackOff struct { + Interval time.Duration +} + +func (b *ConstantBackOff) Reset() {} +func (b *ConstantBackOff) NextBackOff() time.Duration { return b.Interval } + +func NewConstantBackOff(d time.Duration) *ConstantBackOff { + return &ConstantBackOff{Interval: d} +} diff --git a/vendor/gopkg.in/cenkalti/backoff.v1/context.go b/vendor/gopkg.in/cenkalti/backoff.v1/context.go new file mode 100644 index 00000000000..5d157092544 --- /dev/null +++ b/vendor/gopkg.in/cenkalti/backoff.v1/context.go @@ -0,0 +1,60 @@ +package backoff + +import ( + "time" + + "golang.org/x/net/context" +) + +// BackOffContext is a backoff policy that stops retrying after the context +// is canceled. +type BackOffContext interface { + BackOff + Context() context.Context +} + +type backOffContext struct { + BackOff + ctx context.Context +} + +// WithContext returns a BackOffContext with context ctx +// +// ctx must not be nil +func WithContext(b BackOff, ctx context.Context) BackOffContext { + if ctx == nil { + panic("nil context") + } + + if b, ok := b.(*backOffContext); ok { + return &backOffContext{ + BackOff: b.BackOff, + ctx: ctx, + } + } + + return &backOffContext{ + BackOff: b, + ctx: ctx, + } +} + +func ensureContext(b BackOff) BackOffContext { + if cb, ok := b.(BackOffContext); ok { + return cb + } + return WithContext(b, context.Background()) +} + +func (b *backOffContext) Context() context.Context { + return b.ctx +} + +func (b *backOffContext) NextBackOff() time.Duration { + select { + case <-b.Context().Done(): + return Stop + default: + return b.BackOff.NextBackOff() + } +} diff --git a/vendor/gopkg.in/cenkalti/backoff.v1/exponential.go b/vendor/gopkg.in/cenkalti/backoff.v1/exponential.go new file mode 100644 index 00000000000..9a6addf0750 --- /dev/null +++ b/vendor/gopkg.in/cenkalti/backoff.v1/exponential.go @@ -0,0 +1,156 @@ +package backoff + +import ( + "math/rand" + "time" +) + +/* +ExponentialBackOff is a backoff implementation that increases the backoff +period for each retry attempt using a randomization function that grows exponentially. + +NextBackOff() is calculated using the following formula: + + randomized interval = + RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor]) + +In other words NextBackOff() will range between the randomization factor +percentage below and above the retry interval. + +For example, given the following parameters: + + RetryInterval = 2 + RandomizationFactor = 0.5 + Multiplier = 2 + +the actual backoff period used in the next retry attempt will range between 1 and 3 seconds, +multiplied by the exponential, that is, between 2 and 6 seconds. + +Note: MaxInterval caps the RetryInterval and not the randomized interval. + +If the time elapsed since an ExponentialBackOff instance is created goes past the +MaxElapsedTime, then the method NextBackOff() starts returning backoff.Stop. + +The elapsed time can be reset by calling Reset(). + +Example: Given the following default arguments, for 10 tries the sequence will be, +and assuming we go over the MaxElapsedTime on the 10th try: + + Request # RetryInterval (seconds) Randomized Interval (seconds) + + 1 0.5 [0.25, 0.75] + 2 0.75 [0.375, 1.125] + 3 1.125 [0.562, 1.687] + 4 1.687 [0.8435, 2.53] + 5 2.53 [1.265, 3.795] + 6 3.795 [1.897, 5.692] + 7 5.692 [2.846, 8.538] + 8 8.538 [4.269, 12.807] + 9 12.807 [6.403, 19.210] + 10 19.210 backoff.Stop + +Note: Implementation is not thread-safe. +*/ +type ExponentialBackOff struct { + InitialInterval time.Duration + RandomizationFactor float64 + Multiplier float64 + MaxInterval time.Duration + // After MaxElapsedTime the ExponentialBackOff stops. + // It never stops if MaxElapsedTime == 0. + MaxElapsedTime time.Duration + Clock Clock + + currentInterval time.Duration + startTime time.Time + random *rand.Rand +} + +// Clock is an interface that returns current time for BackOff. +type Clock interface { + Now() time.Time +} + +// Default values for ExponentialBackOff. +const ( + DefaultInitialInterval = 500 * time.Millisecond + DefaultRandomizationFactor = 0.5 + DefaultMultiplier = 1.5 + DefaultMaxInterval = 60 * time.Second + DefaultMaxElapsedTime = 15 * time.Minute +) + +// NewExponentialBackOff creates an instance of ExponentialBackOff using default values. +func NewExponentialBackOff() *ExponentialBackOff { + b := &ExponentialBackOff{ + InitialInterval: DefaultInitialInterval, + RandomizationFactor: DefaultRandomizationFactor, + Multiplier: DefaultMultiplier, + MaxInterval: DefaultMaxInterval, + MaxElapsedTime: DefaultMaxElapsedTime, + Clock: SystemClock, + random: rand.New(rand.NewSource(time.Now().UnixNano())), + } + b.Reset() + return b +} + +type systemClock struct{} + +func (t systemClock) Now() time.Time { + return time.Now() +} + +// SystemClock implements Clock interface that uses time.Now(). +var SystemClock = systemClock{} + +// Reset the interval back to the initial retry interval and restarts the timer. +func (b *ExponentialBackOff) Reset() { + b.currentInterval = b.InitialInterval + b.startTime = b.Clock.Now() +} + +// NextBackOff calculates the next backoff interval using the formula: +// Randomized interval = RetryInterval +/- (RandomizationFactor * RetryInterval) +func (b *ExponentialBackOff) NextBackOff() time.Duration { + // Make sure we have not gone over the maximum elapsed time. + if b.MaxElapsedTime != 0 && b.GetElapsedTime() > b.MaxElapsedTime { + return Stop + } + defer b.incrementCurrentInterval() + if b.random == nil { + b.random = rand.New(rand.NewSource(time.Now().UnixNano())) + } + return getRandomValueFromInterval(b.RandomizationFactor, b.random.Float64(), b.currentInterval) +} + +// GetElapsedTime returns the elapsed time since an ExponentialBackOff instance +// is created and is reset when Reset() is called. +// +// The elapsed time is computed using time.Now().UnixNano(). +func (b *ExponentialBackOff) GetElapsedTime() time.Duration { + return b.Clock.Now().Sub(b.startTime) +} + +// Increments the current interval by multiplying it with the multiplier. +func (b *ExponentialBackOff) incrementCurrentInterval() { + // Check for overflow, if overflow is detected set the current interval to the max interval. + if float64(b.currentInterval) >= float64(b.MaxInterval)/b.Multiplier { + b.currentInterval = b.MaxInterval + } else { + b.currentInterval = time.Duration(float64(b.currentInterval) * b.Multiplier) + } +} + +// Returns a random value from the following interval: +// [randomizationFactor * currentInterval, randomizationFactor * currentInterval]. +func getRandomValueFromInterval(randomizationFactor, random float64, currentInterval time.Duration) time.Duration { + var delta = randomizationFactor * float64(currentInterval) + var minInterval = float64(currentInterval) - delta + var maxInterval = float64(currentInterval) + delta + + // Get a random value from the range [minInterval, maxInterval]. + // The formula used below has a +1 because if the minInterval is 1 and the maxInterval is 3 then + // we want a 33% chance for selecting either 1, 2 or 3. + return time.Duration(minInterval + (random * (maxInterval - minInterval + 1))) +} diff --git a/vendor/gopkg.in/cenkalti/backoff.v1/retry.go b/vendor/gopkg.in/cenkalti/backoff.v1/retry.go new file mode 100644 index 00000000000..5dbd825b5c8 --- /dev/null +++ b/vendor/gopkg.in/cenkalti/backoff.v1/retry.go @@ -0,0 +1,78 @@ +package backoff + +import "time" + +// An Operation is executing by Retry() or RetryNotify(). +// The operation will be retried using a backoff policy if it returns an error. +type Operation func() error + +// Notify is a notify-on-error function. It receives an operation error and +// backoff delay if the operation failed (with an error). +// +// NOTE that if the backoff policy stated to stop retrying, +// the notify function isn't called. +type Notify func(error, time.Duration) + +// Retry the operation o until it does not return error or BackOff stops. +// o is guaranteed to be run at least once. +// It is the caller's responsibility to reset b after Retry returns. +// +// If o returns a *PermanentError, the operation is not retried, and the +// wrapped error is returned. +// +// Retry sleeps the goroutine for the duration returned by BackOff after a +// failed operation returns. +func Retry(o Operation, b BackOff) error { return RetryNotify(o, b, nil) } + +// RetryNotify calls notify function with the error and wait duration +// for each failed attempt before sleep. +func RetryNotify(operation Operation, b BackOff, notify Notify) error { + var err error + var next time.Duration + + cb := ensureContext(b) + + b.Reset() + for { + if err = operation(); err == nil { + return nil + } + + if permanent, ok := err.(*PermanentError); ok { + return permanent.Err + } + + if next = b.NextBackOff(); next == Stop { + return err + } + + if notify != nil { + notify(err, next) + } + + t := time.NewTimer(next) + + select { + case <-cb.Context().Done(): + t.Stop() + return err + case <-t.C: + } + } +} + +// PermanentError signals that the operation should not be retried. +type PermanentError struct { + Err error +} + +func (e *PermanentError) Error() string { + return e.Err.Error() +} + +// Permanent wraps the given err in a *PermanentError. +func Permanent(err error) *PermanentError { + return &PermanentError{ + Err: err, + } +} diff --git a/vendor/gopkg.in/cenkalti/backoff.v1/ticker.go b/vendor/gopkg.in/cenkalti/backoff.v1/ticker.go new file mode 100644 index 00000000000..49a99718d74 --- /dev/null +++ b/vendor/gopkg.in/cenkalti/backoff.v1/ticker.go @@ -0,0 +1,81 @@ +package backoff + +import ( + "runtime" + "sync" + "time" +) + +// Ticker holds a channel that delivers `ticks' of a clock at times reported by a BackOff. +// +// Ticks will continue to arrive when the previous operation is still running, +// so operations that take a while to fail could run in quick succession. +type Ticker struct { + C <-chan time.Time + c chan time.Time + b BackOffContext + stop chan struct{} + stopOnce sync.Once +} + +// NewTicker returns a new Ticker containing a channel that will send the time at times +// specified by the BackOff argument. Ticker is guaranteed to tick at least once. +// The channel is closed when Stop method is called or BackOff stops. +func NewTicker(b BackOff) *Ticker { + c := make(chan time.Time) + t := &Ticker{ + C: c, + c: c, + b: ensureContext(b), + stop: make(chan struct{}), + } + go t.run() + runtime.SetFinalizer(t, (*Ticker).Stop) + return t +} + +// Stop turns off a ticker. After Stop, no more ticks will be sent. +func (t *Ticker) Stop() { + t.stopOnce.Do(func() { close(t.stop) }) +} + +func (t *Ticker) run() { + c := t.c + defer close(c) + t.b.Reset() + + // Ticker is guaranteed to tick at least once. + afterC := t.send(time.Now()) + + for { + if afterC == nil { + return + } + + select { + case tick := <-afterC: + afterC = t.send(tick) + case <-t.stop: + t.c = nil // Prevent future ticks from being sent to the channel. + return + case <-t.b.Context().Done(): + return + } + } +} + +func (t *Ticker) send(tick time.Time) <-chan time.Time { + select { + case t.c <- tick: + case <-t.stop: + return nil + } + + next := t.b.NextBackOff() + if next == Stop { + t.Stop() + return nil + } + + return time.After(next) +} diff --git a/vendor/gopkg.in/cenkalti/backoff.v1/tries.go b/vendor/gopkg.in/cenkalti/backoff.v1/tries.go new file mode 100644 index 00000000000..d2da7308b6a --- /dev/null +++ b/vendor/gopkg.in/cenkalti/backoff.v1/tries.go @@ -0,0 +1,35 @@ +package backoff + +import "time" + +/* +WithMaxTries creates a wrapper around another BackOff, which will +return Stop if NextBackOff() has been called too many times since +the last time Reset() was called + +Note: Implementation is not thread-safe. +*/ +func WithMaxTries(b BackOff, max uint64) BackOff { + return &backOffTries{delegate: b, maxTries: max} +} + +type backOffTries struct { + delegate BackOff + maxTries uint64 + numTries uint64 +} + +func (b *backOffTries) NextBackOff() time.Duration { + if b.maxTries > 0 { + if b.maxTries <= b.numTries { + return Stop + } + b.numTries++ + } + return b.delegate.NextBackOff() +} + +func (b *backOffTries) Reset() { + b.numTries = 0 + b.delegate.Reset() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index e45cb3a618f..5b139594d13 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1555,6 +1555,9 @@ github.com/prometheus/procfs/internal/util github.com/prometheus/statsd_exporter/pkg/level github.com/prometheus/statsd_exporter/pkg/mapper github.com/prometheus/statsd_exporter/pkg/mapper/fsm +# github.com/r3labs/sse/v2 v2.10.0 +## explicit; go 1.13 +github.com/r3labs/sse/v2 # github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 ## explicit github.com/rcrowley/go-metrics @@ -2150,6 +2153,9 @@ google.golang.org/protobuf/types/known/fieldmaskpb google.golang.org/protobuf/types/known/structpb google.golang.org/protobuf/types/known/timestamppb google.golang.org/protobuf/types/known/wrapperspb +# gopkg.in/cenkalti/backoff.v1 v1.1.0 +## explicit +gopkg.in/cenkalti/backoff.v1 # gopkg.in/ini.v1 v1.67.0 ## explicit gopkg.in/ini.v1 From 77aa84171fb5024bd38b6a9f4fa85bb5a8a07cdc Mon Sep 17 00:00:00 2001 From: jkoberg Date: Mon, 26 Jun 2023 16:51:56 +0200 Subject: [PATCH 3/7] add sse info to readme Signed-off-by: jkoberg --- services/userlog/README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/services/userlog/README.md b/services/userlog/README.md index 009c561bfb5..fe0901781a5 100644 --- a/services/userlog/README.md +++ b/services/userlog/README.md @@ -30,6 +30,10 @@ For the time being, the configuration which user related events are of interest The `userlog` service provides an API to retrieve configured events. For now, this API is mostly following the [oc10 notification GET API](https://doc.owncloud.com/server/next/developer_manual/core/apis/ocs-notification-endpoint-v1.html#get-user-notifications). +## Subscribing + +Additionaly to the oc10 API the `userlog` service also provides an `/sse` (Server-Sent Events) endpoint to be informed by the server when an event happens. See [here](https://medium.com/yemeksepeti-teknoloji/what-is-server-sent-events-sse-and-how-to-implement-it-904938bffd73) for example for simple introduction to server sent events. The `sse` endpoint will respect language changes of the user without needing to reconnect. + ## Deleting To delete events for an user, use a `DELETE` request to `ocs/v2.php/apps/notifications/api/v1/notifications` containing the IDs to delete. From d9d6da856f5d7d7e4070eab2a723425fdf3906fa Mon Sep 17 00:00:00 2001 From: jkoberg Date: Wed, 28 Jun 2023 10:58:49 +0200 Subject: [PATCH 4/7] improve logging and error handling Signed-off-by: jkoberg --- services/userlog/pkg/service/http.go | 11 +++-- services/userlog/pkg/service/service.go | 64 ++++++++++++++++--------- 2 files changed, 49 insertions(+), 26 deletions(-) diff --git a/services/userlog/pkg/service/http.go b/services/userlog/pkg/service/http.go index f674fa34601..b1a9e82588f 100644 --- a/services/userlog/pkg/service/http.go +++ b/services/userlog/pkg/service/http.go @@ -32,19 +32,22 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request return } - conv := NewConverter(r.Header.Get(HeaderAcceptLanguage), ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath) + conv := ul.getConverter(r.Header.Get(HeaderAcceptLanguage)) resp := GetEventResponseOC10{} for _, e := range evs { etype, ok := ul.registeredEvents[e.Type] if !ok { - // this should not happen + ul.log.Error().Str("eventid", e.Id).Str("eventtype", e.Type).Msg("event not registered") + continue } einterface, err := etype.Unmarshal(e.Event) if err != nil { - // this shouldn't happen either + ul.log.Error().Str("eventid", e.Id).Str("eventtype", e.Type).Msg("failed to umarshal event") + continue } + noti, err := conv.ConvertEvent(e.Id, einterface) if err != nil { ul.log.Error().Err(err).Str("eventid", e.Id).Str("eventtype", e.Type).Msg("failed to convert event") @@ -63,12 +66,14 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request func (ul *UserlogService) HandleSSE(w http.ResponseWriter, r *http.Request) { u, ok := ctx.ContextGetUser(r.Context()) if !ok { + ul.log.Error().Msg("sse: no user in context") w.WriteHeader(http.StatusInternalServerError) return } uid := u.GetId().GetOpaqueId() if uid == "" { + ul.log.Error().Msg("sse: user in context is broken") w.WriteHeader(http.StatusInternalServerError) return } diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index d2aec634287..717e8c77c1d 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -30,8 +30,6 @@ import ( "google.golang.org/grpc/metadata" ) -var _defaultLocale = "en" - // UserlogService is the service responsible for user activities type UserlogService struct { log log.Logger @@ -232,16 +230,29 @@ func (ul *UserlogService) DeleteEvents(userid string, evids []string) error { } func (ul *UserlogService) addEventToUser(userid string, event events.Event) error { - loc := getUserLang(context.Background(), userid, ul.valueClient) - ev, _ := NewConverter(loc, ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath).ConvertEvent(event.ID, event.Event) - b, _ := json.Marshal(ev) - - ul.sse.Publish(userid, &sse.Event{Data: b}) + if err := ul.sendSSE(userid, event); err != nil { + ul.log.Error().Err(err).Str("userid", userid).Str("eventid", event.ID).Msg("cannot create sse event") + } return ul.alterUserEventList(userid, func(ids []string) []string { return append(ids, event.ID) }) } +func (ul *UserlogService) sendSSE(userid string, event events.Event) error { + ev, err := ul.getConverter(ul.getUserLocale(userid)).ConvertEvent(event.ID, event.Event) + if err != nil { + return err + } + + b, err := json.Marshal(ev) + if err != nil { + return err + } + + ul.sse.Publish(userid, &sse.Event{Data: b}) + return nil +} + func (ul *UserlogService) removeExpiredEvents(userid string, all []string, received []*ehmsg.Event) error { exists := make(map[string]struct{}, len(received)) for _, e := range received { @@ -412,6 +423,29 @@ func (ul *UserlogService) impersonate(uid *user.UserId) context.Context { return ctx } +func (ul *UserlogService) getUserLocale(userid string) string { + resp, err := ul.valueClient.GetValueByUniqueIdentifiers( + micrometadata.Set(context.Background(), middleware.AccountID, userid), + &settingssvc.GetValueByUniqueIdentifiersRequest{ + AccountUuid: userid, + SettingId: defaults.SettingUUIDProfileLanguage, + }, + ) + if err != nil { + ul.log.Error().Err(err).Str("userid", userid).Msg("cannot get users locale") + return "" + } + val := resp.GetValue().GetValue().GetListValue().GetValues() + if len(val) == 0 { + return "" + } + return val[0].GetStringValue() +} + +func (ul *UserlogService) getConverter(locale string) *Converter { + return NewConverter(locale, ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath) +} + func authenticate(usr *user.User, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], machineAuthAPIKey string) (context.Context, error) { gatewayClient, err := gatewaySelector.Next() if err != nil { @@ -550,19 +584,3 @@ func editor(perms *storageprovider.ResourcePermissions) bool { func manager(perms *storageprovider.ResourcePermissions) bool { return perms.DenyGrant } - -func getUserLang(ctx context.Context, userid string, vs settingssvc.ValueService) string { - granteeCtx := micrometadata.Set(ctx, middleware.AccountID, userid) - if resp, err := vs.GetValueByUniqueIdentifiers(granteeCtx, - &settingssvc.GetValueByUniqueIdentifiersRequest{ - AccountUuid: userid, - SettingId: defaults.SettingUUIDProfileLanguage, - }, - ); err == nil { - val := resp.GetValue().GetValue().GetListValue().GetValues() - if len(val) > 0 && val[0] != nil { - return val[0].GetStringValue() - } - } - return _defaultLocale -} From dc17ac7b2b1e10797771349b4d7707a3d47f47d9 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Wed, 28 Jun 2023 16:53:00 +0200 Subject: [PATCH 5/7] allow disabling sses Signed-off-by: jkoberg --- services/userlog/pkg/config/config.go | 2 ++ services/userlog/pkg/service/service.go | 16 ++++++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/services/userlog/pkg/config/config.go b/services/userlog/pkg/config/config.go index 609eefe7b1f..237fb0e3cc3 100644 --- a/services/userlog/pkg/config/config.go +++ b/services/userlog/pkg/config/config.go @@ -28,6 +28,8 @@ type Config struct { Events Events `yaml:"events"` Persistence Persistence `yaml:"persistence"` + DisableSSE bool `yaml:"disable_sse" env:"USERLOG_DISABLE_SSE" desc:"Disables server-sent events. Clients will no longer be able to connect to the sse endpoint."` + Context context.Context `yaml:"-"` } diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index 717e8c77c1d..1c105bccd00 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -68,10 +68,13 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) { historyClient: o.HistoryClient, gatewaySelector: o.GatewaySelector, valueClient: o.ValueClient, - sse: sse.New(), registeredEvents: make(map[string]events.Unmarshaller), } + if !ul.cfg.DisableSSE { + ul.sse = sse.New() + } + for _, e := range o.RegisteredEvents { typ := reflect.TypeOf(e) ul.registeredEvents[typ.String()] = e @@ -79,8 +82,11 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) { ul.m.Route("/ocs/v2.php/apps/notifications/api/v1/notifications", func(r chi.Router) { r.Get("/", ul.HandleGetEvents) - r.Get("/sse", ul.HandleSSE) r.Delete("/", ul.HandleDeleteEvents) + + if !ul.cfg.DisableSSE { + r.Get("/sse", ul.HandleSSE) + } }) go ul.MemorizeEvents(ch) @@ -230,8 +236,10 @@ func (ul *UserlogService) DeleteEvents(userid string, evids []string) error { } func (ul *UserlogService) addEventToUser(userid string, event events.Event) error { - if err := ul.sendSSE(userid, event); err != nil { - ul.log.Error().Err(err).Str("userid", userid).Str("eventid", event.ID).Msg("cannot create sse event") + if !ul.cfg.DisableSSE { + if err := ul.sendSSE(userid, event); err != nil { + ul.log.Error().Err(err).Str("userid", userid).Str("eventid", event.ID).Msg("cannot create sse event") + } } return ul.alterUserEventList(userid, func(ids []string) []string { return append(ids, event.ID) From c74da3d05b16163673f990f4ab8663dbac587381 Mon Sep 17 00:00:00 2001 From: kobergj Date: Thu, 29 Jun 2023 09:24:26 +0200 Subject: [PATCH 6/7] improve userlog docu Co-authored-by: Martin --- services/userlog/README.md | 2 +- services/userlog/pkg/config/config.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/services/userlog/README.md b/services/userlog/README.md index fe0901781a5..1f9aa0e0a99 100644 --- a/services/userlog/README.md +++ b/services/userlog/README.md @@ -32,7 +32,7 @@ The `userlog` service provides an API to retrieve configured events. For now, th ## Subscribing -Additionaly to the oc10 API the `userlog` service also provides an `/sse` (Server-Sent Events) endpoint to be informed by the server when an event happens. See [here](https://medium.com/yemeksepeti-teknoloji/what-is-server-sent-events-sse-and-how-to-implement-it-904938bffd73) for example for simple introduction to server sent events. The `sse` endpoint will respect language changes of the user without needing to reconnect. +Additionaly to the oc10 API, the `userlog` service also provides an `/sse` (Server-Sent Events) endpoint to be informed by the server when an event happens. See [What is Server-Sent Events](https://medium.com/yemeksepeti-teknoloji/what-is-server-sent-events-sse-and-how-to-implement-it-904938bffd73) for a simple introduction and examples to server sent events. The `sse` endpoint will respect language changes of the user without needing to reconnect. ## Deleting diff --git a/services/userlog/pkg/config/config.go b/services/userlog/pkg/config/config.go index 237fb0e3cc3..b369ac22fbe 100644 --- a/services/userlog/pkg/config/config.go +++ b/services/userlog/pkg/config/config.go @@ -28,7 +28,7 @@ type Config struct { Events Events `yaml:"events"` Persistence Persistence `yaml:"persistence"` - DisableSSE bool `yaml:"disable_sse" env:"USERLOG_DISABLE_SSE" desc:"Disables server-sent events. Clients will no longer be able to connect to the sse endpoint."` + DisableSSE bool `yaml:"disable_sse" env:"USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer be able to connect to the sse endpoint."` Context context.Context `yaml:"-"` } From 47d04d68c9841181b35a3882ee143092c314e7d2 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Thu, 29 Jun 2023 10:36:25 +0200 Subject: [PATCH 7/7] add a changelog Signed-off-by: jkoberg --- changelog/unreleased/add-sse-endpoint.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/unreleased/add-sse-endpoint.md diff --git a/changelog/unreleased/add-sse-endpoint.md b/changelog/unreleased/add-sse-endpoint.md new file mode 100644 index 00000000000..6986359a49f --- /dev/null +++ b/changelog/unreleased/add-sse-endpoint.md @@ -0,0 +1,5 @@ +Enhancement: Add SSE Endpoint + +Add a server-sent events (sse) endpoint for the userlog service + +https://github.com/owncloud/ocis/pull/5998