Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert readyz changes #12244

Merged
merged 6 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions integration/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1187,15 +1187,26 @@ func (i *TeleInstance) Start() error {
// Build a list of expected events to wait for before unblocking based off
// the configuration passed in.
expectedEvents := []string{}
// Always wait for TeleportReadyEvent.
expectedEvents = append(expectedEvents, service.TeleportReadyEvent)
if i.Config.Auth.Enabled {
expectedEvents = append(expectedEvents, service.AuthTLSReady)
}
if i.Config.Proxy.Enabled {
expectedEvents = append(expectedEvents, service.ProxyReverseTunnelReady)
expectedEvents = append(expectedEvents, service.ProxySSHReady)
expectedEvents = append(expectedEvents, service.ProxyAgentPoolReady)
if !i.Config.Proxy.DisableWebService {
expectedEvents = append(expectedEvents, service.ProxyWebServerReady)
}
}
if i.Config.SSH.Enabled {
expectedEvents = append(expectedEvents, service.NodeSSHReady)
}
if i.Config.Apps.Enabled {
expectedEvents = append(expectedEvents, service.AppsReady)
}
if i.Config.Databases.Enabled {
expectedEvents = append(expectedEvents, service.DatabasesReady)
}

// Start the process and block until the expected events have arrived.
receivedEvents, err := startAndWait(i.Process, expectedEvents)
Expand Down
51 changes: 0 additions & 51 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ func TestIntegrations(t *testing.T) {
t.Run("PortForwarding", suite.bind(testPortForwarding))
t.Run("ProxyHostKeyCheck", suite.bind(testProxyHostKeyCheck))
t.Run("ReverseTunnelCollapse", suite.bind(testReverseTunnelCollapse))
t.Run("Readyz", suite.bind(testReadyz))
t.Run("RotateChangeSigningAlg", suite.bind(testRotateChangeSigningAlg))
t.Run("RotateRollback", suite.bind(testRotateRollback))
t.Run("RotateSuccess", suite.bind(testRotateSuccess))
Expand Down Expand Up @@ -3664,45 +3663,6 @@ func testPAM(t *testing.T, suite *integrationTestSuite) {
}
}

func testReadyz(t *testing.T, suite *integrationTestSuite) {
// TODO: test more service combinations

recConfig, err := types.NewSessionRecordingConfigFromConfigFile(types.SessionRecordingConfigSpecV2{
Mode: types.RecordOff,
})
require.NoError(t, err)

tconf := suite.defaultServiceConfig()
tconf.Auth.Enabled = true
tconf.Auth.SessionRecordingConfig = recConfig
tconf.Proxy.Enabled = true
tconf.Proxy.DisableWebInterface = true
tconf.Proxy.Kube.Enabled = true
// fire up the proxy kube service
tconf.Proxy.Kube.ListenAddr = utils.NetAddr{
AddrNetwork: "tcp",
Addr: "127.0.0.1:0",
}
tconf.SSH.Enabled = false
tconf.DiagnosticAddr = utils.NetAddr{
AddrNetwork: "tcp",
Addr: "127.0.0.1:0",
}

teleport := suite.newTeleportWithConfig(t, nil, nil, tconf)
t.Cleanup(func() { require.NoError(t, teleport.StopAll()) })

diagAddr, err := teleport.Process.DiagnosticAddr()
require.NoError(t, err)

require.Eventually(t, func() bool {
resp, err := http.Get(fmt.Sprintf("http://%s/readyz", diagAddr))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
return resp.StatusCode == http.StatusOK
}, 5*time.Second, 500*time.Millisecond)
}

// testRotateSuccess tests full cycle cert authority rotation
func testRotateSuccess(t *testing.T, suite *integrationTestSuite) {
tr := utils.NewTracer(utils.ThisFunction()).Start()
Expand Down Expand Up @@ -4403,17 +4363,6 @@ func waitForProcessStart(serviceC chan *service.TeleportProcess) (*service.Telep
dumpGoroutineProfile()
return nil, trace.BadParameter("timeout waiting for service to start")
}

eventC := make(chan service.Event, 1)
svc.WaitForEvent(context.TODO(), service.TeleportReadyEvent, eventC)
select {
case <-eventC:

case <-time.After(20 * time.Second):
dumpGoroutineProfile()
return nil, trace.BadParameter("timeout waiting for service to broadcast ready status")
}

return svc, nil
}

Expand Down
2 changes: 0 additions & 2 deletions lib/kube/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ func NewTLSServer(cfg TLSServerConfig) (*TLSServer, error) {
}
} else {
log.Debug("No local kube credentials on proxy, will not start kubernetes_service heartbeats")
// Report the component as being ready.
cfg.OnHeartbeat(nil)
}

return server, nil
Expand Down
4 changes: 0 additions & 4 deletions lib/service/desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,6 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus.
"Windows desktop service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, listener.Addr())
}

// since srv.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: WindowsDesktopReady, Payload: nil})
err := srv.Serve(listener)
if err != nil {
if err == http.ErrServerClosed {
Expand Down
2 changes: 0 additions & 2 deletions lib/service/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,6 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C
"Kubernetes service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, listener.Addr())
}
// since kubeServer.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: KubernetesReady, Payload: nil})
err := kubeServer.Serve(listener)
if err != nil {
Expand Down
113 changes: 40 additions & 73 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,6 @@ const (
// and is ready to start accepting connections.
ProxySSHReady = "ProxySSHReady"

// ProxyKubeReady is generated when the kubernetes proxy service has been initialized.
ProxyKubeReady = "ProxyKubeReady"

// NodeSSHReady is generated when the Teleport node has initialized a SSH server
// and is ready to start accepting SSH connections.
NodeSSHReady = "NodeReady"
Expand All @@ -175,6 +172,10 @@ const (
// is ready to start accepting connections.
DatabasesReady = "DatabasesReady"

// MetricsReady is generated when the Teleport metrics service is ready to
// start accepting connections.
MetricsReady = "MetricsReady"

// WindowsDesktopReady is generated when the Teleport windows desktop
// service is ready to start accepting connections.
WindowsDesktopReady = "WindowsDesktopReady"
Expand All @@ -188,7 +189,7 @@ const (
// in a graceful way.
TeleportReloadEvent = "TeleportReload"

// TeleportPhaseChangeEvent is generated to indicate that teleport
// TeleportPhaseChangeEvent is generated to indidate that teleport
// CA rotation phase has been updated, used in tests
TeleportPhaseChangeEvent = "TeleportPhaseChange"

Expand Down Expand Up @@ -511,20 +512,6 @@ func Run(ctx context.Context, cfg Config, newTeleport NewProcess) error {
if err := srv.Start(); err != nil {
return trace.Wrap(err, "startup failed")
}

// Wait for the service to report that it has started.
startTimeoutCtx, startCancel := context.WithTimeout(ctx, signalPipeTimeout)
defer startCancel()
eventC := make(chan Event, 1)
srv.WaitForEvent(startTimeoutCtx, TeleportReadyEvent, eventC)
select {
case <-eventC:
cfg.Log.Infof("Service has started successfully.")
case <-startTimeoutCtx.Done():
warnOnErr(srv.Close(), cfg.Log)
return trace.BadParameter("service has failed to start")
}

// Wait and reload until called exit.
for {
srv, err = waitAndReload(ctx, cfg, srv, newTeleport)
Expand Down Expand Up @@ -761,17 +748,14 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) {

process.registerAppDepend()

// Produce global TeleportReadyEvent when all components have started
componentCount := process.registerTeleportReadyEvent(cfg)

process.log = cfg.Log.WithFields(logrus.Fields{
trace.Component: teleport.Component(teleport.ComponentProcess, process.id),
})

serviceStarted := false

if !cfg.DiagnosticAddr.IsEmpty() {
if err := process.initDiagnosticService(componentCount); err != nil {
if err := process.initDiagnosticService(); err != nil {
return nil, trace.Wrap(err)
}
} else {
Expand All @@ -784,6 +768,37 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) {
cfg.Keygen = native.New(process.ExitContext())
}

// Produce global TeleportReadyEvent
// when all components have started
eventMapping := EventMapping{
Out: TeleportReadyEvent,
}
if cfg.Auth.Enabled {
eventMapping.In = append(eventMapping.In, AuthTLSReady)
}
if cfg.SSH.Enabled {
eventMapping.In = append(eventMapping.In, NodeSSHReady)
}
if cfg.Proxy.Enabled {
eventMapping.In = append(eventMapping.In, ProxySSHReady)
}
if cfg.Kube.Enabled {
eventMapping.In = append(eventMapping.In, KubernetesReady)
}
if cfg.Apps.Enabled {
eventMapping.In = append(eventMapping.In, AppsReady)
}
if cfg.Databases.Enabled {
eventMapping.In = append(eventMapping.In, DatabasesReady)
}
if cfg.Metrics.Enabled {
eventMapping.In = append(eventMapping.In, MetricsReady)
}
if cfg.WindowsDesktop.Enabled {
eventMapping.In = append(eventMapping.In, WindowsDesktopReady)
}
process.RegisterEventMapping(eventMapping)

if cfg.Auth.Enabled {
if err := process.initAuthService(); err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -1371,7 +1386,7 @@ func (process *TeleportProcess) initAuthService() error {
utils.Consolef(cfg.Console, log, teleport.ComponentAuth, "Auth service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, authAddr)

// since tlsServer.Serve is a blocking call, we emit this event right before
// since tlsServer.Serve is a blocking call, we emit this even right before
// the service has started
process.BroadcastEvent(Event{Name: AuthTLSReady, Payload: nil})
err := tlsServer.Serve()
Expand Down Expand Up @@ -2207,7 +2222,7 @@ func (process *TeleportProcess) initMetricsService() error {

// initDiagnosticService starts diagnostic service currently serving healthz
// and prometheus endpoints
func (process *TeleportProcess) initDiagnosticService(componentCount int) error {
func (process *TeleportProcess) initDiagnosticService() error {
mux := http.NewServeMux()

// support legacy metrics collection in the diagnostic service.
Expand Down Expand Up @@ -2238,7 +2253,7 @@ func (process *TeleportProcess) initDiagnosticService(componentCount int) error
// Create a state machine that will process and update the internal state of
// Teleport based off Events. Use this state machine to return return the
// status from the /readyz endpoint.
ps, err := newProcessState(process, componentCount)
ps, err := newProcessState(process)
if err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -3055,9 +3070,6 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
})

log.Infof("Starting Kube proxy on %v.", cfg.Proxy.Kube.ListenAddr.Addr)
// since kubeServer.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: ProxyKubeReady, Payload: nil})
err := kubeServer.Serve(listeners.kube)
if err != nil && err != http.ErrServerClosed {
log.Warningf("Kube TLS server exited with error: %v.", err)
Expand Down Expand Up @@ -3449,51 +3461,6 @@ func (process *TeleportProcess) waitForAppDepend() {
}
}

// registerTeleportReadyEvent ensures that a TeleportReadyEvent is produced
// when all components enabled (based on the configuration) have started.
// It returns the number of components enabled.
func (process *TeleportProcess) registerTeleportReadyEvent(cfg *Config) int {
eventMapping := EventMapping{
Out: TeleportReadyEvent,
}

if cfg.Auth.Enabled {
eventMapping.In = append(eventMapping.In, AuthTLSReady)
}

if cfg.SSH.Enabled {
eventMapping.In = append(eventMapping.In, NodeSSHReady)
}

proxyConfig := cfg.Proxy
if proxyConfig.Enabled {
eventMapping.In = append(eventMapping.In, ProxySSHReady)
}
if proxyConfig.Kube.Enabled && !proxyConfig.Kube.ListenAddr.IsEmpty() && !proxyConfig.DisableReverseTunnel {
eventMapping.In = append(eventMapping.In, ProxyKubeReady)
}

if cfg.Kube.Enabled {
eventMapping.In = append(eventMapping.In, KubernetesReady)
}

if cfg.Apps.Enabled {
eventMapping.In = append(eventMapping.In, AppsReady)
}

if cfg.Databases.Enabled {
eventMapping.In = append(eventMapping.In, DatabasesReady)
}

if cfg.WindowsDesktop.Enabled {
eventMapping.In = append(eventMapping.In, WindowsDesktopReady)
}

componentCount := len(eventMapping.In)
process.RegisterEventMapping(eventMapping)
return componentCount
}

// appDependEvents is a list of events that the application service depends on.
var appDependEvents = []string{
AuthTLSReady,
Expand Down
Loading