Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

counter part of ent pr #17618

Merged
merged 2 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ dev-docker: linux dev-build
--label version=$(CONSUL_VERSION) \
--load \
-f $(CURDIR)/build-support/docker/Consul-Dev-Multiarch.dockerfile $(CURDIR)/pkg/bin/
docker tag 'consul:local' '$(CONSUL_COMPAT_TEST_IMAGE):local'

check-remote-dev-image-env:
ifndef REMOTE_DEV_IMAGE
Expand Down
107 changes: 60 additions & 47 deletions test/integration/consul-container/test/ratelimit/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ const (
// - logs for exceeding

func TestServerRequestRateLimit(t *testing.T) {
t.Parallel()

type action struct {
function func(client *api.Client) error
rateLimitOperation string
Expand All @@ -52,6 +50,7 @@ func TestServerRequestRateLimit(t *testing.T) {
mode string
}

// getKV and putKV are net/RPC calls
getKV := action{
function: func(client *api.Client) error {
_, _, err := client.KV().Get("foo", &api.QueryOptions{})
Expand Down Expand Up @@ -99,13 +98,13 @@ func TestServerRequestRateLimit(t *testing.T) {
action: putKV,
expectedErrorMsg: "",
expectExceededLog: true,
expectMetric: false,
expectMetric: true,
},
{
action: getKV,
expectedErrorMsg: "",
expectExceededLog: true,
expectMetric: false,
expectMetric: true,
},
},
},
Expand All @@ -127,10 +126,13 @@ func TestServerRequestRateLimit(t *testing.T) {
expectMetric: true,
},
},
}}
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.description, func(t *testing.T) {
t.Parallel()
clusterConfig := &libtopology.ClusterConfig{
NumServers: 1,
NumClients: 0,
Expand All @@ -144,12 +146,9 @@ func TestServerRequestRateLimit(t *testing.T) {
ApplyDefaultProxySettings: false,
}

cluster, _, _ := libtopology.NewCluster(t, clusterConfig)
cluster, client := setupClusterAndClient(t, clusterConfig, true)
defer terminate(t, cluster)

client, err := cluster.GetClient(nil, true)
require.NoError(t, err)

// perform actions and validate returned errors to client
for _, op := range tc.operations {
err := op.action.function(client)
Expand All @@ -165,22 +164,14 @@ func TestServerRequestRateLimit(t *testing.T) {
// doing this in a separate loop so we can perform actions, allow metrics
// and logs to collect and then assert on each.
for _, op := range tc.operations {
timer := &retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond}
timer := &retry.Timer{Timeout: 15 * time.Second, Wait: 500 * time.Millisecond}
retry.RunWith(timer, t, func(r *retry.R) {
// validate metrics
metricsInfo, err := client.Agent().Metrics()
// TODO(NET-1978): currently returns NaN error
// require.NoError(t, err)
if metricsInfo != nil && err == nil {
if op.expectMetric {
checkForMetric(r, metricsInfo, op.action.rateLimitOperation, op.action.rateLimitType, tc.mode)
}
}
checkForMetric(t, cluster, op.action.rateLimitOperation, op.action.rateLimitType, tc.mode, op.expectMetric)

// validate logs
// putting this last as there are cases where logs
// were not present in consumer when assertion was made.
checkLogsForMessage(r, clusterConfig.LogConsumer.Msgs,
checkLogsForMessage(t, clusterConfig.LogConsumer.Msgs,
fmt.Sprintf("[DEBUG] agent.server.rpc-rate-limit: RPC exceeded allowed rate limit: rpc=%s", op.action.rateLimitOperation),
op.action.rateLimitOperation, "exceeded", op.expectExceededLog)

Expand All @@ -190,43 +181,65 @@ func TestServerRequestRateLimit(t *testing.T) {
}
}

func checkForMetric(t *retry.R, metricsInfo *api.MetricsInfo, operationName string, expectedLimitType string, expectedMode string) {
const counterName = "consul.rpc.rate_limit.exceeded"
func setupClusterAndClient(t *testing.T, config *libtopology.ClusterConfig, isServer bool) (*libcluster.Cluster, *api.Client) {
cluster, _, _ := libtopology.NewCluster(t, config)

var counter api.SampledValue
for _, c := range metricsInfo.Counters {
if c.Name == counterName {
counter = c
break
}
}
require.NotEmptyf(t, counter.Name, "counter not found: %s", counterName)
client, err := cluster.GetClient(nil, isServer)
require.NoError(t, err)

return cluster, client
}

func checkForMetric(t *testing.T, cluster *libcluster.Cluster, operationName string, expectedLimitType string, expectedMode string, expectMetric bool) {
// validate metrics
server, err := cluster.GetClient(nil, true)
require.NoError(t, err)
metricsInfo, err := server.Agent().Metrics()
// TODO(NET-1978): currently returns NaN error
// require.NoError(t, err)
if metricsInfo != nil && err == nil {
if expectMetric {
const counterName = "consul.rpc.rate_limit.exceeded"

var counter api.SampledValue
for _, c := range metricsInfo.Counters {
if c.Name == counterName {
counter = c
break
}
}
require.NotEmptyf(t, counter.Name, "counter not found: %s", counterName)

operation, ok := counter.Labels["op"]
require.True(t, ok)
operation, ok := counter.Labels["op"]
require.True(t, ok)

limitType, ok := counter.Labels["limit_type"]
require.True(t, ok)
limitType, ok := counter.Labels["limit_type"]
require.True(t, ok)

mode, ok := counter.Labels["mode"]
require.True(t, ok)
mode, ok := counter.Labels["mode"]
require.True(t, ok)

if operation == operationName {
require.GreaterOrEqual(t, counter.Count, 1)
require.Equal(t, expectedLimitType, limitType)
require.Equal(t, expectedMode, mode)
if operation == operationName {
require.GreaterOrEqual(t, counter.Count, 1)
require.Equal(t, expectedLimitType, limitType)
require.Equal(t, expectedMode, mode)
}
}
}
}

func checkLogsForMessage(t *retry.R, logs []string, msg string, operationName string, logType string, logShouldExist bool) {
found := false
for _, log := range logs {
if strings.Contains(log, msg) {
found = true
break
func checkLogsForMessage(t *testing.T, logs []string, msg string, operationName string, logType string, logShouldExist bool) {
if logShouldExist {
found := false
for _, log := range logs {
if strings.Contains(log, msg) {
found = true
break
}
}
expectedLog := fmt.Sprintf("%s log check failed for: %s. Log expected: %t", logType, operationName, logShouldExist)
require.Equal(t, logShouldExist, found, expectedLog)
}
require.Equal(t, logShouldExist, found, fmt.Sprintf("%s log check failed for: %s. Log expected: %t", logType, operationName, logShouldExist))
}

func terminate(t *testing.T, cluster *libcluster.Cluster) {
Expand Down