Skip to content

Commit

Permalink
Revert "Backport #11725 #11249 #11799 to branch/v9 (#11795)" (#12243)
Browse files Browse the repository at this point in the history
* Revert "Backport #11725 #11249 #11799 to branch/v9 (#11795)"

This reverts commit ea8ee94.

* Revert "Fix ProxyKube not reporting its readiness (#12152)"

This reverts commit ce301f0.
  • Loading branch information
r0mant authored Apr 26, 2022
1 parent b07fc5c commit 03bd197
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 297 deletions.
15 changes: 13 additions & 2 deletions integration/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,15 +1103,26 @@ func (i *TeleInstance) Start() error {
// Build a list of expected events to wait for before unblocking based off
// the configuration passed in.
expectedEvents := []string{}
// Always wait for TeleportReadyEvent.
expectedEvents = append(expectedEvents, service.TeleportReadyEvent)
if i.Config.Auth.Enabled {
expectedEvents = append(expectedEvents, service.AuthTLSReady)
}
if i.Config.Proxy.Enabled {
expectedEvents = append(expectedEvents, service.ProxyReverseTunnelReady)
expectedEvents = append(expectedEvents, service.ProxySSHReady)
expectedEvents = append(expectedEvents, service.ProxyAgentPoolReady)
if !i.Config.Proxy.DisableWebService {
expectedEvents = append(expectedEvents, service.ProxyWebServerReady)
}
}
if i.Config.SSH.Enabled {
expectedEvents = append(expectedEvents, service.NodeSSHReady)
}
if i.Config.Apps.Enabled {
expectedEvents = append(expectedEvents, service.AppsReady)
}
if i.Config.Databases.Enabled {
expectedEvents = append(expectedEvents, service.DatabasesReady)
}

// Start the process and block until the expected events have arrived.
receivedEvents, err := startAndWait(i.Process, expectedEvents)
Expand Down
51 changes: 0 additions & 51 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ func TestIntegrations(t *testing.T) {
t.Run("PortForwarding", suite.bind(testPortForwarding))
t.Run("ProxyHostKeyCheck", suite.bind(testProxyHostKeyCheck))
t.Run("ReverseTunnelCollapse", suite.bind(testReverseTunnelCollapse))
t.Run("Readyz", suite.bind(testReadyz))
t.Run("RotateChangeSigningAlg", suite.bind(testRotateChangeSigningAlg))
t.Run("RotateRollback", suite.bind(testRotateRollback))
t.Run("RotateSuccess", suite.bind(testRotateSuccess))
Expand Down Expand Up @@ -3666,45 +3665,6 @@ func testPAM(t *testing.T, suite *integrationTestSuite) {
}
}

func testReadyz(t *testing.T, suite *integrationTestSuite) {
// TODO: test more service combinations

recConfig, err := types.NewSessionRecordingConfigFromConfigFile(types.SessionRecordingConfigSpecV2{
Mode: types.RecordOff,
})
require.NoError(t, err)

tconf := suite.defaultServiceConfig()
tconf.Auth.Enabled = true
tconf.Auth.SessionRecordingConfig = recConfig
tconf.Proxy.Enabled = true
tconf.Proxy.DisableWebInterface = true
tconf.Proxy.Kube.Enabled = true
// fire up the proxy kube service
tconf.Proxy.Kube.ListenAddr = utils.NetAddr{
AddrNetwork: "tcp",
Addr: "127.0.0.1:0",
}
tconf.SSH.Enabled = false
tconf.DiagnosticAddr = utils.NetAddr{
AddrNetwork: "tcp",
Addr: "127.0.0.1:0",
}

teleport := suite.newTeleportWithConfig(t, nil, nil, tconf)
t.Cleanup(func() { require.NoError(t, teleport.StopAll()) })

diagAddr, err := teleport.Process.DiagnosticAddr()
require.NoError(t, err)

require.Eventually(t, func() bool {
resp, err := http.Get(fmt.Sprintf("http://%s/readyz", diagAddr))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
return resp.StatusCode == http.StatusOK
}, 5*time.Second, 500*time.Millisecond)
}

// testRotateSuccess tests full cycle cert authority rotation
func testRotateSuccess(t *testing.T, suite *integrationTestSuite) {
tr := utils.NewTracer(utils.ThisFunction()).Start()
Expand Down Expand Up @@ -4405,17 +4365,6 @@ func waitForProcessStart(serviceC chan *service.TeleportProcess) (*service.Telep
dumpGoroutineProfile()
return nil, trace.BadParameter("timeout waiting for service to start")
}

eventC := make(chan service.Event, 1)
svc.WaitForEvent(context.TODO(), service.TeleportReadyEvent, eventC)
select {
case <-eventC:

case <-time.After(20 * time.Second):
dumpGoroutineProfile()
return nil, trace.BadParameter("timeout waiting for service to broadcast ready status")
}

return svc, nil
}

Expand Down
2 changes: 0 additions & 2 deletions lib/kube/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ func NewTLSServer(cfg TLSServerConfig) (*TLSServer, error) {
}
} else {
log.Debug("No local kube credentials on proxy, will not start kubernetes_service heartbeats")
// Report the component as being ready.
cfg.OnHeartbeat(nil)
}

return server, nil
Expand Down
4 changes: 0 additions & 4 deletions lib/service/desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,6 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus.
"Windows desktop service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, listener.Addr())
}

// since srv.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: WindowsDesktopReady, Payload: nil})
err := srv.Serve(listener)
if err != nil {
if err == http.ErrServerClosed {
Expand Down
2 changes: 0 additions & 2 deletions lib/service/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,6 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C
"Kubernetes service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, listener.Addr())
}
// since kubeServer.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: KubernetesReady, Payload: nil})
err := kubeServer.Serve(listener)
if err != nil {
Expand Down
113 changes: 40 additions & 73 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,6 @@ const (
// and is ready to start accepting connections.
ProxySSHReady = "ProxySSHReady"

// ProxyKubeReady is generated when the kubernetes proxy service has been initialized.
ProxyKubeReady = "ProxyKubeReady"

// NodeSSHReady is generated when the Teleport node has initialized a SSH server
// and is ready to start accepting SSH connections.
NodeSSHReady = "NodeReady"
Expand All @@ -176,6 +173,10 @@ const (
// is ready to start accepting connections.
DatabasesReady = "DatabasesReady"

// MetricsReady is generated when the Teleport metrics service is ready to
// start accepting connections.
MetricsReady = "MetricsReady"

// WindowsDesktopReady is generated when the Teleport windows desktop
// service is ready to start accepting connections.
WindowsDesktopReady = "WindowsDesktopReady"
Expand All @@ -189,7 +190,7 @@ const (
// in a graceful way.
TeleportReloadEvent = "TeleportReload"

// TeleportPhaseChangeEvent is generated to indicate that teleport
// TeleportPhaseChangeEvent is generated to indidate that teleport
// CA rotation phase has been updated, used in tests
TeleportPhaseChangeEvent = "TeleportPhaseChange"

Expand Down Expand Up @@ -512,20 +513,6 @@ func Run(ctx context.Context, cfg Config, newTeleport NewProcess) error {
if err := srv.Start(); err != nil {
return trace.Wrap(err, "startup failed")
}

// Wait for the service to report that it has started.
startTimeoutCtx, startCancel := context.WithTimeout(ctx, signalPipeTimeout)
defer startCancel()
eventC := make(chan Event, 1)
srv.WaitForEvent(startTimeoutCtx, TeleportReadyEvent, eventC)
select {
case <-eventC:
cfg.Log.Infof("Service has started successfully.")
case <-startTimeoutCtx.Done():
warnOnErr(srv.Close(), cfg.Log)
return trace.BadParameter("service has failed to start")
}

// Wait and reload until called exit.
for {
srv, err = waitAndReload(ctx, cfg, srv, newTeleport)
Expand Down Expand Up @@ -759,17 +746,14 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) {

process.registerAppDepend()

// Produce global TeleportReadyEvent when all components have started
componentCount := process.registerTeleportReadyEvent(cfg)

process.log = cfg.Log.WithFields(logrus.Fields{
trace.Component: teleport.Component(teleport.ComponentProcess, process.id),
})

serviceStarted := false

if !cfg.DiagnosticAddr.IsEmpty() {
if err := process.initDiagnosticService(componentCount); err != nil {
if err := process.initDiagnosticService(); err != nil {
return nil, trace.Wrap(err)
}
} else {
Expand All @@ -788,6 +772,37 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) {
cfg.Keygen = native.New(process.ExitContext(), native.PrecomputeKeys(precomputeCount))
}

// Produce global TeleportReadyEvent
// when all components have started
eventMapping := EventMapping{
Out: TeleportReadyEvent,
}
if cfg.Auth.Enabled {
eventMapping.In = append(eventMapping.In, AuthTLSReady)
}
if cfg.SSH.Enabled {
eventMapping.In = append(eventMapping.In, NodeSSHReady)
}
if cfg.Proxy.Enabled {
eventMapping.In = append(eventMapping.In, ProxySSHReady)
}
if cfg.Kube.Enabled {
eventMapping.In = append(eventMapping.In, KubernetesReady)
}
if cfg.Apps.Enabled {
eventMapping.In = append(eventMapping.In, AppsReady)
}
if cfg.Databases.Enabled {
eventMapping.In = append(eventMapping.In, DatabasesReady)
}
if cfg.Metrics.Enabled {
eventMapping.In = append(eventMapping.In, MetricsReady)
}
if cfg.WindowsDesktop.Enabled {
eventMapping.In = append(eventMapping.In, WindowsDesktopReady)
}
process.RegisterEventMapping(eventMapping)

if cfg.Auth.Enabled {
if err := process.initAuthService(); err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -1379,7 +1394,7 @@ func (process *TeleportProcess) initAuthService() error {
utils.Consolef(cfg.Console, log, teleport.ComponentAuth, "Auth service %s:%s is starting on %v.",
teleport.Version, teleport.Gitref, authAddr)

// since tlsServer.Serve is a blocking call, we emit this event right before
// since tlsServer.Serve is a blocking call, we emit this even right before
// the service has started
process.BroadcastEvent(Event{Name: AuthTLSReady, Payload: nil})
err := tlsServer.Serve()
Expand Down Expand Up @@ -2221,7 +2236,7 @@ func (process *TeleportProcess) initMetricsService() error {

// initDiagnosticService starts diagnostic service currently serving healthz
// and prometheus endpoints
func (process *TeleportProcess) initDiagnosticService(componentCount int) error {
func (process *TeleportProcess) initDiagnosticService() error {
mux := http.NewServeMux()

// support legacy metrics collection in the diagnostic service.
Expand Down Expand Up @@ -2252,7 +2267,7 @@ func (process *TeleportProcess) initDiagnosticService(componentCount int) error
// Create a state machine that will process and update the internal state of
// Teleport based off Events. Use this state machine to return return the
// status from the /readyz endpoint.
ps, err := newProcessState(process, componentCount)
ps, err := newProcessState(process)
if err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -3069,9 +3084,6 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
})

log.Infof("Starting Kube proxy on %v.", cfg.Proxy.Kube.ListenAddr.Addr)
// since kubeServer.Serve is a blocking call, we emit this event right before
// the service has started
process.BroadcastEvent(Event{Name: ProxyKubeReady, Payload: nil})
err := kubeServer.Serve(listeners.kube)
if err != nil && err != http.ErrServerClosed {
log.Warningf("Kube TLS server exited with error: %v.", err)
Expand Down Expand Up @@ -3463,51 +3475,6 @@ func (process *TeleportProcess) waitForAppDepend() {
}
}

// registerTeleportReadyEvent ensures that a TeleportReadyEvent is produced
// when all components enabled (based on the configuration) have started.
// It returns the number of components enabled.
func (process *TeleportProcess) registerTeleportReadyEvent(cfg *Config) int {
eventMapping := EventMapping{
Out: TeleportReadyEvent,
}

if cfg.Auth.Enabled {
eventMapping.In = append(eventMapping.In, AuthTLSReady)
}

if cfg.SSH.Enabled {
eventMapping.In = append(eventMapping.In, NodeSSHReady)
}

proxyConfig := cfg.Proxy
if proxyConfig.Enabled {
eventMapping.In = append(eventMapping.In, ProxySSHReady)
}
if proxyConfig.Kube.Enabled && !proxyConfig.Kube.ListenAddr.IsEmpty() && !proxyConfig.DisableReverseTunnel {
eventMapping.In = append(eventMapping.In, ProxyKubeReady)
}

if cfg.Kube.Enabled {
eventMapping.In = append(eventMapping.In, KubernetesReady)
}

if cfg.Apps.Enabled {
eventMapping.In = append(eventMapping.In, AppsReady)
}

if cfg.Databases.Enabled {
eventMapping.In = append(eventMapping.In, DatabasesReady)
}

if cfg.WindowsDesktop.Enabled {
eventMapping.In = append(eventMapping.In, WindowsDesktopReady)
}

componentCount := len(eventMapping.In)
process.RegisterEventMapping(eventMapping)
return componentCount
}

// appDependEvents is a list of events that the application service depends on.
var appDependEvents = []string{
AuthTLSReady,
Expand Down
Loading

0 comments on commit 03bd197

Please sign in to comment.