diff --git a/changelog/17.0/17.0.3/summary.md b/changelog/17.0/17.0.3/summary.md
new file mode 100644
index 00000000000..9f1c9cdc120
--- /dev/null
+++ b/changelog/17.0/17.0.3/summary.md
@@ -0,0 +1,19 @@
+## Summary
+
+### Table of Contents
+
+- **[Major Changes](#major-changes)**
+ - **[New command line flags and behavior](#new-flag)**
+ - [VTGate flag `--grpc-send-session-in-streaming`](#new-vtgate-streaming-sesion)
+
+## Major Changes
+
+### New command line flags and behavior
+
+#### VTGate GRPC stream execute session flag `--grpc-send-session-in-streaming`
+
+This flag enables transaction support on `StreamExecute` api.
+One enabled, VTGate `StreamExecute` grpc api will send session as the last packet in the response.
+The client should enable it only when they have made the required changes to expect such a packet.
+
+It is disabled by default.
\ No newline at end of file
diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt
index cc1aabc339e..af0fe2c3d54 100644
--- a/go/flags/endtoend/vtgate.txt
+++ b/go/flags/endtoend/vtgate.txt
@@ -39,6 +39,7 @@ Usage of vtgate:
--gate_query_cache_memory int gate server query cache size in bytes, maximum amount of memory to be cached. vtgate analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache. (default 33554432)
--gate_query_cache_size int gate server query cache size, maximum number of queries to be cached. vtgate analyzes every incoming query and generate a query plan, these plans are being cached in a cache. This config controls the expected amount of unique entries in the cache. (default 5000)
--gateway_initial_tablet_timeout duration At startup, the tabletGateway will wait up to this duration to get at least one tablet per keyspace/shard/tablet type (default 30s)
+ --grpc-send-session-in-streaming If set, will send the session as last packet in streaming api to support transactions in streaming
--grpc-use-effective-groups If set, and SSL is not used, will set the immediate caller's security groups from the effective caller id's groups.
--grpc-use-static-authentication-callerid If set, will set the immediate caller id to the username authenticated by the static auth plugin.
--grpc_auth_mode string Which auth plugin implementation to use (eg: static)
diff --git a/go/test/endtoend/vtgate/grpc_api/main_test.go b/go/test/endtoend/vtgate/grpc_api/main_test.go
index a51c6d9e6f2..3c8605f79a0 100644
--- a/go/test/endtoend/vtgate/grpc_api/main_test.go
+++ b/go/test/endtoend/vtgate/grpc_api/main_test.go
@@ -111,6 +111,7 @@ func TestMain(m *testing.M) {
"--grpc_auth_static_password_file", grpcServerAuthStaticPath,
"--grpc_use_effective_callerid",
"--grpc-use-static-authentication-callerid",
+ "--grpc-send-session-in-streaming",
}
// Configure vttablet to use table ACL
diff --git a/go/vt/vtgate/grpcvtgateservice/server.go b/go/vt/vtgate/grpcvtgateservice/server.go
index 7b87b6ed708..00d24281949 100644
--- a/go/vt/vtgate/grpcvtgateservice/server.go
+++ b/go/vt/vtgate/grpcvtgateservice/server.go
@@ -48,12 +48,15 @@ var (
useEffective bool
useEffectiveGroups bool
useStaticAuthenticationIdentity bool
+
+ sendSessionInStreaming bool
)
func registerFlags(fs *pflag.FlagSet) {
fs.BoolVar(&useEffective, "grpc_use_effective_callerid", false, "If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.")
fs.BoolVar(&useEffectiveGroups, "grpc-use-effective-groups", false, "If set, and SSL is not used, will set the immediate caller's security groups from the effective caller id's groups.")
fs.BoolVar(&useStaticAuthenticationIdentity, "grpc-use-static-authentication-callerid", false, "If set, will set the immediate caller id to the username authenticated by the static auth plugin.")
+ fs.BoolVar(&sendSessionInStreaming, "grpc-send-session-in-streaming", false, "If set, will send the session as last packet in streaming api to support transactions in streaming")
}
func init() {
@@ -192,19 +195,22 @@ func (vtg *VTGate) StreamExecute(request *vtgatepb.StreamExecuteRequest, stream
})
})
- // even if there is an error, session could have been modified.
- // So, this needs to be sent back to the client. Session is sent in the last stream response.
- lastErr := stream.Send(&vtgatepb.StreamExecuteResponse{
- Session: session,
- })
-
var errs []error
if vtgErr != nil {
errs = append(errs, vtgErr)
}
- if lastErr != nil {
- errs = append(errs, lastErr)
+
+ if sendSessionInStreaming {
+ // even if there is an error, session could have been modified.
+ // So, this needs to be sent back to the client. Session is sent in the last stream response.
+ lastErr := stream.Send(&vtgatepb.StreamExecuteResponse{
+ Session: session,
+ })
+ if lastErr != nil {
+ errs = append(errs, lastErr)
+ }
}
+
return vterrors.ToGRPC(vterrors.Aggregate(errs))
}
diff --git a/go/vt/vttablet/tabletserver/exclude_race_test.go b/go/vt/vttablet/tabletserver/exclude_race_test.go
new file mode 100644
index 00000000000..2610a98da26
--- /dev/null
+++ b/go/vt/vttablet/tabletserver/exclude_race_test.go
@@ -0,0 +1,62 @@
+//go:build !race
+
+package tabletserver
+
+import (
+ "context"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "vitess.io/vitess/go/sqltypes"
+ querypb "vitess.io/vitess/go/vt/proto/query"
+ "vitess.io/vitess/go/vt/sqlparser"
+ "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
+)
+
+// TestHandlePanicAndSendLogStatsMessageTruncation tests that when an error truncation
+// length is set and a panic occurs, the code in handlePanicAndSendLogStats will
+// truncate the error text in logs, but will not truncate the error text in the
+// error value.
+func TestHandlePanicAndSendLogStatsMessageTruncation(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ tl := newTestLogger()
+ defer tl.Close()
+ logStats := tabletenv.NewLogStats(ctx, "TestHandlePanicAndSendLogStatsMessageTruncation")
+ db, tsv := setupTabletServerTest(t, "")
+ defer tsv.StopService()
+ defer db.Close()
+
+ longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong"
+ longBv := map[string]*querypb.BindVariable{
+ "bv1": sqltypes.Int64BindVariable(1111111111),
+ "bv2": sqltypes.Int64BindVariable(2222222222),
+ "bv3": sqltypes.Int64BindVariable(3333333333),
+ "bv4": sqltypes.Int64BindVariable(4444444444),
+ }
+ origTruncateErrLen := sqlparser.GetTruncateErrLen()
+ sqlparser.SetTruncateErrLen(32)
+ defer sqlparser.SetTruncateErrLen(origTruncateErrLen)
+
+ defer func() {
+ err := logStats.Error
+ want := "Uncaught panic for Sql: \"select * from test_table_loooooooooooooooooooooooooooooooooooong\", BindVars: {bv1: \"type:INT64 value:\\\"1111111111\\\"\"bv2: \"type:INT64 value:\\\"2222222222\\\"\"bv3: \"type:INT64 value:\\\"3333333333\\\"\"bv4: \"type:INT64 value:\\\"4444444444\\\"\"}"
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), want)
+ want = "Uncaught panic for Sql: \"select * from test_t [TRUNCATED]\", BindVars: {bv1: \"typ [TRUNCATED]"
+ gotWhatWeWant := false
+ for _, log := range tl.getLogs() {
+ if strings.HasPrefix(log, want) {
+ gotWhatWeWant = true
+ break
+ }
+ }
+ assert.True(t, gotWhatWeWant)
+ }()
+
+ defer tsv.handlePanicAndSendLogStats(longSql, longBv, logStats)
+ panic("panic from TestHandlePanicAndSendLogStatsMessageTruncation")
+}
diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go
index b26baa35cac..51504ffb8ed 100644
--- a/go/vt/vttablet/tabletserver/tabletserver_test.go
+++ b/go/vt/vttablet/tabletserver/tabletserver_test.go
@@ -1421,49 +1421,6 @@ func TestHandleExecUnknownError(t *testing.T) {
panic("unknown exec error")
}
-// TestHandlePanicAndSendLogStatsMessageTruncation tests that when an error truncation
-// length is set and a panic occurs, the code in handlePanicAndSendLogStats will
-// truncate the error text in logs, but will not truncate the error text in the
-// error value.
-func TestHandlePanicAndSendLogStatsMessageTruncation(t *testing.T) {
- tl := newTestLogger()
- defer tl.Close()
- logStats := tabletenv.NewLogStats(ctx, "TestHandlePanicAndSendLogStatsMessageTruncation")
- db, tsv := setupTabletServerTest(t, "")
- defer tsv.StopService()
- defer db.Close()
-
- longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong"
- longBv := map[string]*querypb.BindVariable{
- "bv1": sqltypes.Int64BindVariable(1111111111),
- "bv2": sqltypes.Int64BindVariable(2222222222),
- "bv3": sqltypes.Int64BindVariable(3333333333),
- "bv4": sqltypes.Int64BindVariable(4444444444),
- }
- origTruncateErrLen := sqlparser.GetTruncateErrLen()
- sqlparser.SetTruncateErrLen(32)
- defer sqlparser.SetTruncateErrLen(origTruncateErrLen)
-
- defer func() {
- err := logStats.Error
- want := "Uncaught panic for Sql: \"select * from test_table_loooooooooooooooooooooooooooooooooooong\", BindVars: {bv1: \"type:INT64 value:\\\"1111111111\\\"\"bv2: \"type:INT64 value:\\\"2222222222\\\"\"bv3: \"type:INT64 value:\\\"3333333333\\\"\"bv4: \"type:INT64 value:\\\"4444444444\\\"\"}"
- require.Error(t, err)
- assert.Contains(t, err.Error(), want)
- want = "Uncaught panic for Sql: \"select * from test_t [TRUNCATED]\", BindVars: {bv1: \"typ [TRUNCATED]"
- gotWhatWeWant := false
- for _, log := range tl.getLogs() {
- if strings.HasPrefix(log, want) {
- gotWhatWeWant = true
- break
- }
- }
- assert.True(t, gotWhatWeWant)
- }()
-
- defer tsv.handlePanicAndSendLogStats(longSql, longBv, logStats)
- panic("panic from TestHandlePanicAndSendLogStatsMessageTruncation")
-}
-
func TestQueryAsString(t *testing.T) {
longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong"
longBv := map[string]*querypb.BindVariable{