Skip to content

Commit

Permalink
Revert "Throw startup error if TeleportReadyEvent is not emitted (#…
Browse files Browse the repository at this point in the history
…11725)"

This reverts commit 933e247.
  • Loading branch information
r0mant committed Apr 26, 2022
1 parent c4bc651 commit aab4d04
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 94 deletions.
15 changes: 13 additions & 2 deletions integration/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1187,15 +1187,26 @@ func (i *TeleInstance) Start() error {
// Build a list of expected events to wait for before unblocking based off
// the configuration passed in.
expectedEvents := []string{}
// Always wait for TeleportReadyEvent.
expectedEvents = append(expectedEvents, service.TeleportReadyEvent)
if i.Config.Auth.Enabled {
expectedEvents = append(expectedEvents, service.AuthTLSReady)
}
if i.Config.Proxy.Enabled {
expectedEvents = append(expectedEvents, service.ProxyReverseTunnelReady)
expectedEvents = append(expectedEvents, service.ProxySSHReady)
expectedEvents = append(expectedEvents, service.ProxyAgentPoolReady)
if !i.Config.Proxy.DisableWebService {
expectedEvents = append(expectedEvents, service.ProxyWebServerReady)
}
}
if i.Config.SSH.Enabled {
expectedEvents = append(expectedEvents, service.NodeSSHReady)
}
if i.Config.Apps.Enabled {
expectedEvents = append(expectedEvents, service.AppsReady)
}
if i.Config.Databases.Enabled {
expectedEvents = append(expectedEvents, service.DatabasesReady)
}

// Start the process and block until the expected events have arrived.
receivedEvents, err := startAndWait(i.Process, expectedEvents)
Expand Down
11 changes: 0 additions & 11 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4403,17 +4403,6 @@ func waitForProcessStart(serviceC chan *service.TeleportProcess) (*service.Telep
dumpGoroutineProfile()
return nil, trace.BadParameter("timeout waiting for service to start")
}

eventC := make(chan service.Event, 1)
svc.WaitForEvent(context.TODO(), service.TeleportReadyEvent, eventC)
select {
case <-eventC:

case <-time.After(20 * time.Second):
dumpGoroutineProfile()
return nil, trace.BadParameter("timeout waiting for service to broadcast ready status")
}

return svc, nil
}

Expand Down
4 changes: 0 additions & 4 deletions lib/service/desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,6 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus.
"Windows desktop service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, listener.Addr())
}

// since srv.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: WindowsDesktopReady, Payload: nil})
err := srv.Serve(listener)
if err != nil {
if err == http.ErrServerClosed {
Expand Down
2 changes: 0 additions & 2 deletions lib/service/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,6 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C
"Kubernetes service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, listener.Addr())
}
// since kubeServer.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: KubernetesReady, Payload: nil})
err := kubeServer.Serve(listener)
if err != nil {
Expand Down
90 changes: 35 additions & 55 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ const (
// is ready to start accepting connections.
DatabasesReady = "DatabasesReady"

// MetricsReady is generated when the Teleport metrics service is ready to
// start accepting connections.
MetricsReady = "MetricsReady"

// WindowsDesktopReady is generated when the Teleport windows desktop
// service is ready to start accepting connections.
WindowsDesktopReady = "WindowsDesktopReady"
Expand Down Expand Up @@ -508,20 +512,6 @@ func Run(ctx context.Context, cfg Config, newTeleport NewProcess) error {
if err := srv.Start(); err != nil {
return trace.Wrap(err, "startup failed")
}

// Wait for the service to report that it has started.
startTimeoutCtx, startCancel := context.WithTimeout(ctx, signalPipeTimeout)
defer startCancel()
eventC := make(chan Event, 1)
srv.WaitForEvent(startTimeoutCtx, TeleportReadyEvent, eventC)
select {
case <-eventC:
cfg.Log.Infof("Service has started successfully.")
case <-startTimeoutCtx.Done():
warnOnErr(srv.Close(), cfg.Log)
return trace.BadParameter("service has failed to start")
}

// Wait and reload until called exit.
for {
srv, err = waitAndReload(ctx, cfg, srv, newTeleport)
Expand Down Expand Up @@ -778,8 +768,36 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) {
cfg.Keygen = native.New(process.ExitContext())
}

// Produce global TeleportReadyEvent when all components have started
process.registerTeleportReadyEvent(cfg)
// Produce global TeleportReadyEvent
// when all components have started
eventMapping := EventMapping{
Out: TeleportReadyEvent,
}
if cfg.Auth.Enabled {
eventMapping.In = append(eventMapping.In, AuthTLSReady)
}
if cfg.SSH.Enabled {
eventMapping.In = append(eventMapping.In, NodeSSHReady)
}
if cfg.Proxy.Enabled {
eventMapping.In = append(eventMapping.In, ProxySSHReady)
}
if cfg.Kube.Enabled {
eventMapping.In = append(eventMapping.In, KubernetesReady)
}
if cfg.Apps.Enabled {
eventMapping.In = append(eventMapping.In, AppsReady)
}
if cfg.Databases.Enabled {
eventMapping.In = append(eventMapping.In, DatabasesReady)
}
if cfg.Metrics.Enabled {
eventMapping.In = append(eventMapping.In, MetricsReady)
}
if cfg.WindowsDesktop.Enabled {
eventMapping.In = append(eventMapping.In, WindowsDesktopReady)
}
process.RegisterEventMapping(eventMapping)

if cfg.Auth.Enabled {
if err := process.initAuthService(); err != nil {
Expand Down Expand Up @@ -1368,7 +1386,7 @@ func (process *TeleportProcess) initAuthService() error {
utils.Consolef(cfg.Console, log, teleport.ComponentAuth, "Auth service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, authAddr)

// since tlsServer.Serve is a blocking call, we emit this event right before
// since tlsServer.Serve is a blocking call, we emit this even right before
// the service has started
process.BroadcastEvent(Event{Name: AuthTLSReady, Payload: nil})
err := tlsServer.Serve()
Expand Down Expand Up @@ -3443,44 +3461,6 @@ func (process *TeleportProcess) waitForAppDepend() {
}
}

// registerTeleportReadyEvent ensures that a TeleportReadyEvent is produced
// when all components have started.
func (process *TeleportProcess) registerTeleportReadyEvent(cfg *Config) {
eventMapping := EventMapping{
Out: TeleportReadyEvent,
}

if cfg.Auth.Enabled {
eventMapping.In = append(eventMapping.In, AuthTLSReady)
}

if cfg.SSH.Enabled {
eventMapping.In = append(eventMapping.In, NodeSSHReady)
}

if cfg.Proxy.Enabled {
eventMapping.In = append(eventMapping.In, ProxySSHReady)
}

if cfg.Kube.Enabled {
eventMapping.In = append(eventMapping.In, KubernetesReady)
}

if cfg.Apps.Enabled {
eventMapping.In = append(eventMapping.In, AppsReady)
}

if cfg.Databases.Enabled {
eventMapping.In = append(eventMapping.In, DatabasesReady)
}

if cfg.WindowsDesktop.Enabled {
eventMapping.In = append(eventMapping.In, WindowsDesktopReady)
}

process.RegisterEventMapping(eventMapping)
}

// appDependEvents is a list of events that the application service depends on.
var appDependEvents = []string{
AuthTLSReady,
Expand Down
2 changes: 1 addition & 1 deletion lib/service/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (process *TeleportProcess) WaitForSignals(ctx context.Context) error {
}

// ErrTeleportReloading is returned when signal waiter exits
// because the teleport process has initiated shutdown
// because the teleport process has initiaded shutdown
var ErrTeleportReloading = &trace.CompareFailedError{Message: "teleport process is reloading"}

// ErrTeleportExited means that teleport has exited
Expand Down
45 changes: 26 additions & 19 deletions lib/service/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Supervisor interface {
Wait() error

// Run starts and waits for the service to complete
// it's a combination Start() and Wait()
// it's a combinatioin Start() and Wait()
Run() error

// Services returns list of running services
Expand Down Expand Up @@ -362,26 +362,33 @@ func (s *LocalSupervisor) BroadcastEvent(event Event) {
s.signalReload()
}

sendEvent := func(e Event) {
select {
case s.eventsC <- e:
case <-s.closeContext.Done():
}
}

s.events[event.Name] = event
go sendEvent(event)

// Log all events other than recovered events to prevent the logs from
// being flooded.
if event.String() != TeleportOKEvent {
s.log.WithField("event", event.String()).Debug("Broadcasting event.")
}

go func() {
select {
case s.eventsC <- event:
case <-s.closeContext.Done():
return
}
}()

for _, m := range s.eventMappings {
if m.matches(event.Name, s.events) {
mappedEvent := Event{Name: m.Out}
s.events[mappedEvent.Name] = mappedEvent
go sendEvent(mappedEvent)
go func(e Event) {
select {
case s.eventsC <- e:
case <-s.closeContext.Done():
return
}
}(mappedEvent)
s.log.WithFields(logrus.Fields{
"in": event.String(),
"out": m.String(),
Expand Down Expand Up @@ -409,7 +416,7 @@ func (s *LocalSupervisor) WaitForEvent(ctx context.Context, name string, eventC
waiter := &waiter{eventC: eventC, context: ctx}
event, ok := s.events[name]
if ok {
go waiter.notify(event)
go s.notifyWaiter(waiter, event)
return
}
s.eventWaiters[name] = append(s.eventWaiters[name], waiter)
Expand All @@ -425,13 +432,20 @@ func (s *LocalSupervisor) getWaiters(name string) []*waiter {
return out
}

func (s *LocalSupervisor) notifyWaiter(w *waiter, event Event) {
select {
case w.eventC <- event:
case <-w.context.Done():
}
}

func (s *LocalSupervisor) fanOut() {
for {
select {
case event := <-s.eventsC:
waiters := s.getWaiters(event.Name)
for _, waiter := range waiters {
go waiter.notify(event)
go s.notifyWaiter(waiter, event)
}
case <-s.closeContext.Done():
return
Expand All @@ -444,13 +458,6 @@ type waiter struct {
context context.Context
}

func (w *waiter) notify(event Event) {
select {
case w.eventC <- event:
case <-w.context.Done():
}
}

// Service is a running teleport service function
type Service interface {
// Serve starts the function
Expand Down

0 comments on commit aab4d04

Please sign in to comment.