Skip to content

Commit

Permalink
implementing timeouts for servers
Browse files Browse the repository at this point in the history
  • Loading branch information
so-sahu committed Mar 19, 2024
1 parent 89c914c commit 1027550
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 17 deletions.
51 changes: 38 additions & 13 deletions provider/cmd/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ func init() {
}

type Options struct {
Address string
StreamingAddress string
BaseURL string
BaseURL string

RootDir string

Expand All @@ -83,11 +81,29 @@ type Options struct {

Libvirt LibvirtOptions
NicPlugin *networkinterfaceplugin.Options
Servers ServersOptions

GCVMGracefulShutdownTimeout time.Duration
ResyncIntervalGarbageCollector time.Duration
}

type HTTPServerOptions struct {
Addr string
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
}

type GRPCServerOptions struct {
Addr string
ConnectionTimeout time.Duration
}

type ServersOptions struct {
Streaming HTTPServerOptions
GRPC GRPCServerOptions
}

type LibvirtOptions struct {
Socket string
Address string
Expand All @@ -100,15 +116,20 @@ type LibvirtOptions struct {
}

func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.Address, "address", "/var/run/iri-machinebroker.sock", "Address to listen on.")
fs.StringVar(&o.RootDir, "libvirt-provider-dir", filepath.Join(homeDir, ".libvirt-provider"), "Path to the directory libvirt-provider manages its content at.")
// ServerOptions
fs.StringVar(&o.Servers.GRPC.Addr, "servers-grpc-address", "/var/run/iri-machinebroker.sock", "Address to listen on.")
fs.DurationVar(&o.Servers.GRPC.ConnectionTimeout, "servers-grpc-connectiontimeout", 3*time.Second, "Connection timeout for GRPC server.")
fs.StringVar(&o.Servers.Streaming.Addr, "servers-streaming-address", "127.0.0.1:20251", "Address to run the streaming server on")
fs.DurationVar(&o.Servers.Streaming.ReadTimeout, "servers-streaming-readtimeout", 200*time.Millisecond, "Read timeout for streaming server.")
fs.DurationVar(&o.Servers.Streaming.WriteTimeout, "servers-streaming-writetimeout", 200*time.Millisecond, "Write timeout for streaming server.")
fs.DurationVar(&o.Servers.Streaming.IdleTimeout, "server-streaming-idletimeout", 1*time.Second, "Idle timeout for connections to streaming server.")

fs.StringVar(&o.RootDir, "libvirt-provider-dir", filepath.Join(homeDir, ".libvirt-provider"), "Path to the directory libvirt-provider manages its content at.")
fs.StringVar(&o.PathSupportedMachineClasses, "supported-machine-classes", o.PathSupportedMachineClasses, "File containing supported machine classes.")
fs.DurationVar(&o.ResyncIntervalVolumeSize, "volume-size-resync-interval", 1*time.Minute, "Interval to determine volume size changes.")

fs.StringVar(&o.ApinetKubeconfig, "apinet-kubeconfig", "", "Path to the kubeconfig file for the apinet-cluster.")

fs.StringVar(&o.StreamingAddress, "streaming-address", "127.0.0.1:20251", "Address to run the streaming server on")
fs.StringVar(&o.BaseURL, "base-url", "", "The base url to construct urls for streaming from. If empty it will be "+
"constructed from the streaming-address")

Expand Down Expand Up @@ -189,7 +210,7 @@ func Run(ctx context.Context, opts Options) error {
if baseURL == "" {
u := &url.URL{
Scheme: "http",
Host: opts.StreamingAddress,
Host: opts.Servers.Streaming.Addr,
}
baseURL = u.String()
}
Expand Down Expand Up @@ -398,7 +419,7 @@ func Run(ctx context.Context, opts Options) error {

func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, srv *server.Server, opts Options) error {
setupLog.V(1).Info("Cleaning up any previous socket")
if err := common.CleanupSocketIfExists(opts.Address); err != nil {
if err := common.CleanupSocketIfExists(opts.Servers.GRPC.Addr); err != nil {
return fmt.Errorf("error cleaning up socket: %w", err)
}

Expand All @@ -407,11 +428,12 @@ func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, s
commongrpc.InjectLogger(log.WithName("iri-server")),
commongrpc.LogRequest,
),
grpc.ConnectionTimeout(opts.Servers.GRPC.ConnectionTimeout),
)
iri.RegisterMachineRuntimeServer(grpcSrv, srv)

setupLog.V(1).Info("Start listening on unix socket", "Address", opts.Address)
l, err := net.Listen("unix", opts.Address)
setupLog.V(1).Info("Start listening on unix socket", "Address", opts.Servers.GRPC.Addr)
l, err := net.Listen("unix", opts.Servers.GRPC.Addr)
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}
Expand All @@ -435,8 +457,11 @@ func runStreamingServer(ctx context.Context, setupLog, log logr.Logger, srv *ser
})

httpSrv := &http.Server{
Addr: opts.StreamingAddress,
Handler: httpHandler,
Addr: opts.Servers.Streaming.Addr,
Handler: httpHandler,
ReadTimeout: opts.Servers.Streaming.ReadTimeout,
WriteTimeout: opts.Servers.Streaming.WriteTimeout,
IdleTimeout: opts.Servers.Streaming.IdleTimeout,
}

go func() {
Expand All @@ -446,7 +471,7 @@ func runStreamingServer(ctx context.Context, setupLog, log logr.Logger, srv *ser
setupLog.Info("Shut down streaming server")
}()

setupLog.V(1).Info("Starting streaming server", "Address", opts.StreamingAddress)
setupLog.V(1).Info("Starting streaming server", "Address", opts.Servers.Streaming.Addr)
if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("error listening / serving streaming server: %w", err)
}
Expand Down
18 changes: 14 additions & 4 deletions provider/server/server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,9 @@ var _ = BeforeSuite(func() {
Expect(os.Chmod(tempDir, 0730)).Should(Succeed())

opts := app.Options{
Address: filepath.Join(tempDir, "test.sock"),
BaseURL: baseURL,
PathSupportedMachineClasses: machineClassesFile.Name(),
RootDir: filepath.Join(tempDir, "libvirt-provider"),
StreamingAddress: streamingAddress,
Libvirt: app.LibvirtOptions{
Socket: "/var/run/libvirt/libvirt-sock",
URI: "qemu:///system",
Expand All @@ -118,6 +116,18 @@ var _ = BeforeSuite(func() {
ResyncIntervalGarbageCollector: resyncGarbageCollectorInterval,
ResyncIntervalVolumeSize: resyncVolumeSizeInterval,
GuestAgent: app.GuestAgentOption(api.GuestAgentNone),
Servers: app.ServersOptions{
GRPC: app.GRPCServerOptions{
Addr: filepath.Join(tempDir, "test.sock"),
ConnectionTimeout: 3 * time.Second,
},
Streaming: app.HTTPServerOptions{
Addr: streamingAddress,
ReadTimeout: 200 * time.Millisecond,
WriteTimeout: 200 * time.Millisecond,
IdleTimeout: 1 * time.Second,
},
},
}

srvCtx, cancel := context.WithCancel(context.Background())
Expand All @@ -129,10 +139,10 @@ var _ = BeforeSuite(func() {
}()

Eventually(func() error {
return isSocketAvailable(opts.Address)
return isSocketAvailable(opts.Servers.GRPC.Addr)
}).WithTimeout(30 * time.Second).WithPolling(500 * time.Millisecond).Should(Succeed())

address, err := machine.GetAddressWithTimeout(3*time.Second, fmt.Sprintf("unix://%s", opts.Address))
address, err := machine.GetAddressWithTimeout(3*time.Second, fmt.Sprintf("unix://%s", opts.Servers.GRPC.Addr))
Expect(err).NotTo(HaveOccurred())

gconn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down

0 comments on commit 1027550

Please sign in to comment.