Skip to content

Commit

Permalink
feat: include policies and notifications services to the runners
Browse files Browse the repository at this point in the history
  • Loading branch information
jvillafanez committed May 17, 2024
1 parent 430da22 commit f692c7c
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 66 deletions.
43 changes: 24 additions & 19 deletions services/notifications/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package command
import (
"context"
"fmt"
"os/signal"

"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
Expand Down Expand Up @@ -51,17 +52,14 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr := run.Group{}

ctx, cancel := func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()

defer cancel()
var cancel context.CancelFunc
ctx := cfg.Context
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}

gr := runner.NewGroup()
{
server := debug.NewService(
debug.Logger(logger),
Expand All @@ -75,10 +73,7 @@ func Server(cfg *config.Config) *cli.Command {
debug.Ready(handlers.Ready),
)

gr.Add(server.ListenAndServe, func(_ error) {
_ = server.Shutdown(ctx)
cancel()
})
gr.Add(runner.NewGolangHttpServerRunner("notifications_debug", server))
}

// evs defines a list of events to subscribe to
Expand Down Expand Up @@ -118,11 +113,21 @@ func Server(cfg *config.Config) *cli.Command {
valueService := settingssvc.NewValueService("com.owncloud.api.settings", grpcClient)
svc := service.NewEventsNotifier(evts, channel, logger, gatewaySelector, valueService, cfg.ServiceAccount.ServiceAccountID, cfg.ServiceAccount.ServiceAccountSecret, cfg.Notifications.EmailTemplatePath, cfg.Notifications.DefaultLanguage, cfg.WebUIURL)

gr.Add(svc.Run, func(error) {
cancel()
})
gr.Add(runner.New("notifications_svc", func() error {
return svc.Run()
}, func() {
svc.Close()
}))

return gr.Run()
grResults := gr.Run(ctx)

// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return nil
},
}
}
40 changes: 31 additions & 9 deletions services/notifications/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"errors"
"fmt"
"net/url"
"os"
"os/signal"
"path"
"strings"
"syscall"
"sync"
"sync/atomic"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
Expand Down Expand Up @@ -47,22 +46,22 @@ func NewEventsNotifier(
logger: logger,
channel: channel,
events: events,
signals: make(chan os.Signal, 1),
gatewaySelector: gatewaySelector,
valueService: valueService,
serviceAccountID: serviceAccountID,
serviceAccountSecret: serviceAccountSecret,
emailTemplatePath: emailTemplatePath,
defaultLanguage: defaultLanguage,
ocisURL: ocisURL,
stopCh: make(chan struct{}, 1),
stopped: new(atomic.Bool),
}
}

type eventsNotifier struct {
logger log.Logger
channel channels.Channel
events <-chan events.Event
signals chan os.Signal
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
valueService settingssvc.ValueService
emailTemplatePath string
Expand All @@ -71,16 +70,27 @@ type eventsNotifier struct {
ocisURL string
serviceAccountID string
serviceAccountSecret string
stopCh chan struct{}
stopped *atomic.Bool
}

func (s eventsNotifier) Run() error {
signal.Notify(s.signals, syscall.SIGINT, syscall.SIGTERM)
var wg sync.WaitGroup

s.logger.Debug().
Msg("eventsNotifier started")
EventLoop:
for {
select {
case evt := <-s.events:
case evt, ok := <-s.events:
if !ok {
break EventLoop
}
// TODO: needs to be replaced with a worker pool
wg.Add(1)
go func() {
defer wg.Done()

switch e := evt.Event.(type) {
case events.SpaceShared:
s.handleSpaceShared(e)
Expand All @@ -94,12 +104,24 @@ func (s eventsNotifier) Run() error {
s.handleShareExpired(e)
}
}()
case <-s.signals:

if s.stopped.Load() {
break EventLoop
}
case <-s.stopCh:
s.logger.Debug().
Msg("eventsNotifier stopped")
return nil
break EventLoop
}
}
// wait until all the goroutines processing events have finished
wg.Wait()
return nil
}

func (s eventsNotifier) Close() {
s.stopped.Store(true)
close(s.stopCh)
}

func (s eventsNotifier) render(ctx context.Context, template email.MessageTemplate,
Expand Down
47 changes: 25 additions & 22 deletions services/policies/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"fmt"
"io"
"net/http"
"os/signal"

"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
Expand All @@ -33,16 +34,12 @@ func Server(cfg *config.Config) *cli.Command {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
var (
gr = run.Group{}
ctx, cancel = func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()
)
defer cancel()
var cancel context.CancelFunc
ctx := cfg.Context
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}

logger := log.NewLogger(
log.Name(cfg.Service.Name),
Expand All @@ -62,6 +59,7 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr := runner.NewGroup()
{
grpcClient, err := grpc.NewClient(
append(
Expand Down Expand Up @@ -104,9 +102,7 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr.Add(svc.Run, func(_ error) {
cancel()
})
gr.Add(runner.NewGoMicroGrpcServerRunner("policies_grpc", svc))
}

{
Expand All @@ -121,9 +117,11 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr.Add(eventSvc.Run, func(_ error) {
cancel()
})
gr.Add(runner.New("policies_svc", func() error {
return eventSvc.Run()
}, func() {
eventSvc.Close()
}))
}

{
Expand Down Expand Up @@ -165,13 +163,18 @@ func Server(cfg *config.Config) *cli.Command {
),
)

gr.Add(server.ListenAndServe, func(_ error) {
_ = server.Shutdown(ctx)
cancel()
})
gr.Add(runner.NewGolangHttpServerRunner("policies_debug", server))
}

return gr.Run()
grResults := gr.Run(ctx)

// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return nil
},
}
}
62 changes: 46 additions & 16 deletions services/policies/pkg/service/event/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eventSVC

import (
"context"
"sync/atomic"

"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
Expand All @@ -11,23 +12,27 @@ import (

// Service defines the service handlers.
type Service struct {
ctx context.Context
query string
log log.Logger
stream events.Stream
engine engine.Engine
tp trace.TracerProvider
ctx context.Context
query string
log log.Logger
stream events.Stream
engine engine.Engine
tp trace.TracerProvider
stopCh chan struct{}
stopped *atomic.Bool
}

// New returns a service implementation for Service.
func New(ctx context.Context, stream events.Stream, logger log.Logger, tp trace.TracerProvider, engine engine.Engine, query string) (Service, error) {
svc := Service{
ctx: ctx,
log: logger,
query: query,
tp: tp,
engine: engine,
stream: stream,
ctx: ctx,
log: logger,
query: query,
tp: tp,
engine: engine,
stream: stream,
stopCh: make(chan struct{}, 1),
stopped: new(atomic.Bool),
}

return svc, nil
Expand All @@ -40,16 +45,41 @@ func (s Service) Run() error {
return err
}

for e := range ch {
err := s.processEvent(e)
if err != nil {
return err
EventLoop:
for {
select {
case <-s.stopCh:
break EventLoop
case e, ok := <-ch:
if !ok {
break EventLoop
}

err := s.processEvent(e)
if err != nil {
return err
}

if s.stopped.Load() {
break EventLoop
}
}
}

return nil
}

// Close will make the policies service to stop processing, so the `Run`
// method can finish.
// TODO: Underlying services can't be stopped. This means that some goroutines
// will get stuck trying to push events through a channel nobody is reading
// from, so resources won't be freed and there will be memory leaks. For now,
// if the service is stopped, you should close the app soon after.
func (s Service) Close() {
s.stopped.Store(true)
close(s.stopCh)
}

func (s Service) processEvent(e events.Event) error {
ctx := e.GetTraceContext(s.ctx)
ctx, span := s.tp.Tracer("policies").Start(ctx, "processEvent")
Expand Down

0 comments on commit f692c7c

Please sign in to comment.