Skip to content

Commit

Permalink
fix: Selenium Grid scaler exposes sum of pending and ongoing sessions…
Browse files Browse the repository at this point in the history
… to KDEA (#6368)
  • Loading branch information
VietND96 authored Dec 4, 2024
1 parent 7e5c523 commit c43af59
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 124 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Here is an overview of all new **experimental** features:
- **General**: Centralize and improve automaxprocs configuration with proper structured logging ([#5970](https://github.com/kedacore/keda/issues/5970))
- **General**: Paused ScaledObject count is reported correctly after operator restart ([#6321](https://github.com/kedacore/keda/issues/6321))
- **General**: ScaledJobs ready status set to true when recoverred problem ([#6329](https://github.com/kedacore/keda/pull/6329))
- **Selenium Grid Scaler**: Exposes sum of pending and ongoing sessions to KDEA ([#6368](https://github.com/kedacore/keda/pull/6368))

### Deprecations

Expand Down
95 changes: 47 additions & 48 deletions pkg/scalers/selenium_grid_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ type seleniumGridScalerMetadata struct {
BrowserName string `keda:"name=browserName, order=triggerMetadata"`
SessionBrowserName string `keda:"name=sessionBrowserName, order=triggerMetadata, optional"`
ActivationThreshold int64 `keda:"name=activationThreshold, order=triggerMetadata, optional"`
BrowserVersion string `keda:"name=browserVersion, order=triggerMetadata, optional, default=latest"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional, default=false"`
PlatformName string `keda:"name=platformName, order=triggerMetadata, optional, default=linux"`
NodeMaxSessions int `keda:"name=nodeMaxSessions, order=triggerMetadata, optional, default=1"`
BrowserVersion string `keda:"name=browserVersion, order=triggerMetadata, default=latest"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
PlatformName string `keda:"name=platformName, order=triggerMetadata, default=linux"`
NodeMaxSessions int64 `keda:"name=nodeMaxSessions, order=triggerMetadata, default=1"`

TargetValue int64
}
Expand All @@ -55,9 +55,9 @@ type Data struct {
}

type Grid struct {
SessionCount int `json:"sessionCount"`
MaxSession int `json:"maxSession"`
TotalSlots int `json:"totalSlots"`
SessionCount int64 `json:"sessionCount"`
MaxSession int64 `json:"maxSession"`
TotalSlots int64 `json:"totalSlots"`
}

type NodesInfo struct {
Expand All @@ -71,17 +71,17 @@ type SessionsInfo struct {
type Nodes []struct {
ID string `json:"id"`
Status string `json:"status"`
SessionCount int `json:"sessionCount"`
MaxSession int `json:"maxSession"`
SlotCount int `json:"slotCount"`
SessionCount int64 `json:"sessionCount"`
MaxSession int64 `json:"maxSession"`
SlotCount int64 `json:"slotCount"`
Stereotypes string `json:"stereotypes"`
Sessions Sessions `json:"sessions"`
}

type ReservedNodes struct {
ID string `json:"id"`
MaxSession int `json:"maxSession"`
SlotCount int `json:"slotCount"`
MaxSession int64 `json:"maxSession"`
SlotCount int64 `json:"slotCount"`
}

type Sessions []struct {
Expand All @@ -102,7 +102,7 @@ type Capability struct {
}

type Stereotypes []struct {
Slots int `json:"slots"`
Slots int64 `json:"slots"`
Stereotype Capability `json:"stereotype"`
}

Expand Down Expand Up @@ -148,6 +148,7 @@ func parseSeleniumGridScalerMetadata(config *scalersconfig.ScalerConfig) (*selen
if meta.SessionBrowserName == "" {
meta.SessionBrowserName = meta.BrowserName
}

return meta, nil
}

Expand All @@ -160,18 +161,18 @@ func (s *seleniumGridScaler) Close(context.Context) error {
}

func (s *seleniumGridScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
sessions, err := s.getSessionsCount(ctx, s.logger)
newRequestNodes, onGoingSessions, err := s.getSessionsQueueLength(ctx, s.logger)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error requesting selenium grid endpoint: %w", err)
}

metric := GenerateMetricInMili(metricName, float64(sessions))
metric := GenerateMetricInMili(metricName, float64(newRequestNodes+onGoingSessions))

return []external_metrics.ExternalMetricValue{metric}, sessions > s.metadata.ActivationThreshold, nil
return []external_metrics.ExternalMetricValue{metric}, (newRequestNodes + onGoingSessions) > s.metadata.ActivationThreshold, nil
}

func (s *seleniumGridScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("seleniumgrid-%s", s.metadata.BrowserName))
metricName := kedautil.NormalizeString(fmt.Sprintf("selenium-grid-%s-%s-%s", s.metadata.BrowserName, s.metadata.BrowserVersion, s.metadata.PlatformName))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
Expand All @@ -184,18 +185,18 @@ func (s *seleniumGridScaler) GetMetricSpecForScaling(context.Context) []v2.Metri
return []v2.MetricSpec{metricSpec}
}

func (s *seleniumGridScaler) getSessionsCount(ctx context.Context, logger logr.Logger) (int64, error) {
func (s *seleniumGridScaler) getSessionsQueueLength(ctx context.Context, logger logr.Logger) (int64, int64, error) {
body, err := json.Marshal(map[string]string{
"query": "{ grid { sessionCount, maxSession, totalSlots }, nodesInfo { nodes { id, status, sessionCount, maxSession, slotCount, stereotypes, sessions { id, capabilities, slot { id, stereotype } } } }, sessionsInfo { sessionQueueRequests } }",
})

if err != nil {
return -1, err
return -1, -1, err
}

req, err := http.NewRequestWithContext(ctx, "POST", s.metadata.URL, bytes.NewBuffer(body))
if err != nil {
return -1, err
return -1, -1, err
}

if (s.metadata.AuthType == "" || strings.EqualFold(s.metadata.AuthType, "Basic")) && s.metadata.Username != "" && s.metadata.Password != "" {
Expand All @@ -206,28 +207,28 @@ func (s *seleniumGridScaler) getSessionsCount(ctx context.Context, logger logr.L

res, err := s.httpClient.Do(req)
if err != nil {
return -1, err
return -1, -1, err
}

if res.StatusCode != http.StatusOK {
msg := fmt.Sprintf("selenium grid returned %d", res.StatusCode)
return -1, errors.New(msg)
return -1, -1, errors.New(msg)
}

defer res.Body.Close()
b, err := io.ReadAll(res.Body)
if err != nil {
return -1, err
return -1, -1, err
}
v, err := getCountFromSeleniumResponse(b, s.metadata.BrowserName, s.metadata.BrowserVersion, s.metadata.SessionBrowserName, s.metadata.PlatformName, s.metadata.NodeMaxSessions, logger)
newRequestNodes, onGoingSession, err := getCountFromSeleniumResponse(b, s.metadata.BrowserName, s.metadata.BrowserVersion, s.metadata.SessionBrowserName, s.metadata.PlatformName, s.metadata.NodeMaxSessions, logger)
if err != nil {
return -1, err
return -1, -1, err
}
return v, nil
return newRequestNodes, onGoingSession, nil
}

func countMatchingSlotsStereotypes(stereotypes Stereotypes, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string) int {
var matchingSlots int
func countMatchingSlotsStereotypes(stereotypes Stereotypes, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string) int64 {
var matchingSlots int64
for _, stereotype := range stereotypes {
if checkCapabilitiesMatch(stereotype.Stereotype, request, browserName, browserVersion, sessionBrowserName, platformName) {
matchingSlots += stereotype.Slots
Expand All @@ -236,8 +237,8 @@ func countMatchingSlotsStereotypes(stereotypes Stereotypes, request Capability,
return matchingSlots
}

func countMatchingSessions(sessions Sessions, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string, logger logr.Logger) int {
var matchingSessions int
func countMatchingSessions(sessions Sessions, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string, logger logr.Logger) int64 {
var matchingSessions int64
for _, session := range sessions {
var capability = Capability{}
if err := json.Unmarshal([]byte(session.Capabilities), &capability); err == nil {
Expand Down Expand Up @@ -274,7 +275,7 @@ func checkCapabilitiesMatch(capability Capability, requestCapability Capability,
return browserNameMatches && browserVersionMatches && platformNameMatches
}

func checkNodeReservedSlots(reservedNodes []ReservedNodes, nodeID string, availableSlots int) int {
func checkNodeReservedSlots(reservedNodes []ReservedNodes, nodeID string, availableSlots int64) int64 {
for _, reservedNode := range reservedNodes {
if strings.EqualFold(reservedNode.ID, nodeID) {
return reservedNode.SlotCount
Expand All @@ -283,7 +284,7 @@ func checkNodeReservedSlots(reservedNodes []ReservedNodes, nodeID string, availa
return availableSlots
}

func updateOrAddReservedNode(reservedNodes []ReservedNodes, nodeID string, slotCount int, maxSession int) []ReservedNodes {
func updateOrAddReservedNode(reservedNodes []ReservedNodes, nodeID string, slotCount int64, maxSession int64) []ReservedNodes {
for i, reservedNode := range reservedNodes {
if strings.EqualFold(reservedNode.ID, nodeID) {
// Update remaining available slots for the reserved node
Expand All @@ -295,17 +296,15 @@ func updateOrAddReservedNode(reservedNodes []ReservedNodes, nodeID string, slotC
return append(reservedNodes, ReservedNodes{ID: nodeID, SlotCount: slotCount, MaxSession: maxSession})
}

func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion string, sessionBrowserName string, platformName string, nodeMaxSessions int, logger logr.Logger) (int64, error) {
// The returned count of the number of new Nodes will be scaled up
var count int64
func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion string, sessionBrowserName string, platformName string, nodeMaxSessions int64, logger logr.Logger) (int64, int64, error) {
// Track number of available slots of existing Nodes in the Grid can be reserved for the matched requests
var availableSlots int
var availableSlots int64
// Track number of matched requests in the sessions queue will be served by this scaler
var queueSlots int
var queueSlots int64

var seleniumResponse = SeleniumResponse{}
if err := json.Unmarshal(b, &seleniumResponse); err != nil {
return 0, err
return 0, 0, err
}

var sessionQueueRequests = seleniumResponse.Data.SessionsInfo.SessionQueueRequests
Expand All @@ -314,6 +313,7 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s
var reservedNodes []ReservedNodes
// Track list of new Nodes will be scaled up with number of available slots following scaler parameter `nodeMaxSessions`
var newRequestNodes []ReservedNodes
var onGoingSessions int64
for requestIndex, sessionQueueRequest := range sessionQueueRequests {
var isRequestMatched bool
var requestCapability = Capability{}
Expand All @@ -332,20 +332,22 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s
}

var isRequestReserved bool
var sumOfCurrentSessionsMatch int64
// Check if the matched request can be assigned to available slots of existing Nodes in the Grid
for _, node := range nodes {
// Count ongoing sessions that match the request capability and scaler metadata
var currentSessionsMatch = countMatchingSessions(node.Sessions, requestCapability, browserName, browserVersion, sessionBrowserName, platformName, logger)
sumOfCurrentSessionsMatch += currentSessionsMatch
// Check if node is UP and has available slots (maxSession > sessionCount)
if strings.EqualFold(node.Status, "UP") && checkNodeReservedSlots(reservedNodes, node.ID, node.MaxSession-node.SessionCount) > 0 {
var stereotypes = Stereotypes{}
var availableSlotsMatch int
var availableSlotsMatch int64
if err := json.Unmarshal([]byte(node.Stereotypes), &stereotypes); err == nil {
// Count available slots that match the request capability and scaler metadata
availableSlotsMatch += countMatchingSlotsStereotypes(stereotypes, requestCapability, browserName, browserVersion, sessionBrowserName, platformName)
} else {
logger.Error(err, fmt.Sprintf("Error when unmarshaling node stereotypes: %s", err))
}
// Count ongoing sessions that match the request capability and scaler metadata
var currentSessionsMatch = countMatchingSessions(node.Sessions, requestCapability, browserName, browserVersion, sessionBrowserName, platformName, logger)
// Count remaining available slots can be reserved for this request
var availableSlotsCanBeReserved = checkNodeReservedSlots(reservedNodes, node.ID, node.MaxSession-node.SessionCount)
// Reserve one available slot for the request if available slots match is greater than current sessions match
Expand All @@ -357,6 +359,9 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s
}
}
}
if sumOfCurrentSessionsMatch > onGoingSessions {
onGoingSessions = sumOfCurrentSessionsMatch
}
// Check if the matched request can be assigned to available slots of new Nodes will be scaled up, since the scaler parameter `nodeMaxSessions` can be greater than 1
if !isRequestReserved {
for _, newRequestNode := range newRequestNodes {
Expand All @@ -373,11 +378,5 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s
}
}

if queueSlots > availableSlots {
count = int64(len(newRequestNodes))
} else {
count = 0
}

return count, nil
return int64(len(newRequestNodes)), onGoingSessions, nil
}
Loading

0 comments on commit c43af59

Please sign in to comment.