From 74c402195a375de718ff6347327d01184f8a19cc Mon Sep 17 00:00:00 2001 From: Vitor Enes Date: Wed, 6 Apr 2022 16:09:59 +0100 Subject: [PATCH 1/3] Throw startup error if `TeleportReadyEvent` is not emitted (#11725) * Throw startup error if `TeleportReadyEvent` is not emitted Before this commit, the `TeleportReadyEvent` was only waited for when a process reload occurred. Thus, if a bug exists in the code that emits this event (as it's currently the case since the `MetricsReady` and `WindowsDesktopReady` events are never emitted), such a bug may go unnoticed for a while. This commit ensures that the `TeleportReadyEvent` is always waited for on startup, and throws an error if the event is not emitted (after some timeout). This commit also: - removes the `MetricsReady` event (as this is not produced by a component that sends heartbeats, which is the case of every other event required by the `TeleportReadyEvent` event mapping) - ensures that `WindowsDesktopReady` event is emitted - refactors some of the code in `lib/service/supervisor.go` - moves the event mapping registration to a new `registerTeleportReadyEvent` function --- integration/helpers.go | 15 +----- integration/integration_test.go | 11 ++++ lib/service/desktop.go | 4 ++ lib/service/kubernetes.go | 2 + lib/service/service.go | 90 ++++++++++++++++++++------------- lib/service/signals.go | 2 +- lib/service/supervisor.go | 45 +++++++---------- 7 files changed, 94 insertions(+), 75 deletions(-) diff --git a/integration/helpers.go b/integration/helpers.go index d21becf30b98a..eea278af97910 100644 --- a/integration/helpers.go +++ b/integration/helpers.go @@ -1103,26 +1103,15 @@ func (i *TeleInstance) Start() error { // Build a list of expected events to wait for before unblocking based off // the configuration passed in. expectedEvents := []string{} - if i.Config.Auth.Enabled { - expectedEvents = append(expectedEvents, service.AuthTLSReady) - } + // Always wait for TeleportReadyEvent. + expectedEvents = append(expectedEvents, service.TeleportReadyEvent) if i.Config.Proxy.Enabled { expectedEvents = append(expectedEvents, service.ProxyReverseTunnelReady) - expectedEvents = append(expectedEvents, service.ProxySSHReady) expectedEvents = append(expectedEvents, service.ProxyAgentPoolReady) if !i.Config.Proxy.DisableWebService { expectedEvents = append(expectedEvents, service.ProxyWebServerReady) } } - if i.Config.SSH.Enabled { - expectedEvents = append(expectedEvents, service.NodeSSHReady) - } - if i.Config.Apps.Enabled { - expectedEvents = append(expectedEvents, service.AppsReady) - } - if i.Config.Databases.Enabled { - expectedEvents = append(expectedEvents, service.DatabasesReady) - } // Start the process and block until the expected events have arrived. receivedEvents, err := startAndWait(i.Process, expectedEvents) diff --git a/integration/integration_test.go b/integration/integration_test.go index 1fe08ec18544c..a075b27fe501c 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -4359,6 +4359,17 @@ func waitForProcessStart(serviceC chan *service.TeleportProcess) (*service.Telep dumpGoroutineProfile() return nil, trace.BadParameter("timeout waiting for service to start") } + + eventC := make(chan service.Event, 1) + svc.WaitForEvent(context.TODO(), service.TeleportReadyEvent, eventC) + select { + case <-eventC: + + case <-time.After(20 * time.Second): + dumpGoroutineProfile() + return nil, trace.BadParameter("timeout waiting for service to broadcast ready status") + } + return svc, nil } diff --git a/lib/service/desktop.go b/lib/service/desktop.go index 8263c7d9f9fed..ff0c9ae40de1d 100644 --- a/lib/service/desktop.go +++ b/lib/service/desktop.go @@ -248,6 +248,10 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus. "Windows desktop service %s:%s is starting on %v.", teleport.Version, teleport.Gitref, listener.Addr()) } + + // since srv.Serve is a blocking call, we emit this event right before + // the service has started + process.BroadcastEvent(Event{Name: WindowsDesktopReady, Payload: nil}) err := srv.Serve(listener) if err != nil { if err == http.ErrServerClosed { diff --git a/lib/service/kubernetes.go b/lib/service/kubernetes.go index 70d4cca899d50..c5c4aea33c0c9 100644 --- a/lib/service/kubernetes.go +++ b/lib/service/kubernetes.go @@ -271,6 +271,8 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C "Kubernetes service %s:%s is starting on %v.", teleport.Version, teleport.Gitref, listener.Addr()) } + // since kubeServer.Serve is a blocking call, we emit this event right before + // the service has started process.BroadcastEvent(Event{Name: KubernetesReady, Payload: nil}) err := kubeServer.Serve(listener) if err != nil { diff --git a/lib/service/service.go b/lib/service/service.go index 6c66f8565016d..749f9fe4645d2 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -173,10 +173,6 @@ const ( // is ready to start accepting connections. DatabasesReady = "DatabasesReady" - // MetricsReady is generated when the Teleport metrics service is ready to - // start accepting connections. - MetricsReady = "MetricsReady" - // WindowsDesktopReady is generated when the Teleport windows desktop // service is ready to start accepting connections. WindowsDesktopReady = "WindowsDesktopReady" @@ -513,6 +509,20 @@ func Run(ctx context.Context, cfg Config, newTeleport NewProcess) error { if err := srv.Start(); err != nil { return trace.Wrap(err, "startup failed") } + + // Wait for the service to report that it has started. + startTimeoutCtx, startCancel := context.WithTimeout(ctx, signalPipeTimeout) + defer startCancel() + eventC := make(chan Event, 1) + srv.WaitForEvent(startTimeoutCtx, TeleportReadyEvent, eventC) + select { + case <-eventC: + cfg.Log.Infof("Service has started successfully.") + case <-startTimeoutCtx.Done(): + warnOnErr(srv.Close(), cfg.Log) + return trace.BadParameter("service has failed to start") + } + // Wait and reload until called exit. for { srv, err = waitAndReload(ctx, cfg, srv, newTeleport) @@ -761,36 +771,8 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) { cfg.Keygen = native.New(process.ExitContext(), native.PrecomputeKeys(precomputeCount)) } - // Produce global TeleportReadyEvent - // when all components have started - eventMapping := EventMapping{ - Out: TeleportReadyEvent, - } - if cfg.Auth.Enabled { - eventMapping.In = append(eventMapping.In, AuthTLSReady) - } - if cfg.SSH.Enabled { - eventMapping.In = append(eventMapping.In, NodeSSHReady) - } - if cfg.Proxy.Enabled { - eventMapping.In = append(eventMapping.In, ProxySSHReady) - } - if cfg.Kube.Enabled { - eventMapping.In = append(eventMapping.In, KubernetesReady) - } - if cfg.Apps.Enabled { - eventMapping.In = append(eventMapping.In, AppsReady) - } - if cfg.Databases.Enabled { - eventMapping.In = append(eventMapping.In, DatabasesReady) - } - if cfg.Metrics.Enabled { - eventMapping.In = append(eventMapping.In, MetricsReady) - } - if cfg.WindowsDesktop.Enabled { - eventMapping.In = append(eventMapping.In, WindowsDesktopReady) - } - process.RegisterEventMapping(eventMapping) + // Produce global TeleportReadyEvent when all components have started + process.registerTeleportReadyEvent(cfg) if cfg.Auth.Enabled { if err := process.initAuthService(); err != nil { @@ -1381,7 +1363,7 @@ func (process *TeleportProcess) initAuthService() error { utils.Consolef(cfg.Console, log, teleport.ComponentAuth, "Auth service %s:%s is starting on %v.", teleport.Version, teleport.Gitref, authAddr) - // since tlsServer.Serve is a blocking call, we emit this even right before + // since tlsServer.Serve is a blocking call, we emit this event right before // the service has started process.BroadcastEvent(Event{Name: AuthTLSReady, Payload: nil}) err := tlsServer.Serve() @@ -3464,6 +3446,44 @@ func (process *TeleportProcess) waitForAppDepend() { } } +// registerTeleportReadyEvent ensures that a TeleportReadyEvent is produced +// when all components have started. +func (process *TeleportProcess) registerTeleportReadyEvent(cfg *Config) { + eventMapping := EventMapping{ + Out: TeleportReadyEvent, + } + + if cfg.Auth.Enabled { + eventMapping.In = append(eventMapping.In, AuthTLSReady) + } + + if cfg.SSH.Enabled { + eventMapping.In = append(eventMapping.In, NodeSSHReady) + } + + if cfg.Proxy.Enabled { + eventMapping.In = append(eventMapping.In, ProxySSHReady) + } + + if cfg.Kube.Enabled { + eventMapping.In = append(eventMapping.In, KubernetesReady) + } + + if cfg.Apps.Enabled { + eventMapping.In = append(eventMapping.In, AppsReady) + } + + if cfg.Databases.Enabled { + eventMapping.In = append(eventMapping.In, DatabasesReady) + } + + if cfg.WindowsDesktop.Enabled { + eventMapping.In = append(eventMapping.In, WindowsDesktopReady) + } + + process.RegisterEventMapping(eventMapping) +} + // appDependEvents is a list of events that the application service depends on. var appDependEvents = []string{ AuthTLSReady, diff --git a/lib/service/signals.go b/lib/service/signals.go index 21fd8db5fe66e..de8799e029acd 100644 --- a/lib/service/signals.go +++ b/lib/service/signals.go @@ -153,7 +153,7 @@ func (process *TeleportProcess) WaitForSignals(ctx context.Context) error { } // ErrTeleportReloading is returned when signal waiter exits -// because the teleport process has initiaded shutdown +// because the teleport process has initiated shutdown var ErrTeleportReloading = &trace.CompareFailedError{Message: "teleport process is reloading"} // ErrTeleportExited means that teleport has exited diff --git a/lib/service/supervisor.go b/lib/service/supervisor.go index a059f80766bb7..f744c175491cf 100644 --- a/lib/service/supervisor.go +++ b/lib/service/supervisor.go @@ -55,7 +55,7 @@ type Supervisor interface { Wait() error // Run starts and waits for the service to complete - // it's a combinatioin Start() and Wait() + // it's a combination Start() and Wait() Run() error // Services returns list of running services @@ -362,33 +362,26 @@ func (s *LocalSupervisor) BroadcastEvent(event Event) { s.signalReload() } - s.events[event.Name] = event + sendEvent := func(e Event) { + select { + case s.eventsC <- e: + case <-s.closeContext.Done(): + } + } + s.events[event.Name] = event + go sendEvent(event) // Log all events other than recovered events to prevent the logs from // being flooded. if event.String() != TeleportOKEvent { s.log.WithField("event", event.String()).Debug("Broadcasting event.") } - go func() { - select { - case s.eventsC <- event: - case <-s.closeContext.Done(): - return - } - }() - for _, m := range s.eventMappings { if m.matches(event.Name, s.events) { mappedEvent := Event{Name: m.Out} s.events[mappedEvent.Name] = mappedEvent - go func(e Event) { - select { - case s.eventsC <- e: - case <-s.closeContext.Done(): - return - } - }(mappedEvent) + go sendEvent(mappedEvent) s.log.WithFields(logrus.Fields{ "in": event.String(), "out": m.String(), @@ -416,7 +409,7 @@ func (s *LocalSupervisor) WaitForEvent(ctx context.Context, name string, eventC waiter := &waiter{eventC: eventC, context: ctx} event, ok := s.events[name] if ok { - go s.notifyWaiter(waiter, event) + go waiter.notify(event) return } s.eventWaiters[name] = append(s.eventWaiters[name], waiter) @@ -432,20 +425,13 @@ func (s *LocalSupervisor) getWaiters(name string) []*waiter { return out } -func (s *LocalSupervisor) notifyWaiter(w *waiter, event Event) { - select { - case w.eventC <- event: - case <-w.context.Done(): - } -} - func (s *LocalSupervisor) fanOut() { for { select { case event := <-s.eventsC: waiters := s.getWaiters(event.Name) for _, waiter := range waiters { - go s.notifyWaiter(waiter, event) + go waiter.notify(event) } case <-s.closeContext.Done(): return @@ -458,6 +444,13 @@ type waiter struct { context context.Context } +func (w *waiter) notify(event Event) { + select { + case w.eventC <- event: + case <-w.context.Done(): + } +} + // Service is a running teleport service function type Service interface { // Serve starts the function From 01284597bf2517ba57ce5a0e50af71ac90940a33 Mon Sep 17 00:00:00 2001 From: Vitor Enes Date: Thu, 7 Apr 2022 12:28:31 +0100 Subject: [PATCH 2/3] Ensure stateOK is reported only when all components have sent updates (#11249) Fixes #11065. This commit: - ensures that `TeleportReadyEvent` is only produced when all components that send heartbeats (i.e. call [`process.onHeartbeat`](https://github.com/gravitational/teleport/blob/16bf416556f337b045b66dc9c3f5a3e16f8cc988/lib/service/service.go#L358-L366)) are ready - changes `TeleportProcess.registerTeleportReadyEvent` so that it returns a count of these components (let's call it `componentCount`) - uses `componentCount` to also ensure that `stateOK` is only reported when all the components have sent their heartbeat, thus fixing #11065 Since it seems difficult to know when `TeleportProcess.registerTeleportReadyEvent` should be updated, with the goal of quickly detecting a bug when it's introduced we have that: 1. if `componentCount` is lower than it should, then the service fails to start (due to #11725) 2. if `componentCount` is higher than it should, then an error is logged in function `processState.getStateLocked`. --- lib/service/service.go | 33 ++++++--- lib/service/service_test.go | 143 ++++++++++++++++++++++-------------- lib/service/state.go | 29 +++++--- lib/service/state_test.go | 31 +++++--- lib/srv/heartbeat.go | 2 +- 5 files changed, 149 insertions(+), 89 deletions(-) diff --git a/lib/service/service.go b/lib/service/service.go index 749f9fe4645d2..c092c3da05beb 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -158,6 +158,9 @@ const ( // and is ready to start accepting connections. ProxySSHReady = "ProxySSHReady" + // ProxyKubeReady is generated when the kubernetes proxy service has been initialized. + ProxyKubeReady = "ProxyKubeReady" + // NodeSSHReady is generated when the Teleport node has initialized a SSH server // and is ready to start accepting SSH connections. NodeSSHReady = "NodeReady" @@ -186,7 +189,7 @@ const ( // in a graceful way. TeleportReloadEvent = "TeleportReload" - // TeleportPhaseChangeEvent is generated to indidate that teleport + // TeleportPhaseChangeEvent is generated to indicate that teleport // CA rotation phase has been updated, used in tests TeleportPhaseChangeEvent = "TeleportPhaseChange" @@ -745,6 +748,9 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) { process.registerAppDepend() + // Produce global TeleportReadyEvent when all components have started + componentCount := process.registerTeleportReadyEvent(cfg) + process.log = cfg.Log.WithFields(logrus.Fields{ trace.Component: teleport.Component(teleport.ComponentProcess, process.id), }) @@ -752,7 +758,7 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) { serviceStarted := false if !cfg.DiagnosticAddr.IsEmpty() { - if err := process.initDiagnosticService(); err != nil { + if err := process.initDiagnosticService(componentCount); err != nil { return nil, trace.Wrap(err) } } else { @@ -771,9 +777,6 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) { cfg.Keygen = native.New(process.ExitContext(), native.PrecomputeKeys(precomputeCount)) } - // Produce global TeleportReadyEvent when all components have started - process.registerTeleportReadyEvent(cfg) - if cfg.Auth.Enabled { if err := process.initAuthService(); err != nil { return nil, trace.Wrap(err) @@ -2208,7 +2211,7 @@ func (process *TeleportProcess) initMetricsService() error { // initDiagnosticService starts diagnostic service currently serving healthz // and prometheus endpoints -func (process *TeleportProcess) initDiagnosticService() error { +func (process *TeleportProcess) initDiagnosticService(componentCount int) error { mux := http.NewServeMux() // support legacy metrics collection in the diagnostic service. @@ -2239,7 +2242,7 @@ func (process *TeleportProcess) initDiagnosticService() error { // Create a state machine that will process and update the internal state of // Teleport based off Events. Use this state machine to return return the // status from the /readyz endpoint. - ps, err := newProcessState(process) + ps, err := newProcessState(process, componentCount) if err != nil { return trace.Wrap(err) } @@ -3056,6 +3059,9 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { }) log.Infof("Starting Kube proxy on %v.", cfg.Proxy.Kube.ListenAddr.Addr) + // since kubeServer.Serve is a blocking call, we emit this event right before + // the service has started + process.BroadcastEvent(Event{Name: ProxyKubeReady, Payload: nil}) err := kubeServer.Serve(listeners.kube) if err != nil && err != http.ErrServerClosed { log.Warningf("Kube TLS server exited with error: %v.", err) @@ -3447,8 +3453,9 @@ func (process *TeleportProcess) waitForAppDepend() { } // registerTeleportReadyEvent ensures that a TeleportReadyEvent is produced -// when all components have started. -func (process *TeleportProcess) registerTeleportReadyEvent(cfg *Config) { +// when all components enabled (based on the configuration) have started. +// It returns the number of components enabled. +func (process *TeleportProcess) registerTeleportReadyEvent(cfg *Config) int { eventMapping := EventMapping{ Out: TeleportReadyEvent, } @@ -3461,9 +3468,13 @@ func (process *TeleportProcess) registerTeleportReadyEvent(cfg *Config) { eventMapping.In = append(eventMapping.In, NodeSSHReady) } - if cfg.Proxy.Enabled { + proxyConfig := cfg.Proxy + if proxyConfig.Enabled { eventMapping.In = append(eventMapping.In, ProxySSHReady) } + if proxyConfig.Kube.Enabled && !proxyConfig.Kube.ListenAddr.IsEmpty() && !proxyConfig.DisableReverseTunnel { + eventMapping.In = append(eventMapping.In, ProxyKubeReady) + } if cfg.Kube.Enabled { eventMapping.In = append(eventMapping.In, KubernetesReady) @@ -3481,7 +3492,9 @@ func (process *TeleportProcess) registerTeleportReadyEvent(cfg *Config) { eventMapping.In = append(eventMapping.In, WindowsDesktopReady) } + componentCount := len(eventMapping.In) process.RegisterEventMapping(eventMapping) + return componentCount } // appDependEvents is a list of events that the application service depends on. diff --git a/lib/service/service_test.go b/lib/service/service_test.go index 0210c8b68a0b8..be2ac0aac7669 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -18,7 +18,7 @@ package service import ( "context" "fmt" - "io/ioutil" + "net" "net/http" "os" "strings" @@ -45,6 +45,16 @@ import ( "github.com/stretchr/testify/require" ) +var ports utils.PortList + +func init() { + var err error + ports, err = utils.GetFreeTCPPorts(5, utils.PortStartingNumber) + if err != nil { + panic(fmt.Sprintf("failed to allocate tcp ports for tests: %v", err)) + } +} + func TestMain(m *testing.M) { utils.InitLoggerForTests() os.Exit(m.Run()) @@ -81,25 +91,29 @@ func TestServiceSelfSignedHTTPS(t *testing.T) { require.FileExists(t, cfg.Proxy.KeyPairs[0].PrivateKey) } -func TestMonitor(t *testing.T) { - t.Parallel() - fakeClock := clockwork.NewFakeClock() +type monitorTest struct { + desc string + event *Event + advanceClock time.Duration + wantStatus int +} +func testMonitor(t *testing.T, sshEnabled bool, tests []monitorTest) { + fakeClock := clockwork.NewFakeClock() cfg := MakeDefaultConfig() cfg.Clock = fakeClock var err error - cfg.DataDir, err = ioutil.TempDir("", "teleport") - require.NoError(t, err) - defer os.RemoveAll(cfg.DataDir) - cfg.DiagnosticAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"} - cfg.AuthServers = []utils.NetAddr{{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}} + cfg.DataDir = t.TempDir() + cfg.DiagnosticAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: net.JoinHostPort("127.0.0.1", ports.Pop())} cfg.Auth.Enabled = true - cfg.Auth.StorageConfig.Params["path"], err = ioutil.TempDir("", "teleport") - require.NoError(t, err) - defer os.RemoveAll(cfg.DataDir) - cfg.Auth.SSHAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"} + cfg.Auth.SSHAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: net.JoinHostPort("127.0.0.1", ports.Pop())} + cfg.AuthServers = []utils.NetAddr{cfg.Auth.SSHAddr} + cfg.Auth.StorageConfig.Params["path"] = t.TempDir() + if sshEnabled { + cfg.SSH.Enabled = true + cfg.SSH.Addr = utils.NetAddr{AddrNetwork: "tcp", Addr: net.JoinHostPort("127.0.0.1", ports.Pop())} + } cfg.Proxy.Enabled = false - cfg.SSH.Enabled = false process, err := NewTeleport(cfg) require.NoError(t, err) @@ -116,65 +130,84 @@ func TestMonitor(t *testing.T) { err = waitForStatus(endpoint, http.StatusOK) require.NoError(t, err) - tests := []struct { - desc string - event Event - advanceClock time.Duration - wantStatus []int - }{ + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + fakeClock.Advance(tt.advanceClock) + if tt.event != nil { + process.BroadcastEvent(*tt.event) + } + err := waitForStatus(endpoint, tt.wantStatus) + require.NoError(t, err) + }) + } +} + +func TestMonitorOneComponent(t *testing.T) { + t.Parallel() + sshEnabled := false + tests := []monitorTest{ + { + desc: "it starts with OK state", + event: nil, + wantStatus: http.StatusOK, + }, { desc: "degraded event causes degraded state", - event: Event{Name: TeleportDegradedEvent, Payload: teleport.ComponentAuth}, - wantStatus: []int{http.StatusServiceUnavailable, http.StatusBadRequest}, + event: &Event{Name: TeleportDegradedEvent, Payload: teleport.ComponentAuth}, + wantStatus: http.StatusServiceUnavailable, }, { desc: "ok event causes recovering state", - event: Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, - wantStatus: []int{http.StatusBadRequest}, + event: &Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, + wantStatus: http.StatusBadRequest, }, { desc: "ok event remains in recovering state because not enough time passed", - event: Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, - wantStatus: []int{http.StatusBadRequest}, + event: &Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, + wantStatus: http.StatusBadRequest, }, { desc: "ok event after enough time causes OK state", - event: Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, + event: &Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, advanceClock: defaults.HeartbeatCheckPeriod*2 + 1, - wantStatus: []int{http.StatusOK}, + wantStatus: http.StatusOK, }, + } + testMonitor(t, sshEnabled, tests) +} + +func TestMonitorTwoComponents(t *testing.T) { + t.Parallel() + sshEnabled := true + tests := []monitorTest{ { - desc: "degraded event in a new component causes degraded state", - event: Event{Name: TeleportDegradedEvent, Payload: teleport.ComponentNode}, - wantStatus: []int{http.StatusServiceUnavailable, http.StatusBadRequest}, + desc: "it starts with OK state", + event: nil, + wantStatus: http.StatusOK, }, { - desc: "ok event in one component keeps overall status degraded due to other component", - advanceClock: defaults.HeartbeatCheckPeriod*2 + 1, - event: Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, - wantStatus: []int{http.StatusServiceUnavailable, http.StatusBadRequest}, + desc: "degraded event in one component causes degraded state", + event: &Event{Name: TeleportDegradedEvent, Payload: teleport.ComponentNode}, + wantStatus: http.StatusServiceUnavailable, }, { - desc: "ok event in new component causes overall recovering state", - advanceClock: defaults.HeartbeatCheckPeriod*2 + 1, - event: Event{Name: TeleportOKEvent, Payload: teleport.ComponentNode}, - wantStatus: []int{http.StatusBadRequest}, + desc: "ok event in ok component keeps overall status degraded due to degraded component", + event: &Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, + wantStatus: http.StatusServiceUnavailable, }, { - desc: "ok event in new component causes overall OK state", + desc: "ok event in degraded component causes overall recovering state", + event: &Event{Name: TeleportOKEvent, Payload: teleport.ComponentNode}, + wantStatus: http.StatusBadRequest, + }, + { + desc: "ok event after enough time causes overall OK state", advanceClock: defaults.HeartbeatCheckPeriod*2 + 1, - event: Event{Name: TeleportOKEvent, Payload: teleport.ComponentNode}, - wantStatus: []int{http.StatusOK}, + event: &Event{Name: TeleportOKEvent, Payload: teleport.ComponentNode}, + wantStatus: http.StatusOK, }, } - for _, tt := range tests { - t.Run(tt.desc, func(t *testing.T) { - fakeClock.Advance(tt.advanceClock) - process.BroadcastEvent(tt.event) - err = waitForStatus(endpoint, tt.wantStatus...) - require.NoError(t, err) - }) - } + testMonitor(t, sshEnabled, tests) } // TestServiceCheckPrincipals checks certificates regeneration only requests @@ -456,7 +489,7 @@ func TestDesktopAccessFIPS(t *testing.T) { require.Error(t, err) } -func waitForStatus(diagAddr string, statusCodes ...int) error { +func waitForStatus(diagAddr string, statusCode int) error { tickCh := time.Tick(100 * time.Millisecond) timeoutCh := time.After(10 * time.Second) var lastStatus int @@ -469,13 +502,11 @@ func waitForStatus(diagAddr string, statusCodes ...int) error { } resp.Body.Close() lastStatus = resp.StatusCode - for _, statusCode := range statusCodes { - if resp.StatusCode == statusCode { - return nil - } + if resp.StatusCode == statusCode { + return nil } case <-timeoutCh: - return trace.BadParameter("timeout waiting for status: %v; last status: %v", statusCodes, lastStatus) + return trace.BadParameter("timeout waiting for status: %v; last status: %v", statusCode, lastStatus) } } } diff --git a/lib/service/state.go b/lib/service/state.go index bc6e18685a87b..d42cf3e730dae 100644 --- a/lib/service/state.go +++ b/lib/service/state.go @@ -57,9 +57,10 @@ func init() { // processState tracks the state of the Teleport process. type processState struct { - process *TeleportProcess - mu sync.Mutex - states map[string]*componentState + process *TeleportProcess + mu sync.Mutex + states map[string]*componentState + totalComponentCount int // number of components that will send updates } type componentState struct { @@ -68,15 +69,16 @@ type componentState struct { } // newProcessState returns a new FSM that tracks the state of the Teleport process. -func newProcessState(process *TeleportProcess) (*processState, error) { +func newProcessState(process *TeleportProcess, componentCount int) (*processState, error) { err := utils.RegisterPrometheusCollectors(stateGauge) if err != nil { return nil, trace.Wrap(err) } return &processState{ - process: process, - states: make(map[string]*componentState), + process: process, + states: make(map[string]*componentState), + totalComponentCount: componentCount, }, nil } @@ -127,7 +129,7 @@ func (f *processState) update(event Event) { } // getStateLocked returns the overall process state based on the state of -// individual components. If no components sent updates yet, returns +// individual components. If not all components have sent updates yet, returns // stateStarting. // // Order of importance: @@ -138,8 +140,13 @@ func (f *processState) update(event Event) { // // Note: f.mu must be locked by the caller! func (f *processState) getStateLocked() componentStateEnum { + // Return stateStarting if not all components have sent updates yet. + if len(f.states) < f.totalComponentCount { + return stateStarting + } + state := stateStarting - numNotOK := len(f.states) + numOK := 0 for _, s := range f.states { switch s.state { case stateDegraded: @@ -147,12 +154,14 @@ func (f *processState) getStateLocked() componentStateEnum { case stateRecovering: state = stateRecovering case stateOK: - numNotOK-- + numOK++ } } // Only return stateOK if *all* components are in stateOK. - if numNotOK == 0 && len(f.states) > 0 { + if numOK == f.totalComponentCount { state = stateOK + } else if numOK > f.totalComponentCount { + f.process.log.Errorf("incorrect count of components (found: %d; expected: %d), this is a bug!", numOK, f.totalComponentCount) } return state } diff --git a/lib/service/state_test.go b/lib/service/state_test.go index 078a8354f0b54..32bbf2511b1cd 100644 --- a/lib/service/state_test.go +++ b/lib/service/state_test.go @@ -24,21 +24,24 @@ func TestProcessStateGetState(t *testing.T) { t.Parallel() tests := []struct { - desc string - states map[string]*componentState - want componentStateEnum + desc string + states map[string]*componentState + totalComponentCount int + want componentStateEnum }{ { - desc: "no components", - states: map[string]*componentState{}, - want: stateStarting, + desc: "no components", + states: map[string]*componentState{}, + totalComponentCount: 1, + want: stateStarting, }, { desc: "one component in stateOK", states: map[string]*componentState{ "one": {state: stateOK}, }, - want: stateOK, + totalComponentCount: 1, + want: stateOK, }, { desc: "multiple components in stateOK", @@ -47,7 +50,8 @@ func TestProcessStateGetState(t *testing.T) { "two": {state: stateOK}, "three": {state: stateOK}, }, - want: stateOK, + totalComponentCount: 3, + want: stateOK, }, { desc: "multiple components, one is degraded", @@ -56,7 +60,8 @@ func TestProcessStateGetState(t *testing.T) { "two": {state: stateDegraded}, "three": {state: stateOK}, }, - want: stateDegraded, + totalComponentCount: 3, + want: stateDegraded, }, { desc: "multiple components, one is recovering", @@ -65,7 +70,8 @@ func TestProcessStateGetState(t *testing.T) { "two": {state: stateRecovering}, "three": {state: stateOK}, }, - want: stateRecovering, + totalComponentCount: 3, + want: stateRecovering, }, { desc: "multiple components, one is starting", @@ -74,13 +80,14 @@ func TestProcessStateGetState(t *testing.T) { "two": {state: stateStarting}, "three": {state: stateOK}, }, - want: stateStarting, + totalComponentCount: 3, + want: stateStarting, }, } for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { - ps := &processState{states: tt.states} + ps := &processState{states: tt.states, totalComponentCount: tt.totalComponentCount} got := ps.getState() require.Equal(t, got, tt.want) }) diff --git a/lib/srv/heartbeat.go b/lib/srv/heartbeat.go index be87e502ba8ff..97936f526a6e2 100644 --- a/lib/srv/heartbeat.go +++ b/lib/srv/heartbeat.go @@ -152,7 +152,7 @@ func NewHeartbeat(cfg HeartbeatConfig) (*Heartbeat, error) { announceC: make(chan struct{}, 1), sendC: make(chan struct{}, 1), } - h.Debugf("Starting %v heartbeat with announce period: %v, keep-alive period %v, poll period: %v", cfg.Mode, cfg.KeepAlivePeriod, cfg.AnnouncePeriod, cfg.CheckPeriod) + h.Debugf("Starting %v heartbeat with announce period: %v, keep-alive period %v, poll period: %v", cfg.Mode, cfg.AnnouncePeriod, cfg.KeepAlivePeriod, cfg.CheckPeriod) return h, nil } From afb7277dbc53b6127a984487cc06ea1cd38a6cc2 Mon Sep 17 00:00:00 2001 From: Vitor Enes Date: Thu, 7 Apr 2022 17:49:17 +0100 Subject: [PATCH 3/3] Make `PortList.Pop()` thread-safe (#11799) * Make `PortList.Pop()` thread-safe * Store a `[]int` instead of `[]string` in `PortList` --- lib/utils/utils.go | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/lib/utils/utils.go b/lib/utils/utils.go index f9a53095b3eb3..6f8a49009d905 100644 --- a/lib/utils/utils.go +++ b/lib/utils/utils.go @@ -388,26 +388,29 @@ func OpaqueAccessDenied(err error) error { } // PortList is a list of TCP port -type PortList []string +type PortList struct { + ports []int + mu sync.Mutex +} // Pop returns a value from the list, it panics if the value is not there func (p *PortList) Pop() string { - if len(*p) == 0 { - panic("list is empty") - } - val := (*p)[len(*p)-1] - *p = (*p)[:len(*p)-1] - return val + return strconv.Itoa(p.PopInt()) } // PopInt returns a value from the list, it panics if not enough values // were allocated func (p *PortList) PopInt() int { - i, err := strconv.Atoi(p.Pop()) - if err != nil { - panic(err) + p.mu.Lock() + defer p.mu.Unlock() + + l := len(p.ports) + if l == 0 { + panic("list is empty") } - return i + val := p.ports[l-1] + p.ports = p.ports[:l-1] + return val } // PopIntSlice returns a slice of values from the list, it panics if not enough @@ -425,15 +428,15 @@ const PortStartingNumber = 20000 // GetFreeTCPPorts returns n ports starting from port 20000. func GetFreeTCPPorts(n int, offset ...int) (PortList, error) { - list := make(PortList, 0, n) + list := make([]int, 0, n) start := PortStartingNumber if len(offset) != 0 { start = offset[0] } for i := start; i < start+n; i++ { - list = append(list, strconv.Itoa(i)) + list = append(list, i) } - return list, nil + return PortList{ports: list}, nil } // ReadHostUUID reads host UUID from the file in the data dir