Skip to content

Commit

Permalink
feat: add antivirus, audit, clientlog and ocdav
Browse files Browse the repository at this point in the history
  • Loading branch information
jvillafanez committed May 20, 2024
1 parent 4dc128d commit 4619f75
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 178 deletions.
58 changes: 33 additions & 25 deletions services/antivirus/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package command
import (
"context"
"fmt"
"os/signal"

"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
Expand All @@ -27,36 +28,38 @@ func Server(cfg *config.Config) *cli.Command {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
var (
gr = run.Group{}
ctx, cancel = func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()
logger = log.NewLogger(
log.Name(cfg.Service.Name),
log.Level(cfg.Log.Level),
log.Pretty(cfg.Log.Pretty),
log.Color(cfg.Log.Color),
log.File(cfg.Log.File),
)
var cancel context.CancelFunc
ctx := cfg.Context
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}

logger := log.NewLogger(
log.Name(cfg.Service.Name),
log.Level(cfg.Log.Level),
log.Pretty(cfg.Log.Pretty),
log.Color(cfg.Log.Color),
log.File(cfg.Log.File),
)
defer cancel()

traceProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name)
if err != nil {
return err
}

gr := runner.NewGroup()
{
svc, err := service.NewAntivirus(cfg, logger, traceProvider)
if err != nil {
return err
}

gr.Add(svc.Run, func(_ error) {
cancel()
})
gr.Add(runner.New("antivirus_svc", func() error {
return svc.Run()
}, func() {
svc.Close()
}))
}

{
Expand All @@ -72,13 +75,18 @@ func Server(cfg *config.Config) *cli.Command {
debug.Ready(handlers.Ready),
)

gr.Add(server.ListenAndServe, func(_ error) {
_ = server.Shutdown(ctx)
cancel()
})
gr.Add(runner.NewGolangHttpServerRunner("antivirus_debug", server))
}

return gr.Run()
grResults := gr.Run(ctx)

// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return nil
},
}
}
57 changes: 43 additions & 14 deletions services/antivirus/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"net/http"
"os"
"sync/atomic"
"time"

"github.com/cs3org/reva/v2/pkg/bytesize"
Expand Down Expand Up @@ -52,7 +53,15 @@ func NewAntivirus(c *config.Config, l log.Logger, tp trace.TracerProvider) (Anti
return Antivirus{}, err
}

av := Antivirus{c: c, l: l, tp: tp, s: scanner, client: rhttp.GetHTTPClient(rhttp.Insecure(true))}
av := Antivirus{
c: c,
l: l,
tp: tp,
s: scanner,
client: rhttp.GetHTTPClient(rhttp.Insecure(true)),
stopCh: make(chan struct{}, 1),
stopped: new(atomic.Bool),
}

switch o := events.PostprocessingOutcome(c.InfectedFileHandling); o {
case events.PPOutcomeContinue, events.PPOutcomeAbort, events.PPOutcomeDelete:
Expand Down Expand Up @@ -82,7 +91,9 @@ type Antivirus struct {
m uint64
tp trace.TracerProvider

client *http.Client
client *http.Client
stopCh chan struct{}
stopped *atomic.Bool
}

// Run runs the service
Expand Down Expand Up @@ -116,25 +127,43 @@ func (av Antivirus) Run() error {
return err
}

for e := range ch {
err := av.processEvent(e, natsStream)
if err != nil {
switch {
case errors.Is(err, ErrFatal):
return err
case errors.Is(err, ErrEvent):
// Right now logging of these happens in the processEvent method, might be cleaner to do it here.
continue
default:
av.l.Fatal().Err(err).Msg("unknown error - exiting")
EventLoop:
for {
select {
case e, ok := <-ch:
if !ok {
break EventLoop
}

err := av.processEvent(e, natsStream)
if err != nil {
switch {
case errors.Is(err, ErrFatal):
return err
case errors.Is(err, ErrEvent):
// Right now logging of these happens in the processEvent method, might be cleaner to do it here.
continue
default:
av.l.Fatal().Err(err).Msg("unknown error - exiting")
}
}
}

if av.stopped.Load() {
break EventLoop
}
case <-av.stopCh:
break EventLoop
}
}

return nil
}

func (av Antivirus) Close() {
av.stopped.Store(true)
close(av.stopCh)
}

func (av Antivirus) processEvent(e events.Event, s events.Publisher) error {
ctx := e.GetTraceContext(context.Background())
ctx, span := av.tp.Tracer("antivirus").Start(ctx, "processEvent")
Expand Down
58 changes: 31 additions & 27 deletions services/audit/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package command
import (
"context"
"fmt"
"os"
"os/signal"

"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
"github.com/owncloud/ocis/v2/services/audit/pkg/config"
Expand All @@ -30,18 +30,15 @@ func Server(cfg *config.Config) *cli.Command {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
var (
gr = run.Group{}
logger = logging.Configure(cfg.Service.Name, cfg.Log)
var cancel context.CancelFunc
ctx := cfg.Context
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}

ctx, cancel = func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()
)
defer cancel()
logger := logging.Configure(cfg.Service.Name, cfg.Log)
gr := runner.NewGroup()

client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
if err != nil {
Expand All @@ -52,16 +49,17 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr.Add(func() error {
svc.AuditLoggerFromConfig(ctx, cfg.Auditlog, evts, logger)
// we need an additional context for the audit server in order to
// cancel it anytime
svcCtx, svcCancel := context.WithCancel(ctx)
defer svcCancel()

gr.Add(runner.New("audit_svc", func() error {
svc.AuditLoggerFromConfig(svcCtx, cfg.Auditlog, evts, logger)
return nil
}, func(err error) {
logger.Error().
Err(err).
Msg("Shutting down server")
cancel()
os.Exit(1)
})
}, func() {
svcCancel()
}))

{
server := debug.NewService(
Expand All @@ -76,12 +74,18 @@ func Server(cfg *config.Config) *cli.Command {
debug.Ready(handlers.Ready),
)

gr.Add(server.ListenAndServe, func(_ error) {
_ = server.Shutdown(ctx)
cancel()
})
gr.Add(runner.NewGolangHttpServerRunner("audit_debug", server))
}

grResults := gr.Run(ctx)

// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return gr.Run()
return nil
},
}
}
17 changes: 16 additions & 1 deletion services/audit/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ func StartAuditLogger(ctx context.Context, ch <-chan events.Event, log log.Logge
select {
case <-ctx.Done():
return
case i := <-ch:
case i, ok := <-ch:
if !ok {
return
}

var auditEvent interface{}
switch ev := i.Event.(type) {
case events.ShareCreated:
Expand Down Expand Up @@ -111,19 +115,30 @@ func StartAuditLogger(ctx context.Context, ch <-chan events.Event, log log.Logge
auditEvent = types.GroupMemberRemoved(ev)
default:
log.Error().Interface("event", ev).Msg(fmt.Sprintf("can't handle event of type '%T'", ev))
if ctx.Err() != nil {
// if context is done, do not process more events
return
}
continue

}

b, err := marshaller(auditEvent)
if err != nil {
log.Error().Err(err).Msg("error marshaling the event")
if ctx.Err() != nil {
return
}
continue
}

for _, l := range logto {
l(b)
}

if ctx.Err() != nil {
return
}
}
}

Expand Down
Loading

0 comments on commit 4619f75

Please sign in to comment.