From 4619f75e7541554af17e3e9a9077110790bd5be9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Pablo=20Villaf=C3=A1=C3=B1ez?= Date: Mon, 20 May 2024 15:29:38 +0200 Subject: [PATCH] feat: add antivirus, audit, clientlog and ocdav --- services/antivirus/pkg/command/server.go | 58 ++++---- services/antivirus/pkg/service/service.go | 57 ++++++-- services/audit/pkg/command/server.go | 58 ++++---- services/audit/pkg/service/service.go | 17 ++- services/clientlog/pkg/command/server.go | 49 ++++--- services/clientlog/pkg/service/service.go | 26 +++- services/ocdav/pkg/command/server.go | 160 +++++++++++----------- 7 files changed, 247 insertions(+), 178 deletions(-) diff --git a/services/antivirus/pkg/command/server.go b/services/antivirus/pkg/command/server.go index 32e2fd8b44f..ba4e1371cc3 100644 --- a/services/antivirus/pkg/command/server.go +++ b/services/antivirus/pkg/command/server.go @@ -3,11 +3,12 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" "github.com/owncloud/ocis/v2/ocis-pkg/handlers" "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/ocis-pkg/runner" "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" "github.com/owncloud/ocis/v2/ocis-pkg/tracing" "github.com/owncloud/ocis/v2/ocis-pkg/version" @@ -27,36 +28,38 @@ func Server(cfg *config.Config) *cli.Command { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, Action: func(c *cli.Context) error { - var ( - gr = run.Group{} - ctx, cancel = func() (context.Context, context.CancelFunc) { - if cfg.Context == nil { - return context.WithCancel(context.Background()) - } - return context.WithCancel(cfg.Context) - }() - logger = log.NewLogger( - log.Name(cfg.Service.Name), - log.Level(cfg.Log.Level), - log.Pretty(cfg.Log.Pretty), - log.Color(cfg.Log.Color), - log.File(cfg.Log.File), - ) + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + + logger := log.NewLogger( + log.Name(cfg.Service.Name), + log.Level(cfg.Log.Level), + log.Pretty(cfg.Log.Pretty), + log.Color(cfg.Log.Color), + log.File(cfg.Log.File), ) - defer cancel() + traceProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name) if err != nil { return err } + + gr := runner.NewGroup() { svc, err := service.NewAntivirus(cfg, logger, traceProvider) if err != nil { return err } - gr.Add(svc.Run, func(_ error) { - cancel() - }) + gr.Add(runner.New("antivirus_svc", func() error { + return svc.Run() + }, func() { + svc.Close() + })) } { @@ -72,13 +75,18 @@ func Server(cfg *config.Config) *cli.Command { debug.Ready(handlers.Ready), ) - gr.Add(server.ListenAndServe, func(_ error) { - _ = server.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("antivirus_debug", server)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/antivirus/pkg/service/service.go b/services/antivirus/pkg/service/service.go index a1d41c651c9..d71532784be 100644 --- a/services/antivirus/pkg/service/service.go +++ b/services/antivirus/pkg/service/service.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "os" + "sync/atomic" "time" "github.com/cs3org/reva/v2/pkg/bytesize" @@ -52,7 +53,15 @@ func NewAntivirus(c *config.Config, l log.Logger, tp trace.TracerProvider) (Anti return Antivirus{}, err } - av := Antivirus{c: c, l: l, tp: tp, s: scanner, client: rhttp.GetHTTPClient(rhttp.Insecure(true))} + av := Antivirus{ + c: c, + l: l, + tp: tp, + s: scanner, + client: rhttp.GetHTTPClient(rhttp.Insecure(true)), + stopCh: make(chan struct{}, 1), + stopped: new(atomic.Bool), + } switch o := events.PostprocessingOutcome(c.InfectedFileHandling); o { case events.PPOutcomeContinue, events.PPOutcomeAbort, events.PPOutcomeDelete: @@ -82,7 +91,9 @@ type Antivirus struct { m uint64 tp trace.TracerProvider - client *http.Client + client *http.Client + stopCh chan struct{} + stopped *atomic.Bool } // Run runs the service @@ -116,25 +127,43 @@ func (av Antivirus) Run() error { return err } - for e := range ch { - err := av.processEvent(e, natsStream) - if err != nil { - switch { - case errors.Is(err, ErrFatal): - return err - case errors.Is(err, ErrEvent): - // Right now logging of these happens in the processEvent method, might be cleaner to do it here. - continue - default: - av.l.Fatal().Err(err).Msg("unknown error - exiting") +EventLoop: + for { + select { + case e, ok := <-ch: + if !ok { + break EventLoop + } + + err := av.processEvent(e, natsStream) + if err != nil { + switch { + case errors.Is(err, ErrFatal): + return err + case errors.Is(err, ErrEvent): + // Right now logging of these happens in the processEvent method, might be cleaner to do it here. + continue + default: + av.l.Fatal().Err(err).Msg("unknown error - exiting") + } } - } + if av.stopped.Load() { + break EventLoop + } + case <-av.stopCh: + break EventLoop + } } return nil } +func (av Antivirus) Close() { + av.stopped.Store(true) + close(av.stopCh) +} + func (av Antivirus) processEvent(e events.Event, s events.Publisher) error { ctx := e.GetTraceContext(context.Background()) ctx, span := av.tp.Tracer("antivirus").Start(ctx, "processEvent") diff --git a/services/audit/pkg/command/server.go b/services/audit/pkg/command/server.go index 6c55e638c1f..36e53086d3a 100644 --- a/services/audit/pkg/command/server.go +++ b/services/audit/pkg/command/server.go @@ -3,13 +3,13 @@ package command import ( "context" "fmt" - "os" + "os/signal" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" - "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" "github.com/owncloud/ocis/v2/ocis-pkg/handlers" + "github.com/owncloud/ocis/v2/ocis-pkg/runner" "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" "github.com/owncloud/ocis/v2/ocis-pkg/version" "github.com/owncloud/ocis/v2/services/audit/pkg/config" @@ -30,18 +30,15 @@ func Server(cfg *config.Config) *cli.Command { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, Action: func(c *cli.Context) error { - var ( - gr = run.Group{} - logger = logging.Configure(cfg.Service.Name, cfg.Log) + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - ctx, cancel = func() (context.Context, context.CancelFunc) { - if cfg.Context == nil { - return context.WithCancel(context.Background()) - } - return context.WithCancel(cfg.Context) - }() - ) - defer cancel() + logger := logging.Configure(cfg.Service.Name, cfg.Log) + gr := runner.NewGroup() client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) if err != nil { @@ -52,16 +49,17 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - svc.AuditLoggerFromConfig(ctx, cfg.Auditlog, evts, logger) + // we need an additional context for the audit server in order to + // cancel it anytime + svcCtx, svcCancel := context.WithCancel(ctx) + defer svcCancel() + + gr.Add(runner.New("audit_svc", func() error { + svc.AuditLoggerFromConfig(svcCtx, cfg.Auditlog, evts, logger) return nil - }, func(err error) { - logger.Error(). - Err(err). - Msg("Shutting down server") - cancel() - os.Exit(1) - }) + }, func() { + svcCancel() + })) { server := debug.NewService( @@ -76,12 +74,18 @@ func Server(cfg *config.Config) *cli.Command { debug.Ready(handlers.Ready), ) - gr.Add(server.ListenAndServe, func(_ error) { - _ = server.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("audit_debug", server)) + } + + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } } - return gr.Run() + return nil }, } } diff --git a/services/audit/pkg/service/service.go b/services/audit/pkg/service/service.go index 929bb262140..832728d39b9 100644 --- a/services/audit/pkg/service/service.go +++ b/services/audit/pkg/service/service.go @@ -42,7 +42,11 @@ func StartAuditLogger(ctx context.Context, ch <-chan events.Event, log log.Logge select { case <-ctx.Done(): return - case i := <-ch: + case i, ok := <-ch: + if !ok { + return + } + var auditEvent interface{} switch ev := i.Event.(type) { case events.ShareCreated: @@ -111,6 +115,10 @@ func StartAuditLogger(ctx context.Context, ch <-chan events.Event, log log.Logge auditEvent = types.GroupMemberRemoved(ev) default: log.Error().Interface("event", ev).Msg(fmt.Sprintf("can't handle event of type '%T'", ev)) + if ctx.Err() != nil { + // if context is done, do not process more events + return + } continue } @@ -118,12 +126,19 @@ func StartAuditLogger(ctx context.Context, ch <-chan events.Event, log log.Logge b, err := marshaller(auditEvent) if err != nil { log.Error().Err(err).Msg("error marshaling the event") + if ctx.Err() != nil { + return + } continue } for _, l := range logto { l(b) } + + if ctx.Err() != nil { + return + } } } diff --git a/services/clientlog/pkg/command/server.go b/services/clientlog/pkg/command/server.go index 0723a7485a8..b41e5549726 100644 --- a/services/clientlog/pkg/command/server.go +++ b/services/clientlog/pkg/command/server.go @@ -3,15 +3,15 @@ package command import ( "context" "fmt" - "os" + "os/signal" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" - "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" "github.com/owncloud/ocis/v2/ocis-pkg/handlers" "github.com/owncloud/ocis/v2/ocis-pkg/registry" + "github.com/owncloud/ocis/v2/ocis-pkg/runner" "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" "github.com/owncloud/ocis/v2/ocis-pkg/tracing" "github.com/owncloud/ocis/v2/ocis-pkg/version" @@ -60,19 +60,16 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr := run.Group{} - ctx, cancel := func() (context.Context, context.CancelFunc) { - if cfg.Context == nil { - return context.WithCancel(context.Background()) - } - return context.WithCancel(cfg.Context) - }() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } mtrcs := metrics.New() mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1) - defer cancel() - s, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) if err != nil { return err @@ -93,6 +90,7 @@ func Server(cfg *config.Config) *cli.Command { return fmt.Errorf("could not get reva client selector: %s", err) } + gr := runner.NewGroup() { svc, err := service.NewClientlogService( service.Logger(logger), @@ -108,17 +106,11 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { + gr.Add(runner.New("clientlog_svc", func() error { return svc.Run() - }, func(err error) { - logger.Error(). - Str("transport", "http"). - Err(err). - Msg("Shutting down server") - - cancel() - os.Exit(1) - }) + }, func() { + svc.Close() + })) } { @@ -134,13 +126,18 @@ func Server(cfg *config.Config) *cli.Command { debug.Ready(handlers.Ready), ) - gr.Add(server.ListenAndServe, func(_ error) { - _ = server.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("clientlog_debug", server)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/clientlog/pkg/service/service.go b/services/clientlog/pkg/service/service.go index dbd9ae1cece..bb20d93f529 100644 --- a/services/clientlog/pkg/service/service.go +++ b/services/clientlog/pkg/service/service.go @@ -7,6 +7,7 @@ import ( "fmt" "path/filepath" "reflect" + "sync/atomic" gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" @@ -34,6 +35,8 @@ type ClientlogService struct { tracer trace.Tracer publisher events.Publisher ch <-chan events.Event + stopCh chan struct{} + stopped atomic.Bool } // NewClientlogService returns a clientlog service @@ -61,6 +64,7 @@ func NewClientlogService(opts ...Option) (*ClientlogService, error) { tracer: o.TraceProvider.Tracer("github.com/owncloud/ocis/services/clientlog/pkg/service"), publisher: o.Stream, ch: ch, + stopCh: make(chan struct{}, 1), } for _, e := range o.RegisteredEvents { @@ -73,13 +77,31 @@ func NewClientlogService(opts ...Option) (*ClientlogService, error) { // Run runs the service func (cl *ClientlogService) Run() error { - for event := range cl.ch { - cl.processEvent(event) +EventLoop: + for { + select { + case event, ok := <-cl.ch: + if !ok { + break EventLoop + } + cl.processEvent(event) + + if cl.stopped.Load() { + break EventLoop + } + case <-cl.stopCh: + break EventLoop + } } return nil } +func (cl *ClientlogService) Close() { + cl.stopped.Store(true) + close(cl.stopCh) +} + func (cl *ClientlogService) processEvent(event events.Event) { gwc, err := cl.gatewaySelector.Next() if err != nil { diff --git a/services/ocdav/pkg/command/server.go b/services/ocdav/pkg/command/server.go index 447e72d8c78..9efb698731f 100644 --- a/services/ocdav/pkg/command/server.go +++ b/services/ocdav/pkg/command/server.go @@ -3,13 +3,14 @@ package command import ( "context" "fmt" - "os" + "os/signal" "github.com/cs3org/reva/v2/pkg/micro/ocdav" "github.com/cs3org/reva/v2/pkg/sharedconf" - "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/broker" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" + "github.com/owncloud/ocis/v2/ocis-pkg/runner" + ohttp "github.com/owncloud/ocis/v2/ocis-pkg/service/http" "github.com/owncloud/ocis/v2/ocis-pkg/tracing" "github.com/owncloud/ocis/v2/ocis-pkg/version" "github.com/owncloud/ocis/v2/services/ocdav/pkg/config" @@ -34,75 +35,74 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := defineContext(cfg) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - gr.Add(func() error { - // init reva shared config explicitly as the go-micro based ocdav does not use - // the reva runtime. But we need e.g. the shared client settings to be initialized - sc := map[string]interface{}{ - "jwt_secret": cfg.TokenManager.JWTSecret, - "gatewaysvc": cfg.Reva.Address, - "skip_user_groups_in_token": cfg.SkipUserGroupsInToken, - "grpc_client_options": cfg.Reva.GetGRPCClientConfig(), - } - if err := sharedconf.Decode(sc); err != nil { - logger.Error().Err(err).Msg("error decoding shared config for ocdav") - } - opts := []ocdav.Option{ - ocdav.Name(cfg.HTTP.Namespace + "." + cfg.Service.Name), - ocdav.Version(version.GetString()), - ocdav.Context(ctx), - ocdav.Logger(logger.Logger), - ocdav.Address(cfg.HTTP.Addr), - ocdav.AllowCredentials(cfg.HTTP.CORS.AllowCredentials), - ocdav.AllowedMethods(cfg.HTTP.CORS.AllowedMethods), - ocdav.AllowedHeaders(cfg.HTTP.CORS.AllowedHeaders), - ocdav.AllowedOrigins(cfg.HTTP.CORS.AllowedOrigins), - ocdav.FilesNamespace(cfg.FilesNamespace), - ocdav.WebdavNamespace(cfg.WebdavNamespace), - ocdav.OCMNamespace(cfg.OCMNamespace), - ocdav.AllowDepthInfinity(cfg.AllowPropfindDepthInfinity), - ocdav.SharesNamespace(cfg.SharesNamespace), - ocdav.Timeout(cfg.Timeout), - ocdav.Insecure(cfg.Insecure), - ocdav.PublicURL(cfg.PublicURL), - ocdav.Prefix(cfg.HTTP.Prefix), - ocdav.GatewaySvc(cfg.Reva.Address), - ocdav.JWTSecret(cfg.TokenManager.JWTSecret), - ocdav.ProductName(cfg.Status.ProductName), - ocdav.ProductVersion(cfg.Status.ProductVersion), - ocdav.Product(cfg.Status.Product), - ocdav.Version(cfg.Status.Version), - ocdav.VersionString(cfg.Status.VersionString), - ocdav.Edition(cfg.Status.Edition), - ocdav.MachineAuthAPIKey(cfg.MachineAuthAPIKey), - ocdav.Broker(broker.NoOp{}), - // ocdav.FavoriteManager() // FIXME needs a proper persistence implementation https://github.com/owncloud/ocis/issues/1228 - // ocdav.LockSystem(), // will default to the CS3 lock system - // ocdav.TLSConfig() // tls config for the http server - ocdav.MetricsEnabled(true), - ocdav.MetricsNamespace("ocis"), - ocdav.Tracing("Adding these strings is a workaround for ->", "https://github.com/cs3org/reva/issues/4131"), - ocdav.WithTraceProvider(traceProvider), - } + gr := runner.NewGroup() - s, err := ocdav.Service(opts...) - if err != nil { - return err - } + // init reva shared config explicitly as the go-micro based ocdav does not use + // the reva runtime. But we need e.g. the shared client settings to be initialized + sc := map[string]interface{}{ + "jwt_secret": cfg.TokenManager.JWTSecret, + "gatewaysvc": cfg.Reva.Address, + "skip_user_groups_in_token": cfg.SkipUserGroupsInToken, + "grpc_client_options": cfg.Reva.GetGRPCClientConfig(), + } + if err := sharedconf.Decode(sc); err != nil { + logger.Error().Err(err).Msg("error decoding shared config for ocdav") + } + opts := []ocdav.Option{ + ocdav.Name(cfg.HTTP.Namespace + "." + cfg.Service.Name), + ocdav.Version(version.GetString()), + ocdav.Context(ctx), + ocdav.Logger(logger.Logger), + ocdav.Address(cfg.HTTP.Addr), + ocdav.AllowCredentials(cfg.HTTP.CORS.AllowCredentials), + ocdav.AllowedMethods(cfg.HTTP.CORS.AllowedMethods), + ocdav.AllowedHeaders(cfg.HTTP.CORS.AllowedHeaders), + ocdav.AllowedOrigins(cfg.HTTP.CORS.AllowedOrigins), + ocdav.FilesNamespace(cfg.FilesNamespace), + ocdav.WebdavNamespace(cfg.WebdavNamespace), + ocdav.OCMNamespace(cfg.OCMNamespace), + ocdav.AllowDepthInfinity(cfg.AllowPropfindDepthInfinity), + ocdav.SharesNamespace(cfg.SharesNamespace), + ocdav.Timeout(cfg.Timeout), + ocdav.Insecure(cfg.Insecure), + ocdav.PublicURL(cfg.PublicURL), + ocdav.Prefix(cfg.HTTP.Prefix), + ocdav.GatewaySvc(cfg.Reva.Address), + ocdav.JWTSecret(cfg.TokenManager.JWTSecret), + ocdav.ProductName(cfg.Status.ProductName), + ocdav.ProductVersion(cfg.Status.ProductVersion), + ocdav.Product(cfg.Status.Product), + ocdav.Version(cfg.Status.Version), + ocdav.VersionString(cfg.Status.VersionString), + ocdav.Edition(cfg.Status.Edition), + ocdav.MachineAuthAPIKey(cfg.MachineAuthAPIKey), + ocdav.Broker(broker.NoOp{}), + // ocdav.FavoriteManager() // FIXME needs a proper persistence implementation https://github.com/owncloud/ocis/issues/1228 + // ocdav.LockSystem(), // will default to the CS3 lock system + // ocdav.TLSConfig() // tls config for the http server + ocdav.MetricsEnabled(true), + ocdav.MetricsNamespace("ocis"), + ocdav.Tracing("Adding these strings is a workaround for ->", "https://github.com/cs3org/reva/issues/4131"), + ocdav.WithTraceProvider(traceProvider), + } - return s.Run() - }, func(err error) { - logger.Error(). - Err(err). - Str("server", c.Command.Name). - Msg("Shutting down server") - cancel() - os.Exit(1) - }) + s, err := ocdav.Service(opts...) + if err != nil { + return err + } + + // creating a runner for a go-micro service is a bit complex, so we'll + // wrap the go-micro service with an ocis service the same way as + // ocis-pkg/service/http is doing in order to reuse the factory. + gr.Add(runner.NewGoMicroHttpServerRunner("ocdav_http", ohttp.Service{s})) debugServer, err := debug.Server( debug.Logger(logger), @@ -115,23 +115,17 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("ocdav_debug", debugServer)) + + grResults := gr.Run(ctx) - return gr.Run() + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } - -// defineContext sets the context for the service. If there is a context configured it will create a new child from it, -// if not, it will create a root context that can be cancelled. -func defineContext(cfg *config.Config) (context.Context, context.CancelFunc) { - return func() (context.Context, context.CancelFunc) { - if cfg.Context == nil { - return context.WithCancel(context.Background()) - } - return context.WithCancel(cfg.Context) - }() -}