Skip to content

Commit

Permalink
Merge #68792
Browse files Browse the repository at this point in the history
68792: builtins: implement session serialization and deserialization  r=rafiss a=otan

See individual commits for details.

Resolves #68464

Co-authored-by: Oliver Tan <[email protected]>
  • Loading branch information
craig[bot] and otan committed Aug 20, 2021
2 parents 74f0ac4 + fcdb310 commit 53a8df0
Show file tree
Hide file tree
Showing 21 changed files with 1,357 additions and 149 deletions.
4 changes: 4 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2801,6 +2801,8 @@ SELECT * FROM crdb_internal.check_consistency(true, ‘\x02’, ‘\x04’)</p>
</span></td></tr>
<tr><td><a name="crdb_internal.create_join_token"></a><code>crdb_internal.create_join_token() &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>Creates a join token for use when adding a new node to a secure cluster.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.deserialize_session"></a><code>crdb_internal.deserialize_session(session: <a href="bytes.html">bytes</a>) &rarr; <a href="bool.html">bool</a></code></td><td><span class="funcdesc"><p>This function deserializes the serialized variables into the current session.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.encode_key"></a><code>crdb_internal.encode_key(table_id: <a href="int.html">int</a>, index_id: <a href="int.html">int</a>, row_tuple: anyelement) &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>Generate the key for a row on a particular table and index.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.force_assertion_error"></a><code>crdb_internal.force_assertion_error(msg: <a href="string.html">string</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
Expand Down Expand Up @@ -2864,6 +2866,8 @@ SELECT * FROM crdb_internal.check_consistency(true, ‘\x02’, ‘\x04’)</p>
</span></td></tr>
<tr><td><a name="crdb_internal.round_decimal_values"></a><code>crdb_internal.round_decimal_values(val: <a href="decimal.html">decimal</a>[], scale: <a href="int.html">int</a>) &rarr; <a href="decimal.html">decimal</a>[]</code></td><td><span class="funcdesc"><p>This function is used internally to round decimal array values during mutations.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.serialize_session"></a><code>crdb_internal.serialize_session() &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>This function serializes the variables in the current session.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.set_trace_verbose"></a><code>crdb_internal.set_trace_verbose(trace_id: <a href="int.html">int</a>, verbosity: <a href="bool.html">bool</a>) &rarr; <a href="bool.html">bool</a></code></td><td><span class="funcdesc"><p>Returns true if root span was found and verbosity was set, false otherwise.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.set_vmodule"></a><code>crdb_internal.set_vmodule(vmodule_string: <a href="string.html">string</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>Set the equivalent of the <code>--vmodule</code> flag on the gateway node processing this request; it affords control over the logging verbosity of different files. Example syntax: <code>crdb_internal.set_vmodule('recordio=2,file=1,gfs*=3')</code>. Reset with: <code>crdb_internal.set_vmodule('')</code>. Raising the verbosity can severely affect performance.</p>
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ go_test(
"schema_changer_test.go",
"scrub_test.go",
"sequence_test.go",
"session_migration_test.go",
"set_zone_config_test.go",
"show_create_all_tables_builtin_test.go",
"show_fingerprints_test.go",
Expand Down
49 changes: 28 additions & 21 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,12 @@ type prepStmtNamespace struct {
portals map[string]PreparedPortal
}

// HasPrepared returns true if there are prepared statements or portals
// in the session.
func (ns prepStmtNamespace) HasPrepared() bool {
return len(ns.prepStmts) > 0 || len(ns.portals) > 0
}

func (ns prepStmtNamespace) String() string {
var sb strings.Builder
sb.WriteString("Prep stmts: ")
Expand Down Expand Up @@ -2341,27 +2347,28 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo

*evalCtx = extendedEvalContext{
EvalContext: tree.EvalContext{
Planner: p,
PrivilegedAccessor: p,
SessionAccessor: p,
ClientNoticeSender: p,
Sequence: p,
Tenant: p,
JoinTokenCreator: p,
SessionData: ex.sessionData,
Settings: ex.server.cfg.Settings,
TestingKnobs: ex.server.cfg.EvalContextTestingKnobs,
ClusterID: ex.server.cfg.ClusterID(),
ClusterName: ex.server.cfg.RPCContext.ClusterName(),
NodeID: ex.server.cfg.NodeID,
Codec: ex.server.cfg.Codec,
Locality: ex.server.cfg.Locality,
ReCache: ex.server.reCache,
InternalExecutor: &ie,
DB: ex.server.cfg.DB,
SQLLivenessReader: ex.server.cfg.SQLLivenessReader,
SQLStatsController: ex.server.sqlStatsController,
CompactEngineSpan: ex.server.cfg.CompactEngineSpanFunc,
Planner: p,
PrivilegedAccessor: p,
SessionAccessor: p,
ClientNoticeSender: p,
Sequence: p,
Tenant: p,
JoinTokenCreator: p,
PreparedStatementState: &ex.extraTxnState.prepStmtsNamespace,
SessionData: ex.sessionData,
Settings: ex.server.cfg.Settings,
TestingKnobs: ex.server.cfg.EvalContextTestingKnobs,
ClusterID: ex.server.cfg.ClusterID(),
ClusterName: ex.server.cfg.RPCContext.ClusterName(),
NodeID: ex.server.cfg.NodeID,
Codec: ex.server.cfg.Codec,
Locality: ex.server.cfg.Locality,
ReCache: ex.server.reCache,
InternalExecutor: &ie,
DB: ex.server.cfg.DB,
SQLLivenessReader: ex.server.cfg.SQLLivenessReader,
SQLStatsController: ex.server.sqlStatsController,
CompactEngineSpan: ex.server.cfg.CompactEngineSpanFunc,
},
SessionMutator: ex.dataMutator,
VirtualSchemas: ex.server.cfg.VirtualSchemas,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2639,7 +2639,7 @@ func (m *sessionDataMutator) SetNoticeDisplaySeverity(severity pgnotice.DisplayS

// initSequenceCache creates an empty sequence cache instance for the session.
func (m *sessionDataMutator) initSequenceCache() {
m.data.SequenceCache = sessiondata.SequenceCache{}
m.data.SequenceCache = sessiondatapb.SequenceCache{}
}

// SetIntervalStyle sets the IntervalStyle for the given session.
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/faketreeeval/evalctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,3 +433,14 @@ func (c *DummyTenantOperator) UpdateTenantResourceLimits(
) error {
return errors.WithStack(errEvalTenant)
}

// DummyPreparedStatementState implements the tree.PreparedStatementState
// interface.
type DummyPreparedStatementState struct{}

var _ tree.PreparedStatementState = (*DummyPreparedStatementState)(nil)

// HasPrepared is part of the tree.PreparedStatementState interface.
func (ps *DummyPreparedStatementState) HasPrepared() bool {
return false
}
2 changes: 1 addition & 1 deletion pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (p *planner) ExecCfg() *ExecutorConfig {
// GetOrInitSequenceCache returns the sequence cache for the session.
// If the sequence cache has not been used yet, it initializes the cache
// inside the session data.
func (p *planner) GetOrInitSequenceCache() sessiondata.SequenceCache {
func (p *planner) GetOrInitSequenceCache() sessiondatapb.SequenceCache {
if p.SessionData().SequenceCache == nil {
p.sessionDataMutator.initSequenceCache()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/builtins/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ go_library(
"//pkg/util/json",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/ring",
"//pkg/util/syncutil",
"//pkg/util/timeofday",
Expand Down
97 changes: 97 additions & 0 deletions pkg/sql/sem/builtins/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ipaddr"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeofday"
"github.com/cockroachdb/cockroach/pkg/util/timetz"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -5879,6 +5880,102 @@ table's zone configuration this will return NULL.`,
Volatility: tree.VolatilityVolatile,
},
),

"crdb_internal.serialize_session": makeBuiltin(
tree.FunctionProperties{
Category: categorySystemInfo,
},
tree.Overload{
Types: tree.ArgTypes{},
ReturnType: tree.FixedReturnType(types.Bytes),
Fn: func(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
if !evalCtx.TxnImplicit {
return nil, pgerror.Newf(
pgcode.InvalidTransactionState,
"cannot serialize a session which is inside a transaction",
)
}

if evalCtx.PreparedStatementState.HasPrepared() {
return nil, pgerror.Newf(
pgcode.InvalidTransactionState,
"cannot serialize a session which has portals or prepared statements",
)
}

sd := evalCtx.SessionData
if sd == nil {
return nil, pgerror.Newf(
pgcode.InvalidTransactionState,
"no session is active",
)
}

if len(sd.DatabaseIDToTempSchemaID) > 0 {
return nil, pgerror.Newf(
pgcode.InvalidTransactionState,
"cannot serialize session with temporary schemas",
)
}

var m sessiondatapb.MigratableSession
m.SessionData = sd.SessionData
sessiondata.MarshalNonLocal(sd, &m.SessionData)
m.LocalOnlySessionData = sd.LocalOnlySessionData

b, err := protoutil.Marshal(&m)
if err != nil {
return nil, err
}
return tree.NewDBytes(tree.DBytes(b)), nil
},
Info: `This function serializes the variables in the current session.`,
Volatility: tree.VolatilityVolatile,
},
),

"crdb_internal.deserialize_session": makeBuiltin(
tree.FunctionProperties{
Category: categorySystemInfo,
},
tree.Overload{
Types: tree.ArgTypes{{"session", types.Bytes}},
ReturnType: tree.FixedReturnType(types.Bool),
Fn: func(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
if !evalCtx.TxnImplicit {
return nil, pgerror.Newf(
pgcode.InvalidTransactionState,
"cannot deserialize a session which is inside a transaction",
)
}

var m sessiondatapb.MigratableSession
if err := protoutil.Unmarshal([]byte(tree.MustBeDBytes(args[0])), &m); err != nil {
return nil, pgerror.WithCandidateCode(
errors.Wrapf(err, "error deserializing session"),
pgcode.InvalidParameterValue,
)
}
sd, err := sessiondata.UnmarshalNonLocal(m.SessionData)
if err != nil {
return nil, err
}
sd.SessionData = m.SessionData
sd.LocalUnmigratableSessionData = evalCtx.SessionData.LocalUnmigratableSessionData
sd.LocalOnlySessionData = m.LocalOnlySessionData
if sd.SessionUser().Normalized() != evalCtx.SessionData.SessionUser().Normalized() {
return nil, pgerror.Newf(
pgcode.InsufficientPrivilege,
"can only serialize matching session users",
)
}
*evalCtx.SessionData = *sd
return tree.MakeDBool(true), nil
},
Info: `This function deserializes the serialized variables into the current session.`,
Volatility: tree.VolatilityVolatile,
},
),
}

var lengthImpls = func(incBitOverload bool) builtinDefinition {
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/sem/tree/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -3258,6 +3258,12 @@ type EvalSessionAccessor interface {
HasRoleOption(ctx context.Context, roleOption roleoption.Option) (bool, error)
}

// PreparedStatementState is a limited interface that exposes metadata about
// prepared statements.
type PreparedStatementState interface {
HasPrepared() bool
}

// ClientNoticeSender is a limited interface to send notices to the
// client.
//
Expand Down Expand Up @@ -3531,6 +3537,8 @@ type EvalContext struct {

JoinTokenCreator JoinTokenCreator

PreparedStatementState PreparedStatementState

// The transaction in which the statement is executing.
Txn *kv.Txn
// A handle to the database.
Expand Down
135 changes: 135 additions & 0 deletions pkg/sql/session_migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql_test

import (
"context"
gosql "database/sql"
"fmt"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"
)

// TestSessionMigration tests migrating a session as a data driven test.
// It supports the following directives:
// * reset: resets the connection.
// * exec: executes a SQL command
// * query: executes a SQL command and returns the output
// * dump_vars: dumps variables into a variable called with the given input.
// * compare_vars: compares two dumped variables.
func TestSessionMigration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
datadriven.Walk(t, "testdata/session_migration", func(t *testing.T, path string) {
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

openConnFunc := func() *gosql.DB {
return serverutils.OpenDBConn(
t,
tc.Server(0).ServingSQLAddr(),
"defaultdb", /* database */
false, /* insecure */
tc.Server(0).Stopper(),
)
}
dbConn := openConnFunc()
defer func() {
_ = dbConn.Close()
}()

vars := make(map[string]string)
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
getQuery := func() string {
q := d.Input
for k, v := range vars {
q = strings.ReplaceAll(q, k, v)
}
return q
}
switch d.Cmd {
case "reset":
require.NoError(t, dbConn.Close())
dbConn = openConnFunc()
return ""
case "exec":
_, err := dbConn.Exec(getQuery())
if err != nil {
return err.Error()
}
return ""
case "dump_vars":
require.NotEmpty(t, d.Input, "expected table name")
_, err := dbConn.Exec(fmt.Sprintf("CREATE TABLE %s AS SELECT * FROM [SHOW ALL]", d.Input))
require.NoError(t, err)
return ""
case "compare_vars":
tables := strings.Split(d.Input, "\n")
require.Len(t, tables, 2, "require 2 tables to compare against")

q := `SELECT dump.variable, dump.value, dump2.variable, dump2.value
FROM dump
FULL OUTER JOIN dump2
ON ( dump.variable = dump2.variable )
WHERE dump.variable IS NULL OR dump2.variable IS NULL OR dump.variable != dump2.variable`
for _, repl := range []struct {
from string
to string
}{
{"dump2", tables[1]},
{"dump", tables[0]},
} {
q = strings.ReplaceAll(q, repl.from, repl.to)
}
rows, err := dbConn.Query(q)
require.NoError(t, err)
ret, err := sqlutils.RowsToDataDrivenOutput(rows)
require.NoError(t, err)
return ret
case "query":
q := d.Input
for k, v := range vars {
q = strings.ReplaceAll(q, k, v)
}
rows, err := dbConn.Query(getQuery())
if err != nil {
return err.Error()
}
ret, err := sqlutils.RowsToDataDrivenOutput(rows)
require.NoError(t, err)
return ret
case "let":
row := dbConn.QueryRow(getQuery())
var v string
require.NoError(t, row.Err())
require.NoError(t, row.Scan(&v))
require.Len(t, d.CmdArgs, 1, "only one argument permitted for let")
for _, arg := range d.CmdArgs {
vars[arg.Key] = v
}
return ""
}
t.Fatalf("unknown command: %s", d.Cmd)
return "unexpected"
})
})
}
Loading

0 comments on commit 53a8df0

Please sign in to comment.