From aef734ec0bb75c870c0755c7c34e892f32b9dbe4 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 4 Oct 2021 20:12:42 +0530 Subject: [PATCH 1/3] e2e test for stream messages Signed-off-by: Harshit Gangal --- go/test/endtoend/cluster/cluster_process.go | 4 +- go/test/endtoend/vtgate/godriver/main_test.go | 155 ++++++++++++++++++ 2 files changed, 157 insertions(+), 2 deletions(-) create mode 100644 go/test/endtoend/vtgate/godriver/main_test.go diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 64ba79b485a..801c62a69fa 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -446,11 +446,11 @@ func (cluster *LocalProcessCluster) StartVtgate() (err error) { // NewVtgateInstance returns an instance of vtgateprocess func (cluster *LocalProcessCluster) NewVtgateInstance() *VtgateProcess { vtgateHTTPPort := cluster.GetAndReservePort() - vtgateGrpcPort := cluster.GetAndReservePort() + cluster.VtgateGrpcPort = cluster.GetAndReservePort() cluster.VtgateMySQLPort = cluster.GetAndReservePort() vtgateProcInstance := VtgateProcessInstance( vtgateHTTPPort, - vtgateGrpcPort, + cluster.VtgateGrpcPort, cluster.VtgateMySQLPort, cluster.Cell, cluster.Cell, diff --git a/go/test/endtoend/vtgate/godriver/main_test.go b/go/test/endtoend/vtgate/godriver/main_test.go new file mode 100644 index 00000000000..0bf81e75bd6 --- /dev/null +++ b/go/test/endtoend/vtgate/godriver/main_test.go @@ -0,0 +1,155 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package godriver + +import ( + "database/sql" + "flag" + "os" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" + + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vitessdriver" + + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + cell = "zone1" + hostname = "localhost" + KeyspaceName = "customer" + SchemaSQL = ` +create table my_message( + time_scheduled bigint, + id bigint, + time_next bigint, + epoch bigint, + time_created bigint, + time_acked bigint, + message varchar(128), + priority tinyint NOT NULL DEFAULT '0', + primary key(time_scheduled, id), + unique index id_idx(id), + index next_idx(priority, time_next) +) comment 'vitess_message,vt_ack_wait=30,vt_purge_after=86400,vt_batch_size=10,vt_cache_size=10000,vt_poller_interval=30'; +` + VSchema = ` +{ + "sharded": true, + "vindexes": { + "hash": { + "type": "hash" + } + }, + "tables": { + "my_message": { + "column_vindexes": [ + { + "column": "id", + "name": "hash" + } + ] + } + } +} +` +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + if err := clusterInstance.StartTopo(); err != nil { + return 1 + } + + // Start keyspace + Keyspace := &cluster.Keyspace{ + Name: KeyspaceName, + SchemaSQL: SchemaSQL, + VSchema: VSchema, + } + clusterInstance.VtTabletExtraArgs = []string{"-queryserver-config-transaction-timeout", "3"} + if err := clusterInstance.StartKeyspace(*Keyspace, []string{"-80", "80-"}, 1, false); err != nil { + log.Fatal(err.Error()) + return 1 + } + + // Start vtgate + clusterInstance.VtGateExtraArgs = []string{"-warn_sharded_only=true"} + if err := clusterInstance.StartVtgate(); err != nil { + log.Fatal(err.Error()) + return 1 + } + + return m.Run() + }() + os.Exit(exitCode) +} + +func TestStreamMessaging(t *testing.T) { + defer cluster.PanicHandler(t) + + cnf := vitessdriver.Configuration{ + Protocol: "grpc", + Address: clusterInstance.Hostname + ":" + strconv.Itoa(clusterInstance.VtgateGrpcPort), + GRPCDialOptions: []grpc.DialOption{ + grpc.WithDefaultCallOptions(), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 300 * time.Second, + Timeout: 600 * time.Second, + PermitWithoutStream: true, + }), + }, + } + + // for inserting data + db, err := vitessdriver.OpenWithConfiguration(cnf) + require.NoError(t, err) + defer db.Close() + + // Exec not allowed in streaming + timenow := time.Now().Add(time.Second * 60).UnixNano() + _, err = db.Exec("insert into my_message(id, message, time_scheduled) values(1, 'hello world', :curr_time)", sql.Named("curr_time", timenow)) + require.NoError(t, err) + + // for streaming messages + cnf.Streaming = true + streamDB, err := vitessdriver.OpenWithConfiguration(cnf) + require.NoError(t, err) + defer streamDB.Close() + + // Exec not allowed in streaming + _, err = streamDB.Exec("stream * from my_message") + assert.EqualError(t, err, "Exec not allowed for streaming connections") + + row := streamDB.QueryRow("stream * from my_message") + require.NoError(t, row.Err()) +} From ce9f1adfdbc7550375f7f713edad92ff9988c6e9 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 4 Oct 2021 20:13:12 +0530 Subject: [PATCH 2/3] StreamExecute grpc fix to set default tablettype if not provided Signed-off-by: Harshit Gangal --- go/vt/vtgate/grpcvtgateservice/server.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/grpcvtgateservice/server.go b/go/vt/vtgate/grpcvtgateservice/server.go index 115936bface..a99a8f8bc85 100644 --- a/go/vt/vtgate/grpcvtgateservice/server.go +++ b/go/vt/vtgate/grpcvtgateservice/server.go @@ -163,8 +163,16 @@ func (vtg *VTGate) StreamExecute(request *vtgatepb.StreamExecuteRequest, stream if session == nil { session = &vtgatepb.Session{Autocommit: true} } + + // For backward compatibility. + // The mysql query equivalent has logic to use topodatapb.TabletType_PRIMARY if tablet_type is not set. + tabletType := request.TabletType + if tabletType == topodatapb.TabletType_UNKNOWN { + tabletType = topodatapb.TabletType_PRIMARY + } + if session.TargetString == "" { - session.TargetString = request.KeyspaceShard + "@" + topoproto.TabletTypeLString(request.TabletType) + session.TargetString = request.KeyspaceShard + "@" + topoproto.TabletTypeLString(tabletType) } if session.Options == nil { session.Options = request.Options From 65fa4d239eb4dfea0bb7f00e18d0043d768fb6b7 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 4 Oct 2021 20:44:09 +0530 Subject: [PATCH 3/3] set default target only when tablet_type is present Signed-off-by: Harshit Gangal --- go/vt/vtgate/grpcvtgateservice/server.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/go/vt/vtgate/grpcvtgateservice/server.go b/go/vt/vtgate/grpcvtgateservice/server.go index a99a8f8bc85..7fb6ec1f396 100644 --- a/go/vt/vtgate/grpcvtgateservice/server.go +++ b/go/vt/vtgate/grpcvtgateservice/server.go @@ -164,15 +164,9 @@ func (vtg *VTGate) StreamExecute(request *vtgatepb.StreamExecuteRequest, stream session = &vtgatepb.Session{Autocommit: true} } - // For backward compatibility. - // The mysql query equivalent has logic to use topodatapb.TabletType_PRIMARY if tablet_type is not set. - tabletType := request.TabletType - if tabletType == topodatapb.TabletType_UNKNOWN { - tabletType = topodatapb.TabletType_PRIMARY - } - - if session.TargetString == "" { - session.TargetString = request.KeyspaceShard + "@" + topoproto.TabletTypeLString(tabletType) + // Do not set target if the tablet_type is not set correctly. + if session.TargetString == "" && request.TabletType != topodatapb.TabletType_UNKNOWN { + session.TargetString = request.KeyspaceShard + "@" + topoproto.TabletTypeLString(request.TabletType) } if session.Options == nil { session.Options = request.Options