Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SSE to userlog #5998

Merged
merged 7 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/add-sse-endpoint.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Add SSE Endpoint

Add a server-sent events (sse) endpoint for the userlog service

https://github.com/owncloud/ocis/pull/5998
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ require (
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/prometheus/statsd_exporter v0.22.8 // indirect
github.com/r3labs/sse/v2 v2.10.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/rivo/uniseg v0.4.2 // indirect
github.com/rs/cors v1.9.0 // indirect
Expand Down Expand Up @@ -325,6 +326,7 @@ require (
golang.org/x/tools v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1469,6 +1469,8 @@ github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9
github.com/prometheus/statsd_exporter v0.22.8 h1:Qo2D9ZzaQG+id9i5NYNGmbf1aa/KxKbB9aKfMS+Yib0=
github.com/prometheus/statsd_exporter v0.22.8/go.mod h1:/DzwbTEaFTE0Ojz5PqcSk6+PFHOPWGxdXVr6yC8eFOM=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=
github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
github.com/rainycape/memcache v0.0.0-20150622160815-1031fa0ce2f2/go.mod h1:7tZKcyumwBO6qip7RNQ5r77yrssm9bfCowcLEBcU5IA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
Expand Down Expand Up @@ -1808,6 +1810,7 @@ golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand Down Expand Up @@ -2383,6 +2386,8 @@ google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cn
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/Acconut/lockfile.v1 v1.1.0/go.mod h1:6UCz3wJ8tSFUsPR6uP/j8uegEtDuEEqFxlpi0JI4Umw=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
4 changes: 4 additions & 0 deletions services/userlog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ For the time being, the configuration which user related events are of interest

The `userlog` service provides an API to retrieve configured events. For now, this API is mostly following the [oc10 notification GET API](https://doc.owncloud.com/server/next/developer_manual/core/apis/ocs-notification-endpoint-v1.html#get-user-notifications).

## Subscribing

Additionaly to the oc10 API, the `userlog` service also provides an `/sse` (Server-Sent Events) endpoint to be informed by the server when an event happens. See [What is Server-Sent Events](https://medium.com/yemeksepeti-teknoloji/what-is-server-sent-events-sse-and-how-to-implement-it-904938bffd73) for a simple introduction and examples to server sent events. The `sse` endpoint will respect language changes of the user without needing to reconnect.

## Deleting

To delete events for an user, use a `DELETE` request to `ocs/v2.php/apps/notifications/api/v1/notifications` containing the IDs to delete.
Expand Down
3 changes: 3 additions & 0 deletions services/userlog/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config/parser"
"github.com/owncloud/ocis/v2/services/userlog/pkg/logging"
Expand Down Expand Up @@ -102,6 +103,7 @@ func Server(cfg *config.Config) *cli.Command {
}

hClient := ehsvc.NewEventHistoryService("com.owncloud.api.eventhistory", ogrpc.DefaultClient())
vClient := settingssvc.NewValueService("com.owncloud.api.settings", ogrpc.DefaultClient())

{
server, err := http.Server(
Expand All @@ -113,6 +115,7 @@ func Server(cfg *config.Config) *cli.Command {
http.Consumer(consumer),
http.GatewaySelector(gatewaySelector),
http.History(hClient),
http.Value(vClient),
http.RegisteredEvents(_registeredEvents),
)

Expand Down
2 changes: 2 additions & 0 deletions services/userlog/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type Config struct {
Events Events `yaml:"events"`
Persistence Persistence `yaml:"persistence"`

DisableSSE bool `yaml:"disable_sse" env:"USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer be able to connect to the sse endpoint."`

Context context.Context `yaml:"-"`
}

Expand Down
9 changes: 9 additions & 0 deletions services/userlog/pkg/server/http/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/owncloud/ocis/v2/services/userlog/pkg/metrics"
"github.com/urfave/cli/v2"
Expand All @@ -29,6 +30,7 @@ type Options struct {
Consumer events.Consumer
GatewaySelector pool.Selectable[gateway.GatewayAPIClient]
HistoryClient ehsvc.EventHistoryService
ValueClient settingssvc.ValueService
RegisteredEvents []events.Unmarshaller
}

Expand Down Expand Up @@ -119,3 +121,10 @@ func RegisteredEvents(evs []events.Unmarshaller) Option {
o.RegisteredEvents = evs
}
}

// Value provides a function to configure the value service client
func Value(vs settingssvc.ValueService) Option {
return func(o *Options) {
o.ValueClient = vs
}
}
1 change: 1 addition & 0 deletions services/userlog/pkg/server/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func Server(opts ...Option) (http.Service, error) {
svc.Config(options.Config),
svc.HistoryClient(options.HistoryClient),
svc.GatewaySelector(options.GatewaySelector),
svc.ValueClient(options.ValueClient),
svc.RegisteredEvents(options.RegisteredEvents),
)
if err != nil {
Expand Down
42 changes: 13 additions & 29 deletions services/userlog/pkg/service/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"embed"
"errors"
"fmt"
"io/fs"
"strings"
Expand All @@ -20,7 +19,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/leonelquinteros/gotext"
ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
)

//go:embed l10n/locale
Expand Down Expand Up @@ -56,7 +54,6 @@ type Converter struct {
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
machineAuthAPIKey string
serviceName string
registeredEvents map[string]events.Unmarshaller
translationPath string

// cached within one request not to query other service too much
Expand All @@ -67,13 +64,12 @@ type Converter struct {
}

// NewConverter returns a new Converter
func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], machineAuthAPIKey string, name string, translationPath string, registeredEvents map[string]events.Unmarshaller) *Converter {
func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], machineAuthAPIKey string, name string, translationPath string) *Converter {
return &Converter{
locale: loc,
gatewaySelector: gatewaySelector,
machineAuthAPIKey: machineAuthAPIKey,
serviceName: name,
registeredEvents: registeredEvents,
translationPath: translationPath,
spaces: make(map[string]*storageprovider.StorageSpace),
users: make(map[string]*user.User),
Expand All @@ -83,52 +79,40 @@ func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPI
}

// ConvertEvent converts an eventhistory event to an OC10Notification
func (c *Converter) ConvertEvent(event *ehmsg.Event) (OC10Notification, error) {
etype, ok := c.registeredEvents[event.Type]
if !ok {
// this should not happen
return OC10Notification{}, errors.New("eventtype not registered")
}

einterface, err := etype.Unmarshal(event.Event)
if err != nil {
// this shouldn't happen either
return OC10Notification{}, errors.New("cant unmarshal event")
}

switch ev := einterface.(type) {
func (c *Converter) ConvertEvent(eventid string, event interface{}) (OC10Notification, error) {
switch ev := event.(type) {
default:
return OC10Notification{}, fmt.Errorf("unknown event type: %T", ev)
// file related
case events.PostprocessingStepFinished:
switch ev.FinishedStep {
case events.PPStepAntivirus:
res := ev.Result.(events.VirusscanResult)
return c.virusMessage(event.Id, VirusFound, ev.ExecutingUser, res.ResourceID, ev.Filename, res.Description, res.Scandate)
return c.virusMessage(eventid, VirusFound, ev.ExecutingUser, res.ResourceID, ev.Filename, res.Description, res.Scandate)
case events.PPStepPolicies:
return c.policiesMessage(event.Id, PoliciesEnforced, ev.ExecutingUser, ev.Filename, time.Now())
return c.policiesMessage(eventid, PoliciesEnforced, ev.ExecutingUser, ev.Filename, time.Now())
default:
return OC10Notification{}, fmt.Errorf("unknown postprocessing step: %s", ev.FinishedStep)
}
// space related
case events.SpaceDisabled:
return c.spaceMessage(event.Id, SpaceDisabled, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
return c.spaceMessage(eventid, SpaceDisabled, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
case events.SpaceDeleted:
return c.spaceDeletedMessage(event.Id, ev.Executant, ev.ID.GetOpaqueId(), ev.SpaceName, ev.Timestamp)
return c.spaceDeletedMessage(eventid, ev.Executant, ev.ID.GetOpaqueId(), ev.SpaceName, ev.Timestamp)
case events.SpaceShared:
return c.spaceMessage(event.Id, SpaceShared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
return c.spaceMessage(eventid, SpaceShared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
case events.SpaceUnshared:
return c.spaceMessage(event.Id, SpaceUnshared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
return c.spaceMessage(eventid, SpaceUnshared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
case events.SpaceMembershipExpired:
return c.spaceMessage(event.Id, SpaceMembershipExpired, ev.SpaceOwner, ev.SpaceID.GetOpaqueId(), ev.ExpiredAt)
return c.spaceMessage(eventid, SpaceMembershipExpired, ev.SpaceOwner, ev.SpaceID.GetOpaqueId(), ev.ExpiredAt)

// share related
case events.ShareCreated:
return c.shareMessage(event.Id, ShareCreated, ev.Executant, ev.ItemID, ev.ShareID, utils.TSToTime(ev.CTime))
return c.shareMessage(eventid, ShareCreated, ev.Executant, ev.ItemID, ev.ShareID, utils.TSToTime(ev.CTime))
case events.ShareExpired:
return c.shareMessage(event.Id, ShareExpired, ev.ShareOwner, ev.ItemID, ev.ShareID, ev.ExpiredAt)
return c.shareMessage(eventid, ShareExpired, ev.ShareOwner, ev.ItemID, ev.ShareID, ev.ExpiredAt)
case events.ShareRemoved:
return c.shareMessage(event.Id, ShareRemoved, ev.Executant, ev.ItemID, ev.ShareID, ev.Timestamp)
return c.shareMessage(eventid, ShareRemoved, ev.Executant, ev.ItemID, ev.ShareID, ev.Timestamp)
}
}

Expand Down
44 changes: 42 additions & 2 deletions services/userlog/pkg/service/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"net/http"

"github.com/cs3org/reva/v2/pkg/ctx"
revactx "github.com/cs3org/reva/v2/pkg/ctx"
)

Expand Down Expand Up @@ -31,11 +32,23 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
return
}

conv := NewConverter(r.Header.Get(HeaderAcceptLanguage), ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.registeredEvents)
conv := ul.getConverter(r.Header.Get(HeaderAcceptLanguage))

resp := GetEventResponseOC10{}
for _, e := range evs {
noti, err := conv.ConvertEvent(e)
etype, ok := ul.registeredEvents[e.Type]
if !ok {
ul.log.Error().Str("eventid", e.Id).Str("eventtype", e.Type).Msg("event not registered")
continue
}

einterface, err := etype.Unmarshal(e.Event)
if err != nil {
ul.log.Error().Str("eventid", e.Id).Str("eventtype", e.Type).Msg("failed to umarshal event")
continue
}

noti, err := conv.ConvertEvent(e.Id, einterface)
if err != nil {
ul.log.Error().Err(err).Str("eventid", e.Id).Str("eventtype", e.Type).Msg("failed to convert event")
continue
Expand All @@ -49,6 +62,33 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
w.Write(b)
}

// HandleSSE is the GET handler for events
func (ul *UserlogService) HandleSSE(w http.ResponseWriter, r *http.Request) {
u, ok := ctx.ContextGetUser(r.Context())
if !ok {
ul.log.Error().Msg("sse: no user in context")
w.WriteHeader(http.StatusInternalServerError)
return
}

uid := u.GetId().GetOpaqueId()
if uid == "" {
ul.log.Error().Msg("sse: user in context is broken")
w.WriteHeader(http.StatusInternalServerError)
return
}

stream := ul.sse.CreateStream(uid)
stream.AutoReplay = false

// add stream to URL
q := r.URL.Query()
q.Set("stream", uid)
r.URL.RawQuery = q.Encode()

ul.sse.ServeHTTP(w, r)
}

// HandleDeleteEvents is the DELETE handler for events
func (ul *UserlogService) HandleDeleteEvents(w http.ResponseWriter, r *http.Request) {
u, ok := revactx.ContextGetUser(r.Context())
Expand Down
8 changes: 8 additions & 0 deletions services/userlog/pkg/service/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"go-micro.dev/v4/store"
)
Expand All @@ -23,6 +24,7 @@ type Options struct {
Config *config.Config
HistoryClient ehsvc.EventHistoryService
GatewaySelector pool.Selectable[gateway.GatewayAPIClient]
ValueClient settingssvc.ValueService
RegisteredEvents []events.Unmarshaller
}

Expand Down Expand Up @@ -81,3 +83,9 @@ func RegisteredEvents(e []events.Unmarshaller) Option {
o.RegisteredEvents = e
}
}

func ValueClient(vs settingssvc.ValueService) Option {
return func(o *Options) {
o.ValueClient = vs
}
}
Loading