Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: memory optimizations and prom metrics on terminal session exposed #4909

Merged
merged 3 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/events/EventClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func (impl *EventRESTClientImpl) sendEvent(event Event) (bool, error) {
impl.logger.Errorw("error while UpdateJiraTransition request ", "err", err)
return false, err
}
defer resp.Body.Close()
impl.logger.Debugw("event completed", "event resp", resp)
return true, err
}
Expand Down
18 changes: 18 additions & 0 deletions internal/middleware/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,16 @@ var DeploymentStatusCronDuration = promauto.NewHistogramVec(prometheus.Histogram
Name: "deployment_status_cron_process_time",
}, []string{"cronName"})

var TerminalSessionRequestCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "initiate_terminal_session_request_counter",
Help: "count of requests for initiated, established and closed terminal sessions",
}, []string{"sessionAction", "isError"})

var TerminalSessionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "terminal_session_duration",
Help: "duration of each terminal session",
}, []string{"podName", "namespace", "clusterId"})

// prometheusMiddleware implements mux.MiddlewareFunc.
func PrometheusMiddleware(next http.Handler) http.Handler {
// prometheus.MustRegister(requestCounter)
Expand All @@ -134,3 +144,11 @@ func PrometheusMiddleware(next http.Handler) http.Handler {
requestCounter.WithLabelValues(path, method, strconv.Itoa(d.Status())).Inc()
})
}

func IncTerminalSessionRequestCounter(sessionAction string, isError string) {
TerminalSessionRequestCounter.WithLabelValues(sessionAction, isError).Inc()
}

func RecordTerminalSessionDurationMetrics(podName, namespace, clusterId string, sessionDuration float64) {
TerminalSessionDuration.WithLabelValues(podName, namespace, clusterId).Observe(sessionDuration)
}
2 changes: 1 addition & 1 deletion pkg/clusterTerminalAccess/UserTerminalAccessService.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (impl *UserTerminalAccessServiceImpl) closeAndCleanTerminalSession(accessSe
}

func (impl *UserTerminalAccessServiceImpl) closeSession(sessionId string) {
impl.terminalSessionHandler.Close(sessionId, 1, "Process exited")
impl.terminalSessionHandler.Close(sessionId, 1, terminal.ProcessExitedMsg)
}

func (impl *UserTerminalAccessServiceImpl) extractMetadataString(request *models.UserTerminalSessionRequest) string {
Expand Down
7 changes: 6 additions & 1 deletion pkg/k8s/K8sCommonService.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,12 @@ func (impl *K8sCommonServiceImpl) GetManifestsByBatch(ctx context.Context, reque
defer cancel()
go func() {
ans := impl.getManifestsByBatch(ctx, requests)
ch <- ans
select {
case <-ctx.Done():
return
default:
ch <- ans
}
}()
select {
case ans := <-ch:
Expand Down
6 changes: 6 additions & 0 deletions pkg/terminal/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package terminal

const (
SessionTerminated = "SessionTerminated"
SessionInitiating = "SessionInitiating"
)
88 changes: 67 additions & 21 deletions pkg/terminal/terminalSesion.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"crypto/rand"
"encoding/hex"
"encoding/json"
errors2 "errors"
"fmt"
"github.com/caarlos0/env"
"github.com/devtron-labs/common-lib/utils/k8s"
"github.com/devtron-labs/devtron/internal/middleware"
"github.com/devtron-labs/devtron/pkg/argoApplication"
"github.com/devtron-labs/devtron/pkg/cluster"
"github.com/devtron-labs/devtron/pkg/cluster/repository"
Expand All @@ -31,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"log"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -44,6 +47,7 @@ import (
)

const END_OF_TRANSMISSION = "\u0004"
const ProcessExitedMsg = "Process exited"

// PtyHandler is what remotecommand expects from a pty
type PtyHandler interface {
Expand All @@ -54,11 +58,17 @@ type PtyHandler interface {

// TerminalSession implements PtyHandler (using a SockJS connection)
type TerminalSession struct {
id string
bound chan error
sockJSSession sockjs.Session
sizeChan chan remotecommand.TerminalSize
doneChan chan struct{}
id string
bound chan error
sockJSSession sockjs.Session
sizeChan chan remotecommand.TerminalSize
doneChan chan struct{}
context context.Context
contextCancelFunc context.CancelFunc
podName string
namespace string
clusterId string
startedOn time.Time
}

// TerminalMessage is the messaging protocol between ShellController and TerminalSession.
Expand Down Expand Up @@ -166,6 +176,15 @@ func (sm *SessionMap) Set(sessionId string, session TerminalSession) {
sm.Sessions[sessionId] = session
}

func (sm *SessionMap) SetTerminalSessionStartTime(sessionId string) {
sm.Lock.Lock()
defer sm.Lock.Unlock()
if session, ok := sm.Sessions[sessionId]; ok {
session.startedOn = time.Now()
sm.Sessions[sessionId] = session
}
}

// Close shuts down the SockJS connection and sends the status code and reason to the client
// Can happen if the process exits or if there is an error starting up the process
// For now the status code is unused and reason is shown to the user (unless "")
Expand All @@ -178,11 +197,23 @@ func (sm *SessionMap) Close(sessionId string, status uint32, reason string) {
if err != nil {
log.Println(err)
}
isErroredConnectionTermination := isConnectionClosedByError(status)
middleware.IncTerminalSessionRequestCounter(SessionTerminated, strconv.FormatBool(isErroredConnectionTermination))
middleware.RecordTerminalSessionDurationMetrics(terminalSession.podName, terminalSession.namespace, terminalSession.clusterId, time.Since(terminalSession.startedOn).Seconds())
close(terminalSession.doneChan)
terminalSession.contextCancelFunc()
delete(sm.Sessions, sessionId)
}

}

func isConnectionClosedByError(status uint32) bool {
if status == 2 {
return true
}
return false
}

var terminalSessions = SessionMap{Sessions: make(map[string]TerminalSession)}

// handleTerminalSession is Called by net/http for any new /api/sockjs connections
Expand Down Expand Up @@ -243,7 +274,7 @@ func CreateAttachHandler(path string) http.Handler {

// startProcess is called by handleAttach
// Executed cmd in the container specified in request and connects it up with the ptyHandler (a session)
func startProcess(k8sClient kubernetes.Interface, cfg *rest.Config,
func startProcess(ctx context.Context, k8sClient kubernetes.Interface, cfg *rest.Config,
cmd []string, ptyHandler PtyHandler, sessionRequest *TerminalSessionRequest) error {
namespace := sessionRequest.Namespace
podName := sessionRequest.PodName
Expand All @@ -262,17 +293,18 @@ func startProcess(k8sClient kubernetes.Interface, cfg *rest.Config,
TerminalSizeQueue: ptyHandler,
Tty: true,
}

err = execWithStreamOptions(exec, streamOptions)
isErroredConnectionTermination := false
middleware.IncTerminalSessionRequestCounter(SessionInitiating, strconv.FormatBool(isErroredConnectionTermination))
terminalSessions.SetTerminalSessionStartTime(sessionRequest.SessionId)
err = execWithStreamOptions(ctx, exec, streamOptions)
if err != nil {
return err
}

return nil
}

func execWithStreamOptions(exec remotecommand.Executor, streamOptions remotecommand.StreamOptions) error {
return exec.Stream(streamOptions)
func execWithStreamOptions(ctx context.Context, exec remotecommand.Executor, streamOptions remotecommand.StreamOptions) error {
return exec.StreamWithContext(ctx, streamOptions)
}

func getExecutor(k8sClient kubernetes.Interface, cfg *rest.Config, podName, namespace, containerName string, cmd []string, stdin bool, tty bool) (remotecommand.Executor, error) {
Expand Down Expand Up @@ -344,32 +376,39 @@ var validShells = []string{"bash", "sh", "powershell", "cmd"}
// Waits for the SockJS connection to be opened by the client the session to be bound in handleTerminalSession
func WaitForTerminal(k8sClient kubernetes.Interface, cfg *rest.Config, request *TerminalSessionRequest) {

session := terminalSessions.Get(request.SessionId)
sessionCtx := session.context
timedCtx, _ := context.WithTimeout(sessionCtx, 60*time.Second)
select {
case <-terminalSessions.Get(request.SessionId).bound:
close(terminalSessions.Get(request.SessionId).bound)
case <-session.bound:
close(session.bound)

var err error
if isValidShell(validShells, request.Shell) {
cmd := []string{request.Shell}

err = startProcess(k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request)
err = startProcess(sessionCtx, k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request)
} else {
// No Shell given or it was not valid: try some shells until one succeeds or all fail
// FIXME: if the first Shell fails then the first keyboard event is lost
for _, testShell := range validShells {
cmd := []string{testShell}
if err = startProcess(k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request); err == nil {
if err = startProcess(sessionCtx, k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request); err == nil || errors2.Is(err, context.Canceled) {
break
}
}
}

if err != nil {
if err != nil && !errors2.Is(err, context.Canceled) {
terminalSessions.Close(request.SessionId, 2, err.Error())
return
}

terminalSessions.Close(request.SessionId, 1, "Process exited")
terminalSessions.Close(request.SessionId, 1, ProcessExitedMsg)
case <-timedCtx.Done():
// handle case when connection has not been initiated from FE side within particular time
close(session.bound)
terminalSessions.Close(request.SessionId, 1, ProcessExitedMsg)
}
}

Expand Down Expand Up @@ -432,10 +471,17 @@ func (impl *TerminalSessionHandlerImpl) GetTerminalSession(req *TerminalSessionR
return statusCode, nil, err
}
req.SessionId = sessionID
sessionCtx, cancelFunc := context.WithCancel(context.Background())
terminalSessions.Set(sessionID, TerminalSession{
id: sessionID,
bound: make(chan error),
sizeChan: make(chan remotecommand.TerminalSize),
id: sessionID,
bound: make(chan error),
sizeChan: make(chan remotecommand.TerminalSize),
doneChan: make(chan struct{}),
context: sessionCtx,
contextCancelFunc: cancelFunc,
podName: req.PodName,
namespace: req.Namespace,
clusterId: strconv.Itoa(req.ClusterId),
})
config, client, err := impl.getClientConfig(req)

Expand Down Expand Up @@ -559,7 +605,7 @@ func (impl *TerminalSessionHandlerImpl) RunCmdInRemotePod(req *TerminalSessionRe
buf := &bytes.Buffer{}
errBuf := &bytes.Buffer{}
impl.logger.Debug("reached execWithStreamOptions method call")
err = execWithStreamOptions(exec, remotecommand.StreamOptions{
err = execWithStreamOptions(context.Background(), exec, remotecommand.StreamOptions{
Stdout: buf,
Stderr: errBuf,
})
Expand Down
Loading