Skip to content

Commit

Permalink
* Push window size changes to clients instead of polling.
Browse files Browse the repository at this point in the history
* Cache services.ClusterConfig within srv.ServerContext for the duration
  of a connection.
* Create a single websocket between the browser and the proxy for all
* terminal bytes and events.
  • Loading branch information
russjones committed May 4, 2018
1 parent a819018 commit 876e04a
Show file tree
Hide file tree
Showing 20 changed files with 1,612 additions and 1,214 deletions.
6 changes: 6 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,9 @@ const (
// SharedDirMode is a mode for a directory shared with group
SharedDirMode = 0750
)

const (
// SessionEvent is sent by servers to clients when an audit event occurs on
// the session.
SessionEvent = "x-teleport-event"
)
114 changes: 114 additions & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2525,6 +2525,120 @@ func runAndMatch(tc *client.TeleportClient, attempts int, command []string, patt
return err
}

// TestWindowChange checks if custom Teleport window change requests are sent
// when the server side PTY changes its size.
func (s *IntSuite) TestWindowChange(c *check.C) {
t := s.newTeleport(c, nil, true)
defer t.Stop(true)

site := t.GetSiteAPI(Site)
c.Assert(site, check.NotNil)

personA := NewTerminal(250)
personB := NewTerminal(250)

// openSession will open a new session on a server.
openSession := func() {
cl, err := t.NewClient(ClientConfig{
Login: s.me.Username,
Cluster: Site,
Host: Host,
Port: t.GetPortSSHInt(),
})
c.Assert(err, check.IsNil)

cl.Stdout = &personA
cl.Stdin = &personA

err = cl.SSH(context.TODO(), []string{}, false)
c.Assert(err, check.IsNil)
}

// joinSession will join the existing session on a server.
joinSession := func() {
// Find the existing session in the backend.
var sessionID string
for {
time.Sleep(time.Millisecond)
sessions, _ := site.GetSessions(defaults.Namespace)
if len(sessions) == 0 {
continue
}
sessionID = string(sessions[0].ID)
break
}

cl, err := t.NewClient(ClientConfig{
Login: s.me.Username,
Cluster: Site,
Host: Host,
Port: t.GetPortSSHInt(),
})
c.Assert(err, check.IsNil)

cl.Stdout = &personB
cl.Stdin = &personB

// Change the size of the window immediately after it is created.
cl.OnShellCreated = func(s *ssh.Session, c *ssh.Client, terminal io.ReadWriteCloser) (exit bool, err error) {
err = s.WindowChange(48, 160)
if err != nil {
return true, trace.Wrap(err)
}
return false, nil
}

for i := 0; i < 10; i++ {
err = cl.Join(context.TODO(), defaults.Namespace, session.ID(sessionID), &personB)
if err == nil {
break
}
}
c.Assert(err, check.IsNil)
}

// waitForOutput checks the output of the passed in terminal of a string until
// some timeout has occured.
waitForOutput := func(t Terminal, s string) error {
tickerCh := time.Tick(500 * time.Millisecond)
timeoutCh := time.After(30 * time.Second)
for {
select {
case <-tickerCh:
if strings.Contains(t.Output(500), s) {
return nil
}
case <-timeoutCh:
return trace.BadParameter("timed out waiting for output")
}
}

}

// Open session, the initial size will be 80x24.
go openSession()

// Use the "printf" command to print the terminal size on the screen and
// make sure it is 80x25.
personA.Type("\aprintf '%s %s\n' $(tput cols) $(tput lines)\n\r\a")
err := waitForOutput(personA, "80 25")
c.Assert(err, check.IsNil)

// As soon as person B joins the session, the terminal is resized to 160x48.
// Have another user join the session. As soon as the second shell is
// created, the window is resized to 160x48 (see joinSession implementation).
go joinSession()

// Use the "printf" command to print the window size again and make sure it's
// 160x48.
personA.Type("\aprintf '%s %s\n' $(tput cols) $(tput lines)\n\r\a")
err = waitForOutput(personA, "160 48")
c.Assert(err, check.IsNil)

// Close the session.
personA.Type("\aexit\r\n\a")
}

// runCommand is a shortcut for running SSH command, it creates a client
// connected to proxy of the passed in instance, runs the command, and returns
// the result. If multiple attempts are requested, a 250 millisecond delay is
Expand Down
30 changes: 29 additions & 1 deletion lib/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,12 @@ type TeleportClient struct {
localAgent *LocalKeyAgent

// OnShellCreated gets called when the shell is created. It's
// safe to keep it nil
// safe to keep it nil.
OnShellCreated ShellCreatedCallback

// eventsCh is a channel used to inform clients about events have that
// occured during the session.
eventsCh chan events.EventFields
}

// ShellCreatedCallback can be supplied for every teleport client. It will
Expand Down Expand Up @@ -568,6 +572,12 @@ func NewClient(c *Config) (tc *TeleportClient, err error) {
tc.Stdin = os.Stdin
}

// Create a buffered channel to hold events that occured during this session.
// This channel must be buffered because the SSH connection directly feeds
// into it. Delays in pulling messages off the global SSH request channel
// could lead to the connection hanging.
tc.eventsCh = make(chan events.EventFields, 1024)

// sometimes we need to use external auth without using local auth
// methods, e.g. in automation daemons
if c.SkipLocalAuth {
Expand Down Expand Up @@ -1500,6 +1510,24 @@ func (tc *TeleportClient) u2fLogin(pub []byte) (*auth.SSHLoginResponse, error) {
return response, trace.Wrap(err)
}

// SendEvent adds a events.EventFields to the channel.
func (tc *TeleportClient) SendEvent(ctx context.Context, e events.EventFields) error {
// Try and send the event to the eventsCh. If blocking, keep blocking until
// the passed in context in canceled.
select {
case tc.eventsCh <- e:
return nil
case <-ctx.Done():
return trace.Wrap(ctx.Err())
}
}

// EventsChannel returns a channel that can be used to listen for events that
// occur for this session.
func (tc *TeleportClient) EventsChannel() <-chan events.EventFields {
return tc.eventsCh
}

// loopbackPool reads trusted CAs if it finds it in a predefined location
// and will work only if target proxy address is loopback
func loopbackPool(proxyAddr string) *x509.CertPool {
Expand Down
69 changes: 62 additions & 7 deletions lib/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/sshutils"
"github.com/gravitational/teleport/lib/sshutils/scp"
Expand Down Expand Up @@ -63,6 +64,7 @@ type NodeClient struct {
Namespace string
Client *ssh.Client
Proxy *ProxyClient
TC *TeleportClient
}

// GetSites returns list of the "sites" (AKA teleport clusters) connected to the proxy
Expand Down Expand Up @@ -420,9 +422,62 @@ func (proxy *ProxyClient) ConnectToNode(ctx context.Context, nodeAddress string,
return nil, trace.Wrap(err)
}

client := ssh.NewClient(conn, chans, reqs)
// We pass an empty channel which we close right away to ssh.NewClient
// because the client need to handle requests itself.
emptyCh := make(chan *ssh.Request)
close(emptyCh)

return &NodeClient{Client: client, Proxy: proxy, Namespace: defaults.Namespace}, nil
client := ssh.NewClient(conn, chans, emptyCh)

nc := &NodeClient{
Client: client,
Proxy: proxy,
Namespace: defaults.Namespace,
TC: proxy.teleportClient,
}

// Start a goroutine that will run for the duration of the client to process
// global requests from the client. Teleport clients will use this to update
// terminal sizes when the remote PTY size has changed.
go nc.handleGlobalRequests(ctx, reqs)

return nc, nil
}

func (c *NodeClient) handleGlobalRequests(ctx context.Context, requestCh <-chan *ssh.Request) {
for {
select {
case r := <-requestCh:
// When the channel is closing, nil is returned.
if r == nil {
return
}

switch r.Type {
case teleport.SessionEvent:
// Parse event and create events.EventFields that can be consumed directly
// by caller.
var e events.EventFields
err := json.Unmarshal(r.Payload, &e)
if err != nil {
log.Warnf("Unable to parse event: %v: %v.", string(r.Payload), err)
continue
}

// Send event to event channel.
err = c.TC.SendEvent(ctx, e)
if err != nil {
log.Warnf("Unable to send event %v: %v.", string(r.Payload), err)
continue
}
default:
// This handles keepalive messages and matches the behaviour of OpenSSH.
r.Reply(false, nil)
}
case <-ctx.Done():
return
}
}
}

// newClientConn is a wrapper around ssh.NewClientConn
Expand Down Expand Up @@ -504,18 +559,18 @@ func (client *NodeClient) Download(remoteSourcePath, localDestinationPath string
// scp runs remote scp command(shellCmd) on the remote server and
// runs local scp handler using scpConf
func (client *NodeClient) scp(scpCommand scp.Command, shellCmd string, errWriter io.Writer) error {
session, err := client.Client.NewSession()
s, err := client.Client.NewSession()
if err != nil {
return trace.Wrap(err)
}
defer session.Close()
defer s.Close()

stdin, err := session.StdinPipe()
stdin, err := s.StdinPipe()
if err != nil {
return trace.Wrap(err)
}

stdout, err := session.StdoutPipe()
stdout, err := s.StdoutPipe()
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -537,7 +592,7 @@ func (client *NodeClient) scp(scpCommand scp.Command, shellCmd string, errWriter
close(closeC)
}()

runErr := session.Run(shellCmd)
runErr := s.Run(shellCmd)
if runErr != nil && err == nil {
err = runErr
}
Expand Down
Loading

0 comments on commit 876e04a

Please sign in to comment.