Skip to content

Commit

Permalink
Apache Pulsar: Add support for partitioned topics
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Marshall <[email protected]>
  • Loading branch information
michaeljmarshall committed Nov 10, 2022
1 parent 710acf6 commit 3364691
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 271 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310))
- **Event Hubs Scaler:** Support Azure Active Direcotry Pod & Workload Identity for Storage Blobs ([#3569](https://github.com/kedacore/keda/issues/3569))
- **Metrics API Scaler:** Add unsafeSsl paramater to skip certificate validation when connecting over HTTPS ([#3728](https://github.com/kedacore/keda/discussions/3728))
- **Pulsar Scaler:** Add support for partitioned topics ([#3833](https://github.com/kedacore/keda/issues/3833))

### Fixes

Expand Down
10 changes: 9 additions & 1 deletion pkg/scalers/pulsar_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type pulsarScaler struct {
type pulsarMetadata struct {
adminURL string
topic string
isPartitionedTopic bool
subscription string
msgBacklogThreshold int64
activationMsgBacklogThreshold int64
Expand All @@ -48,6 +49,7 @@ const (
pulsarMetricType = "External"
defaultMsgBacklogThreshold = 10
enable = "enable"
stringTrue = "true"
)

type pulsarSubscription struct {
Expand Down Expand Up @@ -140,7 +142,13 @@ func parsePulsarMetadata(config *ScalerConfig) (pulsarMetadata, error) {
}

topic := strings.ReplaceAll(meta.topic, "persistent://", "")
meta.statsURL = meta.adminURL + "/admin/v2/persistent/" + topic + "/stats"
if config.TriggerMetadata["isPartitionedTopic"] == stringTrue {
meta.isPartitionedTopic = true
meta.statsURL = meta.adminURL + "/admin/v2/persistent/" + topic + "/partitioned-stats"
} else {
meta.isPartitionedTopic = false
meta.statsURL = meta.adminURL + "/admin/v2/persistent/" + topic + "/stats"
}

switch {
case config.TriggerMetadata["subscriptionFromEnv"] != "":
Expand Down
54 changes: 38 additions & 16 deletions pkg/scalers/pulsar_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package scalers
import (
"context"
"fmt"
"strings"
"testing"

"github.com/go-logr/logr"
)

type parsePulsarMetadataTestData struct {
metadata map[string]string
isError bool
isActive bool
adminURL string
topic string
subscription string
metadata map[string]string
isError bool
isActive bool
isPartitionedTopic bool
adminURL string
topic string
subscription string
}

type parsePulsarAuthParamsTestData struct {
Expand Down Expand Up @@ -51,18 +53,20 @@ var validPulsarWithoutAuthParams = map[string]string{}

var parsePulsarMetadataTestDataset = []parsePulsarMetadataTestData{
// failure, no adminURL
{map[string]string{}, true, false, "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, "http://172.20.0.151:80", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, "http://172.20.0.151:80", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, "http://172.20.0.151:80", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic"}, true, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub1"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub2"}, false, true, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub2"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub3"}, false, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub3"},
{map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"},
{map[string]string{}, true, false, false, "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic"}, true, false, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub1"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub2"}, false, true, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub2"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub3"}, false, false, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub3"},
{map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"},
{map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"},
{map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "isPartitionedTopic": "true", "subscription": "sub1"}, false, false, true, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"},

// tls
{map[string]string{"adminURL": "https://localhost:8443", "tls": "enable", "cert": "certdata", "key": "keydata", "ca": "cadata", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, "https://localhost:8443", "persistent://public/default/my-topic", "sub1"},
{map[string]string{"adminURL": "https://localhost:8443", "tls": "enable", "cert": "certdata", "key": "keydata", "ca": "cadata", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "https://localhost:8443", "persistent://public/default/my-topic", "sub1"},
}

var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{
Expand All @@ -89,6 +93,24 @@ func TestParsePulsarMetadata(t *testing.T) {
t.Errorf("Expected adminURL %s but got %s\n", testData.adminURL, meta.adminURL)
}

if !testData.isError {
if testData.isPartitionedTopic {
if !meta.isPartitionedTopic {
t.Errorf("Expected isPartitionedTopic %t but got %t\n", testData.isPartitionedTopic, meta.isPartitionedTopic)
}
if !strings.HasSuffix(meta.statsURL, "/partitioned-stats") {
t.Errorf("Expected statsURL to end with /partitioned-stats but got %s\n", meta.statsURL)
}
} else {
if meta.isPartitionedTopic {
t.Errorf("Expected isPartitionedTopic %t but got %t\n", testData.isPartitionedTopic, meta.isPartitionedTopic)
}
if !strings.HasSuffix(meta.statsURL, "/stats") {
t.Errorf("Expected statsURL to end with /stats but got %s\n", meta.statsURL)
}
}
}

if meta.topic != testData.topic {
t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic)
}
Expand Down
247 changes: 247 additions & 0 deletions tests/scalers/pulsar/helper/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
//go:build e2e
// +build e2e

package helper

import (
"fmt"
"testing"

"github.com/joho/godotenv"
"github.com/stretchr/testify/assert"
"k8s.io/client-go/kubernetes"

"github.com/kedacore/keda/v2/tests/helper"
)

// Load environment variables from .env file
var _ = godotenv.Load("../../../.env")

const (
apachePulsarVersion = "2.10.2"
messageCount = 3
minReplicaCount = 0
maxReplicaCount = 5
msgBacklog = 10
)

type templateData struct {
ApachePulsarVersion string
TestName string // Used for most resource names
NumPartitions int // Use 0 to create a non-partitioned topic
MessageCount int
MinReplicaCount int
MaxReplicaCount int
MsgBacklog int
}

const pulsarStatefulsetTemplate = `
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{.TestName}}
namespace: {{.TestName}}
labels:
app: pulsar
spec:
selector:
matchLabels:
app: pulsar
replicas: 1
serviceName: {{.TestName}}
template:
metadata:
labels:
app: pulsar
spec:
containers:
- name: pulsar
image: apachepulsar/pulsar:{{.ApachePulsarVersion}}
imagePullPolicy: IfNotPresent
readinessProbe:
tcpSocket:
port: 8080
ports:
- name: pulsar
containerPort: 6650
protocol: TCP
- name: admin
containerPort: 8080
protocol: TCP
env:
- name: PULSAR_PREFIX_tlsRequireTrustedClientCertOnConnect
value: "true"
command:
- sh
- -c
args: ["bin/apply-config-from-env.py conf/client.conf && bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss"]
`

const pulsarServiceTemplate = `
apiVersion: v1
kind: Service
metadata:
name: {{.TestName}}
namespace: {{.TestName}}
spec:
type: ClusterIP
ports:
- name: http
port: 8080
targetPort: 8080
protocol: TCP
- name: pulsar
port: 6650
targetPort: 6650
protocol: TCP
selector:
app: pulsar
`

const consumerTemplate = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{.TestName}}-consumer
namespace: {{.TestName}}
labels:
app: pulsar-consumer
spec:
selector:
matchLabels:
app: pulsar-consumer
template:
metadata:
labels:
app: pulsar-consumer
spec:
containers:
- name: pulsar-consumer
image: ghcr.io/pulsar-sigs/pulsar-client:v0.3.1
imagePullPolicy: IfNotPresent
readinessProbe:
tcpSocket:
port: 9494
args: ["consumer","--broker","pulsar://{{.TestName}}.{{.TestName}}:6650","--topic","persistent://public/default/keda","--subscription-name","keda","--consume-time","200"]
`

const scaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.TestName}}
namespace: {{.TestName}}
spec:
scaleTargetRef:
name: {{.TestName}}-consumer
pollingInterval: 5 # Optional. Default: 30 seconds
cooldownPeriod: 30 # Optional. Default: 300 seconds
maxReplicaCount: {{.MaxReplicaCount}}
minReplicaCount: {{.MinReplicaCount}}
isPartitionedTopic: {{ if .NumPartitions }} "true" {{else}} "false" {{end}}
triggers:
- type: pulsar
metadata:
msgBacklog: "{{.MsgBacklog}}"
activationMsgBacklogThreshold: "5"
adminURL: http://{{.TestName}}.{{.TestName}}:8080
topic: persistent://public/default/keda
subscription: keda
`

const topicPublishJobTemplate = `
apiVersion: batch/v1
kind: Job
metadata:
name: {{.TestName}}-producer
namespace: {{.TestName}}
spec:
template:
spec:
containers:
- name: pulsar-producer
image: apachepulsar/pulsar:{{.ApachePulsarVersion}}
imagePullPolicy: IfNotPresent
args: ["bin/pulsar-perf produce --service-url pulsar://{{.TestName}}.{{.TestName}}:6650 --num-messages {{.MessageCount}} {{ if .NumPartitions }} "--partitions {{.NumPartitions}}" {{ end }} persistent://public/default/keda"]
restartPolicy: Never
backoffLimit: 4
`

func TestScalerWithConfig(t *testing.T, testName string, numPartitions int) {
// setup
t.Log("--- setting up ---")

// Create kubernetes resources
kc := helper.GetKubernetesClient(t)
data, templates := getTemplateData(testName, numPartitions)

helper.CreateKubernetesResources(t, kc, testName, data, templates)

assert.True(t, helper.WaitForStatefulsetReplicaReadyCount(t, kc, testName, testName, 1, 300, 1),
"replica count should be 1 after 5 minute")

helper.KubectlApplyWithTemplate(t, data, "consumerTemplate", consumerTemplate)

// run consumer for create subscription
assert.True(t, helper.WaitForDeploymentReplicaReadyCount(t, kc, getConsumerDeploymentName(testName), testName, 1, 300, 1),
"replica count should be 1 after 5 minute")

helper.KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)

assert.True(t, helper.WaitForDeploymentReplicaReadyCount(t, kc, getConsumerDeploymentName(testName), testName, 0, 60, 1),
"replica count should be 0 after a minute")

testActivation(t, kc, data)
// scale up
testScaleUp(t, kc, data)
// scale down
testScaleDown(t, kc, testName)

// cleanup
helper.KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
helper.KubectlDeleteWithTemplate(t, data, "publishJobTemplate", topicPublishJobTemplate)

helper.DeleteKubernetesResources(t, kc, testName, data, templates)
}

func getTemplateData(testName string, numPartitions int) (templateData, []helper.Template) {
return templateData{
ApachePulsarVersion: apachePulsarVersion,
TestName: testName,
NumPartitions: numPartitions,
MessageCount: messageCount,
MinReplicaCount: minReplicaCount,
MaxReplicaCount: maxReplicaCount,
MsgBacklog: msgBacklog,
}, []helper.Template{
{Name: "statefulsetTemplate", Config: pulsarStatefulsetTemplate},
{Name: "serviceTemplate", Config: pulsarServiceTemplate},
}
}

func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing activation ---")
// publish message and less than MsgBacklog
helper.KubectlApplyWithTemplate(t, data, "publishJobTemplate", topicPublishJobTemplate)
helper.AssertReplicaCountNotChangeDuringTimePeriod(t, kc, getConsumerDeploymentName(data.TestName), data.TestName, data.MinReplicaCount, 60)
helper.KubectlDeleteWithTemplate(t, data, "publishJobTemplate", topicPublishJobTemplate)
}

func testScaleUp(t *testing.T, kc *kubernetes.Clientset, data templateData) {
data.MessageCount = 100
helper.KubectlApplyWithTemplate(t, data, "publishJobTemplate", topicPublishJobTemplate)
assert.True(t, helper.WaitForDeploymentReplicaReadyCount(t, kc, getConsumerDeploymentName(data.TestName), data.TestName, 5, 300, 1),
"replica count should be 5 after 5 minute")
}

func testScaleDown(t *testing.T, kc *kubernetes.Clientset, testName string) {
t.Log("--- testing scale down ---")
// Check if deployment scale down to 0 after 5 minutes
assert.True(t, helper.WaitForDeploymentReplicaReadyCount(t, kc, getConsumerDeploymentName(testName), testName, 0, 300, 1),
"Replica count should be 0 after 5 minutes")
}

func getConsumerDeploymentName(testName string) string {
return fmt.Sprintf("%s-consumer", testName)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
//go:build e2e
// +build e2e

package pulsar_non_partitioned_topic_test

import (
"testing"

pulsar "github.com/kedacore/keda/v2/tests/scalers/pulsar/helper"
)

const (
testName = "pulsar-non-partitioned-topic-test"
numPartitions = 0
)

func TestScaler(t *testing.T) {
pulsar.TestScalerWithConfig(t, testName, numPartitions)
}
Loading

0 comments on commit 3364691

Please sign in to comment.