Skip to content

Commit

Permalink
builtins: add pg_backend_pid()
Browse files Browse the repository at this point in the history
Release note (sql change): Updated the pg_backend_pid() builtin function
so it matches with the data in the query cancellation key created during
session initialization. The function is just for compatibility, and it
does not return a real process ID.
  • Loading branch information
rafiss committed Jun 16, 2022
1 parent c030b8b commit 343335a
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 3 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3419,6 +3419,8 @@ A write probe will effectively probe reads as well.</p>
</span></td></tr>
<tr><td><a name="oid"></a><code>oid(int: <a href="int.html">int</a>) &rarr; oid</code></td><td><span class="funcdesc"><p>Converts an integer to an OID.</p>
</span></td></tr>
<tr><td><a name="pg_backend_pid"></a><code>pg_backend_pid() &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>Returns a numerical ID attached to this session. This ID is part of the query cancellation key used by the wire protocol. This function was only added for compatibility, and unlike in Postgres, thereturned value does not correspond to a real process ID.</p>
</span></td></tr>
<tr><td><a name="pg_collation_for"></a><code>pg_collation_for(str: anyelement) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>Returns the collation of the argument</p>
</span></td></tr>
<tr><td><a name="pg_column_is_updatable"></a><code>pg_column_is_updatable(reloid: oid, attnum: int2, include_triggers: <a href="bool.html">bool</a>) &rarr; <a href="bool.html">bool</a></code></td><td><span class="funcdesc"><p>Returns whether the given column can be updated.</p>
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2674,6 +2674,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
RangeProber: p.execCfg.RangeProber,
StmtDiagnosticsRequestInserter: ex.server.cfg.StmtDiagnosticsRecorder.InsertRequest,
CatalogBuiltins: &p.evalCatalogBuiltins,
QueryCancelKey: ex.queryCancelKey,
},
Tracing: &ex.sessionTracing,
MemMetrics: &ex.memMetrics,
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/pgwire/pgwirecancel/backend_key_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,12 @@ func (b BackendKeyData) GetSQLInstanceID() base.SQLInstanceID {
bits = bits &^ leadingBitMask
// Use the upper 32 bits as the sqlInstanceID.
return base.SQLInstanceID(bits >> 32)
}

// GetPGBackendPID returns the upper 32 bits of this BackendKeyData. In Postgres,
// this is the process ID, but we expose it only for compatibility.
func (b BackendKeyData) GetPGBackendPID() uint32 {
bits := uint64(b)
return uint32(bits >> 32)

}
10 changes: 7 additions & 3 deletions pkg/sql/sem/builtins/pg_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,14 @@ var pgBuiltins = map[string]builtinDefinition{
tree.Overload{
Types: tree.ArgTypes{},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(_ *eval.Context, _ tree.Datums) (tree.Datum, error) {
return tree.NewDInt(-1), nil
Fn: func(ctx *eval.Context, _ tree.Datums) (tree.Datum, error) {
pid := ctx.QueryCancelKey.GetPGBackendPID()
return tree.NewDInt(tree.DInt(pid)), nil
},
Info: notUsableInfo,
Info: "Returns a numerical ID attached to this session. This ID is " +
"part of the query cancellation key used by the wire protocol. This " +
"function was only added for compatibility, and unlike in Postgres, the" +
"returned value does not correspond to a real process ID.",
Volatility: volatility.Stable,
},
),
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/eval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ go_library(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgnotice",
"//pkg/sql/pgwire/pgwirecancel",
"//pkg/sql/privilege",
"//pkg/sql/roleoption",
"//pkg/sql/sem/cast",
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/sem/eval/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirecancel"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
Expand Down Expand Up @@ -211,6 +212,10 @@ type Context struct {
// CatalogBuiltins is used by various builtins which depend on looking up
// catalog information. Unlike the Planner, it is available in DistSQL.
CatalogBuiltins CatalogBuiltins

// QueryCancelKey is the key used by the pgwire protocol to cancel the
// query currently running in this session.
QueryCancelKey pgwirecancel.BackendKeyData
}

var _ tree.ParseTimeContext = &Context{}
Expand Down
34 changes: 34 additions & 0 deletions pkg/testutils/pgtest/pgtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net"
"reflect"
"strconv"
"strings"
"testing"

Expand Down Expand Up @@ -78,11 +79,44 @@ func NewPGTest(ctx context.Context, addr, user string) (*PGTest, error) {
if backendKeyData == nil {
return nil, errors.Errorf("did not receive BackendKeyData")
}
if err := checkPGBackendPID(p, backendKeyData); err != nil {
return nil, err
}

p.isCockroachDB = foundCrdb
success = err == nil
return p, err
}

func checkPGBackendPID(p *PGTest, backendKeyData *pgproto3.BackendKeyData) error {
if err := p.fe.Send(&pgproto3.Query{
String: "SELECT pg_backend_pid();",
}); err != nil {
return errors.Wrap(err, "fetching pg_backend_pid")
}
msgs, err := p.Until(false /* keepErrMsg */, &pgproto3.ReadyForQuery{})
if err != nil {
return errors.Wrap(err, "fetching pg_backend_pid")
}
matched := false
for _, msg := range msgs {
if d, ok := msg.(*pgproto3.DataRow); ok {
pid, err := strconv.Atoi(string(d.Values[0]))
if err != nil {
return errors.Wrap(err, "parsing pg_backend_pid")
}
if uint32(pid) != backendKeyData.ProcessID {
return errors.Errorf("wrong pg_backend_pid; wanted %d, got %d", backendKeyData.ProcessID, pid)
}
matched = true
}
}
if !matched {
return errors.Errorf("could not retrieve pg_backend_pid")
}
return nil
}

// Close sends a Terminate message and closes the connection.
func (p *PGTest) Close() error {
defer p.conn.Close()
Expand Down

0 comments on commit 343335a

Please sign in to comment.