Skip to content

Commit

Permalink
feat: include idm and nats services to the runners
Browse files Browse the repository at this point in the history
  • Loading branch information
jvillafanez committed May 17, 2024
1 parent f692c7c commit 4dc128d
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 74 deletions.
62 changes: 29 additions & 33 deletions services/idm/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import (
"fmt"
"html/template"
"os"
"os/signal"
"strings"

"github.com/go-ldap/ldif"
"github.com/libregraph/idm/pkg/ldappassword"
"github.com/libregraph/idm/pkg/ldbbolt"
"github.com/libregraph/idm/server"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
pkgcrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
"github.com/owncloud/ocis/v2/services/idm"
Expand All @@ -37,19 +38,16 @@ func Server(cfg *config.Config) *cli.Command {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
var (
gr = run.Group{}
logger = logging.Configure(cfg.Service.Name, cfg.Log)
ctx, cancel = func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()
)
var cancel context.CancelFunc
ctx := cfg.Context
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}

defer cancel()
logger := logging.Configure(cfg.Service.Name, cfg.Log)

gr := runner.NewGroup()
{
servercfg := server.Config{
Logger: log.LogrusWrap(logger.Logger),
Expand Down Expand Up @@ -81,22 +79,16 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr.Add(func() error {
err := make(chan error)
select {
case <-ctx.Done():
return nil
// we need an additional context for the idm server in order to
// cancel it anytime
svcCtx, svcCancel := context.WithCancel(ctx)
defer svcCancel()

case err <- svc.Serve(ctx):
return <-err
}
}, func(err error) {
logger.Error().
Err(err).
Msg("Shutting down server")
cancel()
os.Exit(1)
})
gr.Add(runner.New("idm_svc", func() error {
return svc.Serve(svcCtx)
}, func() {
svcCancel()
}))
}

{
Expand All @@ -112,14 +104,18 @@ func Server(cfg *config.Config) *cli.Command {
debug.Ready(handlers.Ready),
)

gr.Add(server.ListenAndServe, func(_ error) {
_ = server.Shutdown(ctx)
cancel()
})
gr.Add(runner.NewGolangHttpServerRunner("idm_debug", server))
}

return gr.Run()
//return start(ctx, logger, cfg)
grResults := gr.Run(ctx)

// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return nil
},
}
}
Expand Down
58 changes: 23 additions & 35 deletions services/nats/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ import (
"context"
"crypto/tls"
"fmt"
"os"

"github.com/oklog/run"
"os/signal"

"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
pkgcrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
"github.com/owncloud/ocis/v2/services/nats/pkg/config"
Expand All @@ -32,16 +31,14 @@ func Server(cfg *config.Config) *cli.Command {
Action: func(c *cli.Context) error {
logger := logging.Configure(cfg.Service.Name, cfg.Log)

gr := run.Group{}
ctx, cancel := func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()

defer cancel()
var cancel context.CancelFunc
ctx := cfg.Context
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}

gr := runner.NewGroup()
{
server := debug.NewService(
debug.Logger(logger),
Expand All @@ -55,10 +52,7 @@ func Server(cfg *config.Config) *cli.Command {
debug.Ready(handlers.Ready),
)

gr.Add(server.ListenAndServe, func(_ error) {
_ = server.Shutdown(ctx)
cancel()
})
gr.Add(runner.NewGolangHttpServerRunner("nats_debug", server))
}

var tlsConf *tls.Config
Expand All @@ -85,7 +79,6 @@ func Server(cfg *config.Config) *cli.Command {
}
}
natsServer, err := nats.NewNATSServer(
ctx,
logging.NewLogWrapper(logger),
nats.Host(cfg.Nats.Host),
nats.Port(cfg.Nats.Port),
Expand All @@ -98,26 +91,21 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr.Add(func() error {
err := make(chan error)
select {
case <-ctx.Done():
return nil
case err <- natsServer.ListenAndServe():
return <-err
}

}, func(err error) {
logger.Error().
Err(err).
Msg("Shutting down server")

gr.Add(runner.New("nats_svc", func() error {
return natsServer.ListenAndServe()
}, func() {
natsServer.Shutdown()
cancel()
os.Exit(1)
})
}))

grResults := gr.Run(ctx)

return gr.Run()
// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return nil
},
}
}
9 changes: 3 additions & 6 deletions services/nats/pkg/server/nats/nats.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nats

import (
"context"
"time"

nserver "github.com/nats-io/nats-server/v2/server"
Expand All @@ -10,11 +9,10 @@ import (
var NATSListenAndServeLoopTimer = 1 * time.Second

type NATSServer struct {
ctx context.Context
server *nserver.Server
}

func NewNATSServer(ctx context.Context, logger nserver.Logger, opts ...NatsOption) (*NATSServer, error) {
func NewNATSServer(logger nserver.Logger, opts ...NatsOption) (*NATSServer, error) {
natsOpts := &nserver.Options{}

for _, o := range opts {
Expand All @@ -32,15 +30,14 @@ func NewNATSServer(ctx context.Context, logger nserver.Logger, opts ...NatsOptio
server.SetLoggerV2(logger, true, true, false)

return &NATSServer{
ctx: ctx,
server: server,
}, nil
}

// ListenAndServe runs the NATSServer in a blocking way until the server is shutdown or an error occurs
func (n *NATSServer) ListenAndServe() (err error) {
go n.server.Start()
<-n.ctx.Done()
n.server.Start() // it won't block
n.server.WaitForShutdown() // block until the server is fully shutdown
return nil
}

Expand Down
1 change: 1 addition & 0 deletions services/notifications/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
// Service should be named `Runner`
type Service interface {
Run() error
Close()
}

// NewEventsNotifier provides a new eventsNotifier
Expand Down

0 comments on commit 4dc128d

Please sign in to comment.