Skip to content

Commit

Permalink
Queryservice fix
Browse files Browse the repository at this point in the history
Backport of vitessio#8089
This is a combination of 3 commits.

* remove precheck of tablet serving and target
* remove the additional logic and return error if queryservice not found to serve query
* fix test as per new change

Signed-off-by: Harshit Gangal <[email protected]>

Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
harshit-gangal authored and systay committed May 11, 2021
1 parent 79e3069 commit 7e448aa
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 56 deletions.
7 changes: 0 additions & 7 deletions go/vt/discovery/fake_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,6 @@ func (fhc *FakeHealthCheck) TabletConnection(alias *topodatapb.TabletAlias, targ
defer fhc.mu.RUnlock()
for _, item := range fhc.items {
if proto.Equal(alias, item.ts.Tablet.Alias) {
if !item.ts.Serving {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.NotServing)
}
if target != nil && !proto.Equal(item.ts.Target, target) {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s: target mismatch %v vs %v", vterrors.WrongTablet, item.ts.Target, target)
}

return item.ts.Conn, nil
}
}
Expand Down
8 changes: 0 additions & 8 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ import (
"sync"
"time"

"github.com/golang/protobuf/proto"

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -700,12 +698,6 @@ func (hc *HealthCheckImpl) TabletConnection(alias *topodata.TabletAlias, target
//TODO: test that throws this error
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias)
}
if !thc.Serving {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, vterrors.NotServing)
}
if target != nil && !proto.Equal(thc.Target, target) {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "%s: target mismatch %v vs %v", vterrors.WrongTablet, thc.Target, target)
}
return thc.Connection(), nil
}

Expand Down
25 changes: 13 additions & 12 deletions go/vt/vtgate/legacy_scatter_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,26 @@ limitations under the License.
package vtgate

import (
"context"
"fmt"
"reflect"
"strings"
"testing"

"vitess.io/vitess/go/test/utils"

"github.com/stretchr/testify/assert"

"context"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vterrors"
)

// This file uses the sandbox_test framework.
Expand Down Expand Up @@ -391,15 +389,17 @@ func TestMultiExecs(t *testing.T) {
rss := []*srvtopo.ResolvedShard{
{
Target: &querypb.Target{
Keyspace: "TestMultiExecs",
Shard: "0",
Keyspace: "TestMultiExecs",
Shard: "0",
TabletType: topodatapb.TabletType_REPLICA,
},
Gateway: sbc0,
},
{
Target: &querypb.Target{
Keyspace: "TestMultiExecs",
Shard: "1",
Keyspace: "TestMultiExecs",
Shard: "1",
TabletType: topodatapb.TabletType_REPLICA,
},
Gateway: sbc1,
},
Expand All @@ -419,7 +419,8 @@ func TestMultiExecs(t *testing.T) {
},
}

_, _ = sc.ExecuteMultiShard(ctx, rss, queries, NewSafeSession(nil), false, false)
_, err := sc.ExecuteMultiShard(ctx, rss, queries, NewSafeSession(nil), false, false)
require.NoError(t, vterrors.Aggregate(err))
if len(sbc0.Queries) == 0 || len(sbc1.Queries) == 0 {
t.Fatalf("didn't get expected query")
}
Expand Down
18 changes: 1 addition & 17 deletions go/vt/vtgate/scatter_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,23 +221,7 @@ func (stc *ScatterConn) ExecuteMultiShard(

qs, err = getQueryService(rs, info)
if err != nil {
// an error here could mean that the tablet we were targeting earlier has changed type.
// if we have a transaction, we'll have to fail, but if we only had a reserved connection,
// we can create a new reserved connection to a new tablet that is on the right shard
// and has the right type
switch info.actionNeeded {
case nothing:
info.actionNeeded = reserve
case begin:
info.actionNeeded = reserveBegin
default:
return nil, err
}
retry := checkAndResetShardSession(info, err, session)
if retry != newQS {
return nil, err
}
qs = rs.Gateway
return nil, err
}

retryRequest := func(exec func()) {
Expand Down
16 changes: 12 additions & 4 deletions go/vt/vtgate/scatter_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,15 @@ func TestReservedConnFail(t *testing.T) {
})
sbc0Th := ths[0]
sbc0Th.Serving = false
sbc0.NotServing = true
sbc0Rep := hc.AddTestTablet("aa", "0", 2, keyspace, "0", topodatapb.TabletType_REPLICA, true, 1, nil)

sbc0.Queries = nil
sbc0.ExecCount.Set(0)
_ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations)
assert.Equal(t, 0, len(sbc0.Queries), "no attempt should be made as the tablet is not serving")
assert.Equal(t, 1, len(sbc0Rep.Queries), "first attempt should pass as it is healthy")
assert.EqualValues(t, 1, sbc0.ExecCount.Get(), "first attempt should be made on original tablet")
assert.EqualValues(t, 0, len(sbc0.Queries), "no query should be executed on it")
assert.Equal(t, 1, len(sbc0Rep.Queries), "this attempt on new healthy tablet should pass")
require.Equal(t, 1, len(session.ShardSessions))
assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost")
assert.NotEqual(t, oldAlias, session.Session.ShardSessions[0].TabletAlias, "tablet alias should have changed as this is a different tablet")
Expand All @@ -376,12 +379,17 @@ func TestReservedConnFail(t *testing.T) {
Shard: tablet0Rep.GetShard(),
TabletType: topodatapb.TabletType_SPARE,
}
sbc0Rep.Tablet().Type = topodatapb.TabletType_SPARE
sbc0Th.Serving = true
sbc0.NotServing = false
sbc0.ExecCount.Set(0)

sbc0Rep.Queries = nil
sbc0Rep.ExecCount.Set(0)
_ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations)
assert.Equal(t, 1, len(sbc0.Queries), "first attempt should pass as it is healthy and matches the target")
assert.Equal(t, 0, len(sbc0Rep.Queries), " no attempt should be made as the tablet target is changed")
assert.EqualValues(t, 1, sbc0Rep.ExecCount.Get(), "first attempt should be made on the changed tablet type")
assert.EqualValues(t, 0, len(sbc0Rep.Queries), "no query should be executed on it")
assert.Equal(t, 1, len(sbc0.Queries), "this attempt should pass as it is on new healthy tablet and matches the target")
require.Equal(t, 1, len(session.ShardSessions))
assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost")
assert.NotEqual(t, oldAlias, session.Session.ShardSessions[0].TabletAlias, "tablet alias should have changed as this is a different tablet")
Expand Down
71 changes: 63 additions & 8 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"sync"

"vitess.io/vitess/go/vt/sqlparser"

"context"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -108,6 +110,8 @@ type SandboxConn struct {

// this error will only happen once
EphemeralShardErr error

NotServing bool
}

var _ queryservice.QueryService = (*SandboxConn)(nil) // compile-time interface check
Expand Down Expand Up @@ -147,6 +151,12 @@ func (sbc *SandboxConn) Execute(ctx context.Context, target *querypb.Target, que
sbc.execMu.Lock()
defer sbc.execMu.Unlock()
sbc.ExecCount.Add(1)
if sbc.NotServing {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.NotServing)
}
if sbc.tablet.Type != target.TabletType {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s: %v, want: %v", vterrors.WrongTablet, target.TabletType, sbc.tablet.Type)
}
bv := make(map[string]*querypb.BindVariable)
for k, v := range bindVars {
bv[k] = v
Expand All @@ -159,7 +169,8 @@ func (sbc *SandboxConn) Execute(ctx context.Context, target *querypb.Target, que
if err := sbc.getError(); err != nil {
return nil, err
}
return sbc.getNextResult(), nil
parse, _ := sqlparser.Parse(query)
return sbc.getNextResult(parse), nil
}

// ExecuteBatch is part of the QueryService interface.
Expand All @@ -174,8 +185,9 @@ func (sbc *SandboxConn) ExecuteBatch(ctx context.Context, target *querypb.Target
sbc.BatchQueries = append(sbc.BatchQueries, queries)
sbc.Options = append(sbc.Options, options)
result := make([]sqltypes.Result, 0, len(queries))
for range queries {
result = append(result, *(sbc.getNextResult()))
for _, query := range queries {
parse, _ := sqlparser.Parse(query.Sql)
result = append(result, *(sbc.getNextResult(parse)))
}
return result, nil
}
Expand All @@ -198,14 +210,15 @@ func (sbc *SandboxConn) StreamExecute(ctx context.Context, target *querypb.Targe
sbc.sExecMu.Unlock()
return err
}
ast, _ := sqlparser.Parse(query)
if sbc.results == nil {
nextRs := sbc.getNextResult()
nextRs := sbc.getNextResult(ast)
sbc.sExecMu.Unlock()
return callback(nextRs)
}

for len(sbc.results) > 0 {
nextRs := sbc.getNextResult()
nextRs := sbc.getNextResult(ast)
sbc.sExecMu.Unlock()
err := callback(nextRs)
if err != nil {
Expand Down Expand Up @@ -378,7 +391,7 @@ func (sbc *SandboxConn) MessageStream(ctx context.Context, target *querypb.Targe
if err := sbc.getError(); err != nil {
return err
}
r := sbc.getNextResult()
r := sbc.getNextResult(nil)
if r == nil {
return nil
}
Expand Down Expand Up @@ -505,13 +518,55 @@ func (sbc *SandboxConn) Tablet() *topodatapb.Tablet {
return sbc.tablet
}

func (sbc *SandboxConn) getNextResult() *sqltypes.Result {
func (sbc *SandboxConn) getNextResult(stmt sqlparser.Statement) *sqltypes.Result {
if len(sbc.results) != 0 {
r := sbc.results[0]
sbc.results = sbc.results[1:]
return r
}
return SingleRowResult
if stmt == nil {
// if we didn't get a valid query, we'll assume we need a SELECT
return getSingleRowResult()
}
switch stmt.(type) {
case *sqlparser.Select,
*sqlparser.Union,
*sqlparser.Show,
*sqlparser.Explain,
*sqlparser.OtherRead:
return getSingleRowResult()
case *sqlparser.Set,
sqlparser.DDLStatement,
*sqlparser.AlterVschema,
*sqlparser.Use,
*sqlparser.OtherAdmin,
*sqlparser.SetTransaction,
*sqlparser.Savepoint,
*sqlparser.SRollback,
*sqlparser.Release:
return &sqltypes.Result{}
}

// for everything else we fake a single row being affected
return &sqltypes.Result{RowsAffected: 1}
}

// getSingleRowResult is used to get a SingleRowResult but it creates separate fields because some tests change the fields
// If these fields are not created separately then the constants value also changes which leads to some other tests failing later
func getSingleRowResult() *sqltypes.Result {
singleRowResult := &sqltypes.Result{
InsertID: SingleRowResult.InsertID,
Rows: SingleRowResult.Rows,
}

for _, field := range SingleRowResult.Fields {
singleRowResult.Fields = append(singleRowResult.Fields, &querypb.Field{
Name: field.Name,
Type: field.Type,
})
}

return singleRowResult
}

//StringQueries returns the queries executed as a slice of strings
Expand Down

0 comments on commit 7e448aa

Please sign in to comment.