Skip to content

Commit

Permalink
Merge pull request #7484 from planetscale/consul-qs-fix
Browse files Browse the repository at this point in the history
Fix for Query Serving when Toposerver is Down
  • Loading branch information
harshit-gangal authored Feb 17, 2021
2 parents 84b5ab3 + f5384de commit 8b6e690
Show file tree
Hide file tree
Showing 9 changed files with 506 additions and 6 deletions.
13 changes: 10 additions & 3 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type LocalProcessCluster struct {
Cell string
BaseTabletUID int
Hostname string
TopoFlavor string
TopoPort int
TmpDirectory string
OriginalVTDATAROOT string
Expand Down Expand Up @@ -173,6 +174,8 @@ func (cluster *LocalProcessCluster) StartTopo() (err error) {
if cluster.Cell == "" {
cluster.Cell = DefaultCell
}

topoFlavor = cluster.TopoFlavorString()
cluster.TopoPort = cluster.GetAndReservePort()
cluster.TmpDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/tmp_%d", cluster.GetAndReservePort()))
cluster.TopoProcess = *TopoProcessInstance(cluster.TopoPort, cluster.GetAndReservePort(), cluster.Hostname, *topoFlavor, "global")
Expand Down Expand Up @@ -758,9 +761,13 @@ func (cluster *LocalProcessCluster) StartVttablet(tablet *Vttablet, servingStatu
return tablet.VttabletProcess.Setup()
}

//func (cluster *LocalProcessCluster) NewOrcInstance() OrchestratorProcess {
//
//}
// TopoFlavorString returns the topo flavor
func (cluster *LocalProcessCluster) TopoFlavorString() *string {
if cluster.TopoFlavor != "" {
return &cluster.TopoFlavor
}
return topoFlavor
}

func getCoveragePath(fileName string) string {
covDir := os.Getenv("COV_DIR")
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/topo_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoo
// Attempt graceful shutdown with SIGTERM first
_ = topo.proc.Process.Signal(syscall.SIGTERM)

if !*keepData {
if !(*keepData || keepdata) {
_ = os.RemoveAll(topo.DataDirectory)
_ = os.RemoveAll(currentRoot)
}
Expand Down
148 changes: 148 additions & 0 deletions go/test/endtoend/topotest/consul/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package consul

import (
"context"
"flag"
"fmt"
"os"
"testing"
"time"

"vitess.io/vitess/go/vt/log"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
)

var (
clusterInstance *cluster.LocalProcessCluster
cell = "zone1"
hostname = "localhost"
KeyspaceName = "customer"
SchemaSQL = `
CREATE TABLE t1 (
c1 BIGINT NOT NULL,
c2 BIGINT NOT NULL,
c3 BIGINT,
c4 varchar(100),
PRIMARY KEY (c1),
UNIQUE KEY (c2),
UNIQUE KEY (c3),
UNIQUE KEY (c4)
) ENGINE=Innodb;`
VSchema = `
{
"sharded": false,
"tables": {
"t1": {}
}
}
`
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()

// Start topo server
clusterInstance.TopoFlavor = "consul"
if err := clusterInstance.StartTopo(); err != nil {
return 1
}

// Start keyspace
Keyspace := &cluster.Keyspace{
Name: KeyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
}
if err := clusterInstance.StartUnshardedKeyspace(*Keyspace, 0, false); err != nil {
log.Fatal(err.Error())
return 1
}

// Start vtgate
if err := clusterInstance.StartVtgate(); err != nil {
log.Fatal(err.Error())
return 1
}

return m.Run()
}()
os.Exit(exitCode)
}

func TestTopoDownServingQuery(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
vtParams := mysql.ConnParams{
Host: "localhost",
Port: clusterInstance.VtgateMySQLPort,
}
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()

defer exec(t, conn, `delete from t1`)

execMulti(t, conn, `insert into t1(c1, c2, c3, c4) values (300,100,300,'abc'); ;; insert into t1(c1, c2, c3, c4) values (301,101,301,'abcd');;`)
assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
clusterInstance.TopoProcess.TearDown(clusterInstance.Cell, clusterInstance.OriginalVTDATAROOT, clusterInstance.CurrentVTDATAROOT, true, *clusterInstance.TopoFlavorString())
time.Sleep(3 * time.Second)
assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
}

func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
require.NoError(t, err)
return qr
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
qr, more, err := conn.ExecuteFetchMulti(query, 1000, true)
res = append(res, qr)
require.NoError(t, err)
for more == true {
qr, more, _, err = conn.ReadQueryResult(1000, true)
require.NoError(t, err)
res = append(res, qr)
}
return res
}

func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) {
t.Helper()
qr := exec(t, conn, query)
got := fmt.Sprintf("%v", qr.Rows)
diff := cmp.Diff(expected, got)
if diff != "" {
t.Errorf("Query: %s (-want +got):\n%s", query, diff)
}
}
147 changes: 147 additions & 0 deletions go/test/endtoend/topotest/etcd2/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ectd2

import (
"context"
"flag"
"fmt"
"os"
"testing"
"time"

"vitess.io/vitess/go/vt/log"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
)

var (
clusterInstance *cluster.LocalProcessCluster
cell = "zone1"
hostname = "localhost"
KeyspaceName = "customer"
SchemaSQL = `
CREATE TABLE t1 (
c1 BIGINT NOT NULL,
c2 BIGINT NOT NULL,
c3 BIGINT,
c4 varchar(100),
PRIMARY KEY (c1),
UNIQUE KEY (c2),
UNIQUE KEY (c3),
UNIQUE KEY (c4)
) ENGINE=Innodb;`
VSchema = `
{
"sharded": false,
"tables": {
"t1": {}
}
}
`
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()

// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}

// Start keyspace
Keyspace := &cluster.Keyspace{
Name: KeyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
}
if err := clusterInstance.StartUnshardedKeyspace(*Keyspace, 0, false); err != nil {
log.Fatal(err.Error())
return 1
}

// Start vtgate
if err := clusterInstance.StartVtgate(); err != nil {
log.Fatal(err.Error())
return 1
}

return m.Run()
}()
os.Exit(exitCode)
}

func TestTopoDownServingQuery(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
vtParams := mysql.ConnParams{
Host: "localhost",
Port: clusterInstance.VtgateMySQLPort,
}
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()

defer exec(t, conn, `delete from t1`)

execMulti(t, conn, `insert into t1(c1, c2, c3, c4) values (300,100,300,'abc'); ;; insert into t1(c1, c2, c3, c4) values (301,101,301,'abcd');;`)
assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
clusterInstance.TopoProcess.TearDown(clusterInstance.Cell, clusterInstance.OriginalVTDATAROOT, clusterInstance.CurrentVTDATAROOT, true, *clusterInstance.TopoFlavorString())
time.Sleep(3 * time.Second)
assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
}

func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
require.NoError(t, err)
return qr
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
qr, more, err := conn.ExecuteFetchMulti(query, 1000, true)
res = append(res, qr)
require.NoError(t, err)
for more == true {
qr, more, _, err = conn.ReadQueryResult(1000, true)
require.NoError(t, err)
res = append(res, qr)
}
return res
}

func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) {
t.Helper()
qr := exec(t, conn, query)
got := fmt.Sprintf("%v", qr.Rows)
diff := cmp.Diff(expected, got)
if diff != "" {
t.Errorf("Query: %s (-want +got):\n%s", query, diff)
}
}
Loading

0 comments on commit 8b6e690

Please sign in to comment.