Skip to content

Commit

Permalink
perf(ssh): record session using writer adpter, and queue processing
Browse files Browse the repository at this point in the history
  • Loading branch information
henrybarreto committed Dec 18, 2024
1 parent 0c293db commit 829b3b6
Showing 1 changed file with 65 additions and 53 deletions.
118 changes: 65 additions & 53 deletions ssh/server/channels/utils.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package channels

import (
"bytes"
"io"
"sync"

Expand All @@ -14,6 +13,69 @@ import (
gossh "golang.org/x/crypto/ssh"
)

type Recorder struct {
queue chan string
channel gossh.Channel
}

func NewRecorder(sess *session.Session, camera *session.Camera, channel gossh.Channel) io.WriteCloser {
queue := make(chan string, 100)
go func() {
recording := true

for {
msg, ok := <-queue
if !ok {
return
}

if !recording {
continue
}

if err := camera.WriteFrame(&models.SessionRecorded{ //nolint:errcheck
UID: sess.UID,
Namespace: sess.Lookup["domain"],
Message: msg,
Width: int(sess.Pty.Columns),
Height: int(sess.Pty.Rows),
}); err != nil {
log.WithError(err).
WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
Warning("failed to send the session frame to record")

recording = false
}
}
}()

return &Recorder{
queue: queue,
channel: channel,
}
}

func (c *Recorder) record(msg string) {
c.queue <- msg
}

func (c *Recorder) Write(data []byte) (int, error) {
read, err := c.channel.Write(data)
if err != nil {
return read, err
}

go c.record(string(data))

return read, nil
}

func (c *Recorder) Close() error {
close(c.queue)

return c.channel.CloseWrite()
}

// pipe pipes data between client and agent, and vise versa, recoding each frame when ShellHub instance are Cloud or
// Enterprise.
func pipe(ctx gliderssh.Context, sess *session.Session, client gossh.Channel, agent gossh.Channel) {
Expand Down Expand Up @@ -42,69 +104,19 @@ func pipe(ctx gliderssh.Context, sess *session.Session, client gossh.Channel, ag
defer wg.Done()
defer client.CloseWrite() //nolint:errcheck

// NOTE: As the copy required to record the session seem to be inefficient, if we don't have a record URL
// defined, we use an [io.Copy] for the data piping between agent and client.
recordURL := ctx.Value("RECORD_URL").(string)
if (envs.IsEnterprise() || envs.IsCloud()) && recordURL != "" {
// NOTE: Recoding variable is used to control if the frames will be recorded. If something wrong happens in
// this process, to spare resources, we don't send frames anymore for this session.
recording := true

camera, err := sess.Record(ctx, recordURL)
if err != nil {
log.WithError(err).
WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID, "record_url": recordURL}).
Warning("failed to connect to session record endpoint")

recording = false
}

defer camera.Close()

buffer := make([]byte, 1024)
for {
read, err := a.Read(buffer)
// The occurrence of io.EOF is expected when the connection ends.
// This indicates that we have reached the end of the input stream, and we need
// to break out of the loop to handle the termination of the connection
if err == io.EOF {
break
}
// Unlike io.EOF, when 'err' is simply not nil, it signifies an unexpected error,
// and we need to log to handle it appropriately.
if err != nil {
log.WithError(err).
WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
Warning("failed to read from stdout in pty client")

break
}

if _, err = io.Copy(client, bytes.NewReader(buffer[:read])); err != nil && err != io.EOF {
log.WithError(err).
WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
Warning("failed to copy from stdout in pty client")

break
}

if recording {
if err := camera.WriteFrame(&models.SessionRecorded{ //nolint:errcheck
UID: sess.UID,
Namespace: sess.Lookup["domain"],
Message: string(buffer[:read]),
Width: int(sess.Pty.Columns),
Height: int(sess.Pty.Rows),
}); err != nil {
log.WithError(err).
WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
Warning("failed to send the session frame to record")

recording = false

continue
}
}
if _, err := io.Copy(NewRecorder(sess, camera, client), a); err != nil && err != io.EOF {
log.WithError(err).Error("failed on coping data from client to agent")
}
} else {
if _, err := io.Copy(client, a); err != nil && err != io.EOF {
Expand Down

0 comments on commit 829b3b6

Please sign in to comment.