Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Escape queueName and vhostName in RabbitMQ Scaler before use them in query string (bug fix) #2055

Merged
merged 10 commits into from
Aug 26, 2021
6 changes: 3 additions & 3 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {

// Override vhost if requested.
if s.metadata.vhostName != nil {
vhost = "/" + *s.metadata.vhostName
vhost = "/" + url.QueryEscape(*s.metadata.vhostName)
}

if vhost == "" || vhost == "/" || vhost == "//" {
Expand All @@ -378,9 +378,9 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {
parsedURL.Path = ""
var getQueueInfoManagementURI string
if s.metadata.useRegex {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s", parsedURL.String(), "api/queues?use_regex=true&pagination=false&name=", s.metadata.queueName)
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s", parsedURL.String(), "api/queues?page=1&use_regex=true&pagination=false&name=", url.QueryEscape(s.metadata.queueName))
} else {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, s.metadata.queueName)
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, url.QueryEscape(s.metadata.queueName))
}

var info queueInfo
Expand Down
4 changes: 2 additions & 2 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ var testRegexQueueInfoTestData = []getQueueInfoTestData{
func TestGetQueueInfoWithRegex(t *testing.T) {
for _, testData := range testRegexQueueInfoTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := "/api/queues?use_regex=true&pagination=false&name=evaluate_trials"
expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=%5Eevaluate_trials%24"
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}
Expand All @@ -326,7 +326,7 @@ func TestGetQueueInfoWithRegex(t *testing.T) {
resolvedEnv := map[string]string{host: fmt.Sprintf("%s%s", apiStub.URL, testData.vhostPath), "plainHost": apiStub.URL}

metadata := map[string]string{
"queueName": "evaluate_trials",
"queueName": "^evaluate_trials$",
"hostFromEnv": host,
"protocol": "http",
}
Expand Down
13 changes: 8 additions & 5 deletions tests/scalers/rabbitmq-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ export class RabbitMQHelper {
)
}

static publishMessages(t, namespace: string, connectionString: string, messageCount: number) {
static publishMessages(t, namespace: string, connectionString: string, messageCount: number, queueName: string) {
// publish messages
const tmpFile = tmp.fileSync()
fs.writeFileSync(tmpFile.name, publishYaml.replace('{{CONNECTION_STRING}}', connectionString)
.replace('{{MESSAGE_COUNT}}', messageCount.toString()))
.replace('{{MESSAGE_COUNT}}', messageCount.toString())
.replace('{{QUEUE_NAME}}', queueName)
.replace('{{QUEUE_NAME}}', queueName))

t.is(
0,
sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${namespace}`).code,
Expand All @@ -52,15 +55,15 @@ export class RabbitMQHelper {
const publishYaml = `apiVersion: batch/v1
kind: Job
metadata:
name: rabbitmq-publish
name: rabbitmq-publish-{{QUEUE_NAME}}
spec:
template:
spec:
containers:
- name: rabbitmq-client
image: jeffhollan/rabbitmq-client:dev
image: jorturfer/tests-rabbitmq
ahmelsayed marked this conversation as resolved.
Show resolved Hide resolved
imagePullPolicy: Always
command: ["send", "{{CONNECTION_STRING}}", "{{MESSAGE_COUNT}}"]
command: ["send", "{{CONNECTION_STRING}}", "{{MESSAGE_COUNT}}", "{{QUEUE_NAME}}"]
restartPolicy: Never`

const rabbitmqDeployYaml = `apiVersion: v1
Expand Down
4 changes: 2 additions & 2 deletions tests/scalers/rabbitmq-queue-amqp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ test.serial('Deployment should have 0 replicas on start', t => {
})

test.serial(`Deployment should scale to 4 with ${messageCount} messages on the queue then back to 0`, async t => {
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, queueName)

// with messages published, the consumer deployment should start receiving the messages
t.true(await waitForDeploymentReplicaCount(4, 'test-deployment', testNamespace, 20, 5000), 'Replica count should be 4 after 10 seconds')
Expand Down Expand Up @@ -79,7 +79,7 @@ spec:
spec:
containers:
- name: rabbitmq-consumer
image: jeffhollan/rabbitmq-client:dev
image: jorturfer/tests-rabbitmq
imagePullPolicy: Always
command:
- receive
Expand Down
12 changes: 8 additions & 4 deletions tests/scalers/rabbitmq-queue-http-regex.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {waitForDeploymentReplicaCount} from "./helpers";
const testNamespace = 'rabbitmq-queue-http-regex-test'
const rabbitmqNamespace = 'rabbitmq-http-regex-test'
const queueName = 'hello'
const dummyQueueName1 = 'hello-1'
const dummyQueueName2 = 'hellohellohello'
const username = "test-user"
const password = "test-password"
const vhost = "test-vh-regex"
Expand All @@ -20,7 +22,7 @@ test.before(t => {

sh.config.silent = true
// create deployment
const httpConnectionString = `http://${username}:${password}@rabbitmq.${rabbitmqNamespace}.svc.cluster.local/${vhost}`
const httpConnectionString = `http://${username}:${password}@rabbitmq.${rabbitmqNamespace}.svc.cluster.local`

RabbitMQHelper.createDeployment(t, testNamespace, deployYaml, connectionString, httpConnectionString, queueName)
})
Expand All @@ -33,7 +35,9 @@ test.serial('Deployment should have 0 replicas on start', t => {
})

test.serial(`Deployment should scale to 4 with ${messageCount} messages on the queue then back to 0`, async t => {
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, dummyQueueName1)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, dummyQueueName2)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, queueName)

// with messages published, the consumer deployment should start receiving the messages
t.true(await waitForDeploymentReplicaCount(4, 'test-deployment', testNamespace, 20, 5000), 'Replica count should be 4 after 10 seconds')
Expand Down Expand Up @@ -81,7 +85,7 @@ spec:
spec:
containers:
- name: rabbitmq-consumer
image: jeffhollan/rabbitmq-client:dev
image: jorturfer/tests-rabbitmq
imagePullPolicy: Always
command:
- receive
Expand All @@ -105,7 +109,7 @@ spec:
triggers:
- type: rabbitmq
metadata:
queueName: {{QUEUE_NAME}}
queueName: "^hell.{1}$"
hostFromEnv: RabbitApiHost
protocol: http
useRegex: 'true'
Expand Down
4 changes: 2 additions & 2 deletions tests/scalers/rabbitmq-queue-http.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ test.serial('Deployment should have 0 replicas on start', async t => {
})

test.serial(`Deployment should scale to 4 with ${messageCount} messages on the queue then back to 0`, async t => {
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, queueName)

// with messages published, the consumer deployment should start receiving the messages
t.true(await waitForDeploymentReplicaCount(4, 'test-deployment', testNamespace, 30, 5000))
Expand Down Expand Up @@ -78,7 +78,7 @@ spec:
spec:
containers:
- name: rabbitmq-consumer
image: jeffhollan/rabbitmq-client:dev
image: jorturfer/tests-rabbitmq
imagePullPolicy: Always
command:
- receive
Expand Down
4 changes: 2 additions & 2 deletions tests/scalers/rabbitmq-queue-trigger-auth.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ test.serial('Deployment should have 0 replicas on start', t => {
})

test.serial(`Deployment should scale to 4 with ${messageCount} messages on the queue then back to 0`, t => {
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, queueName)

// with messages published, the consumer deployment should start receiving the messages
let replicaCount = '0'
Expand Down Expand Up @@ -102,7 +102,7 @@ spec:
spec:
containers:
- name: rabbitmq-consumer
image: jeffhollan/rabbitmq-client:dev
image: jorturfer/tests-rabbitmq
imagePullPolicy: Always
command:
- receive
Expand Down