Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VTGateProxy CI #414

Merged
merged 2 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 43 additions & 76 deletions go/test/endtoend/vtgateproxy/rebalance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ package vtgateproxy

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strconv"
"testing"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
Expand All @@ -47,8 +44,7 @@ func testVtgateProxyRebalance(t *testing.T, loadBalancer string) {

const targetAffinity = "use1-az1"
const targetPool = "pool1"
const vtgateCount = 10
const vtgatesInAffinity = 5
const vtgateCount = 5
const vtgateproxyConnections = 4

vtgates, err := startAdditionalVtgates(vtgateCount)
Expand All @@ -61,28 +57,20 @@ func testVtgateProxyRebalance(t *testing.T, loadBalancer string) {
var config []map[string]string

for i, vtgate := range vtgates {
affinity := targetAffinity
if i >= vtgatesInAffinity {
affinity = "use1-az2"
}
config = append(config, map[string]string{
"host": fmt.Sprintf("vtgate%v", i),
"address": clusterInstance.Hostname,
"grpc": strconv.Itoa(vtgate.GrpcPort),
"az_id": affinity,
"az_id": targetAffinity,
"type": targetPool,
})
}

vtgateIdx := vtgateproxyConnections
b, err := json.Marshal(config[:vtgateIdx])
if err != nil {
t.Fatal(err)
}
if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil {
if err := writeConfig(t, vtgateHostsFile, config, nil); err != nil {
t.Fatal(err)
}

// Spin up proxy
vtgateproxyHTTPPort := clusterInstance.GetAndReservePort()
vtgateproxyGrpcPort := clusterInstance.GetAndReservePort()
vtgateproxyMySQLPort := clusterInstance.GetAndReservePort()
Expand Down Expand Up @@ -114,27 +102,32 @@ func testVtgateProxyRebalance(t *testing.T, loadBalancer string) {

log.Info("Reading test value while adding vtgates")

const totalQueries = 1000
addVtgateEveryN := totalQueries / len(vtgates)
// Scale up
for i := 1; i <= vtgateCount; i++ {
if err := writeConfig(t, vtgateHostsFile, config[:i], vtgateproxyProcInstance); err != nil {
t.Fatal(err)
}

for i := 0; i < totalQueries; i++ {
if i%(addVtgateEveryN) == 0 && vtgateIdx <= len(vtgates) {
log.Infof("Adding vtgate %v", vtgateIdx-1)
b, err = json.Marshal(config[:vtgateIdx])
// Run queries at each configuration
for j := 0; j < 100; j++ {
result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer")
if err != nil {
t.Fatal(err)
}
if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil {
t.Fatal(err)
}

if err := vtgateproxyProcInstance.WaitForConfig(config[:vtgateIdx], 5*time.Second); err != nil {
t.Fatal(err)
}

vtgateIdx++
assert.Equal(t, []customerEntry{{1, "email1"}}, result)
}
}

log.Info("Removing first vtgates")

// Pop the first 2 vtgates off to force first_ready to pick a new target
if err := writeConfig(t, vtgateHostsFile, config[2:], vtgateproxyProcInstance); err != nil {
t.Fatal(err)
}

// Run queries in the last configuration
for j := 0; j < 100; j++ {
result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer")
if err != nil {
t.Fatal(err)
Expand All @@ -143,59 +136,33 @@ func testVtgateProxyRebalance(t *testing.T, loadBalancer string) {
assert.Equal(t, []customerEntry{{1, "email1"}}, result)
}

// No queries should be sent to vtgates outside target affinity
const expectMaxQueryCountNonAffinity = 0
var expectVtgatesWithQueries int

switch loadBalancer {
case "round_robin":
// At least 1 query should be sent to every vtgate matching target
// affinity
const expectMinQueryCountAffinity = 1

for i, vtgate := range vtgates {
queryCount, err := getVtgateQueryCount(vtgate)
if err != nil {
t.Fatal(err)
}

affinity := config[i]["az_id"]

log.Infof("vtgate %v (%v) query counts: %+v", i, affinity, queryCount)

if affinity == targetAffinity {
assert.GreaterOrEqual(t, queryCount.Sum(), expectMinQueryCountAffinity, "vtgate %v did not recieve the expected number of queries", i)
} else {
assert.LessOrEqual(t, queryCount.Sum(), expectMaxQueryCountNonAffinity, "vtgate %v recieved more than the expected number of queries", i)
}
}
// Every vtgate should get some queries. We went from 1 vtgates to
// NumConnections+1 vtgates, and then removed the first vtgate.
expectVtgatesWithQueries = len(vtgates)
case "first_ready":
// A single vtgate should become the target, and it should recieve all
// queries
targetVtgate := -1

for i, vtgate := range vtgates {
queryCount, err := getVtgateQueryCount(vtgate)
if err != nil {
t.Fatal(err)
}

affinity := config[i]["az_id"]
// Only 2 vtgates should have queries. The first vtgate should get all
// queries until it is removed, and then a new vtgate should be picked
// to get all subsequent queries.
expectVtgatesWithQueries = 2
}

log.Infof("vtgate %v (%v) query counts: %+v", i, affinity, queryCount)
var vtgatesWithQueries int
for i, vtgate := range vtgates {
queryCount, err := getVtgateQueryCount(vtgate)
if err != nil {
t.Fatal(err)
}

sum := queryCount.Sum()
if sum == 0 {
continue
}
log.Infof("vtgate %v query counts: %+v", i, queryCount)

if targetVtgate != -1 {
t.Logf("only vtgate %v should have received queries; vtgate %v got %v", targetVtgate, i, sum)
t.Fail()
} else if affinity == targetAffinity {
targetVtgate = i
} else {
assert.LessOrEqual(t, queryCount.Sum(), expectMaxQueryCountNonAffinity, "vtgate %v recieved more than the expected number of queries", i)
}
if queryCount.Sum() > 0 {
vtgatesWithQueries++
}
}

assert.Equal(t, expectVtgatesWithQueries, vtgatesWithQueries)
}
96 changes: 54 additions & 42 deletions go/test/endtoend/vtgateproxy/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func testVtgateProxyScale(t *testing.T, loadBalancer string) {
vtgateHostsFile := filepath.Join(clusterInstance.TmpDirectory, "hosts")
var config []map[string]string

// Spin up vtgates
for i, vtgate := range vtgates {
pool := targetPool
if i == 0 {
Expand All @@ -76,14 +77,12 @@ func testVtgateProxyScale(t *testing.T, loadBalancer string) {
"type": pool,
})
}
b, err := json.Marshal(config[:1])
if err != nil {
t.Fatal(err)
}
if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil {

if err := writeConfig(t, vtgateHostsFile, config[:1], nil); err != nil {
t.Fatal(err)
}

// Spin up proxy
vtgateproxyHTTPPort := clusterInstance.GetAndReservePort()
vtgateproxyGrpcPort := clusterInstance.GetAndReservePort()
vtgateproxyMySQLPort := clusterInstance.GetAndReservePort()
Expand Down Expand Up @@ -113,35 +112,19 @@ func testVtgateProxyScale(t *testing.T, loadBalancer string) {
t.Fatal("no vtgates in the pool, ping should have failed")
}

log.Info("Reading test value while scaling vtgates")

// Start with an empty list of vtgates, then scale up, then scale back to
// 0. We should expect to see immediate failure when there are no vtgates,
// then success at each scale, until we hit 0 vtgates again, at which point
// we should fail fast again.
i := 0
scaleUp := true
for {
t.Logf("writing config file with %v vtgates", i)
b, err = json.Marshal(config[:i])
if err != nil {
t.Fatal(err)
}
if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil {
t.Fatal(err)
}

if err := vtgateproxyProcInstance.WaitForConfig(config[:i], 5*time.Second); err != nil {
testQuery := func(i int) {
if err := writeConfig(t, vtgateHostsFile, config[:i], vtgateproxyProcInstance); err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

result, err := selectHelper[customerEntry](ctx, conn, "select id, email from customer")
// 0 vtgates should fail
// First vtgate is in the wrong pool, so it should also fail
if i <= 1 {

switch i {
case 0:
// If there are 0 vtgates, expect a failure
if err == nil {
t.Fatal("query should have failed with no vtgates")
}
Expand All @@ -150,25 +133,54 @@ func testVtgateProxyScale(t *testing.T, loadBalancer string) {
if loadBalancer == "first_ready" && errors.Is(err, context.DeadlineExceeded) {
t.Fatal("query timed out but it should have failed fast")
}
} else if err != nil {
t.Fatalf("%v vtgates were present, but the query still failed: %v", i, err)
} else {
assert.Equal(t, []customerEntry{{1, "email1"}}, result)
}
case 1:
// If there is 1 vtgate, expect a failure since it's in the wrong pool
if err == nil {
t.Fatal("query should have failed with no vtgates in the pool")
}

if scaleUp {
i++
if i >= len(config) {
scaleUp = false
i -= 2
// In first_ready mode, we expect to fail fast and not time out.
if loadBalancer == "first_ready" && errors.Is(err, context.DeadlineExceeded) {
t.Fatal("query timed out but it should have failed fast")
}
default:
if err != nil {
t.Fatalf("%v vtgates were present, but the query still failed: %v", i, err)
}

continue
assert.Equal(t, []customerEntry{{1, "email1"}}, result)
}
}

i--
if i < 0 {
break
}
log.Info("Reading test value while scaling vtgates")

// Start with an empty list of vtgates, then scale up, then scale back to
// 0. We should expect to see immediate failure when there are no vtgates,
// then success at each scale, until we hit 0 vtgates again, at which point
// we should fail fast again.
for i := 0; i <= len(config); i++ {
testQuery(i)
}

for i := len(config) - 1; i >= 0; i-- {
testQuery(i)
}
}

func writeConfig(t *testing.T, target string, config []map[string]string, proc *VtgateProxyProcess) error {
t.Logf("writing config with %v vtgates", len(config))

b, err := json.Marshal(config)
if err != nil {
return err
}
if err := os.WriteFile(target, b, 0644); err != nil {
return err
}

if proc != nil {
return proc.WaitForConfig(config, 5*time.Second)
}

return nil
}
9 changes: 9 additions & 0 deletions test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,15 @@
"Shard": "topo_connection_cache",
"RetryMax": 1,
"Tags": []
},
"vtgateproxy": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vtgateproxy"],
"Command": [],
"Manual": false,
"Shard": "vtgate_general_heavy",
"RetryMax": 1,
"Tags": []
}
}
}
Loading