Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vstreamer to throttle on source endpoint #7324

Merged
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
492ac3c
VStreamer throttles on source based on check-self
shlomi-noach Jan 19, 2021
b2ee511
vstreamer.Engine now gets lagThrottler reference, to be used by vstre…
shlomi-noach Jan 20, 2021
2907746
removed debug messages
shlomi-noach Jan 20, 2021
0eea641
Engine owns throttling. Streamers of all types cal on Engine's thrott…
shlomi-noach Jan 20, 2021
f99fa1b
only throttle the events channel, not other channels. We do this by w…
shlomi-noach Jan 20, 2021
542fb91
name of vreplication_basic workflow
shlomi-noach Jan 20, 2021
dd14a08
name of workflow file
shlomi-noach Jan 20, 2021
1e781d4
vreplication_basic on its own shard
shlomi-noach Jan 20, 2021
138bfd5
API endpoint for throttle-app and unthrottle-app
shlomi-noach Jan 20, 2021
603838c
simplify
shlomi-noach Jan 20, 2021
5239392
rename
shlomi-noach Jan 20, 2021
67b5889
test materialization with source throttling
shlomi-noach Jan 20, 2021
209e96e
some comments, a minor Sleep
shlomi-noach Jan 21, 2021
9b3e65a
vreplication_basic moved to a new shard
shlomi-noach Jan 21, 2021
694e1f1
Merge remote-tracking branch 'upstream/master' into vstreamer-throttl…
shlomi-noach Jan 21, 2021
d101493
sub test sections
shlomi-noach Jan 21, 2021
4bf07d3
enable throttler in vreplication test tablets
shlomi-noach Jan 21, 2021
7fa0763
fix shard name
shlomi-noach Jan 21, 2021
78cb68b
exit endless loop
shlomi-noach Jan 21, 2021
ace94f6
fix replication lag query: apparently 'lag' is now introduced asa new…
shlomi-noach Jan 21, 2021
498ccf3
more granular throttling
shlomi-noach Jan 21, 2021
ac494c2
fix vstreamer app name in test, plus validate it
shlomi-noach Jan 21, 2021
6d6e06b
fix throttler path
shlomi-noach Jan 21, 2021
e4a00b3
minor refactor
shlomi-noach Jan 21, 2021
2c32c91
update api path
shlomi-noach Jan 21, 2021
f8822eb
throttle on source tablets
shlomi-noach Jan 21, 2021
fdad85c
throttle product, count customer
shlomi-noach Jan 21, 2021
a55bf2f
code comments
shlomi-noach Jan 21, 2021
c9d5447
comments
shlomi-noach Jan 21, 2021
48279f5
disable lag throttler in irrlelevant test
shlomi-noach Jan 21, 2021
0de0eb7
fix vreplication test: add required sleep just after throttling, vali…
shlomi-noach Jan 24, 2021
7c49e2d
run self checks in throttler
shlomi-noach Jan 24, 2021
bdbcf1d
run self checks in throttler
shlomi-noach Jan 24, 2021
5e24918
Merge remote-tracking branch 'upstream/master' into vstreamer-throttl…
shlomi-noach Jan 24, 2021
dcb6915
fixed dependencies in tabletserver
shlomi-noach Jan 24, 2021
166d939
comment
shlomi-noach Jan 24, 2021
0bc14fa
support 'TickNow()'
shlomi-noach Jan 25, 2021
db121c1
speed up of throttler refresh tick upon opening and upon becoming leader
shlomi-noach Jan 25, 2021
bb328fa
restoring lost endtoend test(24). Whoops
shlomi-noach Jan 25, 2021
13ca3db
merge master, resolve conflicts
shlomi-noach Jan 26, 2021
2b4c5a2
workflow endtoend tests: named shards rather than numbered shards
shlomi-noach Jan 26, 2021
7f44ebc
convert shard numbers to strings
shlomi-noach Jan 26, 2021
22088b4
shard -1 --> shard ""
shlomi-noach Jan 26, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .github/workflows/cluster_endtoend_vreplication_basic.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: Cluster (vreplication_basic)
on: [push, pull_request]
jobs:

build:
name: Run endtoend tests on Cluster (29)
runs-on: ubuntu-latest

steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.15

- name: Check out code
uses: actions/checkout@v2

- name: Get dependencies
run: |
sudo apt-get update
sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata
sudo service mysql stop
sudo service etcd stop
sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld
go mod download

wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get install -y gnupg2
sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get update
sudo apt-get install percona-xtrabackup-24

- name: Installing zookeeper and consul
run: |
make tools

- name: Run cluster endtoend test
timeout-minutes: 30
run: |
source build.env
eatmydata -- go run test.go -docker=false -print-log -follow -shard 29
16 changes: 16 additions & 0 deletions go/sqltypes/named_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ func (r RowNamedValues) AsUint64(fieldName string, def uint64) uint64 {
return def
}

// AsFloat64 returns the named field as float64, or default value if nonexistent/error
func (r RowNamedValues) AsFloat64(fieldName string, def float64) float64 {
if v, err := r.ToFloat64(fieldName); err == nil {
return v
}
return def
}

// ToFloat64 returns the named field as float64
func (r RowNamedValues) ToFloat64(fieldName string) (float64, error) {
if v, ok := r[fieldName]; ok {
return v.ToFloat64()
}
return 0, ErrNoSuchField
}

// ToBool returns the named field as bool
func (r RowNamedValues) ToBool(fieldName string) (bool, error) {
if v, ok := r[fieldName]; ok {
Expand Down
88 changes: 66 additions & 22 deletions go/test/endtoend/tabletmanager/throttler/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (

var (
clusterInstance *cluster.LocalProcessCluster
masterTablet cluster.Vttablet
replicaTablet cluster.Vttablet
primaryTablet *cluster.Vttablet
replicaTablet *cluster.Vttablet
hostname = "localhost"
keyspaceName = "ks"
cell = "zone1"
Expand Down Expand Up @@ -65,8 +65,9 @@ var (
}
}`

httpClient = base.SetupHTTPClient(time.Second)
checkAPIPath = "throttler/check"
httpClient = base.SetupHTTPClient(time.Second)
checkAPIPath = "throttler/check"
checkSelfAPIPath = "throttler/check-self"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -110,9 +111,9 @@ func TestMain(m *testing.M) {
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
for _, tablet := range tablets {
if tablet.Type == "master" {
masterTablet = *tablet
primaryTablet = tablet
} else if tablet.Type != "rdonly" {
replicaTablet = *tablet
replicaTablet = tablet
}
}

Expand All @@ -121,18 +122,24 @@ func TestMain(m *testing.M) {
os.Exit(exitCode)
}

func throttleCheck() (*http.Response, error) {
return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", masterTablet.HTTPPort, checkAPIPath))
func throttleCheck(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkAPIPath))
}

func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkSelfAPIPath))
}

func TestThrottlerBeforeMetricsCollected(t *testing.T) {
defer cluster.PanicHandler(t)

// Immediately after startup, we expect this response:
// {"StatusCode":404,"Value":0,"Threshold":0,"Message":"No such metric"}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}
}

func TestThrottlerAfterMetricsCollected(t *testing.T) {
Expand All @@ -141,9 +148,21 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) {
time.Sleep(10 * time.Second)
// By this time metrics will have been collected. We expect no lag, and something like:
// {"StatusCode":200,"Value":0.282278,"Threshold":1,"Message":""}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(replicaTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
}

func TestLag(t *testing.T) {
Expand All @@ -156,19 +175,44 @@ func TestLag(t *testing.T) {
time.Sleep(2 * time.Second)
// Lag will have accumulated
// {"StatusCode":429,"Value":4.864921,"Threshold":1,"Message":"Threshold exceeded"}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(primaryTablet)
assert.NoError(t, err)
// self (on primary) is unaffected by replication lag
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(replicaTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
}
}
{
err := clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias)
assert.NoError(t, err)

time.Sleep(5 * time.Second)
// Restore
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
{
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
resp, err := throttleCheckSelf(replicaTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
}
}

Expand All @@ -181,7 +225,7 @@ func TestNoReplicas(t *testing.T) {
time.Sleep(10 * time.Second)
// This makes no REPLICA servers available. We expect something like:
// {"StatusCode":200,"Value":0,"Threshold":1,"Message":""}
resp, err := throttleCheck()
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
Expand All @@ -191,7 +235,7 @@ func TestNoReplicas(t *testing.T) {

time.Sleep(10 * time.Second)
// Restore valid replica
resp, err := throttleCheck()
resp, err := throttleCheck(primaryTablet)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
Expand Down
8 changes: 7 additions & 1 deletion go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,14 @@ func (vc *VitessCluster) AddTablet(t *testing.T, cell *Cell, keyspace *Keyspace,
vc.Topo.Port,
globalConfig.hostname,
globalConfig.tmpDir,
[]string{"-queryserver-config-schema-reload-time", "5"}, //FIXME: for multi-cell initial schema doesn't seem to load without this
[]string{
"-queryserver-config-schema-reload-time", "5",
"-enable-lag-throttler",
"-heartbeat_enable",
"-heartbeat_interval", "250ms",
}, //FIXME: for multi-cell initial schema doesn't seem to load without "-queryserver-config-schema-reload-time"
false)

require.NotNil(t, vttablet)
vttablet.SupportsBackup = false

Expand Down
Loading