Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-20.1: sql: fix portals after exhausting rows #52443

Merged
merged 2 commits into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 34 additions & 23 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,12 +645,13 @@ func (s *Server) newConnExecutor(
ex.phaseTimes[sessionInit] = timeutil.Now()
ex.extraTxnState.prepStmtsNamespace = prepStmtNamespace{
prepStmts: make(map[string]*PreparedStatement),
portals: make(map[string]*PreparedPortal),
portals: make(map[string]PreparedPortal),
}
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos = prepStmtNamespace{
prepStmts: make(map[string]*PreparedStatement),
portals: make(map[string]*PreparedPortal),
portals: make(map[string]PreparedPortal),
}
ex.extraTxnState.prepStmtsNamespaceMemAcc = ex.sessionMon.MakeBoundAccount()
ex.extraTxnState.tables = TableCollection{
leaseMgr: s.cfg.LeaseManager,
databaseCache: s.dbCache.getDatabaseCache(),
Expand Down Expand Up @@ -875,8 +876,13 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {

if closeType != panicClose {
// Close all statements and prepared portals.
ex.extraTxnState.prepStmtsNamespace.resetTo(ctx, prepStmtNamespace{})
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.resetTo(ctx, prepStmtNamespace{})
ex.extraTxnState.prepStmtsNamespace.resetTo(
ctx, prepStmtNamespace{}, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.resetTo(
ctx, prepStmtNamespace{}, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
ex.extraTxnState.prepStmtsNamespaceMemAcc.Close(ctx)
}

if ex.sessionTracing.Enabled() {
Expand Down Expand Up @@ -1007,6 +1013,12 @@ type connExecutor struct {
// collections, but these collections are periodically reconciled.
prepStmtsNamespaceAtTxnRewindPos prepStmtNamespace

// prepStmtsNamespaceMemAcc is the memory account that is shared
// between prepStmtsNamespace and prepStmtsNamespaceAtTxnRewindPos. It
// tracks the memory usage of portals and should be closed upon
// connExecutor's closure.
prepStmtsNamespaceMemAcc mon.BoundAccount

// onTxnFinish (if non-nil) will be called when txn is finished (either
// committed or aborted). It is set when txn is started but can remain
// unset when txn is executed within another higher-level txn.
Expand Down Expand Up @@ -1135,7 +1147,7 @@ type prepStmtNamespace struct {
// session.
prepStmts map[string]*PreparedStatement
// portals contains the portals currently available on the session.
portals map[string]*PreparedPortal
portals map[string]PreparedPortal
}

func (ns prepStmtNamespace) String() string {
Expand All @@ -1155,13 +1167,15 @@ func (ns prepStmtNamespace) String() string {
// references are release and all the to's references are duplicated.
//
// An empty `to` can be passed in to deallocate everything.
func (ns *prepStmtNamespace) resetTo(ctx context.Context, to prepStmtNamespace) {
func (ns *prepStmtNamespace) resetTo(
ctx context.Context, to prepStmtNamespace, prepStmtsNamespaceMemAcc *mon.BoundAccount,
) {
for name, p := range ns.prepStmts {
p.decRef(ctx)
delete(ns.prepStmts, name)
}
for name, p := range ns.portals {
p.decRef(ctx)
p.decRef(ctx, prepStmtsNamespaceMemAcc, name)
delete(ns.portals, name)
}

Expand Down Expand Up @@ -1192,7 +1206,7 @@ func (ex *connExecutor) resetExtraTxnState(

// Close all portals.
for name, p := range ex.extraTxnState.prepStmtsNamespace.portals {
p.decRef(ctx)
p.decRef(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
delete(ex.extraTxnState.prepStmtsNamespace.portals, name)
}

Expand Down Expand Up @@ -1397,10 +1411,10 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
// ExecPortal is handled like ExecStmt, except that the placeholder info
// is taken from the portal.

portal, ok := ex.extraTxnState.prepStmtsNamespace.portals[tcmd.Name]
portalName := tcmd.Name
portal, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]
if !ok {
err := pgerror.Newf(
pgcode.InvalidCursorName, "unknown portal %q", tcmd.Name)
err := pgerror.Newf(pgcode.InvalidCursorName, "unknown portal %q", portalName)
ev = eventNonRetriableErr{IsCommit: fsm.False}
payload = eventNonRetriableErrPayload{err: err}
res = ex.clientComm.CreateErrorResult(pos)
Expand Down Expand Up @@ -1440,18 +1454,11 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
pos, portal.OutFormats,
ex.sessionData.DataConversion,
tcmd.Limit,
tcmd.Name,
portalName,
ex.implicitTxn(),
)
res = stmtRes
curStmt := Statement{
Statement: portal.Stmt.Statement,
Prepared: portal.Stmt,
ExpectedTypes: portal.Stmt.Columns,
AnonymizedStr: portal.Stmt.AnonymizedStr,
}
stmtCtx := withStatement(ctx, ex.curStmt)
ev, payload, err = ex.execStmt(stmtCtx, curStmt, stmtRes, pinfo)
ev, payload, err = ex.execPortal(ctx, portal, portalName, stmtRes, pinfo)
if err != nil {
return err
}
Expand Down Expand Up @@ -1854,14 +1861,16 @@ func (ex *connExecutor) generateID() ClusterWideID {
// prepStmtsNamespaceAtTxnRewindPos that's not part of prepStmtsNamespace.
func (ex *connExecutor) commitPrepStmtNamespace(ctx context.Context) {
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.resetTo(
ctx, ex.extraTxnState.prepStmtsNamespace)
ctx, ex.extraTxnState.prepStmtsNamespace, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
}

// commitPrepStmtNamespace deallocates everything in prepStmtsNamespace that's
// not part of prepStmtsNamespaceAtTxnRewindPos.
func (ex *connExecutor) rewindPrepStmtNamespace(ctx context.Context) {
ex.extraTxnState.prepStmtsNamespace.resetTo(
ctx, ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos)
ctx, ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
}

// getRewindTxnCapability checks whether rewinding to the position previously
Expand Down Expand Up @@ -2572,7 +2581,9 @@ func (ps connExPrepStmtsAccessor) Delete(ctx context.Context, name string) bool

// DeleteAll is part of the preparedStatementsAccessor interface.
func (ps connExPrepStmtsAccessor) DeleteAll(ctx context.Context) {
ps.ex.extraTxnState.prepStmtsNamespace.resetTo(ctx, prepStmtNamespace{})
ps.ex.extraTxnState.prepStmtsNamespace.resetTo(
ctx, prepStmtNamespace{}, &ps.ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
}

// contextStatementKey is an empty type for the handle associated with the
Expand Down
59 changes: 57 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,61 @@ func (ex *connExecutor) recordFailure() {
ex.metrics.EngineMetrics.FailureCount.Inc(1)
}

// execPortal executes a prepared statement. It is a "wrapper" around execStmt
// method that is performing additional work to track portal's state.
func (ex *connExecutor) execPortal(
ctx context.Context,
portal PreparedPortal,
portalName string,
stmtRes CommandResult,
pinfo *tree.PlaceholderInfo,
) (ev fsm.Event, payload fsm.EventPayload, err error) {
curStmt := Statement{
Statement: portal.Stmt.Statement,
Prepared: portal.Stmt,
ExpectedTypes: portal.Stmt.Columns,
AnonymizedStr: portal.Stmt.AnonymizedStr,
}
stmtCtx := withStatement(ctx, ex.curStmt)
switch ex.machine.CurState().(type) {
case stateOpen:
// We're about to execute the statement in an open state which
// could trigger the dispatch to the execution engine. However, it
// is possible that we're trying to execute an already exhausted
// portal - in such a scenario we should return no rows, but the
// execution engine is not aware of that and would run the
// statement as if it was running it for the first time. In order
// to prevent such behavior, we check whether the portal has been
// exhausted and execute the statement only if it hasn't. If it has
// been exhausted, then we do not dispatch the query for execution,
// but connExecutor will still perform necessary state transitions
// which will emit CommandComplete messages and alike (in a sense,
// by not calling execStmt we "execute" the portal in such a way
// that it returns 0 rows).
// Note that here we deviate from Postgres which returns an error
// when attempting to execute an exhausted portal which has a
// StatementType() different from "Rows".
if !portal.exhausted {
ev, payload, err = ex.execStmt(stmtCtx, curStmt, stmtRes, pinfo)
// Portal suspension is supported via a "side" state machine
// (see pgwire.limitedCommandResult for details), so when
// execStmt returns, we know for sure that the portal has been
// executed to completion, thus, it is exhausted.
// Note that the portal is considered exhausted regardless of
// the fact whether an error occurred or not - if it did, we
// still don't want to re-execute the portal from scratch.
// The current statement may have just closed and deleted the portal,
// so only exhaust it if it still exists.
if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]; ok {
ex.exhaustPortal(portalName)
}
}
default:
ev, payload, err = ex.execStmt(stmtCtx, curStmt, stmtRes, pinfo)
}
return
}

// execStmtInOpenState executes one statement in the context of the session's
// current transaction.
// It handles statements that affect the transaction state (BEGIN, COMMIT)
Expand Down Expand Up @@ -436,10 +491,10 @@ func (ex *connExecutor) execStmtInOpenState(
// is re-configured, re-set etc without using NewTxnWithSteppingEnabled().
//
// Manually hunting them down and calling ConfigureStepping() each
// time would be error prone (and increase the change that a future
// time would be error prone (and increase the chance that a future
// change would forget to add the call).
//
// TODO(andrei): really the code should be re-architectued to ensure
// TODO(andrei): really the code should be rearchitected to ensure
// that all uses of SQL execution initialize the client.Txn using a
// single/common function. That would be where the stepping mode
// gets enabled once for all SQL statements executed "underneath".
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/conn_executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import (
)

// Test portal implicit destruction. Unless destroying a portal is explicitly
// requested, portals live until the end of the transaction in which
// they'recreated. If they're created outside of a transaction, they live until
// requested, portals live until the end of the transaction in which they're
// created. If they're created outside of a transaction, they live until
// the next transaction completes (so until the next statement is executed,
// which statement is expected to be the execution of the portal that was just
// created).
Expand Down
20 changes: 14 additions & 6 deletions pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,7 @@ func (ex *connExecutor) execBind(
}

// Create the new PreparedPortal.
if err := ex.addPortal(
ctx, portalName, bindCmd.PreparedStatementName, ps, qargs, columnFormatCodes,
); err != nil {
if err := ex.addPortal(ctx, portalName, ps, qargs, columnFormatCodes); err != nil {
return retErr(err)
}

Expand All @@ -380,7 +378,6 @@ func (ex *connExecutor) execBind(
func (ex *connExecutor) addPortal(
ctx context.Context,
portalName string,
psName string,
stmt *PreparedStatement,
qargs tree.QueryArguments,
outFormats []pgwirebase.FormatCode,
Expand All @@ -389,7 +386,7 @@ func (ex *connExecutor) addPortal(
panic(fmt.Sprintf("portal already exists: %q", portalName))
}

portal, err := ex.newPreparedPortal(ctx, portalName, stmt, qargs, outFormats)
portal, err := ex.makePreparedPortal(ctx, portalName, stmt, qargs, outFormats)
if err != nil {
return err
}
Expand All @@ -398,6 +395,17 @@ func (ex *connExecutor) addPortal(
return nil
}

// exhaustPortal marks a portal with the provided name as "exhausted" and
// panics if there is no portal with such name.
func (ex *connExecutor) exhaustPortal(portalName string) {
portal, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]
if !ok {
panic(errors.AssertionFailedf("portal %s doesn't exist", portalName))
}
portal.exhausted = true
ex.extraTxnState.prepStmtsNamespace.portals[portalName] = portal
}

func (ex *connExecutor) deletePreparedStmt(ctx context.Context, name string) {
ps, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[name]
if !ok {
Expand All @@ -412,7 +420,7 @@ func (ex *connExecutor) deletePortal(ctx context.Context, name string) {
if !ok {
return
}
portal.decRef(ctx)
portal.decRef(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
delete(ex.extraTxnState.prepStmtsNamespace.portals, name)
}

Expand Down
52 changes: 52 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/pgjdbc
Original file line number Diff line number Diff line change
@@ -1,3 +1,55 @@
# deallocate_test checks that we can run DEALLOCATE ALL using a prepared
# statement. See #52915.
send
Query {"String": "DROP TABLE IF EXISTS deallocate_test"}
----

until ignore=NoticeResponse
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"DROP TABLE"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "CREATE TABLE deallocate_test (a INT)"}
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"CREATE TABLE"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# 80 = ASCII 'P' for Portal
send
Parse {"Name": "s1", "Query": "DEALLOCATE ALL"}
Bind {"DestinationPortal": "p1", "PreparedStatement": "s1"}
Describe {"ObjectType": 80, "Name": "p1"}
Execute {"Portal": "p1"}
Sync
----

until
ReadyForQuery
----
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"NoData"}
{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "DISCARD ALL"}
----

until
ReadyForQuery
----
{"Type":"ParameterStatus","Name":"application_name","Value":""}
{"Type":"ParameterStatus","Name":"TimeZone","Value":"UTC"}
{"Type":"CommandComplete","CommandTag":"DISCARD"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Send a simple query in the middle of extended protocol, which is apparently
# allowed. (See #41511, #33693)
send
Expand Down
Loading