From 4663de8738636bf7c6edd3f0ae605528e83078e8 Mon Sep 17 00:00:00 2001 From: Sourav Sohan Sahu Date: Wed, 6 Mar 2024 05:55:03 +0000 Subject: [PATCH 1/2] implementing timeouts for servers --- provider/cmd/app/app.go | 51 +++++++++++++++++++++------- provider/server/server_suite_test.go | 18 +++++++--- 2 files changed, 52 insertions(+), 17 deletions(-) diff --git a/provider/cmd/app/app.go b/provider/cmd/app/app.go index bf698aea..6ab28a74 100644 --- a/provider/cmd/app/app.go +++ b/provider/cmd/app/app.go @@ -66,9 +66,7 @@ func init() { } type Options struct { - Address string - StreamingAddress string - BaseURL string + BaseURL string RootDir string @@ -83,11 +81,29 @@ type Options struct { Libvirt LibvirtOptions NicPlugin *networkinterfaceplugin.Options + Servers ServersOptions GCVMGracefulShutdownTimeout time.Duration ResyncIntervalGarbageCollector time.Duration } +type HTTPServerOptions struct { + Addr string + ReadTimeout time.Duration + WriteTimeout time.Duration + IdleTimeout time.Duration +} + +type GRPCServerOptions struct { + Addr string + ConnectionTimeout time.Duration +} + +type ServersOptions struct { + Streaming HTTPServerOptions + GRPC GRPCServerOptions +} + type LibvirtOptions struct { Socket string Address string @@ -100,15 +116,20 @@ type LibvirtOptions struct { } func (o *Options) AddFlags(fs *pflag.FlagSet) { - fs.StringVar(&o.Address, "address", "/var/run/iri-machinebroker.sock", "Address to listen on.") - fs.StringVar(&o.RootDir, "libvirt-provider-dir", filepath.Join(homeDir, ".libvirt-provider"), "Path to the directory libvirt-provider manages its content at.") + // ServerOptions + fs.StringVar(&o.Servers.GRPC.Addr, "servers-grpc-address", "/var/run/iri-machinebroker.sock", "Address to listen on.") + fs.DurationVar(&o.Servers.GRPC.ConnectionTimeout, "servers-grpc-connectiontimeout", 3*time.Second, "Connection timeout for GRPC server.") + fs.StringVar(&o.Servers.Streaming.Addr, "servers-streaming-address", "127.0.0.1:20251", "Address to run the streaming server on") + fs.DurationVar(&o.Servers.Streaming.ReadTimeout, "servers-streaming-readtimeout", 200*time.Millisecond, "Read timeout for streaming server.") + fs.DurationVar(&o.Servers.Streaming.WriteTimeout, "servers-streaming-writetimeout", 200*time.Millisecond, "Write timeout for streaming server.") + fs.DurationVar(&o.Servers.Streaming.IdleTimeout, "server-streaming-idletimeout", 1*time.Second, "Idle timeout for connections to streaming server.") + fs.StringVar(&o.RootDir, "libvirt-provider-dir", filepath.Join(homeDir, ".libvirt-provider"), "Path to the directory libvirt-provider manages its content at.") fs.StringVar(&o.PathSupportedMachineClasses, "supported-machine-classes", o.PathSupportedMachineClasses, "File containing supported machine classes.") fs.DurationVar(&o.ResyncIntervalVolumeSize, "volume-size-resync-interval", 1*time.Minute, "Interval to determine volume size changes.") fs.StringVar(&o.ApinetKubeconfig, "apinet-kubeconfig", "", "Path to the kubeconfig file for the apinet-cluster.") - fs.StringVar(&o.StreamingAddress, "streaming-address", "127.0.0.1:20251", "Address to run the streaming server on") fs.StringVar(&o.BaseURL, "base-url", "", "The base url to construct urls for streaming from. If empty it will be "+ "constructed from the streaming-address") @@ -189,7 +210,7 @@ func Run(ctx context.Context, opts Options) error { if baseURL == "" { u := &url.URL{ Scheme: "http", - Host: opts.StreamingAddress, + Host: opts.Servers.Streaming.Addr, } baseURL = u.String() } @@ -398,7 +419,7 @@ func Run(ctx context.Context, opts Options) error { func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, srv *server.Server, opts Options) error { setupLog.V(1).Info("Cleaning up any previous socket") - if err := common.CleanupSocketIfExists(opts.Address); err != nil { + if err := common.CleanupSocketIfExists(opts.Servers.GRPC.Addr); err != nil { return fmt.Errorf("error cleaning up socket: %w", err) } @@ -407,11 +428,12 @@ func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, s commongrpc.InjectLogger(log.WithName("iri-server")), commongrpc.LogRequest, ), + grpc.ConnectionTimeout(opts.Servers.GRPC.ConnectionTimeout), ) iri.RegisterMachineRuntimeServer(grpcSrv, srv) - setupLog.V(1).Info("Start listening on unix socket", "Address", opts.Address) - l, err := net.Listen("unix", opts.Address) + setupLog.V(1).Info("Start listening on unix socket", "Address", opts.Servers.GRPC.Addr) + l, err := net.Listen("unix", opts.Servers.GRPC.Addr) if err != nil { return fmt.Errorf("failed to listen: %w", err) } @@ -435,8 +457,11 @@ func runStreamingServer(ctx context.Context, setupLog, log logr.Logger, srv *ser }) httpSrv := &http.Server{ - Addr: opts.StreamingAddress, - Handler: httpHandler, + Addr: opts.Servers.Streaming.Addr, + Handler: httpHandler, + ReadTimeout: opts.Servers.Streaming.ReadTimeout, + WriteTimeout: opts.Servers.Streaming.WriteTimeout, + IdleTimeout: opts.Servers.Streaming.IdleTimeout, } go func() { @@ -446,7 +471,7 @@ func runStreamingServer(ctx context.Context, setupLog, log logr.Logger, srv *ser setupLog.Info("Shut down streaming server") }() - setupLog.V(1).Info("Starting streaming server", "Address", opts.StreamingAddress) + setupLog.V(1).Info("Starting streaming server", "Address", opts.Servers.Streaming.Addr) if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { return fmt.Errorf("error listening / serving streaming server: %w", err) } diff --git a/provider/server/server_suite_test.go b/provider/server/server_suite_test.go index 89f91181..5ef5df7f 100644 --- a/provider/server/server_suite_test.go +++ b/provider/server/server_suite_test.go @@ -101,11 +101,9 @@ var _ = BeforeSuite(func() { Expect(os.Chmod(tempDir, 0730)).Should(Succeed()) opts := app.Options{ - Address: filepath.Join(tempDir, "test.sock"), BaseURL: baseURL, PathSupportedMachineClasses: machineClassesFile.Name(), RootDir: filepath.Join(tempDir, "libvirt-provider"), - StreamingAddress: streamingAddress, Libvirt: app.LibvirtOptions{ Socket: "/var/run/libvirt/libvirt-sock", URI: "qemu:///system", @@ -118,6 +116,18 @@ var _ = BeforeSuite(func() { ResyncIntervalGarbageCollector: resyncGarbageCollectorInterval, ResyncIntervalVolumeSize: resyncVolumeSizeInterval, GuestAgent: app.GuestAgentOption(api.GuestAgentNone), + Servers: app.ServersOptions{ + GRPC: app.GRPCServerOptions{ + Addr: filepath.Join(tempDir, "test.sock"), + ConnectionTimeout: 3 * time.Second, + }, + Streaming: app.HTTPServerOptions{ + Addr: streamingAddress, + ReadTimeout: 200 * time.Millisecond, + WriteTimeout: 200 * time.Millisecond, + IdleTimeout: 1 * time.Second, + }, + }, } srvCtx, cancel := context.WithCancel(context.Background()) @@ -129,10 +139,10 @@ var _ = BeforeSuite(func() { }() Eventually(func() error { - return isSocketAvailable(opts.Address) + return isSocketAvailable(opts.Servers.GRPC.Addr) }).WithTimeout(30 * time.Second).WithPolling(500 * time.Millisecond).Should(Succeed()) - address, err := machine.GetAddressWithTimeout(3*time.Second, fmt.Sprintf("unix://%s", opts.Address)) + address, err := machine.GetAddressWithTimeout(3*time.Second, fmt.Sprintf("unix://%s", opts.Servers.GRPC.Addr)) Expect(err).NotTo(HaveOccurred()) gconn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials())) From 0fbee504a780c65eeff9552fed2173c15609cb17 Mon Sep 17 00:00:00 2001 From: Sourav Sohan Sahu Date: Wed, 13 Mar 2024 12:24:08 +0000 Subject: [PATCH 2/2] adding streaming server graceful shutdown --- provider/cmd/app/app.go | 73 +++++++++++++++++++--------- provider/server/server_suite_test.go | 9 ++-- 2 files changed, 55 insertions(+), 27 deletions(-) diff --git a/provider/cmd/app/app.go b/provider/cmd/app/app.go index 6ab28a74..5a10bd1c 100644 --- a/provider/cmd/app/app.go +++ b/provider/cmd/app/app.go @@ -13,6 +13,7 @@ import ( "net/url" "os" "path/filepath" + "sync" "time" "github.com/go-logr/logr" @@ -88,10 +89,11 @@ type Options struct { } type HTTPServerOptions struct { - Addr string - ReadTimeout time.Duration - WriteTimeout time.Duration - IdleTimeout time.Duration + Addr string + ReadTimeout time.Duration + WriteTimeout time.Duration + IdleTimeout time.Duration + GracefulTimeout time.Duration } type GRPCServerOptions struct { @@ -119,10 +121,11 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { // ServerOptions fs.StringVar(&o.Servers.GRPC.Addr, "servers-grpc-address", "/var/run/iri-machinebroker.sock", "Address to listen on.") fs.DurationVar(&o.Servers.GRPC.ConnectionTimeout, "servers-grpc-connectiontimeout", 3*time.Second, "Connection timeout for GRPC server.") - fs.StringVar(&o.Servers.Streaming.Addr, "servers-streaming-address", "127.0.0.1:20251", "Address to run the streaming server on") + fs.StringVar(&o.Servers.Streaming.Addr, "servers-streaming-address", "127.0.0.1:20251", "Address at which the stream server will listen") fs.DurationVar(&o.Servers.Streaming.ReadTimeout, "servers-streaming-readtimeout", 200*time.Millisecond, "Read timeout for streaming server.") fs.DurationVar(&o.Servers.Streaming.WriteTimeout, "servers-streaming-writetimeout", 200*time.Millisecond, "Write timeout for streaming server.") - fs.DurationVar(&o.Servers.Streaming.IdleTimeout, "server-streaming-idletimeout", 1*time.Second, "Idle timeout for connections to streaming server.") + fs.DurationVar(&o.Servers.Streaming.IdleTimeout, "servers-streaming-idletimeout", 1*time.Second, "Idle timeout for connections to streaming server.") + fs.DurationVar(&o.Servers.Streaming.GracefulTimeout, "servers-streaming-gracefultimeout", 2*time.Second, "Graceful timeout to shutdown streaming server. Ideally set it little longer than idletimeout.") fs.StringVar(&o.RootDir, "libvirt-provider-dir", filepath.Join(homeDir, ".libvirt-provider"), "Path to the directory libvirt-provider manages its content at.") fs.StringVar(&o.PathSupportedMachineClasses, "supported-machine-classes", o.PathSupportedMachineClasses, "File containing supported machine classes.") @@ -398,7 +401,7 @@ func Run(ctx context.Context, opts Options) error { g.Go(func() error { setupLog.Info("Starting grpc server") - if err := runGRPCServer(ctx, setupLog, log, srv, opts); err != nil { + if err := runGRPCServer(ctx, setupLog, log, srv, opts.Servers.GRPC); err != nil { setupLog.Error(err, "failed to start grpc server") return err } @@ -407,7 +410,7 @@ func Run(ctx context.Context, opts Options) error { g.Go(func() error { setupLog.Info("Starting streaming server") - if err := runStreamingServer(ctx, setupLog, log, srv, opts); err != nil { + if err := runStreamingServer(ctx, setupLog, log, srv, opts.Servers.Streaming); err != nil { setupLog.Error(err, "failed to start streaming server") return err } @@ -417,9 +420,9 @@ func Run(ctx context.Context, opts Options) error { return g.Wait() } -func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, srv *server.Server, opts Options) error { +func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, srv *server.Server, opts GRPCServerOptions) error { setupLog.V(1).Info("Cleaning up any previous socket") - if err := common.CleanupSocketIfExists(opts.Servers.GRPC.Addr); err != nil { + if err := common.CleanupSocketIfExists(opts.Addr); err != nil { return fmt.Errorf("error cleaning up socket: %w", err) } @@ -428,52 +431,76 @@ func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, s commongrpc.InjectLogger(log.WithName("iri-server")), commongrpc.LogRequest, ), - grpc.ConnectionTimeout(opts.Servers.GRPC.ConnectionTimeout), + grpc.ConnectionTimeout(opts.ConnectionTimeout), ) iri.RegisterMachineRuntimeServer(grpcSrv, srv) - setupLog.V(1).Info("Start listening on unix socket", "Address", opts.Servers.GRPC.Addr) - l, err := net.Listen("unix", opts.Servers.GRPC.Addr) + setupLog.V(1).Info("Start listening on unix socket", "Address", opts.Addr) + l, err := net.Listen("unix", opts.Addr) if err != nil { return fmt.Errorf("failed to listen: %w", err) } - setupLog.Info("Starting grpc server", "Address", l.Addr().String()) + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() <-ctx.Done() setupLog.Info("Shutting down grpc server") grpcSrv.GracefulStop() - setupLog.Info("Shut down grpc server") + setupLog.Info("GRPC server is shutdown") }() + + setupLog.Info("Starting grpc server", "Address", l.Addr().String()) if err := grpcSrv.Serve(l); err != nil { return fmt.Errorf("error serving grpc: %w", err) } + + setupLog.Info("GRPC server stopped serving requests") + + wg.Wait() + return nil } -func runStreamingServer(ctx context.Context, setupLog, log logr.Logger, srv *server.Server, opts Options) error { +func runStreamingServer(ctx context.Context, setupLog, log logr.Logger, srv *server.Server, opts HTTPServerOptions) error { httpHandler := providerhttp.NewHandler(srv, providerhttp.HandlerOptions{ Log: log.WithName("streaming-server"), }) httpSrv := &http.Server{ - Addr: opts.Servers.Streaming.Addr, + Addr: opts.Addr, Handler: httpHandler, - ReadTimeout: opts.Servers.Streaming.ReadTimeout, - WriteTimeout: opts.Servers.Streaming.WriteTimeout, - IdleTimeout: opts.Servers.Streaming.IdleTimeout, + ReadTimeout: opts.ReadTimeout, + WriteTimeout: opts.WriteTimeout, + IdleTimeout: opts.IdleTimeout, } + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() <-ctx.Done() setupLog.Info("Shutting down streaming server") - _ = httpSrv.Close() - setupLog.Info("Shut down streaming server") + shutdownCtx, cancel := context.WithTimeout(context.Background(), opts.GracefulTimeout) + defer cancel() + + locErr := httpSrv.Shutdown(shutdownCtx) + if locErr != nil { + setupLog.Error(locErr, "streaming server wasn't shutdown properly") + } else { + setupLog.Info("Streaming server is shutdown") + } }() - setupLog.V(1).Info("Starting streaming server", "Address", opts.Servers.Streaming.Addr) + setupLog.V(1).Info("Starting streaming server", "Address", opts.Addr) if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { return fmt.Errorf("error listening / serving streaming server: %w", err) } + + setupLog.Info("Streaming server stopped serving requests") + + wg.Wait() + return nil } diff --git a/provider/server/server_suite_test.go b/provider/server/server_suite_test.go index 5ef5df7f..887572db 100644 --- a/provider/server/server_suite_test.go +++ b/provider/server/server_suite_test.go @@ -122,10 +122,11 @@ var _ = BeforeSuite(func() { ConnectionTimeout: 3 * time.Second, }, Streaming: app.HTTPServerOptions{ - Addr: streamingAddress, - ReadTimeout: 200 * time.Millisecond, - WriteTimeout: 200 * time.Millisecond, - IdleTimeout: 1 * time.Second, + Addr: streamingAddress, + ReadTimeout: 200 * time.Millisecond, + WriteTimeout: 200 * time.Millisecond, + IdleTimeout: 1 * time.Second, + GracefulTimeout: 2 * time.Second, }, }, }