Skip to content

Commit

Permalink
Add session flag for stream execute grpc api (#14046)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Sep 20, 2023
1 parent 3a8fce7 commit 3877a94
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 51 deletions.
19 changes: 19 additions & 0 deletions changelog/17.0/17.0.3/summary.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
## Summary

### Table of Contents

- **[Major Changes](#major-changes)**
- **[New command line flags and behavior](#new-flag)**
- [VTGate flag `--grpc-send-session-in-streaming`](#new-vtgate-streaming-sesion)

## <a id="major-changes"/>Major Changes

### <a id="new-flag"/>New command line flags and behavior

#### <a id="new-vtgate-streaming-sesion"/>VTGate GRPC stream execute session flag `--grpc-send-session-in-streaming`

This flag enables transaction support on `StreamExecute` api.
One enabled, VTGate `StreamExecute` grpc api will send session as the last packet in the response.
The client should enable it only when they have made the required changes to expect such a packet.

It is disabled by default.
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Usage of vtgate:
--gate_query_cache_memory int gate server query cache size in bytes, maximum amount of memory to be cached. vtgate analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache. (default 33554432)
--gate_query_cache_size int gate server query cache size, maximum number of queries to be cached. vtgate analyzes every incoming query and generate a query plan, these plans are being cached in a cache. This config controls the expected amount of unique entries in the cache. (default 5000)
--gateway_initial_tablet_timeout duration At startup, the tabletGateway will wait up to this duration to get at least one tablet per keyspace/shard/tablet type (default 30s)
--grpc-send-session-in-streaming If set, will send the session as last packet in streaming api to support transactions in streaming
--grpc-use-effective-groups If set, and SSL is not used, will set the immediate caller's security groups from the effective caller id's groups.
--grpc-use-static-authentication-callerid If set, will set the immediate caller id to the username authenticated by the static auth plugin.
--grpc_auth_mode string Which auth plugin implementation to use (eg: static)
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/vtgate/grpc_api/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func TestMain(m *testing.M) {
"--grpc_auth_static_password_file", grpcServerAuthStaticPath,
"--grpc_use_effective_callerid",
"--grpc-use-static-authentication-callerid",
"--grpc-send-session-in-streaming",
}

// Configure vttablet to use table ACL
Expand Down
22 changes: 14 additions & 8 deletions go/vt/vtgate/grpcvtgateservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,15 @@ var (
useEffective bool
useEffectiveGroups bool
useStaticAuthenticationIdentity bool

sendSessionInStreaming bool
)

func registerFlags(fs *pflag.FlagSet) {
fs.BoolVar(&useEffective, "grpc_use_effective_callerid", false, "If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.")
fs.BoolVar(&useEffectiveGroups, "grpc-use-effective-groups", false, "If set, and SSL is not used, will set the immediate caller's security groups from the effective caller id's groups.")
fs.BoolVar(&useStaticAuthenticationIdentity, "grpc-use-static-authentication-callerid", false, "If set, will set the immediate caller id to the username authenticated by the static auth plugin.")
fs.BoolVar(&sendSessionInStreaming, "grpc-send-session-in-streaming", false, "If set, will send the session as last packet in streaming api to support transactions in streaming")
}

func init() {
Expand Down Expand Up @@ -192,19 +195,22 @@ func (vtg *VTGate) StreamExecute(request *vtgatepb.StreamExecuteRequest, stream
})
})

// even if there is an error, session could have been modified.
// So, this needs to be sent back to the client. Session is sent in the last stream response.
lastErr := stream.Send(&vtgatepb.StreamExecuteResponse{
Session: session,
})

var errs []error
if vtgErr != nil {
errs = append(errs, vtgErr)
}
if lastErr != nil {
errs = append(errs, lastErr)

if sendSessionInStreaming {
// even if there is an error, session could have been modified.
// So, this needs to be sent back to the client. Session is sent in the last stream response.
lastErr := stream.Send(&vtgatepb.StreamExecuteResponse{
Session: session,
})
if lastErr != nil {
errs = append(errs, lastErr)
}
}

return vterrors.ToGRPC(vterrors.Aggregate(errs))
}

Expand Down
62 changes: 62 additions & 0 deletions go/vt/vttablet/tabletserver/exclude_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//go:build !race

package tabletserver

import (
"context"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

// TestHandlePanicAndSendLogStatsMessageTruncation tests that when an error truncation
// length is set and a panic occurs, the code in handlePanicAndSendLogStats will
// truncate the error text in logs, but will not truncate the error text in the
// error value.
func TestHandlePanicAndSendLogStatsMessageTruncation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tl := newTestLogger()
defer tl.Close()
logStats := tabletenv.NewLogStats(ctx, "TestHandlePanicAndSendLogStatsMessageTruncation")
db, tsv := setupTabletServerTest(t, "")
defer tsv.StopService()
defer db.Close()

longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong"
longBv := map[string]*querypb.BindVariable{
"bv1": sqltypes.Int64BindVariable(1111111111),
"bv2": sqltypes.Int64BindVariable(2222222222),
"bv3": sqltypes.Int64BindVariable(3333333333),
"bv4": sqltypes.Int64BindVariable(4444444444),
}
origTruncateErrLen := sqlparser.GetTruncateErrLen()
sqlparser.SetTruncateErrLen(32)
defer sqlparser.SetTruncateErrLen(origTruncateErrLen)

defer func() {
err := logStats.Error
want := "Uncaught panic for Sql: \"select * from test_table_loooooooooooooooooooooooooooooooooooong\", BindVars: {bv1: \"type:INT64 value:\\\"1111111111\\\"\"bv2: \"type:INT64 value:\\\"2222222222\\\"\"bv3: \"type:INT64 value:\\\"3333333333\\\"\"bv4: \"type:INT64 value:\\\"4444444444\\\"\"}"
require.Error(t, err)
assert.Contains(t, err.Error(), want)
want = "Uncaught panic for Sql: \"select * from test_t [TRUNCATED]\", BindVars: {bv1: \"typ [TRUNCATED]"
gotWhatWeWant := false
for _, log := range tl.getLogs() {
if strings.HasPrefix(log, want) {
gotWhatWeWant = true
break
}
}
assert.True(t, gotWhatWeWant)
}()

defer tsv.handlePanicAndSendLogStats(longSql, longBv, logStats)
panic("panic from TestHandlePanicAndSendLogStatsMessageTruncation")
}
43 changes: 0 additions & 43 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,49 +1421,6 @@ func TestHandleExecUnknownError(t *testing.T) {
panic("unknown exec error")
}

// TestHandlePanicAndSendLogStatsMessageTruncation tests that when an error truncation
// length is set and a panic occurs, the code in handlePanicAndSendLogStats will
// truncate the error text in logs, but will not truncate the error text in the
// error value.
func TestHandlePanicAndSendLogStatsMessageTruncation(t *testing.T) {
tl := newTestLogger()
defer tl.Close()
logStats := tabletenv.NewLogStats(ctx, "TestHandlePanicAndSendLogStatsMessageTruncation")
db, tsv := setupTabletServerTest(t, "")
defer tsv.StopService()
defer db.Close()

longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong"
longBv := map[string]*querypb.BindVariable{
"bv1": sqltypes.Int64BindVariable(1111111111),
"bv2": sqltypes.Int64BindVariable(2222222222),
"bv3": sqltypes.Int64BindVariable(3333333333),
"bv4": sqltypes.Int64BindVariable(4444444444),
}
origTruncateErrLen := sqlparser.GetTruncateErrLen()
sqlparser.SetTruncateErrLen(32)
defer sqlparser.SetTruncateErrLen(origTruncateErrLen)

defer func() {
err := logStats.Error
want := "Uncaught panic for Sql: \"select * from test_table_loooooooooooooooooooooooooooooooooooong\", BindVars: {bv1: \"type:INT64 value:\\\"1111111111\\\"\"bv2: \"type:INT64 value:\\\"2222222222\\\"\"bv3: \"type:INT64 value:\\\"3333333333\\\"\"bv4: \"type:INT64 value:\\\"4444444444\\\"\"}"
require.Error(t, err)
assert.Contains(t, err.Error(), want)
want = "Uncaught panic for Sql: \"select * from test_t [TRUNCATED]\", BindVars: {bv1: \"typ [TRUNCATED]"
gotWhatWeWant := false
for _, log := range tl.getLogs() {
if strings.HasPrefix(log, want) {
gotWhatWeWant = true
break
}
}
assert.True(t, gotWhatWeWant)
}()

defer tsv.handlePanicAndSendLogStats(longSql, longBv, logStats)
panic("panic from TestHandlePanicAndSendLogStatsMessageTruncation")
}

func TestQueryAsString(t *testing.T) {
longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong"
longBv := map[string]*querypb.BindVariable{
Expand Down

0 comments on commit 3877a94

Please sign in to comment.