Skip to content

Commit

Permalink
feat(app): consider reverse tunnel errors in apps HA mechanism (#10734)…
Browse files Browse the repository at this point in the history
… (#10906)
  • Loading branch information
gabrielcorado authored Mar 8, 2022
1 parent dbdaa7f commit 61f8af2
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 50 deletions.
152 changes: 104 additions & 48 deletions integration/app_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,18 +635,53 @@ func TestAppAuditEvents(t *testing.T) {

func TestAppServersHA(t *testing.T) {
testCases := map[string]struct {
publicAddr func(pack *pack) string
makeRequest func(pack *pack, inCookie string) (status int, err error)
packInfo func(pack *pack) (cluterName, publicAddr string, appServers []*service.TeleportProcess)
startAppServers func(pack *pack, count int) []*service.TeleportProcess
makeRequest func(pack *pack, inCookie string) (status int, err error)
}{
"HTTPApp": {
publicAddr: func(pack *pack) string { return pack.rootAppPublicAddr },
"RootHTTPApp": {
packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) {
return pack.rootAppClusterName, pack.rootAppPublicAddr, pack.rootAppServers
},
startAppServers: func(pack *pack, count int) []*service.TeleportProcess {
return pack.startRootAppServers(t, count, []service.App{})
},
makeRequest: func(pack *pack, inCookie string) (int, error) {
status, _, err := pack.makeRequest(inCookie, http.MethodGet, "/")
return status, err
},
},
"RootWebSocketApp": {
packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) {
return pack.rootAppClusterName, pack.rootWSPublicAddr, pack.rootAppServers
},
startAppServers: func(pack *pack, count int) []*service.TeleportProcess {
return pack.startRootAppServers(t, count, []service.App{})
},
makeRequest: func(pack *pack, inCookie string) (int, error) {
_, err := pack.makeWebsocketRequest(inCookie, "/")
return 0, err
},
},
"LeafHTTPApp": {
packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) {
return pack.leafAppClusterName, pack.leafAppPublicAddr, pack.leafAppServers
},
startAppServers: func(pack *pack, count int) []*service.TeleportProcess {
return pack.startLeafAppServers(t, count, []service.App{})
},
makeRequest: func(pack *pack, inCookie string) (int, error) {
status, _, err := pack.makeRequest(inCookie, http.MethodGet, "/")
return status, err
},
},
"WebSocketApp": {
publicAddr: func(pack *pack) string { return pack.rootWSPublicAddr },
"LeafWebSocketApp": {
packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) {
return pack.leafAppClusterName, pack.leafWSPublicAddr, pack.leafAppServers
},
startAppServers: func(pack *pack, count int) []*service.TeleportProcess {
return pack.startLeafAppServers(t, count, []service.App{})
},
makeRequest: func(pack *pack, inCookie string) (int, error) {
_, err := pack.makeWebsocketRequest(inCookie, "/")
return 0, err
Expand Down Expand Up @@ -678,18 +713,19 @@ func TestAppServersHA(t *testing.T) {
for name, test := range testCases {
t.Run(name, func(t *testing.T) {
pack := setupWithOptions(t, appTestOptions{rootAppServersCount: 3})
inCookie := pack.createAppSession(t, test.publicAddr(pack), pack.rootAppClusterName)
clusterName, publicAddr, appServers := test.packInfo(pack)

inCookie := pack.createAppSession(t, publicAddr, clusterName)
status, err := test.makeRequest(pack, inCookie)
responseWithoutError(t, status, err)

// Stop all root app servers.
for i, appServer := range pack.rootAppServers {
for i, appServer := range appServers {
appServer.Close()

// issue a request right after a server is gone.
status, err = test.makeRequest(pack, inCookie)
if i == len(pack.rootAppServers)-1 {
if i == len(appServers)-1 {
// fails only when the last one is closed.
responseWithError(t, status, err)
} else {
Expand All @@ -699,13 +735,13 @@ func TestAppServersHA(t *testing.T) {
}
}

servers := pack.startRootAppServers(t, 3, []service.App{})
servers := test.startAppServers(pack, 3)
status, err = test.makeRequest(pack, inCookie)
responseWithoutError(t, status, err)

// Start an additional app server and stop all current running
// ones.
pack.startRootAppServers(t, 1, []service.App{})
test.startAppServers(pack, 1)
for _, appServer := range servers {
appServer.Close()

Expand Down Expand Up @@ -756,8 +792,8 @@ type pack struct {
jwtAppClusterName string
jwtAppURI string

leafCluster *TeleInstance
leafAppServer *service.TeleportProcess
leafCluster *TeleInstance
leafAppServers []*service.TeleportProcess

leafAppName string
leafAppPublicAddr string
Expand Down Expand Up @@ -794,6 +830,7 @@ type appTestOptions struct {
rootClusterPorts *InstancePorts
leafClusterPorts *InstancePorts
rootAppServersCount int
leafAppServersCount int

rootConfig func(config *service.Config)
leafConfig func(config *service.Config)
Expand Down Expand Up @@ -1023,42 +1060,12 @@ func setupWithOptions(t *testing.T, opts appTestOptions) *pack {
}
p.rootAppServers = p.startRootAppServers(t, rootAppServersCount, opts.extraRootApps)

laConf := service.MakeDefaultConfig()
laConf.Console = nil
laConf.Log = log
laConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(laConf.DataDir) })
laConf.Token = "static-token-value"
laConf.AuthServers = []utils.NetAddr{
{
AddrNetwork: "tcp",
Addr: net.JoinHostPort(Loopback, p.leafCluster.GetPortWeb()),
},
// At least one leafAppServer should start during the setup
leafAppServersCount := 1
if opts.leafAppServersCount > 0 {
leafAppServersCount = opts.leafAppServersCount
}
laConf.Auth.Enabled = false
laConf.Proxy.Enabled = false
laConf.SSH.Enabled = false
laConf.Apps.Enabled = true
laConf.Apps.Apps = append([]service.App{
{
Name: p.leafAppName,
URI: leafServer.URL,
PublicAddr: p.leafAppPublicAddr,
},
{
Name: p.leafWSAppName,
URI: leafWSServer.URL,
PublicAddr: p.leafWSPublicAddr,
},
{
Name: p.leafWSSAppName,
URI: leafWSSServer.URL,
PublicAddr: p.leafWSSPublicAddr,
},
}, opts.extraLeafApps...)
p.leafAppServer, err = p.leafCluster.StartApp(laConf)
require.NoError(t, err)
t.Cleanup(func() { p.leafAppServer.Close() })
p.leafAppServers = p.startLeafAppServers(t, leafAppServersCount, opts.extraLeafApps)

// Create user for tests.
p.initUser(t, opts)
Expand Down Expand Up @@ -1511,6 +1518,55 @@ func (p *pack) startRootAppServers(t *testing.T, count int, extraApps []service.
return servers
}

func (p *pack) startLeafAppServers(t *testing.T, count int, extraApps []service.App) []*service.TeleportProcess {
log := utils.NewLoggerForTests()
servers := make([]*service.TeleportProcess, count)

for i := 0; i < count; i++ {
laConf := service.MakeDefaultConfig()
laConf.Console = nil
laConf.Log = log
laConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(laConf.DataDir) })
laConf.Token = "static-token-value"
laConf.AuthServers = []utils.NetAddr{
{
AddrNetwork: "tcp",
Addr: net.JoinHostPort(Loopback, p.leafCluster.GetPortWeb()),
},
}
laConf.Auth.Enabled = false
laConf.Proxy.Enabled = false
laConf.SSH.Enabled = false
laConf.Apps.Enabled = true
laConf.Apps.Apps = append([]service.App{
{
Name: p.leafAppName,
URI: p.leafAppURI,
PublicAddr: p.leafAppPublicAddr,
},
{
Name: p.leafWSAppName,
URI: p.leafWSAppURI,
PublicAddr: p.leafWSPublicAddr,
},
{
Name: p.leafWSSAppName,
URI: p.leafWSSAppURI,
PublicAddr: p.leafWSSPublicAddr,
},
}, extraApps...)

appServer, err := p.leafCluster.StartApp(laConf)
require.NoError(t, err)
t.Cleanup(func() { appServer.Close() })

servers[i] = appServer
}

return servers
}

var forwardedHeaderNames = []string{
teleport.AppJWTHeader,
teleport.AppCFHeader,
Expand Down
11 changes: 9 additions & 2 deletions lib/web/app/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net"
"net/http"
"strings"
"sync"

"github.com/gravitational/teleport/api/constants"
Expand Down Expand Up @@ -189,8 +190,7 @@ func (t *transport) DialContext(ctx context.Context, _, _ string) (net.Conn, err
var dialErr error
conn, dialErr = dialAppServer(t.c.proxyClient, t.c.identity, appServer)
if dialErr != nil {
// Connection problem with the server.
if trace.IsConnectionProblem(dialErr) {
if isReverseTunnelDownError(dialErr) {
t.c.log.Warnf("Failed to connect to application server %q: %v.", serverID, dialErr)
t.servers.Delete(serverID)
// Only goes for the next server if the error returned is a
Expand Down Expand Up @@ -283,3 +283,10 @@ func configureTLS(c *transportConfig) (*tls.Config, error) {

return tlsConfig, nil
}

// isReverseTunnelDownError returns true if the provided error indicates that
// the reverse tunnel connection is down e.g. because the agent is down.
func isReverseTunnelDownError(err error) bool {
return trace.IsConnectionProblem(err) ||
strings.Contains(err.Error(), reversetunnel.NoApplicationTunnel)
}

0 comments on commit 61f8af2

Please sign in to comment.