Skip to content

Commit

Permalink
feat: use runners to startup the services
Browse files Browse the repository at this point in the history
  • Loading branch information
jvillafanez committed May 6, 2024
1 parent 045d545 commit ae1e8bb
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 100 deletions.
134 changes: 134 additions & 0 deletions ocis-pkg/runner/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package runner

import (
"context"
"errors"
"net"
"net/http"
"time"

ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
ohttp "github.com/owncloud/ocis/v2/ocis-pkg/service/http"
"google.golang.org/grpc"
)

// NewGoMicroGrpcServerRunner creates a new runner based on the provided go-micro's
// GRPC service. The service is expected to be created via
// "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc".NewService(...) function
//
// The runner will behave as described:
// * The task is to start a server and listen for connections. If the server
// can't start, the task will finish with that error.
// * The stopper will call the server's stop method and send the result to
// the task.
// * The stopper will run asynchronously because the stop method could take a
// while and we don't want to block
func NewGoMicroGrpcServerRunner(name string, server ogrpc.Service, opts ...Option) *Runner {
httpCh := make(chan error, 1)
r := New(name, func() error {
// start the server and return if it fails
if err := server.Server().Start(); err != nil {
return err
}
return <-httpCh // wait for the result
}, func() {
// stop implies deregistering and waiting for request to finish,
// so don't block
go func() {
httpCh <- server.Server().Stop() // stop and send result through channel
close(httpCh)
}()
}, opts...)
return r
}

// NewGoMicroHttpServerRunner creates a new runner based on the provided go-micro's
// HTTP service. The service is expected to be created via
// "github.com/owncloud/ocis/v2/ocis-pkg/service/http".NewService(...) function
//
// The runner will behave as described:
// * The task is to start a server and listen for connections. If the server
// can't start, the task will finish with that error.
// * The stopper will call the server's stop method and send the result to
// the task.
// * The stopper will run asynchronously because the stop method could take a
// while and we don't want to block
func NewGoMicroHttpServerRunner(name string, server ohttp.Service, opts ...Option) *Runner {
httpCh := make(chan error, 1)
r := New(name, func() error {
// start the server and return if it fails
if err := server.Server().Start(); err != nil {
return err
}
return <-httpCh // wait for the result
}, func() {
// stop implies deregistering and waiting for request to finish,
// so don't block
go func() {
httpCh <- server.Server().Stop() // stop and send result through channel
close(httpCh)
}()
}, opts...)
return r
}

// NewGolangHttpServerRunner creates a new runner based on the provided HTTP server.
// The HTTP server is expected to be created via
// "github.com/owncloud/ocis/v2/ocis-pkg/service/debug".NewService(...) function
// and it's expected to be a regular golang HTTP server
//
// The runner will behave as described:
// * The task starts a server and listen for connections. If the server
// can't start, the task will finish with that error. If the server is shutdown
// the task will wait for the shutdown to return that result (task won't finish
// immediately, but wait until shutdown returns)
// * The stopper will call the server's shutdown method and send the result to
// the task. The stopper will wait up to 5 secs for the shutdown.
// * The stopper will run asynchronously because the shutdown could take a
// while and we don't want to block
func NewGolangHttpServerRunner(name string, server *http.Server, opts ...Option) *Runner {
debugCh := make(chan error, 1)
r := New(name, func() error {
// start listening and return if the error is NOT ErrServerClosed.
// ListenAndServe will always return a non-nil error.
// We need to wait and get the result of the Shutdown call.
// App shouldn't exit until Shutdown has returned.
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
// wait for the shutdown and return the result
return <-debugCh
}, func() {
// Since Shutdown might take some time, don't block
go func() {
// give 5 secs for the shutdown to finish
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

debugCh <- server.Shutdown(shutdownCtx)
close(debugCh)
}()
}, opts...)

return r
}

// NewGolangGrpcServerRunner creates a new runner based on the provided GRPC
// server. The GRPC server is expected to be a regular golang GRPC server,
// created via "google.golang.org/grpc".NewServer(...)
// A listener also needs to be provided for the server to listen there.
//
// The runner will just start the GRPC server in the listener, and the server
// will be gracefully stopped when interrupted
func NewGolangGrpcServerRunner(name string, server *grpc.Server, listener net.Listener, opts ...Option) *Runner {
r := New(name, func() error {
return server.Serve(listener)
}, func() {
// Since GracefulStop might take some time, don't block
go func() {
server.GracefulStop()
}()
}, opts...)

return r
}
6 changes: 6 additions & 0 deletions ocis-pkg/runner/types.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package runner

import (
"os"
"strings"
"syscall"
"time"
)

var (
StopSignals = []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL}
)

// Runable represent a task that can be executed by the Runner.
// It expected to be a long running task with an indefinite execution time,
// so it's suitable for servers or services.
Expand Down
58 changes: 26 additions & 32 deletions services/collaboration/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"fmt"
"net"
"os/signal"

"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/services/collaboration/pkg/config"
"github.com/owncloud/ocis/v2/services/collaboration/pkg/config/parser"
Expand Down Expand Up @@ -35,14 +36,12 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr := run.Group{}
ctx, cancel := func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()
defer cancel()
var cancel context.CancelFunc
ctx := cfg.Context
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}

// prepare components
if err := helpers.RegisterOcisService(ctx, cfg, logger); err != nil {
Expand All @@ -63,6 +62,8 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr := runner.NewGroup()

// start GRPC server
grpcServer, teardown, err := grpc.Server(
grpc.AppURLs(appUrls),
Expand All @@ -78,20 +79,11 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr.Add(func() error {
l, err := net.Listen("tcp", cfg.GRPC.Addr)
if err != nil {
return err
}
return grpcServer.Serve(l)
},
func(_ error) {
logger.Error().
Err(err).
Str("server", "grpc").
Msg("shutting down server")
cancel()
})
l, err := net.Listen("tcp", cfg.GRPC.Addr)
if err != nil {
return err
}
gr.Add(runner.NewGolangGrpcServerRunner("collaboration_grpc", grpcServer, l))

// start debug server
debugServer, err := debug.Server(
Expand All @@ -103,11 +95,7 @@ func Server(cfg *config.Config) *cli.Command {
logger.Info().Err(err).Str("transport", "debug").Msg("Failed to initialize server")
return err
}

gr.Add(debugServer.ListenAndServe, func(_ error) {
_ = debugServer.Shutdown(ctx)
cancel()
})
gr.Add(runner.NewGolangHttpServerRunner("collaboration_debug", debugServer))

// start HTTP server
httpServer, err := http.Server(
Expand All @@ -117,11 +105,17 @@ func Server(cfg *config.Config) *cli.Command {
http.Context(ctx),
http.TracerProvider(traceProvider),
)
gr.Add(httpServer.Run, func(_ error) {
cancel()
})
gr.Add(runner.NewGoMicroHttpServerRunner("collaboration_http", httpServer))

grResults := gr.Run(ctx)

return gr.Run()
// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return nil
},
}
}
59 changes: 22 additions & 37 deletions services/thumbnails/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package command
import (
"context"
"fmt"
"os"
"os/signal"

"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
Expand Down Expand Up @@ -41,20 +41,17 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

var (
gr = run.Group{}
ctx, cancel = func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()
m = metrics.New()
)
var cancel context.CancelFunc
ctx := cfg.Context
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}

defer cancel()
metrics := metrics.New()
metrics.BuildInfo.WithLabelValues(version.GetString()).Set(1)

m.BuildInfo.WithLabelValues(version.GetString()).Set(1)
gr := runner.NewGroup()

service := grpc.NewService(
grpc.Logger(logger),
Expand All @@ -66,16 +63,7 @@ func Server(cfg *config.Config) *cli.Command {
grpc.Metrics(m),
grpc.TraceProvider(traceProvider),
)

gr.Add(service.Run, func(_ error) {
logger.Error().
Err(err).
Str("server", "grpc").
Msg("Shutting down server")

cancel()
os.Exit(1)
})
gr.Add(runner.NewGoMicroGrpcServerRunner("thumbnails_grpc", service))

server, err := debug.Server(
debug.Logger(logger),
Expand All @@ -85,11 +73,7 @@ func Server(cfg *config.Config) *cli.Command {
logger.Info().Err(err).Str("transport", "debug").Msg("Failed to initialize server")
return err
}

gr.Add(server.ListenAndServe, func(_ error) {
_ = server.Shutdown(ctx)
cancel()
})
gr.Add(runner.NewGolangHttpServerRunner("thumbnails_debug", server))

httpServer, err := http.Server(
http.Logger(logger),
Expand All @@ -107,16 +91,17 @@ func Server(cfg *config.Config) *cli.Command {

return err
}
gr.Add(runner.NewGoMicroHttpServerRunner("thumbnails_http", httpServer))

gr.Add(httpServer.Run, func(_ error) {
logger.Error().
Err(err).
Str("server", "http").
Msg("Shutting down server")
cancel()
})
grResults := gr.Run(ctx)

return gr.Run()
// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return nil
},
}
}
Loading

0 comments on commit ae1e8bb

Please sign in to comment.