Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve HA behavior of application agents in leaf clusters #10734

Merged
merged 5 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 104 additions & 48 deletions integration/app_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,18 +635,53 @@ func TestAppAuditEvents(t *testing.T) {

func TestAppServersHA(t *testing.T) {
testCases := map[string]struct {
publicAddr func(pack *pack) string
makeRequest func(pack *pack, inCookie string) (status int, err error)
packInfo func(pack *pack) (cluterName, publicAddr string, appServers []*service.TeleportProcess)
startAppServers func(pack *pack, count int) []*service.TeleportProcess
makeRequest func(pack *pack, inCookie string) (status int, err error)
}{
"HTTPApp": {
publicAddr: func(pack *pack) string { return pack.rootAppPublicAddr },
"RootHTTPApp": {
packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) {
return pack.rootAppClusterName, pack.rootAppPublicAddr, pack.rootAppServers
},
startAppServers: func(pack *pack, count int) []*service.TeleportProcess {
return pack.startRootAppServers(t, count, []service.App{})
},
makeRequest: func(pack *pack, inCookie string) (int, error) {
status, _, err := pack.makeRequest(inCookie, http.MethodGet, "/")
return status, err
},
},
"RootWebSocketApp": {
packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) {
return pack.rootAppClusterName, pack.rootWSPublicAddr, pack.rootAppServers
},
startAppServers: func(pack *pack, count int) []*service.TeleportProcess {
return pack.startRootAppServers(t, count, []service.App{})
},
makeRequest: func(pack *pack, inCookie string) (int, error) {
_, err := pack.makeWebsocketRequest(inCookie, "/")
return 0, err
},
},
"LeafHTTPApp": {
packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) {
return pack.leafAppClusterName, pack.leafAppPublicAddr, pack.leafAppServers
},
startAppServers: func(pack *pack, count int) []*service.TeleportProcess {
return pack.startLeafAppServers(t, count, []service.App{})
},
makeRequest: func(pack *pack, inCookie string) (int, error) {
status, _, err := pack.makeRequest(inCookie, http.MethodGet, "/")
return status, err
},
},
"WebSocketApp": {
publicAddr: func(pack *pack) string { return pack.rootWSPublicAddr },
"LeafWebSocketApp": {
packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) {
return pack.leafAppClusterName, pack.leafWSPublicAddr, pack.leafAppServers
},
startAppServers: func(pack *pack, count int) []*service.TeleportProcess {
return pack.startLeafAppServers(t, count, []service.App{})
},
makeRequest: func(pack *pack, inCookie string) (int, error) {
_, err := pack.makeWebsocketRequest(inCookie, "/")
return 0, err
Expand Down Expand Up @@ -678,18 +713,19 @@ func TestAppServersHA(t *testing.T) {
for name, test := range testCases {
t.Run(name, func(t *testing.T) {
pack := setupWithOptions(t, appTestOptions{rootAppServersCount: 3})
inCookie := pack.createAppSession(t, test.publicAddr(pack), pack.rootAppClusterName)
clusterName, publicAddr, appServers := test.packInfo(pack)

inCookie := pack.createAppSession(t, publicAddr, clusterName)
status, err := test.makeRequest(pack, inCookie)
responseWithoutError(t, status, err)

// Stop all root app servers.
for i, appServer := range pack.rootAppServers {
for i, appServer := range appServers {
appServer.Close()

// issue a request right after a server is gone.
status, err = test.makeRequest(pack, inCookie)
if i == len(pack.rootAppServers)-1 {
if i == len(appServers)-1 {
// fails only when the last one is closed.
responseWithError(t, status, err)
} else {
Expand All @@ -699,13 +735,13 @@ func TestAppServersHA(t *testing.T) {
}
}

servers := pack.startRootAppServers(t, 3, []service.App{})
servers := test.startAppServers(pack, 3)
status, err = test.makeRequest(pack, inCookie)
responseWithoutError(t, status, err)

// Start an additional app server and stop all current running
// ones.
pack.startRootAppServers(t, 1, []service.App{})
test.startAppServers(pack, 1)
for _, appServer := range servers {
appServer.Close()

Expand Down Expand Up @@ -756,8 +792,8 @@ type pack struct {
jwtAppClusterName string
jwtAppURI string

leafCluster *TeleInstance
leafAppServer *service.TeleportProcess
leafCluster *TeleInstance
leafAppServers []*service.TeleportProcess

leafAppName string
leafAppPublicAddr string
Expand Down Expand Up @@ -794,6 +830,7 @@ type appTestOptions struct {
rootClusterPorts *InstancePorts
leafClusterPorts *InstancePorts
rootAppServersCount int
leafAppServersCount int

rootConfig func(config *service.Config)
leafConfig func(config *service.Config)
Expand Down Expand Up @@ -1023,42 +1060,12 @@ func setupWithOptions(t *testing.T, opts appTestOptions) *pack {
}
p.rootAppServers = p.startRootAppServers(t, rootAppServersCount, opts.extraRootApps)

laConf := service.MakeDefaultConfig()
laConf.Console = nil
laConf.Log = log
laConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(laConf.DataDir) })
laConf.Token = "static-token-value"
laConf.AuthServers = []utils.NetAddr{
{
AddrNetwork: "tcp",
Addr: net.JoinHostPort(Loopback, p.leafCluster.GetPortWeb()),
},
// At least one leafAppServer should start during the setup
leafAppServersCount := 1
if opts.leafAppServersCount > 0 {
leafAppServersCount = opts.leafAppServersCount
}
laConf.Auth.Enabled = false
laConf.Proxy.Enabled = false
laConf.SSH.Enabled = false
laConf.Apps.Enabled = true
laConf.Apps.Apps = append([]service.App{
{
Name: p.leafAppName,
URI: leafServer.URL,
PublicAddr: p.leafAppPublicAddr,
},
{
Name: p.leafWSAppName,
URI: leafWSServer.URL,
PublicAddr: p.leafWSPublicAddr,
},
{
Name: p.leafWSSAppName,
URI: leafWSSServer.URL,
PublicAddr: p.leafWSSPublicAddr,
},
}, opts.extraLeafApps...)
p.leafAppServer, err = p.leafCluster.StartApp(laConf)
require.NoError(t, err)
t.Cleanup(func() { p.leafAppServer.Close() })
p.leafAppServers = p.startLeafAppServers(t, leafAppServersCount, opts.extraRootApps)

// Create user for tests.
p.initUser(t, opts)
Expand Down Expand Up @@ -1511,6 +1518,55 @@ func (p *pack) startRootAppServers(t *testing.T, count int, extraApps []service.
return servers
}

func (p *pack) startLeafAppServers(t *testing.T, count int, extraApps []service.App) []*service.TeleportProcess {
log := utils.NewLoggerForTests()
servers := make([]*service.TeleportProcess, count)

for i := 0; i < count; i++ {
laConf := service.MakeDefaultConfig()
laConf.Console = nil
laConf.Log = log
laConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(laConf.DataDir) })
laConf.Token = "static-token-value"
laConf.AuthServers = []utils.NetAddr{
{
AddrNetwork: "tcp",
Addr: net.JoinHostPort(Loopback, p.leafCluster.GetPortWeb()),
},
}
laConf.Auth.Enabled = false
laConf.Proxy.Enabled = false
laConf.SSH.Enabled = false
laConf.Apps.Enabled = true
laConf.Apps.Apps = append([]service.App{
{
Name: p.leafAppName,
URI: p.leafAppURI,
PublicAddr: p.leafAppPublicAddr,
},
{
Name: p.leafWSAppName,
URI: p.leafWSAppURI,
PublicAddr: p.leafWSPublicAddr,
},
{
Name: p.leafWSSAppName,
URI: p.leafWSSAppURI,
PublicAddr: p.leafWSSPublicAddr,
},
}, extraApps...)

appServer, err := p.leafCluster.StartApp(laConf)
require.NoError(t, err)
t.Cleanup(func() { appServer.Close() })

servers[i] = appServer
}

return servers
}

var forwardedHeaderNames = []string{
teleport.AppJWTHeader,
teleport.AppCFHeader,
Expand Down
11 changes: 9 additions & 2 deletions lib/web/app/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net"
"net/http"
"strings"
"sync"

"github.com/gravitational/teleport/api/constants"
Expand Down Expand Up @@ -189,8 +190,7 @@ func (t *transport) DialContext(ctx context.Context, _, _ string) (net.Conn, err
var dialErr error
conn, dialErr = dialAppServer(t.c.proxyClient, t.c.identity, appServer)
if dialErr != nil {
// Connection problem with the server.
if trace.IsConnectionProblem(dialErr) {
if isReverseTunnelDownError(dialErr) {
t.c.log.Warnf("Failed to connect to application server %q: %v.", serverID, dialErr)
t.servers.Delete(serverID)
// Only goes for the next server if the error returned is a
Expand Down Expand Up @@ -283,3 +283,10 @@ func configureTLS(c *transportConfig) (*tls.Config, error) {

return tlsConfig, nil
}

// isReverseTunnelDownError returns true if the provided error indicates that
// the reverse tunnel connection is down e.g. because the agent is down.
func isReverseTunnelDownError(err error) bool {
return trace.IsConnectionProblem(err) ||
strings.Contains(err.Error(), reversetunnel.NoApplicationTunnel)
}