Skip to content

Commit

Permalink
Anonymize the host in case of HTTP failure (RabbitMQ Scaler) (#2041)
Browse files Browse the repository at this point in the history
* Anonimize the host in case of HTTP failure

Signed-off-by: jorturfer <[email protected]>

* Update CHANGELOG and add one extra test case

Signed-off-by: jorturfer <[email protected]>

* Move the regex from the method to a static var

Signed-off-by: jorturfer <[email protected]>

* Update regex var name
Co-authored-by: Zbynek Roubalik <[email protected]>

Signed-off-by: Jorge Turrado <[email protected]>

* Fix typo and improve test
Co-authored-by: Aaron Schlesinger <[email protected]>

Signed-off-by: jorturfer <[email protected]>

Co-authored-by: Ahmed ElSayed <[email protected]>
  • Loading branch information
Jorge Turrado Ferrero and ahmelsayed authored Aug 27, 2021
1 parent 0e4a01f commit ff19979
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Improve validation in Cron scaler in case start & end input is same.([#2032](https://github.com/kedacore/keda/pull/2032))
- Improve the cron validation in Cron Scaler ([#2038](https://github.com/kedacore/keda/pull/2038))
- Add Bearer auth for Metrics API scaler ([#2028](https://github.com/kedacore/keda/pull/2028))
- Anonymize the host in case of HTTP failure (RabbitMQ Scaler) ([#2041](https://github.com/kedacore/keda/pull/2041))
- Escape `queueName` and `vhostName` in RabbitMQ Scaler before use them in query string (bug fix) ([#2055](https://github.com/kedacore/keda/pull/2055))

### Breaking Changes
Expand Down
17 changes: 15 additions & 2 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"regexp"
"strconv"

"github.com/streadway/amqp"
Expand All @@ -20,6 +21,12 @@ import (
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

var rabbitMQAnonymizePattern *regexp.Regexp

func init() {
rabbitMQAnonymizePattern = regexp.MustCompile(`([^ \/:]+):([^\/:]+)\@`)
}

const (
rabbitQueueLengthMetricName = "queueLength"
rabbitModeTriggerConfigName = "mode"
Expand Down Expand Up @@ -306,7 +313,7 @@ func (s *rabbitMQScaler) Close() error {
func (s *rabbitMQScaler) IsActive(ctx context.Context) (bool, error) {
messages, publishRate, err := s.getQueueStatus()
if err != nil {
return false, fmt.Errorf("error inspecting rabbitMQ: %s", err)
return false, s.anonimizeRabbitMQError(err)
}

if s.metadata.mode == rabbitModeQueueLength {
Expand Down Expand Up @@ -421,7 +428,7 @@ func (s *rabbitMQScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
func (s *rabbitMQScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
messages, publishRate, err := s.getQueueStatus()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting rabbitMQ: %s", err)
return []external_metrics.ExternalMetricValue{}, s.anonimizeRabbitMQError(err)
}

var metricValue resource.Quantity
Expand Down Expand Up @@ -498,3 +505,9 @@ func getMaximum(q []queueInfo) (int, float64) {
}
return maxMessages, maxRate
}

// Mask host for log purposes
func (s *rabbitMQScaler) anonimizeRabbitMQError(err error) error {
errorMessage := fmt.Sprintf("error inspecting rabbitMQ: %s", err)
return fmt.Errorf(rabbitMQAnonymizePattern.ReplaceAllString(errorMessage, "user:password@"))
}
41 changes: 41 additions & 0 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

const (
Expand Down Expand Up @@ -388,3 +390,42 @@ func TestRabbitMQGetMetricSpecForScaling(t *testing.T) {
}
}
}

type rabbitMQErrorTestData struct {
err error
message string
}

var anonimizeRabbitMQErrorTestData = []rabbitMQErrorTestData{
{fmt.Errorf("https://user1:[email protected]"), "error inspecting rabbitMQ: https://user:[email protected]"},
{fmt.Errorf("https://fdasr345_-:[email protected]"), "error inspecting rabbitMQ: https://user:[email protected]"},
{fmt.Errorf("https://user1:[email protected]"), "error inspecting rabbitMQ: https://user:[email protected]"},
{fmt.Errorf("https://fdakls_dsa:[email protected]"), "error inspecting rabbitMQ: https://user:[email protected]"},
{fmt.Errorf("fdasr345_-:[email protected]"), "error inspecting rabbitMQ: user:[email protected]"},
{fmt.Errorf("this user1:[email protected] fails"), "error inspecting rabbitMQ: this user:[email protected] fails"},
{fmt.Errorf("this https://user1:[email protected] fails also"), "error inspecting rabbitMQ: this https://user:[email protected] fails also"},
{fmt.Errorf("nothing to replace here"), "error inspecting rabbitMQ: nothing to replace here"},
{fmt.Errorf("the queue https://user1:[email protected]/api/virtual is unavailable"), "error inspecting rabbitMQ: the queue https://user:[email protected]/api/virtual is unavailable"},
}

func TestRabbitMQAnonimizeRabbitMQError(t *testing.T) {
metadata := map[string]string{
"queueName": "evaluate_trials",
"hostFromEnv": host,
"protocol": "http",
}
meta, err := parseRabbitMQMetadata(&ScalerConfig{ResolvedEnv: sampleRabbitMqResolvedEnv, TriggerMetadata: metadata, AuthParams: nil})

if err != nil {
t.Fatalf("Error parsing metadata (%s)", err)
}

s := &rabbitMQScaler{
metadata: meta,
httpClient: nil,
}
for _, testData := range anonimizeRabbitMQErrorTestData {
err := s.anonimizeRabbitMQError(testData.err)
assert.Equal(t, fmt.Sprint(err), testData.message)
}
}

0 comments on commit ff19979

Please sign in to comment.