Skip to content

Commit

Permalink
sql/pgwire: improve detection of Sync message in extended protocol
Browse files Browse the repository at this point in the history
A previous commit (1b42d0a) added code
to detect when the next message in the extended protocol is a Sync
message, in order to allow an optimization in some cases.

That approach wasn't reliable, since even if the pgwire buffer receives
an Execute and a Sync command together, it may not push them into the
connExecutor StmtBuf at the same time. This could lead to the
connExecutor missing the sync.

This is fixed by checking for the Sync at the pgwire level instead.

Release note: None
  • Loading branch information
rafiss committed Jan 13, 2022
1 parent 3d924ff commit bc38925
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 56 deletions.
15 changes: 13 additions & 2 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1749,7 +1749,10 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
)
res = stmtRes

ev, payload, err = ex.execStmt(ctx, tcmd.Statement, nil /* prepared */, nil /* pinfo */, stmtRes)
canAutoCommit := ex.implicitTxn()
ev, payload, err = ex.execStmt(
ctx, tcmd.Statement, nil /* prepared */, nil /* pinfo */, stmtRes, canAutoCommit,
)
return err
}()
// Note: we write to ex.statsCollector.PhaseTimes, instead of ex.phaseTimes,
Expand Down Expand Up @@ -1817,7 +1820,15 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
ex.implicitTxn(),
)
res = stmtRes
ev, payload, err = ex.execPortal(ctx, portal, portalName, stmtRes, pinfo)

// In the extended protocol, autocommit is not always allowed. The postgres
// docs say that commands in the extended protocol are all treated as an
// implicit transaction that does not get committed until a Sync message is
// received. However, if we are executing a statement that is immediately
// followed by Sync (which is the common case), then we still can auto-commit,
// which allows the 1PC txn fast path to be used.
canAutoCommit := ex.implicitTxn() && tcmd.FollowedBySync
ev, payload, err = ex.execPortal(ctx, portal, portalName, stmtRes, pinfo, canAutoCommit)
return err
}()
// Note: we write to ex.statsCollector.phaseTimes, instead of ex.phaseTimes,
Expand Down
26 changes: 7 additions & 19 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (ex *connExecutor) execStmt(
prepared *PreparedStatement,
pinfo *tree.PlaceholderInfo,
res RestrictedCommandResult,
canAutoCommit bool,
) (fsm.Event, fsm.EventPayload, error) {
ast := parserStmt.AST
if log.V(2) || logStatementsExecuteEnabled.Get(&ex.server.cfg.Settings.SV) ||
Expand Down Expand Up @@ -129,10 +130,10 @@ func (ex *connExecutor) execStmt(
"stmt.no.constants", stmtNoConstants,
)
pprof.Do(ctx, labels, func(ctx context.Context) {
ev, payload, err = ex.execStmtInOpenState(ctx, parserStmt, prepared, pinfo, res)
ev, payload, err = ex.execStmtInOpenState(ctx, parserStmt, prepared, pinfo, res, canAutoCommit)
})
} else {
ev, payload, err = ex.execStmtInOpenState(ctx, parserStmt, prepared, pinfo, res)
ev, payload, err = ex.execStmtInOpenState(ctx, parserStmt, prepared, pinfo, res, canAutoCommit)
}
switch ev.(type) {
case eventNonRetriableErr:
Expand Down Expand Up @@ -197,6 +198,7 @@ func (ex *connExecutor) execPortal(
portalName string,
stmtRes CommandResult,
pinfo *tree.PlaceholderInfo,
canAutoCommit bool,
) (ev fsm.Event, payload fsm.EventPayload, err error) {
switch ex.machine.CurState().(type) {
case stateOpen:
Expand All @@ -219,7 +221,7 @@ func (ex *connExecutor) execPortal(
if portal.exhausted {
return nil, nil, nil
}
ev, payload, err = ex.execStmt(ctx, portal.Stmt.Statement, portal.Stmt, pinfo, stmtRes)
ev, payload, err = ex.execStmt(ctx, portal.Stmt.Statement, portal.Stmt, pinfo, stmtRes, canAutoCommit)
// Portal suspension is supported via a "side" state machine
// (see pgwire.limitedCommandResult for details), so when
// execStmt returns, we know for sure that the portal has been
Expand All @@ -235,7 +237,7 @@ func (ex *connExecutor) execPortal(
return ev, payload, err

default:
return ex.execStmt(ctx, portal.Stmt.Statement, portal.Stmt, pinfo, stmtRes)
return ex.execStmt(ctx, portal.Stmt.Statement, portal.Stmt, pinfo, stmtRes, canAutoCommit)
}
}

Expand All @@ -258,6 +260,7 @@ func (ex *connExecutor) execStmtInOpenState(
prepared *PreparedStatement,
pinfo *tree.PlaceholderInfo,
res RestrictedCommandResult,
canAutoCommit bool,
) (retEv fsm.Event, retPayload fsm.EventPayload, retErr error) {
ast := parserStmt.AST
ctx = withStatement(ctx, ast)
Expand All @@ -276,28 +279,13 @@ func (ex *connExecutor) execStmtInOpenState(
}
os := ex.machine.CurState().(stateOpen)

isNextCmdSync := false
isExtendedProtocol := prepared != nil
if isExtendedProtocol {
stmt = makeStatementFromPrepared(prepared, queryID)
// Only check for Sync in the extended protocol. In the simple protocol,
// Sync is meaningless, so it's OK to let isNextCmdSync default to false.
isNextCmdSync, err = ex.stmtBuf.isNextCmdSync()
if err != nil {
return makeErrEvent(err)
}
} else {
stmt = makeStatement(parserStmt, queryID)
}

// In some cases, we need to turn off autocommit behavior here. The postgres
// docs say that commands in the extended protocol are all treated as an
// implicit transaction that does not get committed until a Sync message is
// received. However, if we are executing a statement that is immediately
// followed by Sync (which is the common case), then we still can auto-commit,
// which allows the "insert fast path" (1PC optimization) to be used.
canAutoCommit := os.ImplicitTxn.Get() && (!isExtendedProtocol || isNextCmdSync)

ex.incrementStartedStmtCounter(ast)
defer func() {
if retErr == nil && !payloadHasError(retPayload) {
Expand Down
33 changes: 3 additions & 30 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ type ExecPortal struct {
// TimeReceived is the time at which the exec message was received
// from the client. Used to compute the service latency.
TimeReceived time.Time
// FollowedBySync is true if the next command after this is a Sync. This is
// used to enable the 1PC txn fast path in the extended protocol.
FollowedBySync bool
}

// command implements the Command interface.
Expand Down Expand Up @@ -442,36 +445,6 @@ func (buf *StmtBuf) CurCmd() (Command, CmdPos, error) {
}
}

// isNextCmdSync peeks at the next command, and returns true if it is a Sync
// message. If there is not another command in the buffer already, then false
// is returned. The position is reset before returning, so this does not
// affect the curPos.
//
// If the buffer has previously been Close()d, or is closed while this is
// blocked, io.EOF is returned.
func (buf *StmtBuf) isNextCmdSync() (bool, error) {
buf.mu.Lock()
prev := buf.mu.curPos
buf.mu.curPos++
defer func() {
buf.mu.curPos = prev
buf.mu.Unlock()
}()
if buf.mu.closed {
return false, io.EOF
}
curPos := buf.mu.curPos
cmdIdx, err := buf.translatePosLocked(curPos)
if err != nil {
return false, err
}
if cmdIdx < buf.mu.data.Len() {
_, isSync := buf.mu.data.Get(cmdIdx).(Sync)
return isSync, nil
}
return false, nil
}

// translatePosLocked translates an absolute position of a command (counting
// from the connection start) to the index of the respective command in the
// buffer (so, it returns an index relative to the start of the buffer).
Expand Down
22 changes: 17 additions & 5 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,18 @@ func (c *conn) serveImpl(
return false, isSimpleQuery, c.stmtBuf.Push(ctx, sql.Sync{})

case pgwirebase.ClientMsgExecute:
return false, isSimpleQuery, c.handleExecute(ctx, &c.readBuf, timeReceived)
// To support the 1PC txn fast path, we peek at the next command to
// see if it is a Sync. This is because in the extended protocol, an
// implicit transaction cannot commit until the Sync is seen. If there's
// an error while peeking (for example, there are no bytes in the
// buffer), the error is ignored since it will be handled on the next
// loop iteration.
followedBySync := false
if nextMsgType, err := c.rd.Peek(1); err == nil &&
pgwirebase.ClientMessageType(nextMsgType[0]) == pgwirebase.ClientMsgSync {
followedBySync = true
}
return false, isSimpleQuery, c.handleExecute(ctx, &c.readBuf, timeReceived, followedBySync)

case pgwirebase.ClientMsgParse:
return false, isSimpleQuery, c.handleParse(ctx, &c.readBuf, parser.NakedIntTypeFromDefaultIntSize(atomic.LoadInt32(atomicUnqualifiedIntSize)))
Expand Down Expand Up @@ -1098,7 +1109,7 @@ func (c *conn) handleBind(ctx context.Context, buf *pgwirebase.ReadBuffer) error
// An error is returned iff the statement buffer has been closed. In that case,
// the connection should be considered toast.
func (c *conn) handleExecute(
ctx context.Context, buf *pgwirebase.ReadBuffer, timeReceived time.Time,
ctx context.Context, buf *pgwirebase.ReadBuffer, timeReceived time.Time, followedBySync bool,
) error {
telemetry.Inc(sqltelemetry.ExecuteRequestCounter)
portalName, err := buf.GetString()
Expand All @@ -1110,9 +1121,10 @@ func (c *conn) handleExecute(
return c.stmtBuf.Push(ctx, sql.SendError{Err: err})
}
return c.stmtBuf.Push(ctx, sql.ExecPortal{
Name: portalName,
TimeReceived: timeReceived,
Limit: int(limit),
Name: portalName,
TimeReceived: timeReceived,
Limit: int(limit),
FollowedBySync: followedBySync,
})
}

Expand Down

0 comments on commit bc38925

Please sign in to comment.