Skip to content

Commit

Permalink
Merge #55569
Browse files Browse the repository at this point in the history
55569: sessiondata: extract all non-local parameters into a protobuf r=yuzefovich a=yuzefovich

This commit divides up all the session parameters into two categories:
local only (those that don't influence the execution if not propagated
to the remote nodes) and non-local (those that must be propagated). The
former are extracted into a separate struct, and the latter are now
mostly stored directly in a protobuf with a few exceptions having custom
serialization/deserialization logic. This allows us to clean up the eval
context proto and centralize the custom logic and the definition.

This commit also fixed an issue of not propagating the default int size
which - in theory - could influence the execution on the remote nodes
(e.g. when performing a cast by an internal executor).

Fixes: #51075.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Oct 22, 2020
2 parents 8d5790f + ab19e5c commit dd647f5
Show file tree
Hide file tree
Showing 44 changed files with 2,059 additions and 1,329 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func descForTable(

var testEvalCtx = &tree.EvalContext{
SessionData: &sessiondata.SessionData{
DataConversion: sessiondata.DataConversionConfig{Location: time.UTC},
Location: time.UTC,
},
StmtTimestamp: timeutil.Unix(100000000, 0),
Settings: cluster.MakeTestingClusterSettings(),
Expand Down
3 changes: 2 additions & 1 deletion pkg/cli/sql_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/lex"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1154,7 +1155,7 @@ func formatVal(val driver.Value, showPrintableUnicode bool, showNewLinesAndTabs
// that we can let the user see and control the result using
// `bytea_output`.
return lex.EncodeByteArrayToRawBytes(string(t),
lex.BytesEncodeEscape, false /* skipHexPrefix */)
sessiondatapb.BytesEncodeEscape, false /* skipHexPrefix */)

case time.Time:
// Since we do not know whether the datum is Timestamp or TimestampTZ,
Expand Down
18 changes: 10 additions & 8 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,14 +708,16 @@ func (s *sqlServer) preStart(
ctx, s.pgServer.SQLServer, s.internalMemMetrics, s.execCfg.Settings)
migrationsExecutor.SetSessionData(
&sessiondata.SessionData{
// Migrations need an executor with query distribution turned off. This is
// because the node crashes if migrations fail to execute, and query
// distribution introduces more moving parts. Local execution is more
// robust; for example, the DistSender has retries if it can't connect to
// another node, but DistSQL doesn't. Also see #44101 for why DistSQL is
// particularly fragile immediately after a node is started (i.e. the
// present situation).
DistSQLMode: sessiondata.DistSQLOff,
LocalOnlySessionData: sessiondata.LocalOnlySessionData{
// Migrations need an executor with query distribution turned off. This is
// because the node crashes if migrations fail to execute, and query
// distribution introduces more moving parts. Local execution is more
// robust; for example, the DistSender has retries if it can't connect to
// another node, but DistSQL doesn't. Also see #44101 for why DistSQL is
// particularly fragile immediately after a node is started (i.e. the
// present situation).
DistSQLMode: sessiondata.DistSQLOff,
},
})
migMgr := sqlmigrations.NewManager(
stopper,
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -147,10 +147,10 @@ func needHashAggregator(aggSpec *execinfrapb.AggregatorSpec) (bool, error) {
// isSupported checks whether we have a columnar operator equivalent to a
// processor described by spec. Note that it doesn't perform any other checks
// (like validity of the number of inputs).
func isSupported(mode sessiondata.VectorizeExecMode, spec *execinfrapb.ProcessorSpec) error {
func isSupported(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.ProcessorSpec) error {
core := spec.Core
isFullVectorization := mode == sessiondata.VectorizeOn ||
mode == sessiondata.VectorizeExperimentalAlways
isFullVectorization := mode == sessiondatapb.VectorizeOn ||
mode == sessiondatapb.VectorizeExperimentalAlways

switch {
case core.Noop != nil:
Expand Down Expand Up @@ -429,9 +429,9 @@ func (r opResult) createAndWrapRowSource(
}
if spec.Core.JoinReader == nil {
switch flowCtx.EvalCtx.SessionData.VectorizeMode {
case sessiondata.Vectorize201Auto:
case sessiondatapb.Vectorize201Auto:
return errors.New("rowexec processor wrapping for non-JoinReader core unsupported in vectorize=201auto mode")
case sessiondata.VectorizeExperimentalAlways:
case sessiondatapb.VectorizeExperimentalAlways:
return causeToWrap
}
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/colexec/window_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils/colcontainerutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -48,7 +47,6 @@ func TestWindowFunctions(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
evalCtx.SessionData.VectorizeMode = sessiondata.VectorizeOn
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1034,7 +1034,7 @@ func (s *vectorizedFlowCreator) setupFlow(
if flowCtx.EvalCtx.SessionData.TestingVectorizeInjectPanics {
result.Op = colexec.NewPanicInjector(result.Op)
}
if flowCtx.EvalCtx.SessionData.VectorizeMode == sessiondata.Vectorize201Auto &&
if flowCtx.EvalCtx.SessionData.VectorizeMode == sessiondatapb.Vectorize201Auto &&
!result.IsStreaming {
err = errors.Errorf("non-streaming operator encountered when vectorize=201auto")
return
Expand Down Expand Up @@ -1076,7 +1076,7 @@ func (s *vectorizedFlowCreator) setupFlow(
}
}

if (flowCtx.EvalCtx.SessionData.VectorizeMode == sessiondata.Vectorize201Auto) &&
if (flowCtx.EvalCtx.SessionData.VectorizeMode == sessiondatapb.Vectorize201Auto) &&
pspec.Output[0].Type == execinfrapb.OutputRouterSpec_BY_HASH {
// colexec.HashRouter is not supported when vectorize=auto since it can
// buffer an unlimited number of tuples, even though it falls back to
Expand Down
32 changes: 16 additions & 16 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
Expand Down Expand Up @@ -434,18 +435,13 @@ type ConnectionHandler struct {
// GetUnqualifiedIntSize implements pgwire.sessionDataProvider and returns
// the type that INT should be parsed as.
func (h ConnectionHandler) GetUnqualifiedIntSize() *types.T {
var size int
var size int32
if h.ex != nil {
// The executor will be nil in certain testing situations where
// no server is actually present.
size = h.ex.sessionData.DefaultIntSize
}
switch size {
case 4, 32:
return types.Int4
default:
return types.Int
}
return parser.NakedIntTypeFromDefaultIntSize(size)
}

// GetParamStatus retrieves the configured value of the session
Expand Down Expand Up @@ -484,9 +480,13 @@ func (s *Server) ServeConn(
// newSessionData a SessionData that can be passed to newConnExecutor.
func (s *Server) newSessionData(args SessionArgs) *sessiondata.SessionData {
sd := &sessiondata.SessionData{
User: args.User,
RemoteAddr: args.RemoteAddr,
ResultsBufferSize: args.ConnResultsBufferSize,
SessionData: sessiondatapb.SessionData{
User: args.User,
},
LocalOnlySessionData: sessiondata.LocalOnlySessionData{
RemoteAddr: args.RemoteAddr,
ResultsBufferSize: args.ConnResultsBufferSize,
},
}
s.populateMinimalSessionData(sd)
return sd
Expand All @@ -509,10 +509,8 @@ func (s *Server) populateMinimalSessionData(sd *sessiondata.SessionData) {
if sd.SequenceState == nil {
sd.SequenceState = sessiondata.NewSequenceState()
}
if sd.DataConversion == (sessiondata.DataConversionConfig{}) {
sd.DataConversion = sessiondata.DataConversionConfig{
Location: time.UTC,
}
if sd.Location == nil {
sd.Location = time.UTC
}
if len(sd.SearchPath.GetPathArray()) == 0 {
sd.SearchPath = sessiondata.DefaultSearchPathForUser(sd.User)
Expand Down Expand Up @@ -1431,7 +1429,8 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
NeedRowDesc,
pos,
nil, /* formatCodes */
ex.sessionData.DataConversion,
ex.sessionData.DataConversionConfig,
ex.sessionData.GetLocation(),
0, /* limit */
"", /* portalName */
ex.implicitTxn(),
Expand Down Expand Up @@ -1502,7 +1501,8 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
// needed.
DontNeedRowDesc,
pos, portal.OutFormats,
ex.sessionData.DataConversion,
ex.sessionData.DataConversionConfig,
ex.sessionData.GetLocation(),
tcmd.Limit,
portalName,
ex.implicitTxn(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (ex *connExecutor) execBind(
"expected %d arguments, got %d", numQArgs, len(bindCmd.Args)))
}

ptCtx := tree.NewParseTimeContext(ex.state.sqlTimestamp.In(ex.sessionData.DataConversion.Location))
ptCtx := tree.NewParseTimeContext(ex.state.sqlTimestamp.In(ex.sessionData.GetLocation()))

for i, arg := range bindCmd.Args {
k := tree.PlaceholderIdx(i)
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/ring"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -591,7 +591,8 @@ type ClientComm interface {
descOpt RowDescOpt,
pos CmdPos,
formatCodes []pgwirebase.FormatCode,
conv sessiondata.DataConversionConfig,
conv sessiondatapb.DataConversionConfig,
location *time.Location,
limit int,
portalName string,
implicitTxn bool,
Expand Down
48 changes: 3 additions & 45 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/faketreeeval"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/lex"
"github.com/cockroachdb/cockroach/pkg/sql/rowflow"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
Expand Down Expand Up @@ -259,44 +259,11 @@ func (ds *ServerImpl) setupFlow(
"EvalContext expected to be populated when IsLocal is set")
}

location, err := timeutil.TimeZoneStringToLocation(
req.EvalContext.Location,
timeutil.TimeZoneStringToLocationISO8601Standard,
)
sd, err := sessiondata.UnmarshalNonLocal(req.EvalContext.SessionData)
if err != nil {
tracing.FinishSpan(sp)
return ctx, nil, err
}

var be lex.BytesEncodeFormat
switch req.EvalContext.BytesEncodeFormat {
case execinfrapb.BytesEncodeFormat_HEX:
be = lex.BytesEncodeHex
case execinfrapb.BytesEncodeFormat_ESCAPE:
be = lex.BytesEncodeEscape
case execinfrapb.BytesEncodeFormat_BASE64:
be = lex.BytesEncodeBase64
default:
return nil, nil, errors.AssertionFailedf("unknown byte encode format: %s",
errors.Safe(req.EvalContext.BytesEncodeFormat))
}
sd := &sessiondata.SessionData{
ApplicationName: req.EvalContext.ApplicationName,
Database: req.EvalContext.Database,
User: req.EvalContext.User,
SearchPath: sessiondata.MakeSearchPath(
req.EvalContext.SearchPath,
).WithTemporarySchemaName(
req.EvalContext.TemporarySchemaName,
).WithUserSchemaName(req.EvalContext.User),
SequenceState: sessiondata.NewSequenceState(),
DataConversion: sessiondata.DataConversionConfig{
Location: location,
BytesEncodeFormat: be,
ExtraFloatDigits: int(req.EvalContext.ExtraFloatDigits),
},
VectorizeMode: sessiondata.VectorizeExecMode(req.EvalContext.Vectorize),
}
ie := &lazyInternalExecutor{
newInternalExecutor: func() sqlutil.InternalExecutor {
return ds.SessionBoundInternalExecutorFactory(ctx, sd)
Expand Down Expand Up @@ -334,15 +301,6 @@ func (ds *ServerImpl) setupFlow(
}
evalCtx.SetStmtTimestamp(timeutil.Unix(0 /* sec */, req.EvalContext.StmtTimestampNanos))
evalCtx.SetTxnTimestamp(timeutil.Unix(0 /* sec */, req.EvalContext.TxnTimestampNanos))
var haveSequences bool
for _, seq := range req.EvalContext.SeqState.Seqs {
evalCtx.SessionData.SequenceState.RecordValue(seq.SeqID, seq.LatestVal)
haveSequences = true
}
if haveSequences {
evalCtx.SessionData.SequenceState.SetLastSequenceIncremented(
*req.EvalContext.SeqState.LastSeqIncremented)
}
}

// Create the FlowCtx for the flow.
Expand All @@ -352,7 +310,7 @@ func (ds *ServerImpl) setupFlow(
// have non-nil localState.EvalContext. We don't want to update EvalContext
// itself when the vectorize mode needs to be changed because we would need
// to restore the original value which can have data races under stress.
isVectorized := sessiondata.VectorizeExecMode(req.EvalContext.Vectorize) != sessiondata.VectorizeOff
isVectorized := req.EvalContext.SessionData.VectorizeMode != sessiondatapb.VectorizeOff
f := newFlow(flowCtx, ds.flowRegistry, syncFlowConsumer, localState.LocalProcs, isVectorized)
opt := flowinfra.FuseNormally
if localState.IsLocal {
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
Expand Down Expand Up @@ -151,13 +151,13 @@ func (dsp *DistSQLPlanner) setupFlows(
resultChan = make(chan runnerResult, len(flows)-1)
}

if evalCtx.SessionData.VectorizeMode != sessiondata.VectorizeOff {
if !vectorizeThresholdMet && (evalCtx.SessionData.VectorizeMode == sessiondata.Vectorize201Auto || evalCtx.SessionData.VectorizeMode == sessiondata.VectorizeOn) {
if evalCtx.SessionData.VectorizeMode != sessiondatapb.VectorizeOff {
if !vectorizeThresholdMet && (evalCtx.SessionData.VectorizeMode == sessiondatapb.Vectorize201Auto || evalCtx.SessionData.VectorizeMode == sessiondatapb.VectorizeOn) {
// Vectorization is not justified for this flow because the expected
// amount of data is too small and the overhead of pre-allocating data
// structures needed for the vectorized engine is expected to dominate
// the execution time.
setupReq.EvalContext.Vectorize = int32(sessiondata.VectorizeOff)
setupReq.EvalContext.SessionData.VectorizeMode = sessiondatapb.VectorizeOff
} else {
// Now we check to see whether or not to even try vectorizing the flow.
// The goal here is to determine up front whether all of the flows can be
Expand All @@ -182,7 +182,7 @@ func (dsp *DistSQLPlanner) setupFlows(
); err != nil {
// Vectorization attempt failed with an error.
returnVectorizationSetupError := false
if evalCtx.SessionData.VectorizeMode == sessiondata.VectorizeExperimentalAlways {
if evalCtx.SessionData.VectorizeMode == sessiondatapb.VectorizeExperimentalAlways {
returnVectorizationSetupError = true
// If running with VectorizeExperimentalAlways, this check makes sure
// that we can still run SET statements (mostly to set vectorize to
Expand All @@ -206,7 +206,7 @@ func (dsp *DistSQLPlanner) setupFlows(
}
// Vectorization is not supported for this flow, so we override the
// setting.
setupReq.EvalContext.Vectorize = int32(sessiondata.VectorizeOff)
setupReq.EvalContext.SessionData.VectorizeMode = sessiondatapb.VectorizeOff
break
}
}
Expand Down
Loading

0 comments on commit dd647f5

Please sign in to comment.