From 0588b69a62705fcd3c9367f1000eee77a5f7a336 Mon Sep 17 00:00:00 2001 From: Roman Tkachenko Date: Tue, 26 Apr 2022 12:04:19 -0700 Subject: [PATCH] Revert "Backport #11725 #11249 #11799 to branch/v8 (#11794)" This reverts commit 48eedcd69e3cf86037ba90f5180b167f666cc574. --- integration/helpers.go | 15 +++- integration/integration_test.go | 11 --- lib/service/desktop.go | 4 - lib/service/kubernetes.go | 2 - lib/service/service.go | 113 +++++++++---------------- lib/service/service_test.go | 143 +++++++++++++------------------- lib/service/signals.go | 2 +- lib/service/state.go | 29 +++---- lib/service/state_test.go | 31 +++---- lib/service/supervisor.go | 45 +++++----- lib/srv/heartbeat.go | 2 +- lib/utils/utils.go | 31 ++++--- 12 files changed, 173 insertions(+), 255 deletions(-) diff --git a/integration/helpers.go b/integration/helpers.go index ded20b3ac668d..9af6c95c0e5f0 100644 --- a/integration/helpers.go +++ b/integration/helpers.go @@ -1100,15 +1100,26 @@ func (i *TeleInstance) Start() error { // Build a list of expected events to wait for before unblocking based off // the configuration passed in. expectedEvents := []string{} - // Always wait for TeleportReadyEvent. - expectedEvents = append(expectedEvents, service.TeleportReadyEvent) + if i.Config.Auth.Enabled { + expectedEvents = append(expectedEvents, service.AuthTLSReady) + } if i.Config.Proxy.Enabled { expectedEvents = append(expectedEvents, service.ProxyReverseTunnelReady) + expectedEvents = append(expectedEvents, service.ProxySSHReady) expectedEvents = append(expectedEvents, service.ProxyAgentPoolReady) if !i.Config.Proxy.DisableWebService { expectedEvents = append(expectedEvents, service.ProxyWebServerReady) } } + if i.Config.SSH.Enabled { + expectedEvents = append(expectedEvents, service.NodeSSHReady) + } + if i.Config.Apps.Enabled { + expectedEvents = append(expectedEvents, service.AppsReady) + } + if i.Config.Databases.Enabled { + expectedEvents = append(expectedEvents, service.DatabasesReady) + } // Start the process and block until the expected events have arrived. receivedEvents, err := startAndWait(i.Process, expectedEvents) diff --git a/integration/integration_test.go b/integration/integration_test.go index f3f267b008eb9..6bd904d7c7469 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -4358,17 +4358,6 @@ func waitForProcessStart(serviceC chan *service.TeleportProcess) (*service.Telep dumpGoroutineProfile() return nil, trace.BadParameter("timeout waiting for service to start") } - - eventC := make(chan service.Event, 1) - svc.WaitForEvent(context.TODO(), service.TeleportReadyEvent, eventC) - select { - case <-eventC: - - case <-time.After(20 * time.Second): - dumpGoroutineProfile() - return nil, trace.BadParameter("timeout waiting for service to broadcast ready status") - } - return svc, nil } diff --git a/lib/service/desktop.go b/lib/service/desktop.go index b4e1c503cf1fa..5b3be1ac3528a 100644 --- a/lib/service/desktop.go +++ b/lib/service/desktop.go @@ -246,10 +246,6 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus. "Windows desktop service %s:%s is starting on %v.", teleport.Version, teleport.Gitref, listener.Addr()) } - - // since srv.Serve is a blocking call, we emit this event right before - // the service has started - process.BroadcastEvent(Event{Name: WindowsDesktopReady, Payload: nil}) err := srv.Serve(listener) if err != nil { if err == http.ErrServerClosed { diff --git a/lib/service/kubernetes.go b/lib/service/kubernetes.go index c5c4aea33c0c9..70d4cca899d50 100644 --- a/lib/service/kubernetes.go +++ b/lib/service/kubernetes.go @@ -271,8 +271,6 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C "Kubernetes service %s:%s is starting on %v.", teleport.Version, teleport.Gitref, listener.Addr()) } - // since kubeServer.Serve is a blocking call, we emit this event right before - // the service has started process.BroadcastEvent(Event{Name: KubernetesReady, Payload: nil}) err := kubeServer.Serve(listener) if err != nil { diff --git a/lib/service/service.go b/lib/service/service.go index d9b8441ba8583..2d87714ded8fe 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -155,9 +155,6 @@ const ( // and is ready to start accepting connections. ProxySSHReady = "ProxySSHReady" - // ProxyKubeReady is generated when the kubernetes proxy service has been initialized. - ProxyKubeReady = "ProxyKubeReady" - // NodeSSHReady is generated when the Teleport node has initialized a SSH server // and is ready to start accepting SSH connections. NodeSSHReady = "NodeReady" @@ -173,6 +170,10 @@ const ( // is ready to start accepting connections. DatabasesReady = "DatabasesReady" + // MetricsReady is generated when the Teleport metrics service is ready to + // start accepting connections. + MetricsReady = "MetricsReady" + // WindowsDesktopReady is generated when the Teleport windows desktop // service is ready to start accepting connections. WindowsDesktopReady = "WindowsDesktopReady" @@ -186,7 +187,7 @@ const ( // in a graceful way. TeleportReloadEvent = "TeleportReload" - // TeleportPhaseChangeEvent is generated to indicate that teleport + // TeleportPhaseChangeEvent is generated to indidate that teleport // CA rotation phase has been updated, used in tests TeleportPhaseChangeEvent = "TeleportPhaseChange" @@ -509,20 +510,6 @@ func Run(ctx context.Context, cfg Config, newTeleport NewProcess) error { if err := srv.Start(); err != nil { return trace.Wrap(err, "startup failed") } - - // Wait for the service to report that it has started. - startTimeoutCtx, startCancel := context.WithTimeout(ctx, signalPipeTimeout) - defer startCancel() - eventC := make(chan Event, 1) - srv.WaitForEvent(startTimeoutCtx, TeleportReadyEvent, eventC) - select { - case <-eventC: - cfg.Log.Infof("Service has started successfully.") - case <-startTimeoutCtx.Done(): - warnOnErr(srv.Close(), cfg.Log) - return trace.BadParameter("service has failed to start") - } - // Wait and reload until called exit. for { srv, err = waitAndReload(ctx, cfg, srv, newTeleport) @@ -746,9 +733,6 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) { process.registerAppDepend() - // Produce global TeleportReadyEvent when all components have started - componentCount := process.registerTeleportReadyEvent(cfg) - process.log = cfg.Log.WithFields(logrus.Fields{ trace.Component: teleport.Component(teleport.ComponentProcess, process.id), }) @@ -756,7 +740,7 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) { serviceStarted := false if !cfg.DiagnosticAddr.IsEmpty() { - if err := process.initDiagnosticService(componentCount); err != nil { + if err := process.initDiagnosticService(); err != nil { return nil, trace.Wrap(err) } } else { @@ -775,6 +759,37 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) { cfg.Keygen = native.New(process.ExitContext(), native.PrecomputeKeys(precomputeCount)) } + // Produce global TeleportReadyEvent + // when all components have started + eventMapping := EventMapping{ + Out: TeleportReadyEvent, + } + if cfg.Auth.Enabled { + eventMapping.In = append(eventMapping.In, AuthTLSReady) + } + if cfg.SSH.Enabled { + eventMapping.In = append(eventMapping.In, NodeSSHReady) + } + if cfg.Proxy.Enabled { + eventMapping.In = append(eventMapping.In, ProxySSHReady) + } + if cfg.Kube.Enabled { + eventMapping.In = append(eventMapping.In, KubernetesReady) + } + if cfg.Apps.Enabled { + eventMapping.In = append(eventMapping.In, AppsReady) + } + if cfg.Databases.Enabled { + eventMapping.In = append(eventMapping.In, DatabasesReady) + } + if cfg.Metrics.Enabled { + eventMapping.In = append(eventMapping.In, MetricsReady) + } + if cfg.WindowsDesktop.Enabled { + eventMapping.In = append(eventMapping.In, WindowsDesktopReady) + } + process.RegisterEventMapping(eventMapping) + if cfg.Auth.Enabled { if err := process.initAuthService(); err != nil { return nil, trace.Wrap(err) @@ -1368,7 +1383,7 @@ func (process *TeleportProcess) initAuthService() error { utils.Consolef(cfg.Console, log, teleport.ComponentAuth, "Auth service %s:%s is starting on %v.", teleport.Version, teleport.Gitref, authAddr) - // since tlsServer.Serve is a blocking call, we emit this event right before + // since tlsServer.Serve is a blocking call, we emit this even right before // the service has started process.BroadcastEvent(Event{Name: AuthTLSReady, Payload: nil}) err := tlsServer.Serve() @@ -2210,7 +2225,7 @@ func (process *TeleportProcess) initMetricsService() error { // initDiagnosticService starts diagnostic service currently serving healthz // and prometheus endpoints -func (process *TeleportProcess) initDiagnosticService(componentCount int) error { +func (process *TeleportProcess) initDiagnosticService() error { mux := http.NewServeMux() // support legacy metrics collection in the diagnostic service. @@ -2241,7 +2256,7 @@ func (process *TeleportProcess) initDiagnosticService(componentCount int) error // Create a state machine that will process and update the internal state of // Teleport based off Events. Use this state machine to return return the // status from the /readyz endpoint. - ps, err := newProcessState(process, componentCount) + ps, err := newProcessState(process) if err != nil { return trace.Wrap(err) } @@ -3062,9 +3077,6 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { }) log.Infof("Starting Kube proxy on %v.", cfg.Proxy.Kube.ListenAddr.Addr) - // since kubeServer.Serve is a blocking call, we emit this event right before - // the service has started - process.BroadcastEvent(Event{Name: ProxyKubeReady, Payload: nil}) err := kubeServer.Serve(listeners.kube) if err != nil && err != http.ErrServerClosed { log.Warningf("Kube TLS server exited with error: %v.", err) @@ -3447,51 +3459,6 @@ func (process *TeleportProcess) waitForAppDepend() { } } -// registerTeleportReadyEvent ensures that a TeleportReadyEvent is produced -// when all components enabled (based on the configuration) have started. -// It returns the number of components enabled. -func (process *TeleportProcess) registerTeleportReadyEvent(cfg *Config) int { - eventMapping := EventMapping{ - Out: TeleportReadyEvent, - } - - if cfg.Auth.Enabled { - eventMapping.In = append(eventMapping.In, AuthTLSReady) - } - - if cfg.SSH.Enabled { - eventMapping.In = append(eventMapping.In, NodeSSHReady) - } - - proxyConfig := cfg.Proxy - if proxyConfig.Enabled { - eventMapping.In = append(eventMapping.In, ProxySSHReady) - } - if proxyConfig.Kube.Enabled && !proxyConfig.Kube.ListenAddr.IsEmpty() && !proxyConfig.DisableReverseTunnel { - eventMapping.In = append(eventMapping.In, ProxyKubeReady) - } - - if cfg.Kube.Enabled { - eventMapping.In = append(eventMapping.In, KubernetesReady) - } - - if cfg.Apps.Enabled { - eventMapping.In = append(eventMapping.In, AppsReady) - } - - if cfg.Databases.Enabled { - eventMapping.In = append(eventMapping.In, DatabasesReady) - } - - if cfg.WindowsDesktop.Enabled { - eventMapping.In = append(eventMapping.In, WindowsDesktopReady) - } - - componentCount := len(eventMapping.In) - process.RegisterEventMapping(eventMapping) - return componentCount -} - // appDependEvents is a list of events that the application service depends on. var appDependEvents = []string{ AuthTLSReady, diff --git a/lib/service/service_test.go b/lib/service/service_test.go index b6b3664b87a3a..8778cb0da1c97 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -18,7 +18,7 @@ package service import ( "context" "fmt" - "net" + "io/ioutil" "net/http" "os" "strings" @@ -45,16 +45,6 @@ import ( "github.com/stretchr/testify/require" ) -var ports utils.PortList - -func init() { - var err error - ports, err = utils.GetFreeTCPPorts(5, utils.PortStartingNumber) - if err != nil { - panic(fmt.Sprintf("failed to allocate tcp ports for tests: %v", err)) - } -} - func TestMain(m *testing.M) { utils.InitLoggerForTests() os.Exit(m.Run()) @@ -91,29 +81,25 @@ func TestServiceSelfSignedHTTPS(t *testing.T) { require.FileExists(t, cfg.Proxy.KeyPairs[0].PrivateKey) } -type monitorTest struct { - desc string - event *Event - advanceClock time.Duration - wantStatus int -} - -func testMonitor(t *testing.T, sshEnabled bool, tests []monitorTest) { +func TestMonitor(t *testing.T) { + t.Parallel() fakeClock := clockwork.NewFakeClock() + cfg := MakeDefaultConfig() cfg.Clock = fakeClock var err error - cfg.DataDir = t.TempDir() - cfg.DiagnosticAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: net.JoinHostPort("127.0.0.1", ports.Pop())} + cfg.DataDir, err = ioutil.TempDir("", "teleport") + require.NoError(t, err) + defer os.RemoveAll(cfg.DataDir) + cfg.DiagnosticAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"} + cfg.AuthServers = []utils.NetAddr{{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}} cfg.Auth.Enabled = true - cfg.Auth.SSHAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: net.JoinHostPort("127.0.0.1", ports.Pop())} - cfg.AuthServers = []utils.NetAddr{cfg.Auth.SSHAddr} - cfg.Auth.StorageConfig.Params["path"] = t.TempDir() - if sshEnabled { - cfg.SSH.Enabled = true - cfg.SSH.Addr = utils.NetAddr{AddrNetwork: "tcp", Addr: net.JoinHostPort("127.0.0.1", ports.Pop())} - } + cfg.Auth.StorageConfig.Params["path"], err = ioutil.TempDir("", "teleport") + require.NoError(t, err) + defer os.RemoveAll(cfg.DataDir) + cfg.Auth.SSHAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"} cfg.Proxy.Enabled = false + cfg.SSH.Enabled = false process, err := NewTeleport(cfg) require.NoError(t, err) @@ -130,84 +116,65 @@ func testMonitor(t *testing.T, sshEnabled bool, tests []monitorTest) { err = waitForStatus(endpoint, http.StatusOK) require.NoError(t, err) - for _, tt := range tests { - t.Run(tt.desc, func(t *testing.T) { - fakeClock.Advance(tt.advanceClock) - if tt.event != nil { - process.BroadcastEvent(*tt.event) - } - err := waitForStatus(endpoint, tt.wantStatus) - require.NoError(t, err) - }) - } -} - -func TestMonitorOneComponent(t *testing.T) { - t.Parallel() - sshEnabled := false - tests := []monitorTest{ - { - desc: "it starts with OK state", - event: nil, - wantStatus: http.StatusOK, - }, + tests := []struct { + desc string + event Event + advanceClock time.Duration + wantStatus []int + }{ { desc: "degraded event causes degraded state", - event: &Event{Name: TeleportDegradedEvent, Payload: teleport.ComponentAuth}, - wantStatus: http.StatusServiceUnavailable, + event: Event{Name: TeleportDegradedEvent, Payload: teleport.ComponentAuth}, + wantStatus: []int{http.StatusServiceUnavailable, http.StatusBadRequest}, }, { desc: "ok event causes recovering state", - event: &Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, - wantStatus: http.StatusBadRequest, + event: Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, + wantStatus: []int{http.StatusBadRequest}, }, { desc: "ok event remains in recovering state because not enough time passed", - event: &Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, - wantStatus: http.StatusBadRequest, + event: Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, + wantStatus: []int{http.StatusBadRequest}, }, { desc: "ok event after enough time causes OK state", - event: &Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, + event: Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, advanceClock: defaults.HeartbeatCheckPeriod*2 + 1, - wantStatus: http.StatusOK, + wantStatus: []int{http.StatusOK}, }, - } - testMonitor(t, sshEnabled, tests) -} - -func TestMonitorTwoComponents(t *testing.T) { - t.Parallel() - sshEnabled := true - tests := []monitorTest{ { - desc: "it starts with OK state", - event: nil, - wantStatus: http.StatusOK, + desc: "degraded event in a new component causes degraded state", + event: Event{Name: TeleportDegradedEvent, Payload: teleport.ComponentNode}, + wantStatus: []int{http.StatusServiceUnavailable, http.StatusBadRequest}, }, { - desc: "degraded event in one component causes degraded state", - event: &Event{Name: TeleportDegradedEvent, Payload: teleport.ComponentNode}, - wantStatus: http.StatusServiceUnavailable, - }, - { - desc: "ok event in ok component keeps overall status degraded due to degraded component", - event: &Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, - wantStatus: http.StatusServiceUnavailable, + desc: "ok event in one component keeps overall status degraded due to other component", + advanceClock: defaults.HeartbeatCheckPeriod*2 + 1, + event: Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}, + wantStatus: []int{http.StatusServiceUnavailable, http.StatusBadRequest}, }, { - desc: "ok event in degraded component causes overall recovering state", - event: &Event{Name: TeleportOKEvent, Payload: teleport.ComponentNode}, - wantStatus: http.StatusBadRequest, + desc: "ok event in new component causes overall recovering state", + advanceClock: defaults.HeartbeatCheckPeriod*2 + 1, + event: Event{Name: TeleportOKEvent, Payload: teleport.ComponentNode}, + wantStatus: []int{http.StatusBadRequest}, }, { - desc: "ok event after enough time causes overall OK state", + desc: "ok event in new component causes overall OK state", advanceClock: defaults.HeartbeatCheckPeriod*2 + 1, - event: &Event{Name: TeleportOKEvent, Payload: teleport.ComponentNode}, - wantStatus: http.StatusOK, + event: Event{Name: TeleportOKEvent, Payload: teleport.ComponentNode}, + wantStatus: []int{http.StatusOK}, }, } - testMonitor(t, sshEnabled, tests) + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + fakeClock.Advance(tt.advanceClock) + process.BroadcastEvent(tt.event) + err = waitForStatus(endpoint, tt.wantStatus...) + require.NoError(t, err) + }) + } } // TestServiceCheckPrincipals checks certificates regeneration only requests @@ -490,7 +457,7 @@ func TestDesktopAccessFIPS(t *testing.T) { require.Error(t, err) } -func waitForStatus(diagAddr string, statusCode int) error { +func waitForStatus(diagAddr string, statusCodes ...int) error { tickCh := time.Tick(100 * time.Millisecond) timeoutCh := time.After(10 * time.Second) var lastStatus int @@ -503,11 +470,13 @@ func waitForStatus(diagAddr string, statusCode int) error { } resp.Body.Close() lastStatus = resp.StatusCode - if resp.StatusCode == statusCode { - return nil + for _, statusCode := range statusCodes { + if resp.StatusCode == statusCode { + return nil + } } case <-timeoutCh: - return trace.BadParameter("timeout waiting for status: %v; last status: %v", statusCode, lastStatus) + return trace.BadParameter("timeout waiting for status: %v; last status: %v", statusCodes, lastStatus) } } } diff --git a/lib/service/signals.go b/lib/service/signals.go index de8799e029acd..21fd8db5fe66e 100644 --- a/lib/service/signals.go +++ b/lib/service/signals.go @@ -153,7 +153,7 @@ func (process *TeleportProcess) WaitForSignals(ctx context.Context) error { } // ErrTeleportReloading is returned when signal waiter exits -// because the teleport process has initiated shutdown +// because the teleport process has initiaded shutdown var ErrTeleportReloading = &trace.CompareFailedError{Message: "teleport process is reloading"} // ErrTeleportExited means that teleport has exited diff --git a/lib/service/state.go b/lib/service/state.go index d42cf3e730dae..bc6e18685a87b 100644 --- a/lib/service/state.go +++ b/lib/service/state.go @@ -57,10 +57,9 @@ func init() { // processState tracks the state of the Teleport process. type processState struct { - process *TeleportProcess - mu sync.Mutex - states map[string]*componentState - totalComponentCount int // number of components that will send updates + process *TeleportProcess + mu sync.Mutex + states map[string]*componentState } type componentState struct { @@ -69,16 +68,15 @@ type componentState struct { } // newProcessState returns a new FSM that tracks the state of the Teleport process. -func newProcessState(process *TeleportProcess, componentCount int) (*processState, error) { +func newProcessState(process *TeleportProcess) (*processState, error) { err := utils.RegisterPrometheusCollectors(stateGauge) if err != nil { return nil, trace.Wrap(err) } return &processState{ - process: process, - states: make(map[string]*componentState), - totalComponentCount: componentCount, + process: process, + states: make(map[string]*componentState), }, nil } @@ -129,7 +127,7 @@ func (f *processState) update(event Event) { } // getStateLocked returns the overall process state based on the state of -// individual components. If not all components have sent updates yet, returns +// individual components. If no components sent updates yet, returns // stateStarting. // // Order of importance: @@ -140,13 +138,8 @@ func (f *processState) update(event Event) { // // Note: f.mu must be locked by the caller! func (f *processState) getStateLocked() componentStateEnum { - // Return stateStarting if not all components have sent updates yet. - if len(f.states) < f.totalComponentCount { - return stateStarting - } - state := stateStarting - numOK := 0 + numNotOK := len(f.states) for _, s := range f.states { switch s.state { case stateDegraded: @@ -154,14 +147,12 @@ func (f *processState) getStateLocked() componentStateEnum { case stateRecovering: state = stateRecovering case stateOK: - numOK++ + numNotOK-- } } // Only return stateOK if *all* components are in stateOK. - if numOK == f.totalComponentCount { + if numNotOK == 0 && len(f.states) > 0 { state = stateOK - } else if numOK > f.totalComponentCount { - f.process.log.Errorf("incorrect count of components (found: %d; expected: %d), this is a bug!", numOK, f.totalComponentCount) } return state } diff --git a/lib/service/state_test.go b/lib/service/state_test.go index 32bbf2511b1cd..078a8354f0b54 100644 --- a/lib/service/state_test.go +++ b/lib/service/state_test.go @@ -24,24 +24,21 @@ func TestProcessStateGetState(t *testing.T) { t.Parallel() tests := []struct { - desc string - states map[string]*componentState - totalComponentCount int - want componentStateEnum + desc string + states map[string]*componentState + want componentStateEnum }{ { - desc: "no components", - states: map[string]*componentState{}, - totalComponentCount: 1, - want: stateStarting, + desc: "no components", + states: map[string]*componentState{}, + want: stateStarting, }, { desc: "one component in stateOK", states: map[string]*componentState{ "one": {state: stateOK}, }, - totalComponentCount: 1, - want: stateOK, + want: stateOK, }, { desc: "multiple components in stateOK", @@ -50,8 +47,7 @@ func TestProcessStateGetState(t *testing.T) { "two": {state: stateOK}, "three": {state: stateOK}, }, - totalComponentCount: 3, - want: stateOK, + want: stateOK, }, { desc: "multiple components, one is degraded", @@ -60,8 +56,7 @@ func TestProcessStateGetState(t *testing.T) { "two": {state: stateDegraded}, "three": {state: stateOK}, }, - totalComponentCount: 3, - want: stateDegraded, + want: stateDegraded, }, { desc: "multiple components, one is recovering", @@ -70,8 +65,7 @@ func TestProcessStateGetState(t *testing.T) { "two": {state: stateRecovering}, "three": {state: stateOK}, }, - totalComponentCount: 3, - want: stateRecovering, + want: stateRecovering, }, { desc: "multiple components, one is starting", @@ -80,14 +74,13 @@ func TestProcessStateGetState(t *testing.T) { "two": {state: stateStarting}, "three": {state: stateOK}, }, - totalComponentCount: 3, - want: stateStarting, + want: stateStarting, }, } for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { - ps := &processState{states: tt.states, totalComponentCount: tt.totalComponentCount} + ps := &processState{states: tt.states} got := ps.getState() require.Equal(t, got, tt.want) }) diff --git a/lib/service/supervisor.go b/lib/service/supervisor.go index f744c175491cf..a059f80766bb7 100644 --- a/lib/service/supervisor.go +++ b/lib/service/supervisor.go @@ -55,7 +55,7 @@ type Supervisor interface { Wait() error // Run starts and waits for the service to complete - // it's a combination Start() and Wait() + // it's a combinatioin Start() and Wait() Run() error // Services returns list of running services @@ -362,26 +362,33 @@ func (s *LocalSupervisor) BroadcastEvent(event Event) { s.signalReload() } - sendEvent := func(e Event) { - select { - case s.eventsC <- e: - case <-s.closeContext.Done(): - } - } - s.events[event.Name] = event - go sendEvent(event) + // Log all events other than recovered events to prevent the logs from // being flooded. if event.String() != TeleportOKEvent { s.log.WithField("event", event.String()).Debug("Broadcasting event.") } + go func() { + select { + case s.eventsC <- event: + case <-s.closeContext.Done(): + return + } + }() + for _, m := range s.eventMappings { if m.matches(event.Name, s.events) { mappedEvent := Event{Name: m.Out} s.events[mappedEvent.Name] = mappedEvent - go sendEvent(mappedEvent) + go func(e Event) { + select { + case s.eventsC <- e: + case <-s.closeContext.Done(): + return + } + }(mappedEvent) s.log.WithFields(logrus.Fields{ "in": event.String(), "out": m.String(), @@ -409,7 +416,7 @@ func (s *LocalSupervisor) WaitForEvent(ctx context.Context, name string, eventC waiter := &waiter{eventC: eventC, context: ctx} event, ok := s.events[name] if ok { - go waiter.notify(event) + go s.notifyWaiter(waiter, event) return } s.eventWaiters[name] = append(s.eventWaiters[name], waiter) @@ -425,13 +432,20 @@ func (s *LocalSupervisor) getWaiters(name string) []*waiter { return out } +func (s *LocalSupervisor) notifyWaiter(w *waiter, event Event) { + select { + case w.eventC <- event: + case <-w.context.Done(): + } +} + func (s *LocalSupervisor) fanOut() { for { select { case event := <-s.eventsC: waiters := s.getWaiters(event.Name) for _, waiter := range waiters { - go waiter.notify(event) + go s.notifyWaiter(waiter, event) } case <-s.closeContext.Done(): return @@ -444,13 +458,6 @@ type waiter struct { context context.Context } -func (w *waiter) notify(event Event) { - select { - case w.eventC <- event: - case <-w.context.Done(): - } -} - // Service is a running teleport service function type Service interface { // Serve starts the function diff --git a/lib/srv/heartbeat.go b/lib/srv/heartbeat.go index a9e8c7ba0dabf..a63c444ea42c9 100644 --- a/lib/srv/heartbeat.go +++ b/lib/srv/heartbeat.go @@ -149,7 +149,7 @@ func NewHeartbeat(cfg HeartbeatConfig) (*Heartbeat, error) { announceC: make(chan struct{}, 1), sendC: make(chan struct{}, 1), } - h.Debugf("Starting %v heartbeat with announce period: %v, keep-alive period %v, poll period: %v", cfg.Mode, cfg.AnnouncePeriod, cfg.KeepAlivePeriod, cfg.CheckPeriod) + h.Debugf("Starting %v heartbeat with announce period: %v, keep-alive period %v, poll period: %v", cfg.Mode, cfg.KeepAlivePeriod, cfg.AnnouncePeriod, cfg.CheckPeriod) return h, nil } diff --git a/lib/utils/utils.go b/lib/utils/utils.go index 92f6b27a517b3..6fcde74685263 100644 --- a/lib/utils/utils.go +++ b/lib/utils/utils.go @@ -380,29 +380,26 @@ func OpaqueAccessDenied(err error) error { } // PortList is a list of TCP port -type PortList struct { - ports []int - mu sync.Mutex -} +type PortList []string // Pop returns a value from the list, it panics if the value is not there func (p *PortList) Pop() string { - return strconv.Itoa(p.PopInt()) + if len(*p) == 0 { + panic("list is empty") + } + val := (*p)[len(*p)-1] + *p = (*p)[:len(*p)-1] + return val } // PopInt returns a value from the list, it panics if not enough values // were allocated func (p *PortList) PopInt() int { - p.mu.Lock() - defer p.mu.Unlock() - - l := len(p.ports) - if l == 0 { - panic("list is empty") + i, err := strconv.Atoi(p.Pop()) + if err != nil { + panic(err) } - val := p.ports[l-1] - p.ports = p.ports[:l-1] - return val + return i } // PopIntSlice returns a slice of values from the list, it panics if not enough @@ -420,15 +417,15 @@ const PortStartingNumber = 20000 // GetFreeTCPPorts returns n ports starting from port 20000. func GetFreeTCPPorts(n int, offset ...int) (PortList, error) { - list := make([]int, 0, n) + list := make(PortList, 0, n) start := PortStartingNumber if len(offset) != 0 { start = offset[0] } for i := start; i < start+n; i++ { - list = append(list, i) + list = append(list, strconv.Itoa(i)) } - return PortList{ports: list}, nil + return list, nil } // ReadHostUUID reads host UUID from the file in the data dir