Skip to content

Commit

Permalink
perf: memory optimizations and prom metrics on terminal session expos…
Browse files Browse the repository at this point in the history
…ed (#4909)

* memory optimizations and prom metrics on terminal session exposed

* inc terminal counter before stream actually starts
  • Loading branch information
prakash100198 authored Apr 12, 2024
1 parent e7f34f5 commit b9d775b
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 23 deletions.
1 change: 1 addition & 0 deletions client/events/EventClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func (impl *EventRESTClientImpl) sendEvent(event Event) (bool, error) {
impl.logger.Errorw("error while UpdateJiraTransition request ", "err", err)
return false, err
}
defer resp.Body.Close()
impl.logger.Debugw("event completed", "event resp", resp)
return true, err
}
Expand Down
18 changes: 18 additions & 0 deletions internal/middleware/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,16 @@ var DeploymentStatusCronDuration = promauto.NewHistogramVec(prometheus.Histogram
Name: "deployment_status_cron_process_time",
}, []string{"cronName"})

var TerminalSessionRequestCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "initiate_terminal_session_request_counter",
Help: "count of requests for initiated, established and closed terminal sessions",
}, []string{"sessionAction", "isError"})

var TerminalSessionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "terminal_session_duration",
Help: "duration of each terminal session",
}, []string{"podName", "namespace", "clusterId"})

// prometheusMiddleware implements mux.MiddlewareFunc.
func PrometheusMiddleware(next http.Handler) http.Handler {
// prometheus.MustRegister(requestCounter)
Expand All @@ -134,3 +144,11 @@ func PrometheusMiddleware(next http.Handler) http.Handler {
requestCounter.WithLabelValues(path, method, strconv.Itoa(d.Status())).Inc()
})
}

func IncTerminalSessionRequestCounter(sessionAction string, isError string) {
TerminalSessionRequestCounter.WithLabelValues(sessionAction, isError).Inc()
}

func RecordTerminalSessionDurationMetrics(podName, namespace, clusterId string, sessionDuration float64) {
TerminalSessionDuration.WithLabelValues(podName, namespace, clusterId).Observe(sessionDuration)
}
2 changes: 1 addition & 1 deletion pkg/clusterTerminalAccess/UserTerminalAccessService.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (impl *UserTerminalAccessServiceImpl) closeAndCleanTerminalSession(accessSe
}

func (impl *UserTerminalAccessServiceImpl) closeSession(sessionId string) {
impl.terminalSessionHandler.Close(sessionId, 1, "Process exited")
impl.terminalSessionHandler.Close(sessionId, 1, terminal.ProcessExitedMsg)
}

func (impl *UserTerminalAccessServiceImpl) extractMetadataString(request *models.UserTerminalSessionRequest) string {
Expand Down
7 changes: 6 additions & 1 deletion pkg/k8s/K8sCommonService.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,12 @@ func (impl *K8sCommonServiceImpl) GetManifestsByBatch(ctx context.Context, reque
defer cancel()
go func() {
ans := impl.getManifestsByBatch(ctx, requests)
ch <- ans
select {
case <-ctx.Done():
return
default:
ch <- ans
}
}()
select {
case ans := <-ch:
Expand Down
6 changes: 6 additions & 0 deletions pkg/terminal/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package terminal

const (
SessionTerminated = "SessionTerminated"
SessionInitiating = "SessionInitiating"
)
88 changes: 67 additions & 21 deletions pkg/terminal/terminalSesion.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"crypto/rand"
"encoding/hex"
"encoding/json"
errors2 "errors"
"fmt"
"github.com/caarlos0/env"
"github.com/devtron-labs/common-lib/utils/k8s"
"github.com/devtron-labs/devtron/internal/middleware"
"github.com/devtron-labs/devtron/pkg/argoApplication"
"github.com/devtron-labs/devtron/pkg/cluster"
"github.com/devtron-labs/devtron/pkg/cluster/repository"
Expand All @@ -31,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"log"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -44,6 +47,7 @@ import (
)

const END_OF_TRANSMISSION = "\u0004"
const ProcessExitedMsg = "Process exited"

// PtyHandler is what remotecommand expects from a pty
type PtyHandler interface {
Expand All @@ -54,11 +58,17 @@ type PtyHandler interface {

// TerminalSession implements PtyHandler (using a SockJS connection)
type TerminalSession struct {
id string
bound chan error
sockJSSession sockjs.Session
sizeChan chan remotecommand.TerminalSize
doneChan chan struct{}
id string
bound chan error
sockJSSession sockjs.Session
sizeChan chan remotecommand.TerminalSize
doneChan chan struct{}
context context.Context
contextCancelFunc context.CancelFunc
podName string
namespace string
clusterId string
startedOn time.Time
}

// TerminalMessage is the messaging protocol between ShellController and TerminalSession.
Expand Down Expand Up @@ -166,6 +176,15 @@ func (sm *SessionMap) Set(sessionId string, session TerminalSession) {
sm.Sessions[sessionId] = session
}

func (sm *SessionMap) SetTerminalSessionStartTime(sessionId string) {
sm.Lock.Lock()
defer sm.Lock.Unlock()
if session, ok := sm.Sessions[sessionId]; ok {
session.startedOn = time.Now()
sm.Sessions[sessionId] = session
}
}

// Close shuts down the SockJS connection and sends the status code and reason to the client
// Can happen if the process exits or if there is an error starting up the process
// For now the status code is unused and reason is shown to the user (unless "")
Expand All @@ -178,11 +197,23 @@ func (sm *SessionMap) Close(sessionId string, status uint32, reason string) {
if err != nil {
log.Println(err)
}
isErroredConnectionTermination := isConnectionClosedByError(status)
middleware.IncTerminalSessionRequestCounter(SessionTerminated, strconv.FormatBool(isErroredConnectionTermination))
middleware.RecordTerminalSessionDurationMetrics(terminalSession.podName, terminalSession.namespace, terminalSession.clusterId, time.Since(terminalSession.startedOn).Seconds())
close(terminalSession.doneChan)
terminalSession.contextCancelFunc()
delete(sm.Sessions, sessionId)
}

}

func isConnectionClosedByError(status uint32) bool {
if status == 2 {
return true
}
return false
}

var terminalSessions = SessionMap{Sessions: make(map[string]TerminalSession)}

// handleTerminalSession is Called by net/http for any new /api/sockjs connections
Expand Down Expand Up @@ -243,7 +274,7 @@ func CreateAttachHandler(path string) http.Handler {

// startProcess is called by handleAttach
// Executed cmd in the container specified in request and connects it up with the ptyHandler (a session)
func startProcess(k8sClient kubernetes.Interface, cfg *rest.Config,
func startProcess(ctx context.Context, k8sClient kubernetes.Interface, cfg *rest.Config,
cmd []string, ptyHandler PtyHandler, sessionRequest *TerminalSessionRequest) error {
namespace := sessionRequest.Namespace
podName := sessionRequest.PodName
Expand All @@ -262,17 +293,18 @@ func startProcess(k8sClient kubernetes.Interface, cfg *rest.Config,
TerminalSizeQueue: ptyHandler,
Tty: true,
}

err = execWithStreamOptions(exec, streamOptions)
isErroredConnectionTermination := false
middleware.IncTerminalSessionRequestCounter(SessionInitiating, strconv.FormatBool(isErroredConnectionTermination))
terminalSessions.SetTerminalSessionStartTime(sessionRequest.SessionId)
err = execWithStreamOptions(ctx, exec, streamOptions)
if err != nil {
return err
}

return nil
}

func execWithStreamOptions(exec remotecommand.Executor, streamOptions remotecommand.StreamOptions) error {
return exec.Stream(streamOptions)
func execWithStreamOptions(ctx context.Context, exec remotecommand.Executor, streamOptions remotecommand.StreamOptions) error {
return exec.StreamWithContext(ctx, streamOptions)
}

func getExecutor(k8sClient kubernetes.Interface, cfg *rest.Config, podName, namespace, containerName string, cmd []string, stdin bool, tty bool) (remotecommand.Executor, error) {
Expand Down Expand Up @@ -344,32 +376,39 @@ var validShells = []string{"bash", "sh", "powershell", "cmd"}
// Waits for the SockJS connection to be opened by the client the session to be bound in handleTerminalSession
func WaitForTerminal(k8sClient kubernetes.Interface, cfg *rest.Config, request *TerminalSessionRequest) {

session := terminalSessions.Get(request.SessionId)
sessionCtx := session.context
timedCtx, _ := context.WithTimeout(sessionCtx, 60*time.Second)
select {
case <-terminalSessions.Get(request.SessionId).bound:
close(terminalSessions.Get(request.SessionId).bound)
case <-session.bound:
close(session.bound)

var err error
if isValidShell(validShells, request.Shell) {
cmd := []string{request.Shell}

err = startProcess(k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request)
err = startProcess(sessionCtx, k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request)
} else {
// No Shell given or it was not valid: try some shells until one succeeds or all fail
// FIXME: if the first Shell fails then the first keyboard event is lost
for _, testShell := range validShells {
cmd := []string{testShell}
if err = startProcess(k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request); err == nil {
if err = startProcess(sessionCtx, k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request); err == nil || errors2.Is(err, context.Canceled) {
break
}
}
}

if err != nil {
if err != nil && !errors2.Is(err, context.Canceled) {
terminalSessions.Close(request.SessionId, 2, err.Error())
return
}

terminalSessions.Close(request.SessionId, 1, "Process exited")
terminalSessions.Close(request.SessionId, 1, ProcessExitedMsg)
case <-timedCtx.Done():
// handle case when connection has not been initiated from FE side within particular time
close(session.bound)
terminalSessions.Close(request.SessionId, 1, ProcessExitedMsg)
}
}

Expand Down Expand Up @@ -432,10 +471,17 @@ func (impl *TerminalSessionHandlerImpl) GetTerminalSession(req *TerminalSessionR
return statusCode, nil, err
}
req.SessionId = sessionID
sessionCtx, cancelFunc := context.WithCancel(context.Background())
terminalSessions.Set(sessionID, TerminalSession{
id: sessionID,
bound: make(chan error),
sizeChan: make(chan remotecommand.TerminalSize),
id: sessionID,
bound: make(chan error),
sizeChan: make(chan remotecommand.TerminalSize),
doneChan: make(chan struct{}),
context: sessionCtx,
contextCancelFunc: cancelFunc,
podName: req.PodName,
namespace: req.Namespace,
clusterId: strconv.Itoa(req.ClusterId),
})
config, client, err := impl.getClientConfig(req)

Expand Down Expand Up @@ -559,7 +605,7 @@ func (impl *TerminalSessionHandlerImpl) RunCmdInRemotePod(req *TerminalSessionRe
buf := &bytes.Buffer{}
errBuf := &bytes.Buffer{}
impl.logger.Debug("reached execWithStreamOptions method call")
err = execWithStreamOptions(exec, remotecommand.StreamOptions{
err = execWithStreamOptions(context.Background(), exec, remotecommand.StreamOptions{
Stdout: buf,
Stderr: errBuf,
})
Expand Down

0 comments on commit b9d775b

Please sign in to comment.