diff --git a/services/notifications/pkg/command/server.go b/services/notifications/pkg/command/server.go index 55b3c7ad3c4..19f47acc70f 100644 --- a/services/notifications/pkg/command/server.go +++ b/services/notifications/pkg/command/server.go @@ -3,14 +3,15 @@ package command import ( "context" "fmt" + "os/signal" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" - "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" "github.com/owncloud/ocis/v2/ocis-pkg/handlers" "github.com/owncloud/ocis/v2/ocis-pkg/registry" + "github.com/owncloud/ocis/v2/ocis-pkg/runner" "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" "github.com/owncloud/ocis/v2/ocis-pkg/tracing" @@ -51,17 +52,14 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr := run.Group{} - - ctx, cancel := func() (context.Context, context.CancelFunc) { - if cfg.Context == nil { - return context.WithCancel(context.Background()) - } - return context.WithCancel(cfg.Context) - }() - - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + gr := runner.NewGroup() { server := debug.NewService( debug.Logger(logger), @@ -75,10 +73,7 @@ func Server(cfg *config.Config) *cli.Command { debug.Ready(handlers.Ready), ) - gr.Add(server.ListenAndServe, func(_ error) { - _ = server.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("notifications_debug", server)) } // evs defines a list of events to subscribe to @@ -118,11 +113,21 @@ func Server(cfg *config.Config) *cli.Command { valueService := settingssvc.NewValueService("com.owncloud.api.settings", grpcClient) svc := service.NewEventsNotifier(evts, channel, logger, gatewaySelector, valueService, cfg.ServiceAccount.ServiceAccountID, cfg.ServiceAccount.ServiceAccountSecret, cfg.Notifications.EmailTemplatePath, cfg.Notifications.DefaultLanguage, cfg.WebUIURL) - gr.Add(svc.Run, func(error) { - cancel() - }) + gr.Add(runner.New("notifications_svc", func() error { + return svc.Run() + }, func() { + svc.Close() + })) - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/notifications/pkg/service/service.go b/services/notifications/pkg/service/service.go index 3a48f788cd7..5a492f5745d 100644 --- a/services/notifications/pkg/service/service.go +++ b/services/notifications/pkg/service/service.go @@ -5,11 +5,10 @@ import ( "errors" "fmt" "net/url" - "os" - "os/signal" "path" "strings" - "syscall" + "sync" + "sync/atomic" gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" @@ -47,7 +46,6 @@ func NewEventsNotifier( logger: logger, channel: channel, events: events, - signals: make(chan os.Signal, 1), gatewaySelector: gatewaySelector, valueService: valueService, serviceAccountID: serviceAccountID, @@ -55,6 +53,8 @@ func NewEventsNotifier( emailTemplatePath: emailTemplatePath, defaultLanguage: defaultLanguage, ocisURL: ocisURL, + stopCh: make(chan struct{}, 1), + stopped: new(atomic.Bool), } } @@ -62,7 +62,6 @@ type eventsNotifier struct { logger log.Logger channel channels.Channel events <-chan events.Event - signals chan os.Signal gatewaySelector pool.Selectable[gateway.GatewayAPIClient] valueService settingssvc.ValueService emailTemplatePath string @@ -71,16 +70,27 @@ type eventsNotifier struct { ocisURL string serviceAccountID string serviceAccountSecret string + stopCh chan struct{} + stopped *atomic.Bool } func (s eventsNotifier) Run() error { - signal.Notify(s.signals, syscall.SIGINT, syscall.SIGTERM) + var wg sync.WaitGroup + s.logger.Debug(). Msg("eventsNotifier started") +EventLoop: for { select { - case evt := <-s.events: + case evt, ok := <-s.events: + if !ok { + break EventLoop + } + // TODO: needs to be replaced with a worker pool + wg.Add(1) go func() { + defer wg.Done() + switch e := evt.Event.(type) { case events.SpaceShared: s.handleSpaceShared(e) @@ -94,12 +104,24 @@ func (s eventsNotifier) Run() error { s.handleShareExpired(e) } }() - case <-s.signals: + + if s.stopped.Load() { + break EventLoop + } + case <-s.stopCh: s.logger.Debug(). Msg("eventsNotifier stopped") - return nil + break EventLoop } } + // wait until all the goroutines processing events have finished + wg.Wait() + return nil +} + +func (s eventsNotifier) Close() { + s.stopped.Store(true) + close(s.stopCh) } func (s eventsNotifier) render(ctx context.Context, template email.MessageTemplate, diff --git a/services/policies/pkg/command/server.go b/services/policies/pkg/command/server.go index d2e5ffbfe90..e3ac6d68bb2 100644 --- a/services/policies/pkg/command/server.go +++ b/services/policies/pkg/command/server.go @@ -5,11 +5,12 @@ import ( "fmt" "io" "net/http" + "os/signal" "github.com/cs3org/reva/v2/pkg/events/stream" - "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/ocis-pkg/runner" "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" "github.com/owncloud/ocis/v2/ocis-pkg/tracing" @@ -33,16 +34,12 @@ func Server(cfg *config.Config) *cli.Command { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, Action: func(c *cli.Context) error { - var ( - gr = run.Group{} - ctx, cancel = func() (context.Context, context.CancelFunc) { - if cfg.Context == nil { - return context.WithCancel(context.Background()) - } - return context.WithCancel(cfg.Context) - }() - ) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } logger := log.NewLogger( log.Name(cfg.Service.Name), @@ -62,6 +59,7 @@ func Server(cfg *config.Config) *cli.Command { return err } + gr := runner.NewGroup() { grpcClient, err := grpc.NewClient( append( @@ -104,9 +102,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(svc.Run, func(_ error) { - cancel() - }) + gr.Add(runner.NewGoMicroGrpcServerRunner("policies_grpc", svc)) } { @@ -121,9 +117,11 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(eventSvc.Run, func(_ error) { - cancel() - }) + gr.Add(runner.New("policies_svc", func() error { + return eventSvc.Run() + }, func() { + eventSvc.Close() + })) } { @@ -165,13 +163,18 @@ func Server(cfg *config.Config) *cli.Command { ), ) - gr.Add(server.ListenAndServe, func(_ error) { - _ = server.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("policies_debug", server)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/policies/pkg/service/event/service.go b/services/policies/pkg/service/event/service.go index 756b360a900..e6231b44574 100644 --- a/services/policies/pkg/service/event/service.go +++ b/services/policies/pkg/service/event/service.go @@ -2,6 +2,7 @@ package eventSVC import ( "context" + "sync/atomic" "github.com/cs3org/reva/v2/pkg/events" "github.com/owncloud/ocis/v2/ocis-pkg/log" @@ -11,23 +12,27 @@ import ( // Service defines the service handlers. type Service struct { - ctx context.Context - query string - log log.Logger - stream events.Stream - engine engine.Engine - tp trace.TracerProvider + ctx context.Context + query string + log log.Logger + stream events.Stream + engine engine.Engine + tp trace.TracerProvider + stopCh chan struct{} + stopped *atomic.Bool } // New returns a service implementation for Service. func New(ctx context.Context, stream events.Stream, logger log.Logger, tp trace.TracerProvider, engine engine.Engine, query string) (Service, error) { svc := Service{ - ctx: ctx, - log: logger, - query: query, - tp: tp, - engine: engine, - stream: stream, + ctx: ctx, + log: logger, + query: query, + tp: tp, + engine: engine, + stream: stream, + stopCh: make(chan struct{}, 1), + stopped: new(atomic.Bool), } return svc, nil @@ -40,16 +45,41 @@ func (s Service) Run() error { return err } - for e := range ch { - err := s.processEvent(e) - if err != nil { - return err +EventLoop: + for { + select { + case <-s.stopCh: + break EventLoop + case e, ok := <-ch: + if !ok { + break EventLoop + } + + err := s.processEvent(e) + if err != nil { + return err + } + + if s.stopped.Load() { + break EventLoop + } } } return nil } +// Close will make the policies service to stop processing, so the `Run` +// method can finish. +// TODO: Underlying services can't be stopped. This means that some goroutines +// will get stuck trying to push events through a channel nobody is reading +// from, so resources won't be freed and there will be memory leaks. For now, +// if the service is stopped, you should close the app soon after. +func (s Service) Close() { + s.stopped.Store(true) + close(s.stopCh) +} + func (s Service) processEvent(e events.Event) error { ctx := e.GetTraceContext(s.ctx) ctx, span := s.tp.Tracer("policies").Start(ctx, "processEvent")