From 64fa50bc3ac26d0f475983975db4ad81da71b046 Mon Sep 17 00:00:00 2001 From: Gabriel Corado Date: Wed, 2 Mar 2022 12:01:43 -0300 Subject: [PATCH 1/2] feat(app): consider reverse tunnel errors in apps HA mechanism --- integration/app_integration_test.go | 152 +++++++++++++++++++--------- lib/web/app/transport.go | 11 +- 2 files changed, 113 insertions(+), 50 deletions(-) diff --git a/integration/app_integration_test.go b/integration/app_integration_test.go index 8ec91f3866f8a..e7f0eefe5d3b3 100644 --- a/integration/app_integration_test.go +++ b/integration/app_integration_test.go @@ -635,18 +635,53 @@ func TestAppAuditEvents(t *testing.T) { func TestAppServersHA(t *testing.T) { testCases := map[string]struct { - publicAddr func(pack *pack) string - makeRequest func(pack *pack, inCookie string) (status int, err error) + packInfo func(pack *pack) (cluterName, publicAddr string, appServers []*service.TeleportProcess) + startAppServers func(pack *pack, count int) []*service.TeleportProcess + makeRequest func(pack *pack, inCookie string) (status int, err error) }{ - "HTTPApp": { - publicAddr: func(pack *pack) string { return pack.rootAppPublicAddr }, + "RootHTTPApp": { + packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) { + return pack.rootAppClusterName, pack.rootAppPublicAddr, pack.rootAppServers + }, + startAppServers: func(pack *pack, count int) []*service.TeleportProcess { + return pack.startRootAppServers(t, count, []service.App{}) + }, + makeRequest: func(pack *pack, inCookie string) (int, error) { + status, _, err := pack.makeRequest(inCookie, http.MethodGet, "/") + return status, err + }, + }, + "RootWebSocketApp": { + packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) { + return pack.rootAppClusterName, pack.rootWSPublicAddr, pack.rootAppServers + }, + startAppServers: func(pack *pack, count int) []*service.TeleportProcess { + return pack.startRootAppServers(t, count, []service.App{}) + }, + makeRequest: func(pack *pack, inCookie string) (int, error) { + _, err := pack.makeWebsocketRequest(inCookie, "/") + return 0, err + }, + }, + "LeafHTTPApp": { + packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) { + return pack.leafAppClusterName, pack.leafAppPublicAddr, pack.leafAppServers + }, + startAppServers: func(pack *pack, count int) []*service.TeleportProcess { + return pack.startLeafAppServers(t, count, []service.App{}) + }, makeRequest: func(pack *pack, inCookie string) (int, error) { status, _, err := pack.makeRequest(inCookie, http.MethodGet, "/") return status, err }, }, - "WebSocketApp": { - publicAddr: func(pack *pack) string { return pack.rootWSPublicAddr }, + "LeafWebSocketApp": { + packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) { + return pack.leafAppClusterName, pack.leafWSPublicAddr, pack.leafAppServers + }, + startAppServers: func(pack *pack, count int) []*service.TeleportProcess { + return pack.startLeafAppServers(t, count, []service.App{}) + }, makeRequest: func(pack *pack, inCookie string) (int, error) { _, err := pack.makeWebsocketRequest(inCookie, "/") return 0, err @@ -678,18 +713,19 @@ func TestAppServersHA(t *testing.T) { for name, test := range testCases { t.Run(name, func(t *testing.T) { pack := setupWithOptions(t, appTestOptions{rootAppServersCount: 3}) - inCookie := pack.createAppSession(t, test.publicAddr(pack), pack.rootAppClusterName) + clusterName, publicAddr, appServers := test.packInfo(pack) + inCookie := pack.createAppSession(t, publicAddr, clusterName) status, err := test.makeRequest(pack, inCookie) responseWithoutError(t, status, err) // Stop all root app servers. - for i, appServer := range pack.rootAppServers { + for i, appServer := range appServers { appServer.Close() // issue a request right after a server is gone. status, err = test.makeRequest(pack, inCookie) - if i == len(pack.rootAppServers)-1 { + if i == len(appServers)-1 { // fails only when the last one is closed. responseWithError(t, status, err) } else { @@ -699,13 +735,13 @@ func TestAppServersHA(t *testing.T) { } } - servers := pack.startRootAppServers(t, 3, []service.App{}) + servers := test.startAppServers(pack, 3) status, err = test.makeRequest(pack, inCookie) responseWithoutError(t, status, err) // Start an additional app server and stop all current running // ones. - pack.startRootAppServers(t, 1, []service.App{}) + test.startAppServers(pack, 1) for _, appServer := range servers { appServer.Close() @@ -756,8 +792,8 @@ type pack struct { jwtAppClusterName string jwtAppURI string - leafCluster *TeleInstance - leafAppServer *service.TeleportProcess + leafCluster *TeleInstance + leafAppServers []*service.TeleportProcess leafAppName string leafAppPublicAddr string @@ -794,6 +830,7 @@ type appTestOptions struct { rootClusterPorts *InstancePorts leafClusterPorts *InstancePorts rootAppServersCount int + leafAppServersCount int rootConfig func(config *service.Config) leafConfig func(config *service.Config) @@ -1023,42 +1060,12 @@ func setupWithOptions(t *testing.T, opts appTestOptions) *pack { } p.rootAppServers = p.startRootAppServers(t, rootAppServersCount, opts.extraRootApps) - laConf := service.MakeDefaultConfig() - laConf.Console = nil - laConf.Log = log - laConf.DataDir = t.TempDir() - t.Cleanup(func() { os.RemoveAll(laConf.DataDir) }) - laConf.Token = "static-token-value" - laConf.AuthServers = []utils.NetAddr{ - { - AddrNetwork: "tcp", - Addr: net.JoinHostPort(Loopback, p.leafCluster.GetPortWeb()), - }, + // At least one leafAppServer should start during the setup + leafAppServersCount := 1 + if opts.leafAppServersCount > 0 { + leafAppServersCount = opts.leafAppServersCount } - laConf.Auth.Enabled = false - laConf.Proxy.Enabled = false - laConf.SSH.Enabled = false - laConf.Apps.Enabled = true - laConf.Apps.Apps = append([]service.App{ - { - Name: p.leafAppName, - URI: leafServer.URL, - PublicAddr: p.leafAppPublicAddr, - }, - { - Name: p.leafWSAppName, - URI: leafWSServer.URL, - PublicAddr: p.leafWSPublicAddr, - }, - { - Name: p.leafWSSAppName, - URI: leafWSSServer.URL, - PublicAddr: p.leafWSSPublicAddr, - }, - }, opts.extraLeafApps...) - p.leafAppServer, err = p.leafCluster.StartApp(laConf) - require.NoError(t, err) - t.Cleanup(func() { p.leafAppServer.Close() }) + p.leafAppServers = p.startLeafAppServers(t, leafAppServersCount, opts.extraRootApps) // Create user for tests. p.initUser(t, opts) @@ -1511,6 +1518,55 @@ func (p *pack) startRootAppServers(t *testing.T, count int, extraApps []service. return servers } +func (p *pack) startLeafAppServers(t *testing.T, count int, extraApps []service.App) []*service.TeleportProcess { + log := utils.NewLoggerForTests() + servers := make([]*service.TeleportProcess, count) + + for i := 0; i < count; i++ { + laConf := service.MakeDefaultConfig() + laConf.Console = nil + laConf.Log = log + laConf.DataDir = t.TempDir() + t.Cleanup(func() { os.RemoveAll(laConf.DataDir) }) + laConf.Token = "static-token-value" + laConf.AuthServers = []utils.NetAddr{ + { + AddrNetwork: "tcp", + Addr: net.JoinHostPort(Loopback, p.leafCluster.GetPortWeb()), + }, + } + laConf.Auth.Enabled = false + laConf.Proxy.Enabled = false + laConf.SSH.Enabled = false + laConf.Apps.Enabled = true + laConf.Apps.Apps = append([]service.App{ + { + Name: p.leafAppName, + URI: p.leafAppURI, + PublicAddr: p.leafAppPublicAddr, + }, + { + Name: p.leafWSAppName, + URI: p.leafWSAppURI, + PublicAddr: p.leafWSPublicAddr, + }, + { + Name: p.leafWSSAppName, + URI: p.leafWSSAppURI, + PublicAddr: p.leafWSSPublicAddr, + }, + }, extraApps...) + + appServer, err := p.leafCluster.StartApp(laConf) + require.NoError(t, err) + t.Cleanup(func() { appServer.Close() }) + + servers[i] = appServer + } + + return servers +} + var forwardedHeaderNames = []string{ teleport.AppJWTHeader, teleport.AppCFHeader, diff --git a/lib/web/app/transport.go b/lib/web/app/transport.go index b8a63e00d5d72..0055db3ae1ede 100644 --- a/lib/web/app/transport.go +++ b/lib/web/app/transport.go @@ -22,6 +22,7 @@ import ( "fmt" "net" "net/http" + "strings" "sync" "github.com/gravitational/teleport/api/constants" @@ -189,8 +190,7 @@ func (t *transport) DialContext(ctx context.Context, _, _ string) (net.Conn, err var dialErr error conn, dialErr = dialAppServer(t.c.proxyClient, t.c.identity, appServer) if dialErr != nil { - // Connection problem with the server. - if trace.IsConnectionProblem(dialErr) { + if isReverseTunnelDownError(dialErr) { t.c.log.Warnf("Failed to connect to application server %q: %v.", serverID, dialErr) t.servers.Delete(serverID) // Only goes for the next server if the error returned is a @@ -283,3 +283,10 @@ func configureTLS(c *transportConfig) (*tls.Config, error) { return tlsConfig, nil } + +// isReverseTunnelDownError returns true if the provided error indicates that +// the reverse tunnel connection is down e.g. because the agent is down. +func isReverseTunnelDownError(err error) bool { + return trace.IsConnectionProblem(err) || + strings.Contains(err.Error(), reversetunnel.NoApplicationTunnel) +} From e30e8e8e83d9ee2caef393721a66a5ac64a14d25 Mon Sep 17 00:00:00 2001 From: Gabriel Corado Date: Fri, 4 Mar 2022 12:54:30 -0300 Subject: [PATCH 2/2] test(integration): fix extra apps leaf cluster --- integration/app_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/app_integration_test.go b/integration/app_integration_test.go index e7f0eefe5d3b3..2255212373b35 100644 --- a/integration/app_integration_test.go +++ b/integration/app_integration_test.go @@ -1065,7 +1065,7 @@ func setupWithOptions(t *testing.T, opts appTestOptions) *pack { if opts.leafAppServersCount > 0 { leafAppServersCount = opts.leafAppServersCount } - p.leafAppServers = p.startLeafAppServers(t, leafAppServersCount, opts.extraRootApps) + p.leafAppServers = p.startLeafAppServers(t, leafAppServersCount, opts.extraLeafApps) // Create user for tests. p.initUser(t, opts)