From aab4d048dce85a28fb09340af5c28f69b09c8515 Mon Sep 17 00:00:00 2001 From: Roman Tkachenko Date: Tue, 26 Apr 2022 12:11:54 -0700 Subject: [PATCH] Revert "Throw startup error if `TeleportReadyEvent` is not emitted (#11725)" This reverts commit 933e247287542205c96d3c7a24d824b4d8bcfdcf. --- integration/helpers.go | 15 +++++- integration/integration_test.go | 11 ---- lib/service/desktop.go | 4 -- lib/service/kubernetes.go | 2 - lib/service/service.go | 90 +++++++++++++-------------------- lib/service/signals.go | 2 +- lib/service/supervisor.go | 45 ++++++++++------- 7 files changed, 75 insertions(+), 94 deletions(-) diff --git a/integration/helpers.go b/integration/helpers.go index fc535a966949b..2e8f388e194ad 100644 --- a/integration/helpers.go +++ b/integration/helpers.go @@ -1187,15 +1187,26 @@ func (i *TeleInstance) Start() error { // Build a list of expected events to wait for before unblocking based off // the configuration passed in. expectedEvents := []string{} - // Always wait for TeleportReadyEvent. - expectedEvents = append(expectedEvents, service.TeleportReadyEvent) + if i.Config.Auth.Enabled { + expectedEvents = append(expectedEvents, service.AuthTLSReady) + } if i.Config.Proxy.Enabled { expectedEvents = append(expectedEvents, service.ProxyReverseTunnelReady) + expectedEvents = append(expectedEvents, service.ProxySSHReady) expectedEvents = append(expectedEvents, service.ProxyAgentPoolReady) if !i.Config.Proxy.DisableWebService { expectedEvents = append(expectedEvents, service.ProxyWebServerReady) } } + if i.Config.SSH.Enabled { + expectedEvents = append(expectedEvents, service.NodeSSHReady) + } + if i.Config.Apps.Enabled { + expectedEvents = append(expectedEvents, service.AppsReady) + } + if i.Config.Databases.Enabled { + expectedEvents = append(expectedEvents, service.DatabasesReady) + } // Start the process and block until the expected events have arrived. receivedEvents, err := startAndWait(i.Process, expectedEvents) diff --git a/integration/integration_test.go b/integration/integration_test.go index aa36b2c7e044f..e5c3afd34562f 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -4403,17 +4403,6 @@ func waitForProcessStart(serviceC chan *service.TeleportProcess) (*service.Telep dumpGoroutineProfile() return nil, trace.BadParameter("timeout waiting for service to start") } - - eventC := make(chan service.Event, 1) - svc.WaitForEvent(context.TODO(), service.TeleportReadyEvent, eventC) - select { - case <-eventC: - - case <-time.After(20 * time.Second): - dumpGoroutineProfile() - return nil, trace.BadParameter("timeout waiting for service to broadcast ready status") - } - return svc, nil } diff --git a/lib/service/desktop.go b/lib/service/desktop.go index 641d06bedcd07..33c8cb7de531f 100644 --- a/lib/service/desktop.go +++ b/lib/service/desktop.go @@ -248,10 +248,6 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus. "Windows desktop service %s:%s is starting on %v.", teleport.Version, teleport.Gitref, listener.Addr()) } - - // since srv.Serve is a blocking call, we emit this event right before - // the service has started - process.BroadcastEvent(Event{Name: WindowsDesktopReady, Payload: nil}) err := srv.Serve(listener) if err != nil { if err == http.ErrServerClosed { diff --git a/lib/service/kubernetes.go b/lib/service/kubernetes.go index 3c9ab885a4fdc..1073ad978e65a 100644 --- a/lib/service/kubernetes.go +++ b/lib/service/kubernetes.go @@ -271,8 +271,6 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C "Kubernetes service %s:%s is starting on %v.", teleport.Version, teleport.Gitref, listener.Addr()) } - // since kubeServer.Serve is a blocking call, we emit this event right before - // the service has started process.BroadcastEvent(Event{Name: KubernetesReady, Payload: nil}) err := kubeServer.Serve(listener) if err != nil { diff --git a/lib/service/service.go b/lib/service/service.go index f5d0a4d16b66b..24a81594a9f5b 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -172,6 +172,10 @@ const ( // is ready to start accepting connections. DatabasesReady = "DatabasesReady" + // MetricsReady is generated when the Teleport metrics service is ready to + // start accepting connections. + MetricsReady = "MetricsReady" + // WindowsDesktopReady is generated when the Teleport windows desktop // service is ready to start accepting connections. WindowsDesktopReady = "WindowsDesktopReady" @@ -508,20 +512,6 @@ func Run(ctx context.Context, cfg Config, newTeleport NewProcess) error { if err := srv.Start(); err != nil { return trace.Wrap(err, "startup failed") } - - // Wait for the service to report that it has started. - startTimeoutCtx, startCancel := context.WithTimeout(ctx, signalPipeTimeout) - defer startCancel() - eventC := make(chan Event, 1) - srv.WaitForEvent(startTimeoutCtx, TeleportReadyEvent, eventC) - select { - case <-eventC: - cfg.Log.Infof("Service has started successfully.") - case <-startTimeoutCtx.Done(): - warnOnErr(srv.Close(), cfg.Log) - return trace.BadParameter("service has failed to start") - } - // Wait and reload until called exit. for { srv, err = waitAndReload(ctx, cfg, srv, newTeleport) @@ -778,8 +768,36 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) { cfg.Keygen = native.New(process.ExitContext()) } - // Produce global TeleportReadyEvent when all components have started - process.registerTeleportReadyEvent(cfg) + // Produce global TeleportReadyEvent + // when all components have started + eventMapping := EventMapping{ + Out: TeleportReadyEvent, + } + if cfg.Auth.Enabled { + eventMapping.In = append(eventMapping.In, AuthTLSReady) + } + if cfg.SSH.Enabled { + eventMapping.In = append(eventMapping.In, NodeSSHReady) + } + if cfg.Proxy.Enabled { + eventMapping.In = append(eventMapping.In, ProxySSHReady) + } + if cfg.Kube.Enabled { + eventMapping.In = append(eventMapping.In, KubernetesReady) + } + if cfg.Apps.Enabled { + eventMapping.In = append(eventMapping.In, AppsReady) + } + if cfg.Databases.Enabled { + eventMapping.In = append(eventMapping.In, DatabasesReady) + } + if cfg.Metrics.Enabled { + eventMapping.In = append(eventMapping.In, MetricsReady) + } + if cfg.WindowsDesktop.Enabled { + eventMapping.In = append(eventMapping.In, WindowsDesktopReady) + } + process.RegisterEventMapping(eventMapping) if cfg.Auth.Enabled { if err := process.initAuthService(); err != nil { @@ -1368,7 +1386,7 @@ func (process *TeleportProcess) initAuthService() error { utils.Consolef(cfg.Console, log, teleport.ComponentAuth, "Auth service %s:%s is starting on %v.", teleport.Version, teleport.Gitref, authAddr) - // since tlsServer.Serve is a blocking call, we emit this event right before + // since tlsServer.Serve is a blocking call, we emit this even right before // the service has started process.BroadcastEvent(Event{Name: AuthTLSReady, Payload: nil}) err := tlsServer.Serve() @@ -3443,44 +3461,6 @@ func (process *TeleportProcess) waitForAppDepend() { } } -// registerTeleportReadyEvent ensures that a TeleportReadyEvent is produced -// when all components have started. -func (process *TeleportProcess) registerTeleportReadyEvent(cfg *Config) { - eventMapping := EventMapping{ - Out: TeleportReadyEvent, - } - - if cfg.Auth.Enabled { - eventMapping.In = append(eventMapping.In, AuthTLSReady) - } - - if cfg.SSH.Enabled { - eventMapping.In = append(eventMapping.In, NodeSSHReady) - } - - if cfg.Proxy.Enabled { - eventMapping.In = append(eventMapping.In, ProxySSHReady) - } - - if cfg.Kube.Enabled { - eventMapping.In = append(eventMapping.In, KubernetesReady) - } - - if cfg.Apps.Enabled { - eventMapping.In = append(eventMapping.In, AppsReady) - } - - if cfg.Databases.Enabled { - eventMapping.In = append(eventMapping.In, DatabasesReady) - } - - if cfg.WindowsDesktop.Enabled { - eventMapping.In = append(eventMapping.In, WindowsDesktopReady) - } - - process.RegisterEventMapping(eventMapping) -} - // appDependEvents is a list of events that the application service depends on. var appDependEvents = []string{ AuthTLSReady, diff --git a/lib/service/signals.go b/lib/service/signals.go index de8799e029acd..21fd8db5fe66e 100644 --- a/lib/service/signals.go +++ b/lib/service/signals.go @@ -153,7 +153,7 @@ func (process *TeleportProcess) WaitForSignals(ctx context.Context) error { } // ErrTeleportReloading is returned when signal waiter exits -// because the teleport process has initiated shutdown +// because the teleport process has initiaded shutdown var ErrTeleportReloading = &trace.CompareFailedError{Message: "teleport process is reloading"} // ErrTeleportExited means that teleport has exited diff --git a/lib/service/supervisor.go b/lib/service/supervisor.go index f744c175491cf..a059f80766bb7 100644 --- a/lib/service/supervisor.go +++ b/lib/service/supervisor.go @@ -55,7 +55,7 @@ type Supervisor interface { Wait() error // Run starts and waits for the service to complete - // it's a combination Start() and Wait() + // it's a combinatioin Start() and Wait() Run() error // Services returns list of running services @@ -362,26 +362,33 @@ func (s *LocalSupervisor) BroadcastEvent(event Event) { s.signalReload() } - sendEvent := func(e Event) { - select { - case s.eventsC <- e: - case <-s.closeContext.Done(): - } - } - s.events[event.Name] = event - go sendEvent(event) + // Log all events other than recovered events to prevent the logs from // being flooded. if event.String() != TeleportOKEvent { s.log.WithField("event", event.String()).Debug("Broadcasting event.") } + go func() { + select { + case s.eventsC <- event: + case <-s.closeContext.Done(): + return + } + }() + for _, m := range s.eventMappings { if m.matches(event.Name, s.events) { mappedEvent := Event{Name: m.Out} s.events[mappedEvent.Name] = mappedEvent - go sendEvent(mappedEvent) + go func(e Event) { + select { + case s.eventsC <- e: + case <-s.closeContext.Done(): + return + } + }(mappedEvent) s.log.WithFields(logrus.Fields{ "in": event.String(), "out": m.String(), @@ -409,7 +416,7 @@ func (s *LocalSupervisor) WaitForEvent(ctx context.Context, name string, eventC waiter := &waiter{eventC: eventC, context: ctx} event, ok := s.events[name] if ok { - go waiter.notify(event) + go s.notifyWaiter(waiter, event) return } s.eventWaiters[name] = append(s.eventWaiters[name], waiter) @@ -425,13 +432,20 @@ func (s *LocalSupervisor) getWaiters(name string) []*waiter { return out } +func (s *LocalSupervisor) notifyWaiter(w *waiter, event Event) { + select { + case w.eventC <- event: + case <-w.context.Done(): + } +} + func (s *LocalSupervisor) fanOut() { for { select { case event := <-s.eventsC: waiters := s.getWaiters(event.Name) for _, waiter := range waiters { - go waiter.notify(event) + go s.notifyWaiter(waiter, event) } case <-s.closeContext.Done(): return @@ -444,13 +458,6 @@ type waiter struct { context context.Context } -func (w *waiter) notify(event Event) { - select { - case w.eventC <- event: - case <-w.context.Done(): - } -} - // Service is a running teleport service function type Service interface { // Serve starts the function