Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
110709: plpgsql: implement OPEN statements r=DrewKimball a=DrewKimball

#### plpgsql: add parser support for cursors

This patch adds support in the PLpgSQL parser for the following commands
related to cursors: `DECLARE`, `OPEN`, `FETCH`, `MOVE`, and `CLOSE`.
The `OPEN ... FOR EXECUTE ...` syntax is not currently implemented.

Informs cockroachdb#105254

Release note: None

#### plpgsql: add execution support for OPEN statements

This patch adds support for executing PLpgSQL OPEN statements, which
open a SQL cursor in the current transaction. The name of the cursor
is supplied through a PLpgSQL variable. Since the `REFCURSOR` type
hasn't been implemented yet, this patch uses `STRING` in the
meantime.

Limitations that will be lifted in future PRs:
1. Unnamed cursor declarations are not supported. If a cursor is opened
   with no name supplied, a name should be automatically generated.
2. Bound cursors are not yet supported. It should be possible to declare
   a cursor in the `DECLARE` block with the query already defined, at
   which point it can be opened with `OPEN <cursor>;`.
3. A cursor cannot be opened in a routine with an exception block. This
   is because correct handling of this case is waiting on separate work
   to implement rollback of changes to database state on exceptions.

Informs cockroachdb#109709

Release note (sql change): Added initial support for executing the
PLpgSQL `OPEN` statement, which allows a PLpgSQL routine to create a
cursor. Currently, opening bound or unnamed cursors is not supported.
In addition, `OPEN` statements cannot be used in a routine with an
exception block.

111388: kvserver: latching changes for replicated shared locks r=nvanbenschoten a=arulajmani

Two locking requests from the same transaction that are trying to acquire replicated shared locks need to be isolated from one another. They don't need to be isolated against shared locking requests from other transactions and unreplicated shared lock attempts from the same transaction.

To achieve these semantics, we introduce a per-transaction range local key that all replicated shared locking requests declare non-MVCC write latches over.

Closes cockroachdb#109668

Release note: None

Co-authored-by: Drew Kimball <[email protected]>
Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
3 people committed Sep 28, 2023
3 parents f0f0f93 + 3ee8f89 + 6fa4870 commit 916f096
Show file tree
Hide file tree
Showing 38 changed files with 1,684 additions and 337 deletions.
7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ var (
// AbortSpan protects a transaction from re-reading its own intents
// after it's been aborted.
LocalAbortSpanSuffix = []byte("abc-")
// LocalReplicatedSharedLocksTransactionLatchingKeySuffix specifies the key
// suffix ("rsl" = replicated shared locks) for all replicated shared lock
// attempts, per transaction. The detail about the transaction is the
// transaction id.
LocalReplicatedSharedLocksTransactionLatchingKeySuffix = roachpb.RKey("rsl-")
// localRangeFrozenStatusSuffix is DEPRECATED and remains to prevent reuse.
localRangeFrozenStatusSuffix = []byte("fzn-")
// LocalRangeGCThresholdSuffix is the suffix for the GC threshold. It keeps
Expand Down
13 changes: 7 additions & 6 deletions pkg/keys/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,13 @@ var _ = [...]interface{}{
// range as a whole. Though they are replicated, they are unaddressable.
// Typical examples are MVCC stats and the abort span. They all share
// `LocalRangeIDPrefix` and `LocalRangeIDReplicatedInfix`.
AbortSpanKey, // "abc-"
RangeGCThresholdKey, // "lgc-"
RangeAppliedStateKey, // "rask"
RangeLeaseKey, // "rll-"
RangePriorReadSummaryKey, // "rprs"
RangeVersionKey, // "rver"
AbortSpanKey, // "abc-"
ReplicatedSharedLocksTransactionLatchingKey, // "rsl-"
RangeGCThresholdKey, // "lgc-"
RangeAppliedStateKey, // "rask"
RangeLeaseKey, // "rll-"
RangePriorReadSummaryKey, // "rprs"
RangeVersionKey, // "rver"

// 2. Unreplicated range-ID local keys: These contain metadata that
// pertain to just one replica of a range. They are unreplicated and
Expand Down
19 changes: 19 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,16 @@ func AbortSpanKey(rangeID roachpb.RangeID, txnID uuid.UUID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).AbortSpanKey(txnID)
}

// ReplicatedSharedLocksTransactionLatchingKey returns a range-local key, based
// on the provided range ID and transaction ID, that all replicated shared
// locking requests from the specified transaction should use to serialize on
// latches.
func ReplicatedSharedLocksTransactionLatchingKey(
rangeID roachpb.RangeID, txnID uuid.UUID,
) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).ReplicatedSharedLocksTransactionLatchingKey(txnID)
}

// DecodeAbortSpanKey decodes the provided AbortSpan entry,
// returning the transaction ID.
func DecodeAbortSpanKey(key roachpb.Key, dest []byte) (uuid.UUID, error) {
Expand Down Expand Up @@ -1066,6 +1076,15 @@ func (b RangeIDPrefixBuf) AbortSpanKey(txnID uuid.UUID) roachpb.Key {
return encoding.EncodeBytesAscending(key, txnID.GetBytes())
}

// ReplicatedSharedLocksTransactionLatchingKey returns a range-local key, by
// range ID, for a key on which all replicated shared locking requests from a
// specific transaction should serialize on latches. The per-transaction bit is
// achieved by encoding the supplied transaction ID into the key.
func (b RangeIDPrefixBuf) ReplicatedSharedLocksTransactionLatchingKey(txnID uuid.UUID) roachpb.Key {
key := append(b.replicatedPrefix(), LocalReplicatedSharedLocksTransactionLatchingKeySuffix...)
return encoding.EncodeBytesAscending(key, txnID.GetBytes())
}

// RangeAppliedStateKey returns a system-local key for the range applied state key.
// See comment on RangeAppliedStateKey function.
func (b RangeIDPrefixBuf) RangeAppliedStateKey() roachpb.Key {
Expand Down
20 changes: 20 additions & 0 deletions pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ var (
psFunc func(rangeID roachpb.RangeID, input string) (string, roachpb.Key)
}{
{name: "AbortSpan", suffix: LocalAbortSpanSuffix, ppFunc: abortSpanKeyPrint, psFunc: abortSpanKeyParse},
{name: "ReplicatedSharedLocksTransactionLatch",
suffix: LocalReplicatedSharedLocksTransactionLatchingKeySuffix,
ppFunc: replicatedSharedLocksTransactionLatchingKeyPrint,
},
{name: "RangeTombstone", suffix: LocalRangeTombstoneSuffix},
{name: "RaftHardState", suffix: LocalRaftHardStateSuffix},
{name: "RangeAppliedState", suffix: LocalRangeAppliedStateSuffix},
Expand Down Expand Up @@ -567,6 +571,22 @@ func abortSpanKeyPrint(buf *redact.StringBuilder, key roachpb.Key) {
buf.Printf("/%q", txnID)
}

func replicatedSharedLocksTransactionLatchingKeyPrint(buf *redact.StringBuilder, key roachpb.Key) {
_, id, err := encoding.DecodeBytesAscending([]byte(key), nil)
if err != nil {
buf.Printf("/%q/err:%v", key, err)
return
}

txnID, err := uuid.FromBytes(id)
if err != nil {
buf.Printf("/%q/err:%v", key, err)
return
}

buf.Printf("/%q", txnID)
}

func print(buf *redact.StringBuilder, _ []encoding.Direction, key roachpb.Key) {
buf.Printf("/%q", []byte(key))
}
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func TestPrettyPrint(t *testing.T) {
{keys.StoreLossOfQuorumRecoveryCleanupActionsKey(), "/Local/Store/lossOfQuorumRecovery/cleanup", revertSupportUnknown},

{keys.AbortSpanKey(roachpb.RangeID(1000001), txnID), fmt.Sprintf(`/Local/RangeID/1000001/r/AbortSpan/%q`, txnID), revertSupportUnknown},
{keys.ReplicatedSharedLocksTransactionLatchingKey(roachpb.RangeID(1000001), txnID), fmt.Sprintf(`/Local/RangeID/1000001/r/ReplicatedSharedLocksTransactionLatch/%q`, txnID), revertSupportUnknown},
{keys.RangeAppliedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeAppliedState", revertSupportUnknown},
{keys.RaftTruncatedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/u/RaftTruncatedState", revertSupportUnknown},
{keys.RangeLeaseKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLease", revertSupportUnknown},
Expand Down
14 changes: 12 additions & 2 deletions pkg/kv/kvserver/batcheval/declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func DefaultDeclareKeys(
// ensures that the commands are fully isolated from conflicting transactions
// when it evaluated.
func DefaultDeclareIsolatedKeys(
_ ImmutableRangeState,
rs ImmutableRangeState,
header *kvpb.Header,
req kvpb.Request,
latchSpans *spanset.SpanSet,
Expand Down Expand Up @@ -92,7 +92,8 @@ func DefaultDeclareIsolatedKeys(
// Get the correct lock strength to use for {lock,latch} spans if we're
// dealing with locking read requests.
if readOnlyReq, ok := req.(kvpb.LockingReadRequest); ok {
str, _ = readOnlyReq.KeyLocking()
var dur lock.Durability
str, dur = readOnlyReq.KeyLocking()
switch str {
case lock.None:
panic(errors.AssertionFailedf("unexpected non-locking read handling"))
Expand All @@ -109,6 +110,15 @@ func DefaultDeclareIsolatedKeys(
// from concurrent writers operating at lower timestamps, a shared-locking
// read extends this protection to all timestamps.
timestamp = hlc.MaxTimestamp
if dur == lock.Replicated && header.Txn != nil {
// Concurrent replicated shared lock attempts by the same transaction
// need to be isolated from one another. We acquire a write latch on
// a per-transaction local key to achieve this. See
// https://github.com/cockroachdb/cockroach/issues/109668.
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{
Key: keys.ReplicatedSharedLocksTransactionLatchingKey(rs.GetRangeID(), header.Txn.ID),
})
}
case lock.Exclusive:
// Reads that acquire exclusive locks acquire write latches at the
// request's timestamp. This isolates them from all concurrent writes,
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/concurrency/datadriven_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func scanUserPriority(t *testing.T, d *datadriven.TestData) roachpb.UserPriority
func scanLockDurability(t *testing.T, d *datadriven.TestData) lock.Durability {
var durS string
d.ScanArgs(t, "dur", &durS)
return getLockDurability(t, d, durS)
}

func getLockDurability(t *testing.T, d *datadriven.TestData, durS string) lock.Durability {
switch durS {
case "r":
return lock.Replicated
Expand Down Expand Up @@ -177,13 +181,21 @@ func scanSingleRequest(
}
return concurrency.GetStrength(t, d, s)
}
maybeGetDur := func() lock.Durability {
s, ok := fields["dur"]
if !ok {
return lock.Unreplicated
}
return getLockDurability(t, d, s)
}

switch cmd {
case "get":
var r kvpb.GetRequest
r.Sequence = maybeGetSeq()
r.Key = roachpb.Key(mustGetField("key"))
r.KeyLockingStrength = maybeGetStr()
r.KeyLockingDurability = maybeGetDur()
return &r

case "scan":
Expand All @@ -194,6 +206,7 @@ func scanSingleRequest(
r.EndKey = roachpb.Key(v)
}
r.KeyLockingStrength = maybeGetStr()
r.KeyLockingDurability = maybeGetDur()
return &r

case "put":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,3 +641,78 @@ finish req=req33
finish req=req34
----
[-] finish req34: finishing request

# ------------------------------------------------------------------------------
# Ensure concurrent replicated shared locking requests by the same transaction
# conflict on latches. Also ensure concurrent replicated shared lock attempts
# by different transactions do not.
# ------------------------------------------------------------------------------

new-request name=req35 txn=txn2 ts=11,1
get key=c str=shared dur=r
----

sequence req=req35
----
[35] sequence req35: sequencing request
[35] sequence req35: acquiring latches
[35] sequence req35: scanning lock table for conflicting locks
[35] sequence req35: sequencing complete, returned guard

new-request name=req36 txn=txn2 ts=11,1
scan key=a endkey=f str=shared dur=r
----

sequence req=req36
----
[36] sequence req36: sequencing request
[36] sequence req36: acquiring latches
[36] sequence req36: waiting to acquire write latch ‹/Local/RangeID/1/r/ReplicatedSharedLocksTransactionLatch/"00000002-0000-0000-0000-000000000000"›@0,0, held by write latch ‹/Local/RangeID/1/r/ReplicatedSharedLocksTransactionLatch/"00000002-0000-0000-0000-000000000000"›@0,0
[36] sequence req36: blocked on select in spanlatch.(*Manager).waitForSignal

new-request name=req37 txn=txn1 ts=11,1
get key=c str=shared dur=r
----

sequence req=req37
----
[37] sequence req37: sequencing request
[37] sequence req37: acquiring latches
[37] sequence req37: scanning lock table for conflicting locks
[37] sequence req37: sequencing complete, returned guard


# Unreplicated shared locking request from txn2. Shouldn't conflict on latches.
new-request name=req38 txn=txn2 ts=11,1
get key=c str=shared dur=u
----

sequence req=req38
----
[38] sequence req38: sequencing request
[38] sequence req38: acquiring latches
[38] sequence req38: scanning lock table for conflicting locks
[38] sequence req38: sequencing complete, returned guard

debug-latch-manager
----
write count: 3
read count: 4

finish req=req35
----
[-] finish req35: finishing request
[36] sequence req36: scanning lock table for conflicting locks
[36] sequence req36: sequencing complete, returned guard

finish req=req36
----
[-] finish req36: finishing request

finish req=req37
----
[-] finish req37: finishing request

finish req=req38
----
[-] finish req38: finishing request
11 changes: 6 additions & 5 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1214,13 +1214,16 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
txnEvType = txnRollback
}

// Close all portals, otherwise there will be leftover bytes.
// Close all portals and cursors, otherwise there will be leftover bytes.
ex.extraTxnState.prepStmtsNamespace.closeAllPortals(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
if err := ex.extraTxnState.sqlCursors.closeAll(false /* errorOnWithHold */); err != nil {
log.Warningf(ctx, "error closing cursors: %v", err)
}

var payloadErr error
if closeType == normalClose {
Expand Down Expand Up @@ -1271,17 +1274,15 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
}

if closeType != panicClose {
// Close all statements, prepared portals, and cursors.
// Close all statements and prepared portals. The cursors have already been
// closed.
ex.extraTxnState.prepStmtsNamespace.resetToEmpty(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.resetToEmpty(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
ex.extraTxnState.prepStmtsNamespaceMemAcc.Close(ctx)
if err := ex.extraTxnState.sqlCursors.closeAll(false /* errorOnWithHold */); err != nil {
log.Warningf(ctx, "error closing cursors: %v", err)
}
}

if ex.sessionTracing.Enabled() {
Expand Down
Loading

0 comments on commit 916f096

Please sign in to comment.