Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throw startup error if TeleportReadyEvent is not emitted #11725

Merged
merged 9 commits into from
Apr 6, 2022
4 changes: 4 additions & 0 deletions lib/service/desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus.
"Windows desktop service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, listener.Addr())
}

// since srv.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: WindowsDesktopReady, Payload: nil})
err := srv.Serve(listener)
if err != nil {
if err == http.ErrServerClosed {
Expand Down
2 changes: 2 additions & 0 deletions lib/service/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C
"Kubernetes service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, listener.Addr())
}
// since kubeServer.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: KubernetesReady, Payload: nil})
err := kubeServer.Serve(listener)
if err != nil {
Expand Down
37 changes: 15 additions & 22 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,6 @@ const (
// is ready to start accepting connections.
DatabasesReady = "DatabasesReady"

// MetricsReady is generated when the Teleport metrics service is ready to
// start accepting connections.
MetricsReady = "MetricsReady"

// WindowsDesktopReady is generated when the Teleport windows desktop
// service is ready to start accepting connections.
WindowsDesktopReady = "WindowsDesktopReady"
Expand Down Expand Up @@ -514,6 +510,19 @@ func Run(ctx context.Context, cfg Config, newTeleport NewProcess) error {
}
// Wait and reload until called exit.
for {
// Wait for the service to report that it has started.
startTimeoutCtx, startCancel := context.WithTimeout(ctx, signalPipeTimeout)
defer startCancel()
eventC := make(chan Event, 1)
srv.WaitForEvent(startTimeoutCtx, TeleportReadyEvent, eventC)
select {
case <-eventC:
cfg.Log.Infof("Service has started successfully.")
case <-startTimeoutCtx.Done():
warnOnErr(srv.Close(), cfg.Log)
return trace.BadParameter("service has failed to start")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks that waitAndReload logic already waits for TeleportReadyEvent so after exiting waitAndReload the logic triggers second wait for TeleportReadyEvent event but it the TeleportReadyEvent event was ready broadcasted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point. In a way this is ok since an already broadcast event will still be emitted if waited on, but there might be a better alternative.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this PR remove that WaitForEvent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I think that a similar situation would already occur if two process reloads were triggered.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this PR remove that WaitForEvent?

You're right. Sorry @smallinsky @espadolini, I misunderstood what waitAndReload was doing.

I pushed a new change (f73af18) to ensure that we always for the the TeleportReadyEvent after calling LocalSupervisor.Start() (both on the initial startup, and after a process reload), reverting the changes to waitAndReload.


srv, err = waitAndReload(ctx, cfg, srv, newTeleport)
if err != nil {
// This error means that was a clean shutdown
Expand Down Expand Up @@ -552,20 +561,7 @@ func waitAndReload(ctx context.Context, cfg Config, srv Process, newTeleport New
warnOnErr(srv.Close(), cfg.Log)
return nil, trace.Wrap(err, "failed to start a new service")
}
// Wait for the new server to report that it has started
// before shutting down the old one.
startTimeoutCtx, startCancel := context.WithTimeout(ctx, signalPipeTimeout)
defer startCancel()
eventC := make(chan Event, 1)
newSrv.WaitForEvent(startTimeoutCtx, TeleportReadyEvent, eventC)
select {
case <-eventC:
cfg.Log.Infof("New service has started successfully.")
case <-startTimeoutCtx.Done():
warnOnErr(newSrv.Close(), cfg.Log)
warnOnErr(srv.Close(), cfg.Log)
return nil, trace.BadParameter("the new service has failed to start")
}
// Shut down the old service.
shutdownTimeout := cfg.ShutdownTimeout
if shutdownTimeout == 0 {
// The default shutdown timeout is very generous to avoid disrupting
Expand Down Expand Up @@ -786,9 +782,6 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) {
if cfg.Databases.Enabled {
eventMapping.In = append(eventMapping.In, DatabasesReady)
}
if cfg.Metrics.Enabled {
eventMapping.In = append(eventMapping.In, MetricsReady)
}
if cfg.WindowsDesktop.Enabled {
eventMapping.In = append(eventMapping.In, WindowsDesktopReady)
}
Expand Down Expand Up @@ -1381,7 +1374,7 @@ func (process *TeleportProcess) initAuthService() error {
utils.Consolef(cfg.Console, log, teleport.ComponentAuth, "Auth service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, authAddr)

// since tlsServer.Serve is a blocking call, we emit this even right before
// since tlsServer.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: AuthTLSReady, Payload: nil})
err := tlsServer.Serve()
Expand Down
2 changes: 1 addition & 1 deletion lib/service/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (process *TeleportProcess) WaitForSignals(ctx context.Context) error {
}

// ErrTeleportReloading is returned when signal waiter exits
// because the teleport process has initiaded shutdown
// because the teleport process has initiated shutdown
var ErrTeleportReloading = &trace.CompareFailedError{Message: "teleport process is reloading"}

// ErrTeleportExited means that teleport has exited
Expand Down
34 changes: 14 additions & 20 deletions lib/service/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Supervisor interface {
Wait() error

// Run starts and waits for the service to complete
// it's a combinatioin Start() and Wait()
// it's a combination Start() and Wait()
Run() error

// Services returns list of running services
Expand Down Expand Up @@ -362,33 +362,27 @@ func (s *LocalSupervisor) BroadcastEvent(event Event) {
s.signalReload()
}

s.events[event.Name] = event
sendEvent := func(e Event) {
select {
case s.eventsC <- e:
case <-s.closeContext.Done():
return
vitorenesduarte marked this conversation as resolved.
Show resolved Hide resolved
}
}

s.events[event.Name] = event
go sendEvent(event)
// Log all events other than recovered events to prevent the logs from
// being flooded.
if event.String() != TeleportOKEvent {
s.log.WithField("event", event.String()).Debug("Broadcasting event.")
}

go func() {
select {
case s.eventsC <- event:
case <-s.closeContext.Done():
return
}
}()

for _, m := range s.eventMappings {
if m.matches(event.Name, s.events) {
mappedEvent := Event{Name: m.Out}
s.events[mappedEvent.Name] = mappedEvent
go func(e Event) {
select {
case s.eventsC <- e:
case <-s.closeContext.Done():
return
}
}(mappedEvent)
go sendEvent(mappedEvent)
s.log.WithFields(logrus.Fields{
"in": event.String(),
"out": m.String(),
Expand Down Expand Up @@ -416,7 +410,7 @@ func (s *LocalSupervisor) WaitForEvent(ctx context.Context, name string, eventC
waiter := &waiter{eventC: eventC, context: ctx}
event, ok := s.events[name]
if ok {
go s.notifyWaiter(waiter, event)
go waiter.notify(event)
return
}
s.eventWaiters[name] = append(s.eventWaiters[name], waiter)
Expand All @@ -432,7 +426,7 @@ func (s *LocalSupervisor) getWaiters(name string) []*waiter {
return out
}

func (s *LocalSupervisor) notifyWaiter(w *waiter, event Event) {
func (w *waiter) notify(event Event) {
vitorenesduarte marked this conversation as resolved.
Show resolved Hide resolved
select {
case w.eventC <- event:
case <-w.context.Done():
Expand All @@ -445,7 +439,7 @@ func (s *LocalSupervisor) fanOut() {
case event := <-s.eventsC:
waiters := s.getWaiters(event.Name)
for _, waiter := range waiters {
go s.notifyWaiter(waiter, event)
go waiter.notify(event)
}
case <-s.closeContext.Done():
return
Expand Down