Skip to content

Commit

Permalink
feat: use runners in multiple services
Browse files Browse the repository at this point in the history
  • Loading branch information
jvillafanez committed May 7, 2024
1 parent ae1e8bb commit 868a07b
Show file tree
Hide file tree
Showing 17 changed files with 297 additions and 382 deletions.
2 changes: 1 addition & 1 deletion ocis-pkg/runner/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

var (
StopSignals = []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL}
StopSignals = []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT}
)

// Runable represent a task that can be executed by the Runner.
Expand Down
4 changes: 4 additions & 0 deletions services/collaboration/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func Server(cfg *config.Config) *cli.Command {
http.Context(ctx),
http.TracerProvider(traceProvider),
)
if err != nil {
logger.Info().Err(err).Str("transport", "http").Msg("Failed to initialize server")
return err
}
gr.Add(runner.NewGoMicroHttpServerRunner("collaboration_http", httpServer))

grResults := gr.Run(ctx)
Expand Down
48 changes: 22 additions & 26 deletions services/eventhistory/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package command
import (
"context"
"fmt"
"os/signal"

"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/store"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
Expand Down Expand Up @@ -45,21 +46,18 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

var (
gr = run.Group{}
ctx, cancel = func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()
m = metrics.New()
)

defer cancel()
var cancel context.CancelFunc
ctx := cfg.Context
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}

m := metrics.New()
m.BuildInfo.WithLabelValues(version.GetString()).Set(1)

gr := runner.NewGroup()

consumer, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
if err != nil {
return err
Expand Down Expand Up @@ -88,14 +86,7 @@ func Server(cfg *config.Config) *cli.Command {
grpc.TraceProvider(traceProvider),
)

gr.Add(service.Run, func(err error) {
logger.Error().
Err(err).
Str("server", "grpc").
Msg("Shutting Down server")

cancel()
})
gr.Add(runner.NewGoMicroGrpcServerRunner("eventhistory_grpc", service))

{
server := debug.NewService(
Expand All @@ -110,13 +101,18 @@ func Server(cfg *config.Config) *cli.Command {
debug.Ready(handlers.Ready),
)

gr.Add(server.ListenAndServe, func(_ error) {
_ = server.Shutdown(ctx)
cancel()
})
gr.Add(runner.NewGolangHttpServerRunner("eventhistory_debug", server))
}

return gr.Run()
grResults := gr.Run(ctx)

// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return nil
},
}
}
49 changes: 21 additions & 28 deletions services/graph/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package command
import (
"context"
"fmt"
"os"
"os/signal"

"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
"github.com/owncloud/ocis/v2/services/graph/pkg/config"
Expand Down Expand Up @@ -34,19 +34,17 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr := run.Group{}
ctx, cancel := func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()
mtrcs := metrics.New()

defer cancel()
var cancel context.CancelFunc
ctx := cfg.Context
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}

mtrcs := metrics.New()
mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1)

gr := runner.NewGroup()
{
server, err := http.Server(
http.Logger(logger),
Expand All @@ -60,17 +58,7 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr.Add(func() error {
return server.Run()
}, func(err error) {
logger.Error().
Str("transport", "http").
Err(err).
Msg("Shutting down server")

cancel()
os.Exit(1)
})
gr.Add(runner.NewGoMicroHttpServerRunner("graph_http", server))
}

{
Expand All @@ -84,13 +72,18 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr.Add(server.ListenAndServe, func(_ error) {
_ = server.Shutdown(ctx)
cancel()
})
gr.Add(runner.NewGolangHttpServerRunner("graph_debug", server))
}

return gr.Run()
grResults := gr.Run(ctx)

// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return nil
},
}
}
51 changes: 22 additions & 29 deletions services/idp/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (
"io"
"io/fs"
"os"
"os/signal"
"path/filepath"

"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
"github.com/owncloud/ocis/v2/services/idp/pkg/config"
Expand Down Expand Up @@ -54,21 +55,18 @@ func Server(cfg *config.Config) *cli.Command {
if err != nil {
return err
}
var (
gr = run.Group{}
ctx, cancel = func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()
metrics = metrics.New()
)

defer cancel()

var cancel context.CancelFunc
ctx := cfg.Context
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}

metrics := metrics.New()
metrics.BuildInfo.WithLabelValues(version.GetString()).Set(1)

gr := runner.NewGroup()
{
server, err := http.Server(
http.Logger(logger),
Expand All @@ -86,17 +84,7 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr.Add(func() error {
return server.Run()
}, func(err error) {
logger.Error().
Str("transport", "http").
Err(err).
Msg("Shutting down server")

cancel()
os.Exit(1)
})
gr.Add(runner.NewGoMicroHttpServerRunner("idp_http", server))
}

{
Expand All @@ -110,13 +98,18 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr.Add(server.ListenAndServe, func(_ error) {
_ = server.Shutdown(ctx)
cancel()
})
gr.Add(runner.NewGolangHttpServerRunner("idp_debug", server))
}

return gr.Run()
grResults := gr.Run(ctx)

// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return nil
},
}
}
Expand Down
52 changes: 21 additions & 31 deletions services/invitations/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package command
import (
"context"
"fmt"
"os"
"os/signal"

"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
"github.com/owncloud/ocis/v2/services/invitations/pkg/config"
Expand Down Expand Up @@ -35,21 +35,17 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

var (
gr = run.Group{}
ctx, cancel = func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()
metrics = metrics.New(metrics.Logger(logger))
)

defer cancel()
var cancel context.CancelFunc
ctx := cfg.Context
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}

metrics := metrics.New(metrics.Logger(logger))
metrics.BuildInfo.WithLabelValues(version.GetString()).Set(1)

gr := runner.NewGroup()
{

svc, err := service.New(
Expand Down Expand Up @@ -80,17 +76,7 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr.Add(func() error {
return server.Run()
}, func(err error) {
logger.Error().
Err(err).
Str("transport", "http").
Msg("Shutting down server")

cancel()
os.Exit(1)
})
gr.Add(runner.NewGoMicroHttpServerRunner("invitations_http", server))
}

{
Expand All @@ -104,14 +90,18 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr.Add(server.ListenAndServe, func(err error) {
logger.Error().Err(err)
_ = server.Shutdown(ctx)
cancel()
})
gr.Add(runner.NewGolangHttpServerRunner("invitations_debug", server))
}

return gr.Run()
grResults := gr.Run(ctx)

// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return nil
},
}
}
Loading

0 comments on commit 868a07b

Please sign in to comment.