Skip to content

Commit

Permalink
Merge #83366 #83629
Browse files Browse the repository at this point in the history
83366: kv: fix conflict resolution for high-priority, non-txn'al requests r=andreimatei a=nvanbenschoten

Fixes #83342.

This commit reworks the behavior of non-transactional requests in lock
wait-queues when the lock holder or the lock queue waiter has an extreme
priority (min or max priority). In such cases, we allow the lock queue
waiter to immediately push the lock holder out of its way, either by
moving its timestamp to resolve a read-write conflict or aborting it to
resolve a write-write conflict.

This handling was broken in two ways for non-transactional requests.
1. these requests' priorities were not consulted when deciding whether
   to immediately push instead of temporarily delaying while waiting in the
   lock wait-queue. This meant that a high-priority, non-txn request might
   still wait for 50ms (kv.lock_table.coordinator_liveness_push_delay)
   before pushing a lower priority lock holder out of its way.
2. worse, it was possible that if these requests were not in the front
   of a lock wait-queue, they might never push. This was because we had
   logic that disabled a push if it was not needed for the purposes of
   checking liveness, detecting deadlocks, or enforcing timeouts.

This commit resolves both of these issues. It also improves the testing
of transaction priorities in the `kv/kvserver/concurrency` package.
Finally, it consolidates the determination of when a pusher should be
able to push/abort a pushee into a single location.

Release note (bug fix): a bug in transaction conflict resolution which
could allow backups to wait on long-running transactions has been
resolved.

83629: util/mon: pass reserved account by reference r=yuzefovich a=yuzefovich

**util/mon: pass reserved account by reference**

This commit makes it so that we now pass the `reserved` memory account
when starting up memory monitors by reference. Previously, due to
passing by value, when the monitor is stopped, the copy of the value
would get used so that the actual "reserved" memory account would get
out of sync with its user. This is now fixed. However, this bug doesn't
really have any production impact since we use this "reserved" feature
in a handful of places and these "reserved" memory accounts are not
reused between different usages.

Additionally, this commit renames `MakeStandaloneBudget` to
`NewStandaloneBudget` (since it now returns a reference) and adds
a separate "start" method when the caller doesn't want to pre-reserve
anything when starting up a monitor.

Release note: None

**sql: make sure to close the reserved account for each session**

This commit makes it more clear that the "reserved" memory account
created for each connection is closed when that connection is closed.
Previously, the account was already cleared (when "session root" monitor
is stopped which happens when the connExecutor stops), so there was no
leak in the accounting system because of it, yet it wasn't obvious - this
commit makes it more obvious.

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Jul 6, 2022
3 parents 90b15cf + f6a7a2c + f1a8710 commit d239a2f
Show file tree
Hide file tree
Showing 82 changed files with 988 additions and 192 deletions.
2 changes: 1 addition & 1 deletion pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func TempStorageConfigFromEnv(
maxSizeBytes/10, /* noteworthy */
st,
)
monitor.Start(ctx, nil /* pool */, mon.MakeStandaloneBudget(maxSizeBytes))
monitor.Start(ctx, nil /* pool */, mon.NewStandaloneBudget(maxSizeBytes))
return TempStorageConfig{
InMemory: inMem,
Mon: monitor,
Expand Down
2 changes: 1 addition & 1 deletion pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func DefaultTestTempStorageConfigWithSize(
maxSizeBytes/10, /* noteworthy */
st,
)
monitor.Start(context.Background(), nil /* pool */, mon.MakeStandaloneBudget(maxSizeBytes))
monitor.Start(context.Background(), nil /* pool */, mon.NewStandaloneBudget(maxSizeBytes))
return TempStorageConfig{
InMemory: true,
Mon: monitor,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8027,7 +8027,7 @@ func TestReadBackupManifestMemoryMonitoring(t *testing.T) {
require.NoError(t, err)

m := mon.NewMonitor("test-monitor", mon.MemoryResource, nil, nil, 0, 0, st)
m.Start(ctx, nil, mon.MakeStandaloneBudget(128<<20))
m.Start(ctx, nil, mon.NewStandaloneBudget(128<<20))
mem := m.MakeBoundAccount()
encOpts := &jobspb.BackupEncryptionOptions{
Mode: jobspb.EncryptionMode_Passphrase,
Expand Down Expand Up @@ -9040,7 +9040,7 @@ func TestBackupMemMonitorSSTSinkQueueSize(t *testing.T) {
)
ctx := context.Background()
byteLimit := 14 << 20 // 14 MiB
memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(int64(byteLimit)))
memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(int64(byteLimit)))
defer memoryMonitor.Stop(ctx)
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
}
limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.flowCtx.Cfg.Settings.SV)
kvFeedMemMon := mon.NewMonitorInheritWithLimit("kvFeed", limit, pool)
kvFeedMemMon.Start(ctx, pool, mon.BoundAccount{})
kvFeedMemMon.StartNoReserved(ctx, pool)
ca.kvFeedMemMon = kvFeedMemMon

// The job registry has a set of metrics used to monitor the various jobs it
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6469,7 +6469,7 @@ func startMonitorWithBudget(budget int64) *mon.BytesMonitor {
nil, nil,
128 /* small allocation increment */, 100,
cluster.MakeTestingClusterSettings())
mm.Start(context.Background(), nil, mon.MakeStandaloneBudget(budget))
mm.Start(context.Background(), nil, mon.NewStandaloneBudget(budget))
return mm
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func getBoundAccountWithBudget(budget int64) (account mon.BoundAccount, cleanup
nil, nil,
128 /* small allocation increment */, 100,
cluster.MakeTestingClusterSettings())
mm.Start(context.Background(), nil, mon.MakeStandaloneBudget(budget))
mm.Start(context.Background(), nil, mon.NewStandaloneBudget(budget))
return mm.MakeBoundAccount(), func() { mm.Stop(context.Background()) }
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/bulk/kv_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ func TestKvBuf(t *testing.T) {
src, totalSize := makeTestData(50000)

ctx := context.Background()
none := mon.NewMonitorWithLimit("none", mon.MemoryResource, 0, nil, nil, 0, 0, nil).MakeBoundAccount()
noneMonitor := mon.NewMonitorWithLimit("none", mon.MemoryResource, 0, nil, nil, 0, 0, nil)
noneMonitor.StartNoReserved(ctx, nil /* pool */)
none := noneMonitor.MakeBoundAccount()
lots := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil).MakeBoundAccount()

// Write everything to our buf.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvstreamer/results_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestInOrderResultsBuffer(t *testing.T) {
math.MaxInt64, /* noteworthy */
st,
)
diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64))
diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64))
defer diskMonitor.Stop(ctx)

budget := newBudget(nil /* acc */, math.MaxInt /* limitBytes */)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvstreamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestStreamerBudgetErrorInEnqueue(t *testing.T) {
math.MaxInt64, /* noteworthy */
cluster.MakeTestingClusterSettings(),
)
rootMemMonitor.Start(ctx, nil /* pool */, mon.MakeStandaloneBudget(rootPoolSize))
rootMemMonitor.Start(ctx, nil /* pool */, mon.NewStandaloneBudget(rootPoolSize))
defer rootMemMonitor.Stop(ctx)

acc := rootMemMonitor.MakeBoundAccount()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/rangefeed/db_adapter_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func startMonitorWithBudget(budget int64) *mon.BytesMonitor {
nil, nil,
128 /* small allocation increment */, 100,
cluster.MakeTestingClusterSettings())
mm.Start(context.Background(), nil, mon.MakeStandaloneBudget(budget))
mm.Start(context.Background(), nil, mon.NewStandaloneBudget(budget))
return mm
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func PushTxn(
// If just attempting to cleanup old or already-committed txns,
// pusher always fails.
pusherWins = false
case CanPushWithPriority(&args.PusherTxn, &reply.PusheeTxn):
case txnwait.CanPushWithPriority(args.PusherTxn.Priority, reply.PusheeTxn.Priority):
reason = "pusher has priority"
pusherWins = true
case args.Force:
Expand Down
7 changes: 0 additions & 7 deletions pkg/kv/kvserver/batcheval/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,6 @@ func UpdateAbortSpan(
return rec.AbortSpan().Put(ctx, readWriter, ms, txn.ID, &curEntry)
}

// CanPushWithPriority returns true if the given pusher can push the pushee
// based on its priority.
func CanPushWithPriority(pusher, pushee *roachpb.Transaction) bool {
return (pusher.Priority > enginepb.MinTxnPriority && pushee.Priority == enginepb.MinTxnPriority) ||
(pusher.Priority == enginepb.MaxTxnPriority && pushee.Priority < pusher.Priority)
}

// CanCreateTxnRecord determines whether a transaction record can be created for
// the provided transaction. If not, the function will return an error. If so,
// the function may modify the provided transaction.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ type Request struct {
Timestamp hlc.Timestamp

// The priority of the request. Only set if Txn is nil.
Priority roachpb.UserPriority
NonTxnPriority roachpb.UserPriority

// The consistency level of the request. Only set if Txn is nil.
ReadConsistency roachpb.ReadConsistencyType
Expand Down
63 changes: 48 additions & 15 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import (
//
// The input files use the following DSL:
//
// new-txn name=<txn-name> ts=<int>[,<int>] epoch=<int> [uncertainty-limit=<int>[,<int>]]
// new-txn name=<txn-name> ts=<int>[,<int>] [epoch=<int>] [priority] [uncertainty-limit=<int>[,<int>]]
// new-request name=<req-name> txn=<txn-name>|none ts=<int>[,<int>] [priority] [inconsistent] [wait-policy=<policy>] [lock-timeout] [max-lock-wait-queue-length=<int>] [poison-policy=[err|wait]]
// <proto-name> [<field-name>=<field-value>...] (hint: see scanSingleRequest)
// sequence req=<req-name> [eval-kind=<pess|opt|pess-after-opt]
Expand Down Expand Up @@ -104,8 +104,12 @@ func TestConcurrencyManagerBasic(t *testing.T) {
d.ScanArgs(t, "name", &txnName)
ts := scanTimestamp(t, d)

var epoch int
d.ScanArgs(t, "epoch", &epoch)
epoch := 0
if d.HasArg("epoch") {
d.ScanArgs(t, "epoch", &epoch)
}

priority := scanTxnPriority(t, d)

uncertaintyLimit := ts
if d.HasArg("uncertainty-limit") {
Expand All @@ -125,7 +129,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
Epoch: enginepb.TxnEpoch(epoch),
WriteTimestamp: ts,
MinTimestamp: ts,
Priority: 1, // not min or max
Priority: priority,
},
ReadTimestamp: ts,
GlobalUncertaintyLimit: uncertaintyLimit,
Expand Down Expand Up @@ -159,6 +163,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
readConsistency = roachpb.INCONSISTENT
}

priority := scanUserPriority(t, d)
waitPolicy := scanWaitPolicy(t, d, false /* required */)

var lockTimeout time.Duration
Expand All @@ -182,9 +187,9 @@ func TestConcurrencyManagerBasic(t *testing.T) {
latchSpans, lockSpans := c.collectSpans(t, txn, ts, reqs)

c.requestsByName[reqName] = concurrency.Request{
Txn: txn,
Timestamp: ts,
// TODO(nvanbenschoten): test Priority
Txn: txn,
Timestamp: ts,
NonTxnPriority: priority,
ReadConsistency: readConsistency,
WaitPolicy: waitPolicy,
LockTimeout: lockTimeout,
Expand Down Expand Up @@ -693,19 +698,47 @@ func (c *cluster) PushTransaction(
}
defer c.unregisterPush(push)
}
var pusherPriority enginepb.TxnPriority
if h.Txn != nil {
pusherPriority = h.Txn.Priority
} else {
pusherPriority = roachpb.MakePriority(h.UserPriority)
}
pushTo := h.Timestamp.Next()
for {
// Is the pushee pushed?
pusheeTxn, pusheeRecordSig := pusheeRecord.asTxn()
var pushed bool
switch pushType {
case roachpb.PUSH_TIMESTAMP:
pushed = h.Timestamp.Less(pusheeTxn.WriteTimestamp) || pusheeTxn.Status.IsFinalized()
case roachpb.PUSH_ABORT, roachpb.PUSH_TOUCH:
pushed = pusheeTxn.Status.IsFinalized()
// NOTE: this logic is adapted from cmd_push_txn.go.
var pusherWins bool
switch {
case pusheeTxn.Status.IsFinalized():
// Already finalized.
return pusheeTxn, nil
case pushType == roachpb.PUSH_TIMESTAMP && pushTo.LessEq(pusheeTxn.WriteTimestamp):
// Already pushed.
return pusheeTxn, nil
case pushType == roachpb.PUSH_TOUCH:
pusherWins = false
case txnwait.CanPushWithPriority(pusherPriority, pusheeTxn.Priority):
pusherWins = true
default:
return nil, roachpb.NewErrorf("unexpected push type: %s", pushType)
pusherWins = false
}
if pushed {
if pusherWins {
switch pushType {
case roachpb.PUSH_ABORT:
log.Eventf(ctx, "pusher aborted pushee")
err = c.updateTxnRecord(pusheeTxn.ID, roachpb.ABORTED, pusheeTxn.WriteTimestamp)
case roachpb.PUSH_TIMESTAMP:
log.Eventf(ctx, "pusher pushed pushee to %s", pushTo)
err = c.updateTxnRecord(pusheeTxn.ID, pusheeTxn.Status, pushTo)
default:
err = errors.Errorf("unexpected push type: %s", pushType)
}
if err != nil {
return nil, roachpb.NewError(err)
}
pusheeTxn, _ = pusheeRecord.asTxn()
return pusheeTxn, nil
}
// If PUSH_TOUCH, return error instead of waiting.
Expand Down
35 changes: 35 additions & 0 deletions pkg/kv/kvserver/concurrency/datadriven_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,41 @@ func scanTimestampWithName(t *testing.T, d *datadriven.TestData, name string) hl
return ts
}

func scanTxnPriority(t *testing.T, d *datadriven.TestData) enginepb.TxnPriority {
priority := scanUserPriority(t, d)
// NB: don't use roachpb.MakePriority to avoid randomness.
switch priority {
case roachpb.MinUserPriority:
return enginepb.MinTxnPriority
case roachpb.NormalUserPriority:
return 1 // not min nor max
case roachpb.MaxUserPriority:
return enginepb.MaxTxnPriority
default:
d.Fatalf(t, "unknown priority: %s", priority)
return 0
}
}

func scanUserPriority(t *testing.T, d *datadriven.TestData) roachpb.UserPriority {
const key = "priority"
priS := "normal"
if d.HasArg(key) {
d.ScanArgs(t, key, &priS)
}
switch priS {
case "low":
return roachpb.MinUserPriority
case "normal":
return roachpb.NormalUserPriority
case "high":
return roachpb.MaxUserPriority
default:
d.Fatalf(t, "unknown priority: %s", priS)
return 0
}
}

func scanLockStrength(t *testing.T, d *datadriven.TestData) lock.Strength {
var strS string
d.ScanArgs(t, "strength", &strS)
Expand Down
42 changes: 28 additions & 14 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -221,9 +222,15 @@ func (w *lockTableWaiterImpl) WaitOn(
// still active.
timeoutPush := req.LockTimeout != 0

// If the pushee has the minimum priority or if the pusher has the
// maximum priority, push immediately to proceed without queueing.
// The push should succeed without entering the txn wait-queue.
priorityPush := canPushWithPriority(req, state)

// If the request doesn't want to perform a delayed push for any
// reason, continue waiting without a timer.
if !livenessPush && !deadlockPush && !timeoutPush {
if !(livenessPush || deadlockPush || timeoutPush || priorityPush) {
log.Eventf(ctx, "not pushing")
continue
}

Expand All @@ -250,15 +257,15 @@ func (w *lockTableWaiterImpl) WaitOn(
}
delay = minDuration(delay, w.timeUntilDeadline(lockDeadline))
}

// However, if the pushee has the minimum priority or if the
// pusher has the maximum priority, push immediately.
// TODO(nvanbenschoten): flesh these interactions out more and
// add some testing.
if hasMinPriority(state.txn) || hasMaxPriority(req.Txn) {
if priorityPush {
delay = 0
}

log.Eventf(ctx, "pushing after %s for: "+
"liveness detection = %t, deadlock detection = %t, "+
"timeout enforcement = %t, priority enforcement = %t",
delay, livenessPush, deadlockPush, timeoutPush, priorityPush)

if delay > 0 {
if timer == nil {
timer = timeutil.NewTimer()
Expand Down Expand Up @@ -754,7 +761,7 @@ func (w *lockTableWaiterImpl) pushRequestTxn(
func (w *lockTableWaiterImpl) pushHeader(req Request) roachpb.Header {
h := roachpb.Header{
Timestamp: req.Timestamp,
UserPriority: req.Priority,
UserPriority: req.NonTxnPriority,
}
if req.Txn != nil {
// We are going to hand the header (and thus the transaction proto) to
Expand Down Expand Up @@ -1224,12 +1231,19 @@ func newWriteIntentErr(
return err
}

func hasMinPriority(txn *enginepb.TxnMeta) bool {
return txn != nil && txn.Priority == enginepb.MinTxnPriority
}

func hasMaxPriority(txn *roachpb.Transaction) bool {
return txn != nil && txn.Priority == enginepb.MaxTxnPriority
func canPushWithPriority(req Request, s waitingState) bool {
var pusher, pushee enginepb.TxnPriority
if req.Txn != nil {
pusher = req.Txn.Priority
} else {
pusher = roachpb.MakePriority(req.NonTxnPriority)
}
if s.txn == nil {
// Can't push a non-transactional request.
return false
}
pushee = s.txn.Priority
return txnwait.CanPushWithPriority(pusher, pushee)
}

func minDuration(a, b time.Duration) time.Duration {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ func TestLockTableWaiterWithNonTxn(t *testing.T) {
reqHeaderTS := hlc.Timestamp{WallTime: 10}
makeReq := func() Request {
return Request{
Timestamp: reqHeaderTS,
Priority: roachpb.NormalUserPriority,
Timestamp: reqHeaderTS,
NonTxnPriority: roachpb.NormalUserPriority,
}
}

Expand Down
Loading

0 comments on commit d239a2f

Please sign in to comment.