Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Microsoft SQL Server (MSSQL) scaler implementation #1591

Merged
merged 6 commits into from
Feb 23, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Initial implementation and tests for mssql scaler
Signed-off-by: Chris Gillum <[email protected]>
cgillum committed Feb 10, 2021

Verified

This commit was signed with the committer’s verified signature. The key has expired.
chrismaddalena Christopher Maddalena
commit b467b198245540e14bf1479cde47835d01f3b9ac
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ require (
github.com/Huawei/gophercloud v1.0.21
github.com/Shopify/sarama v1.27.2
github.com/aws/aws-sdk-go v1.36.19
github.com/denisenkom/go-mssqldb v0.9.0 // indirect
github.com/go-logr/logr v0.3.0
github.com/go-logr/zapr v0.3.0 // indirect
github.com/go-openapi/spec v0.20.0
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -381,6 +381,8 @@ github.com/deepmap/oapi-codegen v1.3.13/go.mod h1:WAmG5dWY8/PYHt4vKxlt90NsbHMAOC
github.com/deislabs/oras v0.8.1/go.mod h1:Mx0rMSbBNaNfY9hjpccEnxkOqJL6KGjtxNHPLC4G4As=
github.com/denisenkom/go-mssqldb v0.0.0-20190111225525-2fea367d496d/go.mod h1:xN/JuLBIz4bjkxNmByTiV1IbhfnYb6oo99phBn4Eqhc=
github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/denisenkom/go-mssqldb v0.9.0 h1:RSohk2RsiZqLZ0zCjtfn3S4Gp4exhpBWHyQ7D0yGjAk=
github.com/denisenkom/go-mssqldb v0.9.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0=
github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA=
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
@@ -631,6 +633,7 @@ github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5
github.com/gogo/protobuf v1.2.2-0.20190730201129-28a6bbf47e48/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
github.com/golang/gddo v0.0.0-20190419222130-af0f2af80721/go.mod h1:xEhNfoBDX1hzLm2Nf80qUvZ2sVwoMZ8d6IE2SrsQfh4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
280 changes: 280 additions & 0 deletions pkg/scalers/mssql_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
package scalers

import (
"context"
"crypto/sha256"
"database/sql"
"fmt"
"net/url"
"strconv"

kedautil "github.com/kedacore/keda/v2/pkg/util"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"

logf "sigs.k8s.io/controller-runtime/pkg/log"
)

// mssqlScaler exposes a data pointer to mssqlMetadata and sql.DB connection
type mssqlScaler struct {
metadata *mssqlMetadata
connection *sql.DB
}

// mssqlMetadata defines metadata used by KEDA to query a Microsoft SQL database
type mssqlMetadata struct {
// The connection string used to connect to the MSSQL database.
// Both URL syntax (sqlserver://host?database=dbName) and OLEDB syntax is supported.
// +optional
connectionString string
// The username credential for connecting to the SQL database server, if not specified in the connection string.
// +optional
username string
// The password credential for connecting to the SQL database server, if not specified in the connection string.
// +optional
password string
// The hostname of the database server to connect to query, if not specified in the connection string.
// +optional
host string
// The port number of the database server endpoint to connect to, if not specified in the connection string.
// +optional
port int
// The name of the database to query, if not specified in the connection string.
// +optional
database string
// The T-SQL query to run against the target database - e.g. SELECT COUNT(*) FROM table.
// +required
query string
// The threshold that is used as targetAverageValue in the Horizontal Pod Autoscaler.
// +required
targetValue int
// The name of the metric to use in the Horizontal Pod Autoscaler. This value will be prefixed with "mssql-".
// +optional
metricName string
}

var mssqlLog = logf.Log.WithName("mssql_scaler")

// NewMSSQLScaler creates a new mssql scaler
func NewMSSQLScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseMSSQLMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing mssql metadata: %s", err)
}

conn, err := newMSSQLConnection(meta)
if err != nil {
return nil, fmt.Errorf("error establishing mssql connection: %s", err)
}

return &mssqlScaler{
metadata: meta,
connection: conn,
}, nil
}

// parseMSSQLMetadata takes a ScalerConfig and returns a mssqlMetadata or an error if the config is invalid
func parseMSSQLMetadata(config *ScalerConfig) (*mssqlMetadata, error) {
meta := mssqlMetadata{}

// Query
if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
} else {
return nil, fmt.Errorf("no query given")
}

// Target query value
if val, ok := config.TriggerMetadata["targetValue"]; ok {
targetValue, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("targetValue parsing error %s", err.Error())
}
meta.targetValue = targetValue
} else {
return nil, fmt.Errorf("no targetValue given")
}

// Connection string, which can either be provided explicitly or via the helper fields
switch {
case config.AuthParams["connectionString"] != "":
meta.connectionString = config.AuthParams["connectionString"]
case config.TriggerMetadata["connectionStringFromEnv"] != "":
meta.connectionString = config.ResolvedEnv[config.TriggerMetadata["connectionStringFromEnv"]]
default:
meta.connectionString = ""
if val, ok := config.TriggerMetadata["host"]; ok {
meta.host = val
} else {
return nil, fmt.Errorf("no host given")
}

if val, ok := config.TriggerMetadata["port"]; ok {
port, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("port parsing error %s", err.Error())
}

meta.port = port
}

if val, ok := config.TriggerMetadata["username"]; ok {
meta.username = val
}

// database is optional in SQL s
if val, ok := config.TriggerMetadata["database"]; ok {
meta.database = val
}

if config.AuthParams["password"] != "" {
meta.password = config.AuthParams["password"]
} else if config.TriggerMetadata["passwordFromEnv"] != "" {
meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
}
}

// get the metricName, which can be explicit or from the (masked) connection string
if val, ok := config.TriggerMetadata["metricName"]; ok {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("mssql-%s", val))
} else {
if meta.database != "" {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("mssql-%s", meta.database))
} else if meta.host != "" {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("mssql-%s", meta.host))
} else if meta.connectionString != "" {
// The mssql provider supports of a variety of connection string formats. Instead of trying to parse
// the connection string and mask out sensitive data, play it safe and just hash the whole thing.
connectionStringHash := sha256.Sum256([]byte(meta.connectionString))
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("mssql-%x", connectionStringHash))
} else {
meta.metricName = "mssql"
}
cgillum marked this conversation as resolved.
Show resolved Hide resolved
}

return &meta, nil
}

// newMSSQLConnection returns a new, opened SQL connection for the provided mssqlMetadata
func newMSSQLConnection(meta *mssqlMetadata) (*sql.DB, error) {
connStr := getMSSQLConnectionString(meta)

db, err := sql.Open("sqlserver", connStr)
if err != nil {
mssqlLog.Error(err, fmt.Sprintf("Found error opening mssql: %s", err))
return nil, err
}

err = db.Ping()
if err != nil {
mssqlLog.Error(err, fmt.Sprintf("Found error pinging mssql: %s", err))
return nil, err
}

return db, nil
}

// getMSSQLConnectionString returns a connection string from a mssqlMetadata
func getMSSQLConnectionString(meta *mssqlMetadata) string {
var connStr string

if meta.connectionString != "" {
connStr = meta.connectionString
} else {
query := url.Values{}
if meta.database != "" {
query.Add("database", meta.database)
}

connectionUrl := &url.URL{Scheme: "sqlserver", RawQuery: query.Encode()}
cgillum marked this conversation as resolved.
Show resolved Hide resolved
if meta.username != "" {
if meta.password != "" {
connectionUrl.User = url.UserPassword(meta.username, meta.password)
} else {
connectionUrl.User = url.User(meta.username)
}
}

if meta.port > 0 {
connectionUrl.Host = fmt.Sprintf("%s:%d", meta.host, meta.port)
} else {
connectionUrl.Host = meta.host
}

connStr = connectionUrl.String()
}

return connStr
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *mssqlScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetQueryValue := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetQueryValue,
},
}

metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}

return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns a value for a supported metric or an error if there is a problem getting the metric
func (s *mssqlScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.getQueryResult()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting mssql: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(num), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// getQueryResult returns the result of the scaler query
func (s *mssqlScaler) getQueryResult() (int, error) {
var value int
err := s.connection.QueryRow(s.metadata.query).Scan(&value)
if err != nil {
mssqlLog.Error(err, fmt.Sprintf("Could not query mssql database: %s", err))
return 0, err
}

return value, nil
}

// IsActive returns true if there are pending events to be processed
func (s *mssqlScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.getQueryResult()
if err != nil {
return false, fmt.Errorf("error inspecting mssql: %s", err)
}

return messages > 0, nil
}

// Close closes the mssql database connections
func (s *mssqlScaler) Close() error {
err := s.connection.Close()
if err != nil {
mssqlLog.Error(err, "Error closing mssql connection")
return err
}

return nil
}
Loading