Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throw startup error if TeleportReadyEvent is not emitted #11725

Merged
merged 9 commits into from
Apr 6, 2022
4 changes: 4 additions & 0 deletions lib/service/desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus.
"Windows desktop service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, listener.Addr())
}

// since srv.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: WindowsDesktopReady, Payload: nil})
err := srv.Serve(listener)
if err != nil {
if err == http.ErrServerClosed {
Expand Down
2 changes: 2 additions & 0 deletions lib/service/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C
"Kubernetes service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, listener.Addr())
}
// since kubeServer.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: KubernetesReady, Payload: nil})
err := kubeServer.Serve(listener)
if err != nil {
Expand Down
23 changes: 15 additions & 8 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,6 @@ const (
// is ready to start accepting connections.
DatabasesReady = "DatabasesReady"

// MetricsReady is generated when the Teleport metrics service is ready to
// start accepting connections.
MetricsReady = "MetricsReady"

// WindowsDesktopReady is generated when the Teleport windows desktop
// service is ready to start accepting connections.
WindowsDesktopReady = "WindowsDesktopReady"
Expand Down Expand Up @@ -512,6 +508,20 @@ func Run(ctx context.Context, cfg Config, newTeleport NewProcess) error {
if err := srv.Start(); err != nil {
return trace.Wrap(err, "startup failed")
}

// Wait for the service to report that it has started.
startTimeoutCtx, startCancel := context.WithTimeout(ctx, signalPipeTimeout)
defer startCancel()
eventC := make(chan Event, 1)
srv.WaitForEvent(startTimeoutCtx, TeleportReadyEvent, eventC)
select {
case <-eventC:
cfg.Log.Infof("Service has started successfully.")
case <-startTimeoutCtx.Done():
warnOnErr(srv.Close(), cfg.Log)
return trace.BadParameter("service has failed to start")
}

// Wait and reload until called exit.
for {
srv, err = waitAndReload(ctx, cfg, srv, newTeleport)
Expand Down Expand Up @@ -786,9 +796,6 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) {
if cfg.Databases.Enabled {
eventMapping.In = append(eventMapping.In, DatabasesReady)
}
if cfg.Metrics.Enabled {
eventMapping.In = append(eventMapping.In, MetricsReady)
}
if cfg.WindowsDesktop.Enabled {
eventMapping.In = append(eventMapping.In, WindowsDesktopReady)
}
Expand Down Expand Up @@ -1381,7 +1388,7 @@ func (process *TeleportProcess) initAuthService() error {
utils.Consolef(cfg.Console, log, teleport.ComponentAuth, "Auth service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, authAddr)

// since tlsServer.Serve is a blocking call, we emit this even right before
// since tlsServer.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: AuthTLSReady, Payload: nil})
err := tlsServer.Serve()
Expand Down
2 changes: 1 addition & 1 deletion lib/service/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (process *TeleportProcess) WaitForSignals(ctx context.Context) error {
}

// ErrTeleportReloading is returned when signal waiter exits
// because the teleport process has initiaded shutdown
// because the teleport process has initiated shutdown
var ErrTeleportReloading = &trace.CompareFailedError{Message: "teleport process is reloading"}

// ErrTeleportExited means that teleport has exited
Expand Down
45 changes: 19 additions & 26 deletions lib/service/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Supervisor interface {
Wait() error

// Run starts and waits for the service to complete
// it's a combinatioin Start() and Wait()
// it's a combination Start() and Wait()
Run() error

// Services returns list of running services
Expand Down Expand Up @@ -362,33 +362,26 @@ func (s *LocalSupervisor) BroadcastEvent(event Event) {
s.signalReload()
}

s.events[event.Name] = event
sendEvent := func(e Event) {
select {
case s.eventsC <- e:
case <-s.closeContext.Done():
}
}

s.events[event.Name] = event
go sendEvent(event)
// Log all events other than recovered events to prevent the logs from
// being flooded.
if event.String() != TeleportOKEvent {
s.log.WithField("event", event.String()).Debug("Broadcasting event.")
}

go func() {
select {
case s.eventsC <- event:
case <-s.closeContext.Done():
return
}
}()

for _, m := range s.eventMappings {
if m.matches(event.Name, s.events) {
mappedEvent := Event{Name: m.Out}
s.events[mappedEvent.Name] = mappedEvent
go func(e Event) {
select {
case s.eventsC <- e:
case <-s.closeContext.Done():
return
}
}(mappedEvent)
go sendEvent(mappedEvent)
s.log.WithFields(logrus.Fields{
"in": event.String(),
"out": m.String(),
Expand Down Expand Up @@ -416,7 +409,7 @@ func (s *LocalSupervisor) WaitForEvent(ctx context.Context, name string, eventC
waiter := &waiter{eventC: eventC, context: ctx}
event, ok := s.events[name]
if ok {
go s.notifyWaiter(waiter, event)
go waiter.notify(event)
return
}
s.eventWaiters[name] = append(s.eventWaiters[name], waiter)
Expand All @@ -432,20 +425,13 @@ func (s *LocalSupervisor) getWaiters(name string) []*waiter {
return out
}

func (s *LocalSupervisor) notifyWaiter(w *waiter, event Event) {
select {
case w.eventC <- event:
case <-w.context.Done():
}
}

func (s *LocalSupervisor) fanOut() {
for {
select {
case event := <-s.eventsC:
waiters := s.getWaiters(event.Name)
for _, waiter := range waiters {
go s.notifyWaiter(waiter, event)
go waiter.notify(event)
}
case <-s.closeContext.Done():
return
Expand All @@ -458,6 +444,13 @@ type waiter struct {
context context.Context
}

func (w *waiter) notify(event Event) {
select {
case w.eventC <- event:
case <-w.context.Done():
}
}

// Service is a running teleport service function
type Service interface {
// Serve starts the function
Expand Down