Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema Tracking Refactor: Merge schema-tracking in health-streamer into schema.Engine #13121

Merged
merged 42 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
4e75fe9
test: add test to verify the expectations of health stream
GuptaManan100 May 18, 2023
122c5e6
test: augment the test to also test the case where views are enabled …
GuptaManan100 May 18, 2023
714a975
feat: use the schema engine notifier to run the reload in health-stre…
GuptaManan100 May 19, 2023
36758f2
feat: deprecate the flag controlling the interval of health-streamer …
GuptaManan100 May 19, 2023
9923f76
test: augment test to verify schema tracking works for alters in view…
GuptaManan100 May 19, 2023
a950d15
feat: add a view table type and augment loadTable function to assign …
GuptaManan100 May 19, 2023
69c3554
feat: change health streamer to use the tables got from the schemaEng…
GuptaManan100 May 19, 2023
9a8202e
feat: make the schema engine capable of storing the table and view in…
GuptaManan100 May 22, 2023
b4a7f95
feat: remove unused functions
GuptaManan100 May 22, 2023
5643738
feat: fix health streamer tests
GuptaManan100 May 23, 2023
90e0b56
test: fix tests by also deleting the tablet record for the tablets th…
GuptaManan100 May 23, 2023
05b4b82
feat: fix deadlocks and incorrect writes from health-streamer
GuptaManan100 May 24, 2023
43056d7
feat: improve how we start health streamer and engine
GuptaManan100 May 24, 2023
5bb9291
feat: use the newly introduced field
GuptaManan100 May 24, 2023
a7e6159
feat: use sqlparser.string to get the escaped table name
GuptaManan100 May 24, 2023
41c5d2a
test: fix the default configuration test's expectation
GuptaManan100 May 24, 2023
c47b88b
test: add the new tables to the expected output of tests
GuptaManan100 May 24, 2023
7f56125
test: fix test expectation to match MySQL 8.0 behaviour
GuptaManan100 May 24, 2023
e5272fb
Merge remote-tracking branch 'upstream/main' into get-schema-rpc
GuptaManan100 May 24, 2023
25814d1
feat: fix schema change test
GuptaManan100 May 25, 2023
0d2ac63
feat: fix state manager test
GuptaManan100 May 25, 2023
7699de2
Merge remote-tracking branch 'upstream/main' into get-schema-rpc
GuptaManan100 May 25, 2023
42e8ca4
feat: use the same flag for the context in schema-engine
GuptaManan100 May 25, 2023
32ab02a
test: refactor test to use a single stream call
GuptaManan100 May 25, 2023
aea8396
feat: revert changes to json marshal in config
GuptaManan100 May 25, 2023
707a460
feat: fix health stream timeout test
GuptaManan100 May 25, 2023
dfdb469
feat: refactor engine notifier to send tables instead of strings
GuptaManan100 May 25, 2023
16a4acf
feat: add tests for all the functions in the db.go file and fix a cou…
GuptaManan100 May 26, 2023
54943b2
feat: refactor code and fix a couple of bugs
GuptaManan100 May 26, 2023
877dc48
test: add more tests for the reload database logic
GuptaManan100 May 26, 2023
5ebe65d
test: fix schema version test
GuptaManan100 May 26, 2023
94f8c49
test: added more tests for all the schema.engine functions
GuptaManan100 May 26, 2023
f7b496e
test: fix engine test flakiness
GuptaManan100 May 29, 2023
c8d891e
Merge remote-tracking branch 'upstream/main' into get-schema-rpc
GuptaManan100 May 29, 2023
96cec2c
test: add health-streamer test
GuptaManan100 May 29, 2023
f778e0b
feat: fix data race in fakedb
GuptaManan100 May 29, 2023
ff01bf9
Merge remote-tracking branch 'upstream/main' into get-schema-rpc
GuptaManan100 May 30, 2023
dcff98a
Merge remote-tracking branch 'upstream/main' into get-schema-rpc
GuptaManan100 May 30, 2023
993fa6b
test: add header comments to tests
GuptaManan100 May 30, 2023
05d3546
feat: rename views table columns to table_schema and table_name
GuptaManan100 May 30, 2023
ee7562d
feat: fix detectViewChange query
GuptaManan100 May 30, 2023
e00653f
feat: fix the summary
GuptaManan100 May 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/17.0/17.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ Affected flags and YAML config keys:
- `shutdown_grace_period`
- `unhealthy_threshold`

The flag `queryserver-config-schema-change-signal-interval` is deprecated and will be removed in a later release.
Schema-tracking has been refactored in this release to not use polling anymore, therefore the signal interval isn't required anymore.

### Online DDL

#### <a id="online-ddl-cut-over-threshold-flag" /> --cut-over-threshold DDL strategy flag
Expand Down
1 change: 0 additions & 1 deletion go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ Usage of vttablet:
--queryserver-config-query-pool-waiter-cap int query server query pool waiter limit, this is the maximum number of queries that can be queued waiting to get a connection (default 5000)
--queryserver-config-query-timeout duration query server query timeout (in seconds), this is the query timeout in vttablet side. If a query takes more than this timeout, it will be killed. (default 30s)
--queryserver-config-schema-change-signal query server schema signal, will signal connected vtgates that schema has changed whenever this is detected. VTGates will need to have -schema_change_signal enabled for this to work (default true)
--queryserver-config-schema-change-signal-interval duration query server schema change signal interval defines at which interval the query server shall send schema updates to vtgate. (default 5s)
--queryserver-config-schema-reload-time duration query server schema reload time, how often vttablet reloads schemas from underlying MySQL instance in seconds. vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time. (default 30m0s)
--queryserver-config-stream-buffer-size int query server stream buffer size, the maximum number of bytes sent from vttablet for each stream call. It's recommended to keep this value in sync with vtgate's stream_buffer_size. (default 32768)
--queryserver-config-stream-pool-size int query server stream connection pool size, stream pool is used by stream queries: queries that return results to client in a streaming fashion (default 200)
Expand Down
40 changes: 40 additions & 0 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,46 @@ func (cluster *LocalProcessCluster) StreamTabletHealth(ctx context.Context, vtta
return responses, nil
}

// StreamTabletHealthUntil invokes a HealthStream on a local cluster Vttablet and
// returns the responses. It waits until a certain condition is met. The amount of time to wait is an input that it takes.
func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context, vttablet *Vttablet, timeout time.Duration, condition func(shr *querypb.StreamHealthResponse) bool) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are existing usages of StreamTabletHealth can those also use this utility function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an inherent difference between the two. StreamTabletHealth gets you specified number of stream responses back, which can then be processed. On the other hand StreamTabletHealthUntil is meant to be used where the test doesn't care if we consume more responses and only verifies if a certain check is ever true.
The tests currently using StreamTabletHealth actually can't really use StreamTabletHealthUntil because they want to run a test on the next packet, and not an eventual packet.

tablet, err := cluster.VtctlclientGetTablet(vttablet)
if err != nil {
return err
}

conn, err := tabletconn.GetDialer()(tablet, grpcclient.FailFast(false))
if err != nil {
return err
}

conditionSuccess := false
timeoutExceeded := false
go func() {
time.Sleep(timeout)
timeoutExceeded = true
}()

err = conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
if condition(shr) {
conditionSuccess = true
}
if timeoutExceeded || conditionSuccess {
return io.EOF
}
return nil
})

if conditionSuccess {
return nil
}

if timeoutExceeded {
return errors.New("timeout exceed while waiting for the condition in StreamHealth")
}
return err
}

func (cluster *LocalProcessCluster) VtctlclientGetTablet(tablet *Vttablet) (*topodatapb.Tablet, error) {
result, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("GetTablet", "--", tablet.Alias)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions go/test/endtoend/cluster/vtctldclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ func VtctldClientProcessInstance(hostname string, grpcPort int, tmpDirectory str
return vtctldclient
}

// PlannedReparentShard executes vtctlclient command to make specified tablet the primary for the shard.
func (vtctldclient *VtctldClientProcess) PlannedReparentShard(Keyspace string, Shard string, alias string) (err error) {
output, err := vtctldclient.ExecuteCommandWithOutput(
"PlannedReparentShard",
fmt.Sprintf("%s/%s", Keyspace, Shard),
"--new-primary", alias)
if err != nil {
log.Errorf("error in PlannedReparentShard output %s, err %s", output, err.Error())
Comment on lines +97 to +103
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are other places as well we are doing this call, like InitializeShard can it be reused

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but eventually we want to move to using vtctldclient and not vtctlclient. So I thought it prudent to add a PRS call to vtctldclient. This is what we should be using going forward in all our tests.

}
return err
}

// CreateKeyspace executes the vtctl command to create a keyspace
func (vtctldclient *VtctldClientProcess) CreateKeyspace(keyspaceName string, sidecarDBName string) (err error) {
var output string
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/tabletmanager/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func TestMain(m *testing.M) {
// List of users authorized to execute vschema ddl operations
clusterInstance.VtGateExtraArgs = []string{
"--vschema_ddl_authorized_users=%",
"--enable-views",
"--discovery_low_replication_lag", tabletUnhealthyThreshold.String(),
}
// Set extra tablet args for lock timeout
Expand Down
132 changes: 131 additions & 1 deletion go/test/endtoend/tabletmanager/tablet_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/utils/strings/slices"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
Expand Down Expand Up @@ -195,6 +195,136 @@ func TestHealthCheck(t *testing.T) {
killTablets(t, rTablet)
}

// TestHealthCheckSchemaChangeSignal tests the tables and views, which report their schemas have changed in the output of a StreamHealth.
func TestHealthCheckSchemaChangeSignal(t *testing.T) {
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
// Add one replica that starts not initialized
defer cluster.PanicHandler(t)
ctx := context.Background()

vtParams := clusterInstance.GetVTParams(keyspaceName)
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

// Make sure the primary is the primary when the test starts.
// This state should be ensured before we actually test anything.
checkTabletType(t, primaryTablet.Alias, "PRIMARY")

// Run a bunch of DDL queries and verify that the tables/views changed show up in the health stream.
// These tests are for the part where `--queryserver-enable-views` flag is not set.
verifyHealthStreamSchemaChangeSignals(t, conn, &primaryTablet, false)

// We start a new vttablet, this time with `--queryserver-enable-views` flag specified.
tempTablet := clusterInstance.NewVttabletInstance("replica", 0, "")
// Start Mysql Processes and return connection
_, err = cluster.StartMySQLAndGetConnection(ctx, tempTablet, username, clusterInstance.TmpDirectory)
require.NoError(t, err)
oldArgs := clusterInstance.VtTabletExtraArgs
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--queryserver-enable-views")
defer func() {
clusterInstance.VtTabletExtraArgs = oldArgs
}()
// start vttablet process, should be in SERVING state as we already have a primary.
err = clusterInstance.StartVttablet(tempTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

defer func() {
// Restore the primary tablet back to the original.
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shardName, primaryTablet.Alias)
require.NoError(t, err)
// Manual cleanup of processes
killTablets(t, tempTablet)
}()

// Now we reparent the cluster to the new tablet we have.
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shardName, tempTablet.Alias)
require.NoError(t, err)

checkTabletType(t, tempTablet.Alias, "PRIMARY")
// Run a bunch of DDL queries and verify that the tables/views changed show up in the health stream.
// These tests are for the part where `--queryserver-enable-views` flag is set.
verifyHealthStreamSchemaChangeSignals(t, conn, tempTablet, true)
}

func verifyHealthStreamSchemaChangeSignals(t *testing.T, vtgateConn *mysql.Conn, primaryTablet *cluster.Vttablet, viewsEnabled bool) {
verifyTableDDLSchemaChangeSignal(t, vtgateConn, primaryTablet, "CREATE TABLE `area` (`id` int NOT NULL, `country` varchar(30), PRIMARY KEY (`id`))", "area")
verifyTableDDLSchemaChangeSignal(t, vtgateConn, primaryTablet, "CREATE TABLE `area2` (`id` int NOT NULL, PRIMARY KEY (`id`))", "area2")
verifyViewDDLSchemaChangeSignal(t, vtgateConn, primaryTablet, "CREATE VIEW v2 as select * from area", viewsEnabled)
verifyTableDDLSchemaChangeSignal(t, vtgateConn, primaryTablet, "ALTER TABLE `area` ADD COLUMN name varchar(30) NOT NULL", "area")
verifyTableDDLSchemaChangeSignal(t, vtgateConn, primaryTablet, "DROP TABLE `area2`", "area2")
verifyViewDDLSchemaChangeSignal(t, vtgateConn, primaryTablet, "ALTER VIEW v2 as select id from area", viewsEnabled)
verifyViewDDLSchemaChangeSignal(t, vtgateConn, primaryTablet, "DROP VIEW v2", viewsEnabled)
verifyTableDDLSchemaChangeSignal(t, vtgateConn, primaryTablet, "DROP TABLE `area`", "area")
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
}

func verifyTableDDLSchemaChangeSignal(t *testing.T, vtgateConn *mysql.Conn, primaryTablet *cluster.Vttablet, query string, table string) {
ctx := context.Background()
var streamErr error
wg := sync.WaitGroup{}
wg.Add(1)
ranOnce := false
go func() {
defer wg.Done()
streamErr = clusterInstance.StreamTabletHealthUntil(ctx, primaryTablet, 30*time.Second, func(shr *querypb.StreamHealthResponse) bool {
ranOnce = true
if shr != nil && shr.RealtimeStats != nil && slices.Contains(shr.RealtimeStats.TableSchemaChanged, table) {
return true
}
return false
})
}()
// The test becomes flaky if we run the DDL immediately after starting the above go routine because the client for the Stream
// sometimes isn't registered by the time DDL runs, and it misses the update we get. To prevent this situation, we wait for one Stream packet
// to have returned. Once we know we received a Stream packet, then we know that we are registered for the health stream and can execute the DDL.
for i := 0; i < 30; i++ {
if ranOnce {
break
}
time.Sleep(1 * time.Second)
}
require.True(t, ranOnce, "We should have received atleast 1 health stream")
_, err := vtgateConn.ExecuteFetch(query, 10000, false)
require.NoError(t, err)
wg.Wait()
require.NoError(t, streamErr)
}

func verifyViewDDLSchemaChangeSignal(t *testing.T, vtgateConn *mysql.Conn, primaryTablet *cluster.Vttablet, query string, viewsEnabled bool) {
ctx := context.Background()
var streamErr error
wg := sync.WaitGroup{}
wg.Add(1)
ranOnce := false
go func() {
defer wg.Done()
streamErr = clusterInstance.StreamTabletHealthUntil(ctx, primaryTablet, 30*time.Second, func(shr *querypb.StreamHealthResponse) bool {
ranOnce = true
listToUse := shr.RealtimeStats.TableSchemaChanged
if viewsEnabled {
listToUse = shr.RealtimeStats.ViewSchemaChanged
}
if shr != nil && shr.RealtimeStats != nil && slices.Contains(listToUse, "v2") {
return true
}
return false
})
}()
// The test becomes flaky if we run the DDL immediately after starting the above go routine because the client for the Stream
// sometimes isn't registered by the time DDL runs, and it misses the update we get. To prevent this situation, we wait for one Stream packet
// to have returned. Once we know we received a Stream packet, then we know that we are registered for the health stream and can execute the DDL.
for i := 0; i < 30; i++ {
if ranOnce {
break
}
time.Sleep(1 * time.Second)
}
require.True(t, ranOnce, "We should have received atleast 1 health stream")
_, err := vtgateConn.ExecuteFetch(query, 10000, false)
require.NoError(t, err)
wg.Wait()
require.NoError(t, streamErr)
}

func checkHealth(t *testing.T, port int, shouldError bool) {
url := fmt.Sprintf("http://localhost:%d/healthz", port)
resp, err := http.Get(url)
Expand Down
2 changes: 1 addition & 1 deletion go/test/fuzzing/tabletserver_schema_fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,5 @@ func newTestLoadTable(tableName, comment string, db *fakesqldb.DB) (*schema.Tabl
}
defer conn.Recycle()

return schema.LoadTable(conn, "fakesqldb", tableName, comment)
return schema.LoadTable(conn, "fakesqldb", tableName, "BASE_TABLE", comment)
}
24 changes: 24 additions & 0 deletions go/vt/sidecardb/schema/schemaengine/schema_engine_tables.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
Copyright 2023 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

CREATE TABLE IF NOT EXISTS schema_engine_tables
(
TABLE_SCHEMA varchar(64) NOT NULL,
TABLE_NAME varchar(64) NOT NULL,
CREATE_STATEMENT longtext,
CREATE_TIME BIGINT,
PRIMARY KEY (TABLE_SCHEMA, TABLE_NAME)
) engine = InnoDB
24 changes: 24 additions & 0 deletions go/vt/sidecardb/schema/schemaengine/schema_engine_views.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
Copyright 2023 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

CREATE TABLE IF NOT EXISTS schema_engine_views
(
VIEW_SCHEMA varchar(64) NOT NULL,
VIEW_NAME varchar(64) NOT NULL,
CREATE_STATEMENT longtext,
VIEW_DEFINITION longtext NOT NULL,
PRIMARY KEY (VIEW_SCHEMA, VIEW_NAME)
) engine = InnoDB
1 change: 0 additions & 1 deletion go/vt/vttablet/endtoend/framework/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func StartServer(connParams, connAppDebugParams mysql.ConnParams, dbName string)
config.HotRowProtection.Mode = tabletenv.Enable
config.TrackSchemaVersions = true
_ = config.GracePeriods.ShutdownSeconds.Set("2s")
_ = config.SignalSchemaChangeReloadIntervalSeconds.Set("2100ms")
config.SignalWhenSchemaChange = true
_ = config.Healthcheck.IntervalSeconds.Set("100ms")
_ = config.Oltp.TxTimeoutSeconds.Set("5s")
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vttablet/tabletserver/binlog_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ limitations under the License.
package tabletserver

import (
"context"
"sync"
"time"

"context"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

Expand Down
Loading