Skip to content

Commit

Permalink
Revert "Backport #11725 #11249 #11799 to branch/v9 (#11795)"
Browse files Browse the repository at this point in the history
This reverts commit ea8ee94.
  • Loading branch information
r0mant committed Apr 26, 2022
1 parent d89371c commit c4376d0
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 255 deletions.
15 changes: 13 additions & 2 deletions integration/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,15 +1103,26 @@ func (i *TeleInstance) Start() error {
// Build a list of expected events to wait for before unblocking based off
// the configuration passed in.
expectedEvents := []string{}
// Always wait for TeleportReadyEvent.
expectedEvents = append(expectedEvents, service.TeleportReadyEvent)
if i.Config.Auth.Enabled {
expectedEvents = append(expectedEvents, service.AuthTLSReady)
}
if i.Config.Proxy.Enabled {
expectedEvents = append(expectedEvents, service.ProxyReverseTunnelReady)
expectedEvents = append(expectedEvents, service.ProxySSHReady)
expectedEvents = append(expectedEvents, service.ProxyAgentPoolReady)
if !i.Config.Proxy.DisableWebService {
expectedEvents = append(expectedEvents, service.ProxyWebServerReady)
}
}
if i.Config.SSH.Enabled {
expectedEvents = append(expectedEvents, service.NodeSSHReady)
}
if i.Config.Apps.Enabled {
expectedEvents = append(expectedEvents, service.AppsReady)
}
if i.Config.Databases.Enabled {
expectedEvents = append(expectedEvents, service.DatabasesReady)
}

// Start the process and block until the expected events have arrived.
receivedEvents, err := startAndWait(i.Process, expectedEvents)
Expand Down
11 changes: 0 additions & 11 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4405,17 +4405,6 @@ func waitForProcessStart(serviceC chan *service.TeleportProcess) (*service.Telep
dumpGoroutineProfile()
return nil, trace.BadParameter("timeout waiting for service to start")
}

eventC := make(chan service.Event, 1)
svc.WaitForEvent(context.TODO(), service.TeleportReadyEvent, eventC)
select {
case <-eventC:

case <-time.After(20 * time.Second):
dumpGoroutineProfile()
return nil, trace.BadParameter("timeout waiting for service to broadcast ready status")
}

return svc, nil
}

Expand Down
4 changes: 0 additions & 4 deletions lib/service/desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,6 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus.
"Windows desktop service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, listener.Addr())
}

// since srv.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: WindowsDesktopReady, Payload: nil})
err := srv.Serve(listener)
if err != nil {
if err == http.ErrServerClosed {
Expand Down
2 changes: 0 additions & 2 deletions lib/service/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,6 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C
"Kubernetes service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, listener.Addr())
}
// since kubeServer.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: KubernetesReady, Payload: nil})
err := kubeServer.Serve(listener)
if err != nil {
Expand Down
113 changes: 40 additions & 73 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,6 @@ const (
// and is ready to start accepting connections.
ProxySSHReady = "ProxySSHReady"

// ProxyKubeReady is generated when the kubernetes proxy service has been initialized.
ProxyKubeReady = "ProxyKubeReady"

// NodeSSHReady is generated when the Teleport node has initialized a SSH server
// and is ready to start accepting SSH connections.
NodeSSHReady = "NodeReady"
Expand All @@ -176,6 +173,10 @@ const (
// is ready to start accepting connections.
DatabasesReady = "DatabasesReady"

// MetricsReady is generated when the Teleport metrics service is ready to
// start accepting connections.
MetricsReady = "MetricsReady"

// WindowsDesktopReady is generated when the Teleport windows desktop
// service is ready to start accepting connections.
WindowsDesktopReady = "WindowsDesktopReady"
Expand All @@ -189,7 +190,7 @@ const (
// in a graceful way.
TeleportReloadEvent = "TeleportReload"

// TeleportPhaseChangeEvent is generated to indicate that teleport
// TeleportPhaseChangeEvent is generated to indidate that teleport
// CA rotation phase has been updated, used in tests
TeleportPhaseChangeEvent = "TeleportPhaseChange"

Expand Down Expand Up @@ -512,20 +513,6 @@ func Run(ctx context.Context, cfg Config, newTeleport NewProcess) error {
if err := srv.Start(); err != nil {
return trace.Wrap(err, "startup failed")
}

// Wait for the service to report that it has started.
startTimeoutCtx, startCancel := context.WithTimeout(ctx, signalPipeTimeout)
defer startCancel()
eventC := make(chan Event, 1)
srv.WaitForEvent(startTimeoutCtx, TeleportReadyEvent, eventC)
select {
case <-eventC:
cfg.Log.Infof("Service has started successfully.")
case <-startTimeoutCtx.Done():
warnOnErr(srv.Close(), cfg.Log)
return trace.BadParameter("service has failed to start")
}

// Wait and reload until called exit.
for {
srv, err = waitAndReload(ctx, cfg, srv, newTeleport)
Expand Down Expand Up @@ -759,17 +746,14 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) {

process.registerAppDepend()

// Produce global TeleportReadyEvent when all components have started
componentCount := process.registerTeleportReadyEvent(cfg)

process.log = cfg.Log.WithFields(logrus.Fields{
trace.Component: teleport.Component(teleport.ComponentProcess, process.id),
})

serviceStarted := false

if !cfg.DiagnosticAddr.IsEmpty() {
if err := process.initDiagnosticService(componentCount); err != nil {
if err := process.initDiagnosticService(); err != nil {
return nil, trace.Wrap(err)
}
} else {
Expand All @@ -788,6 +772,37 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) {
cfg.Keygen = native.New(process.ExitContext(), native.PrecomputeKeys(precomputeCount))
}

// Produce global TeleportReadyEvent
// when all components have started
eventMapping := EventMapping{
Out: TeleportReadyEvent,
}
if cfg.Auth.Enabled {
eventMapping.In = append(eventMapping.In, AuthTLSReady)
}
if cfg.SSH.Enabled {
eventMapping.In = append(eventMapping.In, NodeSSHReady)
}
if cfg.Proxy.Enabled {
eventMapping.In = append(eventMapping.In, ProxySSHReady)
}
if cfg.Kube.Enabled {
eventMapping.In = append(eventMapping.In, KubernetesReady)
}
if cfg.Apps.Enabled {
eventMapping.In = append(eventMapping.In, AppsReady)
}
if cfg.Databases.Enabled {
eventMapping.In = append(eventMapping.In, DatabasesReady)
}
if cfg.Metrics.Enabled {
eventMapping.In = append(eventMapping.In, MetricsReady)
}
if cfg.WindowsDesktop.Enabled {
eventMapping.In = append(eventMapping.In, WindowsDesktopReady)
}
process.RegisterEventMapping(eventMapping)

if cfg.Auth.Enabled {
if err := process.initAuthService(); err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -1379,7 +1394,7 @@ func (process *TeleportProcess) initAuthService() error {
utils.Consolef(cfg.Console, log, teleport.ComponentAuth, "Auth service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, authAddr)

// since tlsServer.Serve is a blocking call, we emit this event right before
// since tlsServer.Serve is a blocking call, we emit this even right before
// the service has started
process.BroadcastEvent(Event{Name: AuthTLSReady, Payload: nil})
err := tlsServer.Serve()
Expand Down Expand Up @@ -2221,7 +2236,7 @@ func (process *TeleportProcess) initMetricsService() error {

// initDiagnosticService starts diagnostic service currently serving healthz
// and prometheus endpoints
func (process *TeleportProcess) initDiagnosticService(componentCount int) error {
func (process *TeleportProcess) initDiagnosticService() error {
mux := http.NewServeMux()

// support legacy metrics collection in the diagnostic service.
Expand Down Expand Up @@ -2252,7 +2267,7 @@ func (process *TeleportProcess) initDiagnosticService(componentCount int) error
// Create a state machine that will process and update the internal state of
// Teleport based off Events. Use this state machine to return return the
// status from the /readyz endpoint.
ps, err := newProcessState(process, componentCount)
ps, err := newProcessState(process)
if err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -3069,9 +3084,6 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
})

log.Infof("Starting Kube proxy on %v.", cfg.Proxy.Kube.ListenAddr.Addr)
// since kubeServer.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: ProxyKubeReady, Payload: nil})
err := kubeServer.Serve(listeners.kube)
if err != nil && err != http.ErrServerClosed {
log.Warningf("Kube TLS server exited with error: %v.", err)
Expand Down Expand Up @@ -3463,51 +3475,6 @@ func (process *TeleportProcess) waitForAppDepend() {
}
}

// registerTeleportReadyEvent ensures that a TeleportReadyEvent is produced
// when all components enabled (based on the configuration) have started.
// It returns the number of components enabled.
func (process *TeleportProcess) registerTeleportReadyEvent(cfg *Config) int {
eventMapping := EventMapping{
Out: TeleportReadyEvent,
}

if cfg.Auth.Enabled {
eventMapping.In = append(eventMapping.In, AuthTLSReady)
}

if cfg.SSH.Enabled {
eventMapping.In = append(eventMapping.In, NodeSSHReady)
}

proxyConfig := cfg.Proxy
if proxyConfig.Enabled {
eventMapping.In = append(eventMapping.In, ProxySSHReady)
}
if proxyConfig.Kube.Enabled && !proxyConfig.Kube.ListenAddr.IsEmpty() && !proxyConfig.DisableReverseTunnel {
eventMapping.In = append(eventMapping.In, ProxyKubeReady)
}

if cfg.Kube.Enabled {
eventMapping.In = append(eventMapping.In, KubernetesReady)
}

if cfg.Apps.Enabled {
eventMapping.In = append(eventMapping.In, AppsReady)
}

if cfg.Databases.Enabled {
eventMapping.In = append(eventMapping.In, DatabasesReady)
}

if cfg.WindowsDesktop.Enabled {
eventMapping.In = append(eventMapping.In, WindowsDesktopReady)
}

componentCount := len(eventMapping.In)
process.RegisterEventMapping(eventMapping)
return componentCount
}

// appDependEvents is a list of events that the application service depends on.
var appDependEvents = []string{
AuthTLSReady,
Expand Down
Loading

0 comments on commit c4376d0

Please sign in to comment.