diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 879b2ef8a0e..c92de019d90 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -56,6 +56,7 @@ type LocalProcessCluster struct { Cell string BaseTabletUID int Hostname string + TopoFlavor string TopoPort int TmpDirectory string OriginalVTDATAROOT string @@ -173,6 +174,8 @@ func (cluster *LocalProcessCluster) StartTopo() (err error) { if cluster.Cell == "" { cluster.Cell = DefaultCell } + + topoFlavor = cluster.TopoFlavorString() cluster.TopoPort = cluster.GetAndReservePort() cluster.TmpDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/tmp_%d", cluster.GetAndReservePort())) cluster.TopoProcess = *TopoProcessInstance(cluster.TopoPort, cluster.GetAndReservePort(), cluster.Hostname, *topoFlavor, "global") @@ -758,9 +761,13 @@ func (cluster *LocalProcessCluster) StartVttablet(tablet *Vttablet, servingStatu return tablet.VttabletProcess.Setup() } -//func (cluster *LocalProcessCluster) NewOrcInstance() OrchestratorProcess { -// -//} +// TopoFlavorString returns the topo flavor +func (cluster *LocalProcessCluster) TopoFlavorString() *string { + if cluster.TopoFlavor != "" { + return &cluster.TopoFlavor + } + return topoFlavor +} func getCoveragePath(fileName string) string { covDir := os.Getenv("COV_DIR") diff --git a/go/test/endtoend/cluster/topo_process.go b/go/test/endtoend/cluster/topo_process.go index c78d251768f..01c90906482 100644 --- a/go/test/endtoend/cluster/topo_process.go +++ b/go/test/endtoend/cluster/topo_process.go @@ -230,7 +230,7 @@ func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoo // Attempt graceful shutdown with SIGTERM first _ = topo.proc.Process.Signal(syscall.SIGTERM) - if !*keepData { + if !(*keepData || keepdata) { _ = os.RemoveAll(topo.DataDirectory) _ = os.RemoveAll(currentRoot) } diff --git a/go/test/endtoend/topotest/consul/main_test.go b/go/test/endtoend/topotest/consul/main_test.go new file mode 100644 index 00000000000..f805c7c00b0 --- /dev/null +++ b/go/test/endtoend/topotest/consul/main_test.go @@ -0,0 +1,148 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package consul + +import ( + "context" + "flag" + "fmt" + "os" + "testing" + "time" + + "vitess.io/vitess/go/vt/log" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + cell = "zone1" + hostname = "localhost" + KeyspaceName = "customer" + SchemaSQL = ` +CREATE TABLE t1 ( + c1 BIGINT NOT NULL, + c2 BIGINT NOT NULL, + c3 BIGINT, + c4 varchar(100), + PRIMARY KEY (c1), + UNIQUE KEY (c2), + UNIQUE KEY (c3), + UNIQUE KEY (c4) +) ENGINE=Innodb;` + VSchema = ` +{ + "sharded": false, + "tables": { + "t1": {} + } +} +` +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + clusterInstance.TopoFlavor = "consul" + if err := clusterInstance.StartTopo(); err != nil { + return 1 + } + + // Start keyspace + Keyspace := &cluster.Keyspace{ + Name: KeyspaceName, + SchemaSQL: SchemaSQL, + VSchema: VSchema, + } + if err := clusterInstance.StartUnshardedKeyspace(*Keyspace, 0, false); err != nil { + log.Fatal(err.Error()) + return 1 + } + + // Start vtgate + if err := clusterInstance.StartVtgate(); err != nil { + log.Fatal(err.Error()) + return 1 + } + + return m.Run() + }() + os.Exit(exitCode) +} + +func TestTopoDownServingQuery(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + vtParams := mysql.ConnParams{ + Host: "localhost", + Port: clusterInstance.VtgateMySQLPort, + } + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + defer exec(t, conn, `delete from t1`) + + execMulti(t, conn, `insert into t1(c1, c2, c3, c4) values (300,100,300,'abc'); ;; insert into t1(c1, c2, c3, c4) values (301,101,301,'abcd');;`) + assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`) + clusterInstance.TopoProcess.TearDown(clusterInstance.Cell, clusterInstance.OriginalVTDATAROOT, clusterInstance.CurrentVTDATAROOT, true, *clusterInstance.TopoFlavorString()) + time.Sleep(3 * time.Second) + assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`) +} + +func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { + t.Helper() + qr, err := conn.ExecuteFetch(query, 1000, true) + require.NoError(t, err) + return qr +} + +func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result { + t.Helper() + var res []*sqltypes.Result + qr, more, err := conn.ExecuteFetchMulti(query, 1000, true) + res = append(res, qr) + require.NoError(t, err) + for more == true { + qr, more, _, err = conn.ReadQueryResult(1000, true) + require.NoError(t, err) + res = append(res, qr) + } + return res +} + +func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) { + t.Helper() + qr := exec(t, conn, query) + got := fmt.Sprintf("%v", qr.Rows) + diff := cmp.Diff(expected, got) + if diff != "" { + t.Errorf("Query: %s (-want +got):\n%s", query, diff) + } +} diff --git a/go/test/endtoend/topotest/etcd2/main_test.go b/go/test/endtoend/topotest/etcd2/main_test.go new file mode 100644 index 00000000000..67ffb32ee6e --- /dev/null +++ b/go/test/endtoend/topotest/etcd2/main_test.go @@ -0,0 +1,147 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ectd2 + +import ( + "context" + "flag" + "fmt" + "os" + "testing" + "time" + + "vitess.io/vitess/go/vt/log" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + cell = "zone1" + hostname = "localhost" + KeyspaceName = "customer" + SchemaSQL = ` +CREATE TABLE t1 ( + c1 BIGINT NOT NULL, + c2 BIGINT NOT NULL, + c3 BIGINT, + c4 varchar(100), + PRIMARY KEY (c1), + UNIQUE KEY (c2), + UNIQUE KEY (c3), + UNIQUE KEY (c4) +) ENGINE=Innodb;` + VSchema = ` +{ + "sharded": false, + "tables": { + "t1": {} + } +} +` +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + if err := clusterInstance.StartTopo(); err != nil { + return 1 + } + + // Start keyspace + Keyspace := &cluster.Keyspace{ + Name: KeyspaceName, + SchemaSQL: SchemaSQL, + VSchema: VSchema, + } + if err := clusterInstance.StartUnshardedKeyspace(*Keyspace, 0, false); err != nil { + log.Fatal(err.Error()) + return 1 + } + + // Start vtgate + if err := clusterInstance.StartVtgate(); err != nil { + log.Fatal(err.Error()) + return 1 + } + + return m.Run() + }() + os.Exit(exitCode) +} + +func TestTopoDownServingQuery(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + vtParams := mysql.ConnParams{ + Host: "localhost", + Port: clusterInstance.VtgateMySQLPort, + } + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + defer exec(t, conn, `delete from t1`) + + execMulti(t, conn, `insert into t1(c1, c2, c3, c4) values (300,100,300,'abc'); ;; insert into t1(c1, c2, c3, c4) values (301,101,301,'abcd');;`) + assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`) + clusterInstance.TopoProcess.TearDown(clusterInstance.Cell, clusterInstance.OriginalVTDATAROOT, clusterInstance.CurrentVTDATAROOT, true, *clusterInstance.TopoFlavorString()) + time.Sleep(3 * time.Second) + assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`) +} + +func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { + t.Helper() + qr, err := conn.ExecuteFetch(query, 1000, true) + require.NoError(t, err) + return qr +} + +func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result { + t.Helper() + var res []*sqltypes.Result + qr, more, err := conn.ExecuteFetchMulti(query, 1000, true) + res = append(res, qr) + require.NoError(t, err) + for more == true { + qr, more, _, err = conn.ReadQueryResult(1000, true) + require.NoError(t, err) + res = append(res, qr) + } + return res +} + +func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) { + t.Helper() + qr := exec(t, conn, query) + got := fmt.Sprintf("%v", qr.Rows) + diff := cmp.Diff(expected, got) + if diff != "" { + t.Errorf("Query: %s (-want +got):\n%s", query, diff) + } +} diff --git a/go/test/endtoend/topotest/zk2/main_test.go b/go/test/endtoend/topotest/zk2/main_test.go new file mode 100644 index 00000000000..611bee7d180 --- /dev/null +++ b/go/test/endtoend/topotest/zk2/main_test.go @@ -0,0 +1,148 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package zk2 + +import ( + "context" + "flag" + "fmt" + "os" + "testing" + "time" + + "vitess.io/vitess/go/vt/log" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + cell = "zone1" + hostname = "localhost" + KeyspaceName = "customer" + SchemaSQL = ` +CREATE TABLE t1 ( + c1 BIGINT NOT NULL, + c2 BIGINT NOT NULL, + c3 BIGINT, + c4 varchar(100), + PRIMARY KEY (c1), + UNIQUE KEY (c2), + UNIQUE KEY (c3), + UNIQUE KEY (c4) +) ENGINE=Innodb;` + VSchema = ` +{ + "sharded": false, + "tables": { + "t1": {} + } +} +` +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + clusterInstance.TopoFlavor = "zk2" + if err := clusterInstance.StartTopo(); err != nil { + return 1 + } + + // Start keyspace + Keyspace := &cluster.Keyspace{ + Name: KeyspaceName, + SchemaSQL: SchemaSQL, + VSchema: VSchema, + } + if err := clusterInstance.StartUnshardedKeyspace(*Keyspace, 0, false); err != nil { + log.Fatal(err.Error()) + return 1 + } + + // Start vtgate + if err := clusterInstance.StartVtgate(); err != nil { + log.Fatal(err.Error()) + return 1 + } + + return m.Run() + }() + os.Exit(exitCode) +} + +func TestTopoDownServingQuery(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + vtParams := mysql.ConnParams{ + Host: "localhost", + Port: clusterInstance.VtgateMySQLPort, + } + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + defer exec(t, conn, `delete from t1`) + + execMulti(t, conn, `insert into t1(c1, c2, c3, c4) values (300,100,300,'abc'); ;; insert into t1(c1, c2, c3, c4) values (301,101,301,'abcd');;`) + assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`) + clusterInstance.TopoProcess.TearDown(clusterInstance.Cell, clusterInstance.OriginalVTDATAROOT, clusterInstance.CurrentVTDATAROOT, true, *clusterInstance.TopoFlavorString()) + time.Sleep(3 * time.Second) + assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`) +} + +func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { + t.Helper() + qr, err := conn.ExecuteFetch(query, 1000, true) + require.NoError(t, err) + return qr +} + +func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result { + t.Helper() + var res []*sqltypes.Result + qr, more, err := conn.ExecuteFetchMulti(query, 1000, true) + res = append(res, qr) + require.NoError(t, err) + for more == true { + qr, more, _, err = conn.ReadQueryResult(1000, true) + require.NoError(t, err) + res = append(res, qr) + } + return res +} + +func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) { + t.Helper() + qr := exec(t, conn, query) + got := fmt.Sprintf("%v", qr.Rows) + diff := cmp.Diff(expected, got) + if diff != "" { + t.Errorf("Query: %s (-want +got):\n%s", query, diff) + } +} diff --git a/go/test/endtoend/vtgate/unsharded/main_test.go b/go/test/endtoend/vtgate/unsharded/main_test.go index 323316ea8f3..e447c7543ff 100644 --- a/go/test/endtoend/vtgate/unsharded/main_test.go +++ b/go/test/endtoend/vtgate/unsharded/main_test.go @@ -239,6 +239,26 @@ func TestEmptyStatement(t *testing.T) { assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`) } +func TestTopoDownServingQuery(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + vtParams := mysql.ConnParams{ + Host: "localhost", + Port: clusterInstance.VtgateMySQLPort, + } + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + defer exec(t, conn, `delete from t1`) + + execMulti(t, conn, `insert into t1(c1, c2, c3, c4) values (300,100,300,'abc'); ;; insert into t1(c1, c2, c3, c4) values (301,101,301,'abcd');;`) + assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`) + clusterInstance.TopoProcess.TearDown(clusterInstance.Cell, clusterInstance.OriginalVTDATAROOT, clusterInstance.CurrentVTDATAROOT, true, *clusterInstance.TopoFlavorString()) + time.Sleep(3 * time.Second) + assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`) +} + func TestInsertAllDefaults(t *testing.T) { defer cluster.PanicHandler(t) ctx := context.Background() diff --git a/go/vt/srvtopo/resilient_server.go b/go/vt/srvtopo/resilient_server.go index 1b0886c90da..cdb8b9c12e8 100644 --- a/go/vt/srvtopo/resilient_server.go +++ b/go/vt/srvtopo/resilient_server.go @@ -20,6 +20,7 @@ import ( "flag" "fmt" "html/template" + "net/url" "sort" "sync" "time" @@ -468,7 +469,9 @@ func (server *ResilientServer) watchSrvKeyspace(callerCtx context.Context, entry server.counts.Add(errorCategory, 1) log.Errorf("Initial WatchSrvKeyspace failed for %v/%v: %v", cell, keyspace, current.Err) - if time.Since(entry.lastValueTime) > server.cacheTTL { + // This watcher will able to continue to return the last value till it is not able to connect to the topo server even if the cache TTL is reached. + _, netErr := current.Err.(*url.Error) + if !netErr && time.Since(entry.lastValueTime) > server.cacheTTL { log.Errorf("WatchSrvKeyspace clearing cached entry for %v/%v", cell, keyspace) entry.value = nil } diff --git a/test.go b/test.go index 35c22b171b6..16fd50fc14f 100755 --- a/test.go +++ b/test.go @@ -69,7 +69,7 @@ To pass extra args to Python tests (test/*.py), terminate the list of test names with -- and then add them at the end. For example: - go run test.go test1 test2 -- --topo-server-flavor=etcd2 + go run test.go test1 test2 -- --topo-flavor=etcd2 ` // Flags diff --git a/test/config.json b/test/config.json index 12aaa4c02c3..b9a48bfe033 100644 --- a/test/config.json +++ b/test/config.json @@ -551,6 +551,33 @@ "RetryMax": 0, "Tags": [] }, + "topo_zk2": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/topotest/zk2", "--topo-flavor=zk2"], + "Command": [], + "Manual": false, + "Shard": "25", + "RetryMax": 0, + "Tags": [] + }, + "topo_consul": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/topotest/consul", "--topo-flavor=consul"], + "Command": [], + "Manual": false, + "Shard": "18", + "RetryMax": 0, + "Tags": [] + }, + "topo_etcd2": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/topotest/etcd2"], + "Command": [], + "Manual": false, + "Shard": "17", + "RetryMax": 0, + "Tags": [] + }, "web_test": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vtctldweb"],