Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
Gracefully stop servers on interrupt (#551)
Browse files Browse the repository at this point in the history
* Gracefully stop servers on interrupt

* Server handles stop

* Remove unused error
  • Loading branch information
sethvargo authored Jun 5, 2020
1 parent c190ce1 commit 2161d09
Show file tree
Hide file tree
Showing 12 changed files with 389 additions and 85 deletions.
33 changes: 24 additions & 9 deletions cmd/cleanup-export/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,50 @@ package main

import (
"context"
"log"
"fmt"
"net/http"

"go.opencensus.io/plugin/ochttp"

"github.com/google/exposure-notifications-server/internal/cleanup"
"github.com/google/exposure-notifications-server/internal/interrupt"
"github.com/google/exposure-notifications-server/internal/logging"
_ "github.com/google/exposure-notifications-server/internal/observability"
"github.com/google/exposure-notifications-server/internal/server"
"github.com/google/exposure-notifications-server/internal/setup"
)

func main() {
ctx := context.Background()
ctx, done := interrupt.Context()
defer done()

if err := realMain(ctx); err != nil {
logger := logging.FromContext(ctx)
logger.Fatal(err)
}
}

func realMain(ctx context.Context) error {
logger := logging.FromContext(ctx)

var config cleanup.Config
env, err := setup.Setup(ctx, &config)
if err != nil {
logger.Fatalf("setup.Setup: %v", err)
return fmt.Errorf("setup.Setup: %w", err)
}
defer env.Close(ctx)

handler, err := cleanup.NewExportHandler(&config, env)
if err != nil {
logger.Fatalf("cleanup.NewExportHandler: %v", err)
return fmt.Errorf("cleanup.NewExportHandler: %w", err)
}

mux := http.NewServeMux()
mux.Handle("/", handler)
logger.Infof("starting export cleanup server on :%s", config.Port)
instrumentedHandler := &ochttp.Handler{Handler: mux}
log.Fatal(http.ListenAndServe(":"+config.Port, instrumentedHandler))

srv, err := server.New(config.Port)
if err != nil {
return fmt.Errorf("server.New: %w", err)
}
logger.Infof("listening on :%s", config.Port)

return srv.ServeHTTPHandler(ctx, mux)
}
33 changes: 24 additions & 9 deletions cmd/cleanup-exposure/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,50 @@ package main

import (
"context"
"log"
"fmt"
"net/http"

"go.opencensus.io/plugin/ochttp"

"github.com/google/exposure-notifications-server/internal/cleanup"
"github.com/google/exposure-notifications-server/internal/interrupt"
"github.com/google/exposure-notifications-server/internal/logging"
_ "github.com/google/exposure-notifications-server/internal/observability"
"github.com/google/exposure-notifications-server/internal/server"
"github.com/google/exposure-notifications-server/internal/setup"
)

func main() {
ctx := context.Background()
ctx, done := interrupt.Context()
defer done()

if err := realMain(ctx); err != nil {
logger := logging.FromContext(ctx)
logger.Fatal(err)
}
}

func realMain(ctx context.Context) error {
logger := logging.FromContext(ctx)

var config cleanup.Config
env, err := setup.Setup(ctx, &config)
if err != nil {
logger.Fatalf("setup.Setup: %v", err)
return fmt.Errorf("setup.Setup: %w", err)
}
defer env.Close(ctx)

handler, err := cleanup.NewExposureHandler(&config, env)
if err != nil {
logger.Fatalf("cleanup.NewExposureHandler: %v", err)
return fmt.Errorf("cleanup.NewExposureHandler: %w", err)
}

mux := http.NewServeMux()
mux.Handle("/", handler)
logger.Infof("starting cleanup server on :%s", config.Port)
instrumentedHandler := &ochttp.Handler{Handler: mux}
log.Fatal(http.ListenAndServe(":"+config.Port, instrumentedHandler))

srv, err := server.New(config.Port)
if err != nil {
return fmt.Errorf("server.New: %w", err)
}
logger.Infof("listening on :%s", config.Port)

return srv.ServeHTTPHandler(ctx, mux)
}
36 changes: 25 additions & 11 deletions cmd/export/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,51 @@ package main

import (
"context"
"log"
"fmt"
"net/http"

"go.opencensus.io/plugin/ochttp"

"github.com/google/exposure-notifications-server/internal/export"
"github.com/google/exposure-notifications-server/internal/interrupt"
"github.com/google/exposure-notifications-server/internal/logging"
_ "github.com/google/exposure-notifications-server/internal/observability"
"github.com/google/exposure-notifications-server/internal/server"
"github.com/google/exposure-notifications-server/internal/setup"
)

func main() {
ctx := context.Background()
ctx, done := interrupt.Context()
defer done()

if err := realMain(ctx); err != nil {
logger := logging.FromContext(ctx)
logger.Fatal(err)
}
}

func realMain(ctx context.Context) error {
logger := logging.FromContext(ctx)

var config export.Config
env, err := setup.Setup(ctx, &config)
if err != nil {
logger.Fatalf("setup.Setup: %v", err)
return fmt.Errorf("setup.Setup: %w", err)
}
defer env.Close(ctx)

batchServer, err := export.NewServer(&config, env)
if err != nil {
logger.Fatalf("unable to create server: %v", err)
return fmt.Errorf("export.NewServer: %w", err)
}

mux := http.NewServeMux()
mux.HandleFunc("/create-batches", batchServer.CreateBatchesHandler) // controller that creates work items
mux.HandleFunc("/do-work", batchServer.WorkerHandler) // worker that executes work
mux.HandleFunc("/create-batches", batchServer.CreateBatchesHandler)
mux.HandleFunc("/do-work", batchServer.WorkerHandler)

srv, err := server.New(config.Port)
if err != nil {
return fmt.Errorf("server.New: %w", err)
}
logger.Infof("listening on :%s", config.Port)

logger.Infof("starting exposure export server on :%s", config.Port)
instrumentedHandler := &ochttp.Handler{Handler: mux}
log.Fatal(http.ListenAndServe(":"+config.Port, instrumentedHandler))
return srv.ServeHTTPHandler(ctx, mux)
}
33 changes: 24 additions & 9 deletions cmd/exposure/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,51 @@ package main

import (
"context"
"log"
"fmt"
"net/http"

"go.opencensus.io/plugin/ochttp"

"github.com/google/exposure-notifications-server/internal/handlers"
"github.com/google/exposure-notifications-server/internal/interrupt"
"github.com/google/exposure-notifications-server/internal/logging"
_ "github.com/google/exposure-notifications-server/internal/observability"
"github.com/google/exposure-notifications-server/internal/publish"
"github.com/google/exposure-notifications-server/internal/server"
"github.com/google/exposure-notifications-server/internal/setup"
)

func main() {
ctx := context.Background()
ctx, done := interrupt.Context()
defer done()

if err := realMain(ctx); err != nil {
logger := logging.FromContext(ctx)
logger.Fatal(err)
}
}

func realMain(ctx context.Context) error {
logger := logging.FromContext(ctx)

var config publish.Config
env, err := setup.Setup(ctx, &config)
if err != nil {
logger.Fatalf("setup.Setup: %v", err)
return fmt.Errorf("setup.Setup: %w", err)
}
defer env.Close(ctx)

handler, err := publish.NewHandler(ctx, &config, env)
if err != nil {
logger.Fatalf("unable to create publish handler: %v", err)
return fmt.Errorf("publish.NewHandler: %w", err)
}

mux := http.NewServeMux()
mux.Handle("/", handlers.WithMinimumLatency(config.MinRequestDuration, handler))
logger.Infof("starting exposure server on :%s", config.Port)
instrumentedHandler := &ochttp.Handler{Handler: mux}
log.Fatal(http.ListenAndServe(":"+config.Port, instrumentedHandler))

srv, err := server.New(config.Port)
if err != nil {
return fmt.Errorf("server.New: %w", err)
}
logger.Infof("listening on :%s", config.Port)

return srv.ServeHTTPHandler(ctx, mux)
}
34 changes: 25 additions & 9 deletions cmd/federationin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,47 @@ package main

import (
"context"
"log"
"fmt"
"net/http"

"go.opencensus.io/plugin/ochttp"

"github.com/google/exposure-notifications-server/internal/federationin"
"github.com/google/exposure-notifications-server/internal/interrupt"
"github.com/google/exposure-notifications-server/internal/logging"
_ "github.com/google/exposure-notifications-server/internal/observability"
"github.com/google/exposure-notifications-server/internal/server"
"github.com/google/exposure-notifications-server/internal/setup"
)

func main() {
ctx := context.Background()
ctx, done := interrupt.Context()
defer done()

if err := realMain(ctx); err != nil {
logger := logging.FromContext(ctx)
logger.Fatal(err)
}
}

func realMain(ctx context.Context) error {
logger := logging.FromContext(ctx)

var config federationin.Config
env, err := setup.Setup(ctx, &config)
if err != nil {
logger.Fatalf("setup.Setup: %v", err)
return fmt.Errorf("setup.Setup: %w", err)
}
defer env.Close(ctx)

handler := federationin.NewHandler(env, &config)

mux := http.NewServeMux()
mux.Handle("/", federationin.NewHandler(env, &config))
logger.Infof("Starting federationin server on port %s", config.Port)
instrumentedHandler := &ochttp.Handler{Handler: mux}
log.Fatal(http.ListenAndServe(":"+config.Port, instrumentedHandler))
mux.Handle("/", handler)

srv, err := server.New(config.Port)
if err != nil {
return fmt.Errorf("server.New: %w", err)
}
logger.Infof("listening on :%s", config.Port)

return srv.ServeHTTPHandler(ctx, mux)
}
36 changes: 23 additions & 13 deletions cmd/federationout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,66 @@ package main

import (
"context"
"log"
"net"
"fmt"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"go.opencensus.io/plugin/ocgrpc"

"github.com/google/exposure-notifications-server/internal/federationout"
"github.com/google/exposure-notifications-server/internal/interrupt"
"github.com/google/exposure-notifications-server/internal/logging"
_ "github.com/google/exposure-notifications-server/internal/observability"
"github.com/google/exposure-notifications-server/internal/pb"
"github.com/google/exposure-notifications-server/internal/server"
"github.com/google/exposure-notifications-server/internal/setup"
)

func main() {
ctx := context.Background()
ctx, done := interrupt.Context()
defer done()

if err := realMain(ctx); err != nil {
logger := logging.FromContext(ctx)
logger.Fatal(err)
}
}

func realMain(ctx context.Context) error {
logger := logging.FromContext(ctx)

var config federationout.Config
env, err := setup.Setup(ctx, &config)
if err != nil {
logger.Fatalf("setup.Setup: %v", err)
return fmt.Errorf("setup.Setup: %w", err)
}
defer env.Close(ctx)

server := federationout.NewServer(env, &config)
federationServer := federationout.NewServer(env, &config)

var sopts []grpc.ServerOption
if config.TLSCertFile != "" && config.TLSKeyFile != "" {
creds, err := credentials.NewServerTLSFromFile(config.TLSCertFile, config.TLSKeyFile)
if err != nil {
log.Fatalf("Failed to generate credentials: %v", err)
return fmt.Errorf("failed to create credentials: %w", err)
}
sopts = append(sopts, grpc.Creds(creds))
}

if !config.AllowAnyClient {
sopts = append(sopts, grpc.UnaryInterceptor(server.(*federationout.Server).AuthInterceptor))
sopts = append(sopts, grpc.UnaryInterceptor(federationServer.(*federationout.Server).AuthInterceptor))
}

sopts = append(sopts, grpc.StatsHandler(&ocgrpc.ServerHandler{}))
grpcServer := grpc.NewServer(sopts...)
pb.RegisterFederationServer(grpcServer, server)
pb.RegisterFederationServer(grpcServer, federationServer)

grpcEndpoint := ":" + config.Port
listen, err := net.Listen("tcp", grpcEndpoint)
srv, err := server.New(config.Port)
if err != nil {
logger.Fatalf("Failed to start server: %v", err)
return fmt.Errorf("server.New: %w", err)
}
logger.Infof("Starting federationout gRPC listener [%s]", grpcEndpoint)
log.Fatal(grpcServer.Serve(listen))
logger.Infof("listening on :%s", config.Port)

return srv.ServeGRPC(ctx, grpcServer)
}
Loading

0 comments on commit 2161d09

Please sign in to comment.