Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Gracefully stop servers on interrupt #551

Merged
merged 3 commits into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions cmd/cleanup-export/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,50 @@ package main

import (
"context"
"log"
"fmt"
"net/http"

"go.opencensus.io/plugin/ochttp"

"github.com/google/exposure-notifications-server/internal/cleanup"
"github.com/google/exposure-notifications-server/internal/interrupt"
"github.com/google/exposure-notifications-server/internal/logging"
_ "github.com/google/exposure-notifications-server/internal/observability"
"github.com/google/exposure-notifications-server/internal/server"
"github.com/google/exposure-notifications-server/internal/setup"
)

func main() {
ctx := context.Background()
ctx, done := interrupt.Context()
defer done()

if err := realMain(ctx); err != nil {
logger := logging.FromContext(ctx)
logger.Fatal(err)
}
}

func realMain(ctx context.Context) error {
logger := logging.FromContext(ctx)

var config cleanup.Config
env, err := setup.Setup(ctx, &config)
if err != nil {
logger.Fatalf("setup.Setup: %v", err)
return fmt.Errorf("setup.Setup: %w", err)
}
defer env.Close(ctx)

handler, err := cleanup.NewExportHandler(&config, env)
if err != nil {
logger.Fatalf("cleanup.NewExportHandler: %v", err)
return fmt.Errorf("cleanup.NewExportHandler: %w", err)
}

mux := http.NewServeMux()
mux.Handle("/", handler)
logger.Infof("starting export cleanup server on :%s", config.Port)
instrumentedHandler := &ochttp.Handler{Handler: mux}
log.Fatal(http.ListenAndServe(":"+config.Port, instrumentedHandler))

srv, err := server.New(config.Port)
if err != nil {
return fmt.Errorf("server.New: %w", err)
}
logger.Infof("listening on :%s", config.Port)

return srv.ServeHTTPHandler(ctx, mux)
}
33 changes: 24 additions & 9 deletions cmd/cleanup-exposure/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,50 @@ package main

import (
"context"
"log"
"fmt"
"net/http"

"go.opencensus.io/plugin/ochttp"

"github.com/google/exposure-notifications-server/internal/cleanup"
"github.com/google/exposure-notifications-server/internal/interrupt"
"github.com/google/exposure-notifications-server/internal/logging"
_ "github.com/google/exposure-notifications-server/internal/observability"
"github.com/google/exposure-notifications-server/internal/server"
"github.com/google/exposure-notifications-server/internal/setup"
)

func main() {
ctx := context.Background()
ctx, done := interrupt.Context()
defer done()

if err := realMain(ctx); err != nil {
logger := logging.FromContext(ctx)
logger.Fatal(err)
}
}

func realMain(ctx context.Context) error {
logger := logging.FromContext(ctx)

var config cleanup.Config
env, err := setup.Setup(ctx, &config)
if err != nil {
logger.Fatalf("setup.Setup: %v", err)
return fmt.Errorf("setup.Setup: %w", err)
}
defer env.Close(ctx)

handler, err := cleanup.NewExposureHandler(&config, env)
if err != nil {
logger.Fatalf("cleanup.NewExposureHandler: %v", err)
return fmt.Errorf("cleanup.NewExposureHandler: %w", err)
}

mux := http.NewServeMux()
mux.Handle("/", handler)
logger.Infof("starting cleanup server on :%s", config.Port)
instrumentedHandler := &ochttp.Handler{Handler: mux}
log.Fatal(http.ListenAndServe(":"+config.Port, instrumentedHandler))

srv, err := server.New(config.Port)
if err != nil {
return fmt.Errorf("server.New: %w", err)
}
logger.Infof("listening on :%s", config.Port)

return srv.ServeHTTPHandler(ctx, mux)
}
36 changes: 25 additions & 11 deletions cmd/export/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,51 @@ package main

import (
"context"
"log"
"fmt"
"net/http"

"go.opencensus.io/plugin/ochttp"

"github.com/google/exposure-notifications-server/internal/export"
"github.com/google/exposure-notifications-server/internal/interrupt"
"github.com/google/exposure-notifications-server/internal/logging"
_ "github.com/google/exposure-notifications-server/internal/observability"
"github.com/google/exposure-notifications-server/internal/server"
"github.com/google/exposure-notifications-server/internal/setup"
)

func main() {
ctx := context.Background()
ctx, done := interrupt.Context()
defer done()

if err := realMain(ctx); err != nil {
logger := logging.FromContext(ctx)
logger.Fatal(err)
}
}

func realMain(ctx context.Context) error {
logger := logging.FromContext(ctx)

var config export.Config
env, err := setup.Setup(ctx, &config)
if err != nil {
logger.Fatalf("setup.Setup: %v", err)
return fmt.Errorf("setup.Setup: %w", err)
}
defer env.Close(ctx)

batchServer, err := export.NewServer(&config, env)
if err != nil {
logger.Fatalf("unable to create server: %v", err)
return fmt.Errorf("export.NewServer: %w", err)
}

mux := http.NewServeMux()
mux.HandleFunc("/create-batches", batchServer.CreateBatchesHandler) // controller that creates work items
mux.HandleFunc("/do-work", batchServer.WorkerHandler) // worker that executes work
mux.HandleFunc("/create-batches", batchServer.CreateBatchesHandler)
mux.HandleFunc("/do-work", batchServer.WorkerHandler)

srv, err := server.New(config.Port)
if err != nil {
return fmt.Errorf("server.New: %w", err)
}
logger.Infof("listening on :%s", config.Port)

logger.Infof("starting exposure export server on :%s", config.Port)
instrumentedHandler := &ochttp.Handler{Handler: mux}
log.Fatal(http.ListenAndServe(":"+config.Port, instrumentedHandler))
return srv.ServeHTTPHandler(ctx, mux)
}
33 changes: 24 additions & 9 deletions cmd/exposure/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,51 @@ package main

import (
"context"
"log"
"fmt"
"net/http"

"go.opencensus.io/plugin/ochttp"

"github.com/google/exposure-notifications-server/internal/handlers"
"github.com/google/exposure-notifications-server/internal/interrupt"
"github.com/google/exposure-notifications-server/internal/logging"
_ "github.com/google/exposure-notifications-server/internal/observability"
"github.com/google/exposure-notifications-server/internal/publish"
"github.com/google/exposure-notifications-server/internal/server"
"github.com/google/exposure-notifications-server/internal/setup"
)

func main() {
ctx := context.Background()
ctx, done := interrupt.Context()
defer done()

if err := realMain(ctx); err != nil {
logger := logging.FromContext(ctx)
logger.Fatal(err)
}
}

func realMain(ctx context.Context) error {
logger := logging.FromContext(ctx)

var config publish.Config
env, err := setup.Setup(ctx, &config)
if err != nil {
logger.Fatalf("setup.Setup: %v", err)
return fmt.Errorf("setup.Setup: %w", err)
}
defer env.Close(ctx)

handler, err := publish.NewHandler(ctx, &config, env)
if err != nil {
logger.Fatalf("unable to create publish handler: %v", err)
return fmt.Errorf("publish.NewHandler: %w", err)
}

mux := http.NewServeMux()
mux.Handle("/", handlers.WithMinimumLatency(config.MinRequestDuration, handler))
logger.Infof("starting exposure server on :%s", config.Port)
instrumentedHandler := &ochttp.Handler{Handler: mux}
log.Fatal(http.ListenAndServe(":"+config.Port, instrumentedHandler))

srv, err := server.New(config.Port)
if err != nil {
return fmt.Errorf("server.New: %w", err)
}
logger.Infof("listening on :%s", config.Port)

return srv.ServeHTTPHandler(ctx, mux)
}
34 changes: 25 additions & 9 deletions cmd/federationin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,47 @@ package main

import (
"context"
"log"
"fmt"
"net/http"

"go.opencensus.io/plugin/ochttp"

"github.com/google/exposure-notifications-server/internal/federationin"
"github.com/google/exposure-notifications-server/internal/interrupt"
"github.com/google/exposure-notifications-server/internal/logging"
_ "github.com/google/exposure-notifications-server/internal/observability"
"github.com/google/exposure-notifications-server/internal/server"
"github.com/google/exposure-notifications-server/internal/setup"
)

func main() {
ctx := context.Background()
ctx, done := interrupt.Context()
defer done()

if err := realMain(ctx); err != nil {
logger := logging.FromContext(ctx)
logger.Fatal(err)
}
}

func realMain(ctx context.Context) error {
logger := logging.FromContext(ctx)

var config federationin.Config
env, err := setup.Setup(ctx, &config)
if err != nil {
logger.Fatalf("setup.Setup: %v", err)
return fmt.Errorf("setup.Setup: %w", err)
}
defer env.Close(ctx)

handler := federationin.NewHandler(env, &config)

mux := http.NewServeMux()
mux.Handle("/", federationin.NewHandler(env, &config))
logger.Infof("Starting federationin server on port %s", config.Port)
instrumentedHandler := &ochttp.Handler{Handler: mux}
log.Fatal(http.ListenAndServe(":"+config.Port, instrumentedHandler))
mux.Handle("/", handler)

srv, err := server.New(config.Port)
if err != nil {
return fmt.Errorf("server.New: %w", err)
}
logger.Infof("listening on :%s", config.Port)

return srv.ServeHTTPHandler(ctx, mux)
}
36 changes: 23 additions & 13 deletions cmd/federationout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,66 @@ package main

import (
"context"
"log"
"net"
"fmt"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"go.opencensus.io/plugin/ocgrpc"

"github.com/google/exposure-notifications-server/internal/federationout"
"github.com/google/exposure-notifications-server/internal/interrupt"
"github.com/google/exposure-notifications-server/internal/logging"
_ "github.com/google/exposure-notifications-server/internal/observability"
"github.com/google/exposure-notifications-server/internal/pb"
"github.com/google/exposure-notifications-server/internal/server"
"github.com/google/exposure-notifications-server/internal/setup"
)

func main() {
ctx := context.Background()
ctx, done := interrupt.Context()
defer done()

if err := realMain(ctx); err != nil {
logger := logging.FromContext(ctx)
logger.Fatal(err)
}
}

func realMain(ctx context.Context) error {
logger := logging.FromContext(ctx)

var config federationout.Config
env, err := setup.Setup(ctx, &config)
if err != nil {
logger.Fatalf("setup.Setup: %v", err)
return fmt.Errorf("setup.Setup: %w", err)
}
defer env.Close(ctx)

server := federationout.NewServer(env, &config)
federationServer := federationout.NewServer(env, &config)

var sopts []grpc.ServerOption
if config.TLSCertFile != "" && config.TLSKeyFile != "" {
creds, err := credentials.NewServerTLSFromFile(config.TLSCertFile, config.TLSKeyFile)
if err != nil {
log.Fatalf("Failed to generate credentials: %v", err)
return fmt.Errorf("failed to create credentials: %w", err)
}
sopts = append(sopts, grpc.Creds(creds))
}

if !config.AllowAnyClient {
sopts = append(sopts, grpc.UnaryInterceptor(server.(*federationout.Server).AuthInterceptor))
sopts = append(sopts, grpc.UnaryInterceptor(federationServer.(*federationout.Server).AuthInterceptor))
}

sopts = append(sopts, grpc.StatsHandler(&ocgrpc.ServerHandler{}))
grpcServer := grpc.NewServer(sopts...)
pb.RegisterFederationServer(grpcServer, server)
pb.RegisterFederationServer(grpcServer, federationServer)

grpcEndpoint := ":" + config.Port
listen, err := net.Listen("tcp", grpcEndpoint)
srv, err := server.New(config.Port)
if err != nil {
logger.Fatalf("Failed to start server: %v", err)
return fmt.Errorf("server.New: %w", err)
}
logger.Infof("Starting federationout gRPC listener [%s]", grpcEndpoint)
log.Fatal(grpcServer.Serve(listen))
logger.Infof("listening on :%s", config.Port)

return srv.ServeGRPC(ctx, grpcServer)
}
Loading