Skip to content

Commit

Permalink
kv: don't heap allocate transaction deadline timestamp
Browse files Browse the repository at this point in the history
Fixes #74224.
Second half of #74225.

This commit changes `EndTxnRequest.Deadline` from a nullable `*hlc.Timestamp` to
a non-nullable `hlc.Timestamp`. This avoids the need to heap allocate the
transaction deadline.

In #74224, we saw that in a run of sysbench's `oltp_point_select` workload, this
was the 9th most frequent source of heap allocations, accounting for over **1%**
of total allocations.

```
----------------------------------------------------------+-------------
      flat  flat%   sum%        cum   cum%   calls calls% + context
----------------------------------------------------------+-------------
...
----------------------------------------------------------+-------------
                                          63603658   100% |   github.com/cockroachdb/cockroach/pkg/sql/catalog/descs.(*leasedDescriptors).maybeUpdateDeadline /go/src/github.com/cockroachdb/cockroach/pkg/sql/catalog/descs/leased_descriptors.go:217
  63603658  1.06% 13.90%   63603658  1.06%                | github.com/cockroachdb/cockroach/pkg/kv.(*Txn).UpdateDeadline /go/src/github.com/cockroachdb/cockroach/pkg/kv/txn.go:784
----------------------------------------------------------+-------------
```

Release note: None
  • Loading branch information
nvanbenschoten committed Aug 1, 2022
1 parent b83e83b commit d06a355
Show file tree
Hide file tree
Showing 13 changed files with 42 additions and 46 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,9 +446,9 @@ func (tc *TxnCoordSender) finalizeNonLockingTxnLocked(
et := ba.Requests[0].GetEndTxn()
if et.Commit {
deadline := et.Deadline
if deadline != nil && !deadline.IsEmpty() && deadline.LessEq(tc.mu.txn.WriteTimestamp) {
if !deadline.IsEmpty() && deadline.LessEq(tc.mu.txn.WriteTimestamp) {
txn := tc.mu.txn.Clone()
pErr := generateTxnDeadlineExceededErr(txn, *deadline)
pErr := generateTxnDeadlineExceededErr(txn, deadline)
// We need to bump the epoch and transform this retriable error.
ba.Txn = txn
return tc.updateStateLocked(ctx, ba, nil /* br */, pErr)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ func (tc *txnCommitter) sendLockedWithElidedEndTxn(
// transaction is trying to commit.
if et.Commit {
deadline := et.Deadline
if deadline != nil && !deadline.IsEmpty() && deadline.LessEq(br.Txn.WriteTimestamp) {
return nil, generateTxnDeadlineExceededErr(ba.Txn, *deadline)
if !deadline.IsEmpty() && deadline.LessEq(br.Txn.WriteTimestamp) {
return nil, generateTxnDeadlineExceededErr(ba.Txn, deadline)
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@ func EndTxn(
// IsEndTxnExceedingDeadline returns true if the transaction's provisional
// commit timestamp exceeded its deadline. If so, the transaction should not be
// allowed to commit.
func IsEndTxnExceedingDeadline(commitTS hlc.Timestamp, deadline *hlc.Timestamp) bool {
return deadline != nil && !deadline.IsEmpty() && deadline.LessEq(commitTS)
func IsEndTxnExceedingDeadline(commitTS hlc.Timestamp, deadline hlc.Timestamp) bool {
return !deadline.IsEmpty() && deadline.LessEq(commitTS)
}

// IsEndTxnTriggeringRetryError returns true if the EndTxnRequest cannot be
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) {
commit bool
noLockSpans bool
inFlightWrites []roachpb.SequencedWrite
deadline *hlc.Timestamp
deadline hlc.Timestamp
// Expected result.
expError string
expTxn *roachpb.TransactionRecord
Expand Down Expand Up @@ -917,7 +917,7 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) {
// Sanity check request args.
if !c.commit {
require.Nil(t, c.inFlightWrites)
require.Nil(t, c.deadline)
require.Zero(t, c.deadline)
}

// Issue an EndTxn request.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,13 +557,13 @@ func canDoServersideRetry(
ba *roachpb.BatchRequest,
br *roachpb.BatchResponse,
g *concurrency.Guard,
deadline *hlc.Timestamp,
deadline hlc.Timestamp,
) bool {
if ba.Txn != nil {
if !ba.CanForwardReadTimestamp {
return false
}
if deadline != nil {
if !deadline.IsEmpty() {
log.Fatal(ctx, "deadline passed for transactional request")
}
if etArg, ok := ba.GetArg(roachpb.EndTxn); ok {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -311,7 +312,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
break
}
// If we can retry, set a higher batch timestamp and continue.
if !canDoServersideRetry(ctx, pErr, ba, br, g, nil /* deadline */) {
if !canDoServersideRetry(ctx, pErr, ba, br, g, hlc.Timestamp{} /* deadline */) {
r.store.Metrics().ReadEvaluationServerSideRetryFailure.Inc(1)
break
} else {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -794,7 +795,7 @@ func (r *Replica) handleReadWithinUncertaintyIntervalError(
// Attempt a server-side retry of the request. Note that we pass nil for
// latchSpans, because we have already released our latches and plan to
// re-acquire them if the retry is allowed.
if !canDoServersideRetry(ctx, pErr, ba, nil /* br */, nil /* g */, nil /* deadline */) {
if !canDoServersideRetry(ctx, pErr, ba, nil /* br */, nil /* g */, hlc.Timestamp{} /* deadline */) {
r.store.Metrics().ReadWithinUncertaintyIntervalErrorServerSideRetryFailure.Inc(1)
return nil, pErr
}
Expand Down
16 changes: 6 additions & 10 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3946,15 +3946,13 @@ func TestEndTxnDeadline(t *testing.T) {
// No deadline.
case 1:
// Past deadline.
ts := txn.WriteTimestamp.Prev()
etArgs.Deadline = &ts
etArgs.Deadline = txn.WriteTimestamp.Prev()
case 2:
// Equal deadline.
etArgs.Deadline = &txn.WriteTimestamp
etArgs.Deadline = txn.WriteTimestamp
case 3:
// Future deadline.
ts := txn.WriteTimestamp.Next()
etArgs.Deadline = &ts
etArgs.Deadline = txn.WriteTimestamp.Next()
}

{
Expand Down Expand Up @@ -4016,9 +4014,8 @@ func TestSerializableDeadline(t *testing.T) {
// Send an EndTxn with a deadline below the point where the txn has been
// pushed.
etArgs, etHeader := endTxnArgs(txn, true /* commit */)
deadline := updatedPushee.WriteTimestamp
deadline.Logical--
etArgs.Deadline = &deadline
etArgs.Deadline = updatedPushee.WriteTimestamp
etArgs.Deadline.Logical--
_, pErr = tc.SendWrappedWith(etHeader, &etArgs)
const expectedErrMsg = "TransactionRetryError: retry txn \\(RETRY_SERIALIZABLE\\)"
if pErr == nil {
Expand Down Expand Up @@ -4152,8 +4149,7 @@ func TestEndTxnDeadline_1PC(t *testing.T) {
put := putArgs(key, []byte("value"))
et, etH := endTxnArgs(txn, true)
// Past deadline.
ts := txn.WriteTimestamp.Prev()
et.Deadline = &ts
et.Deadline = txn.WriteTimestamp.Prev()

var ba roachpb.BatchRequest
ba.Header = etH
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func (r *Replica) evaluateWriteBatch(
rec := NewReplicaEvalContext(ctx, r, g.LatchSpans(), ba.RequiresClosedTSOlderThanStorageSnapshot())
defer rec.Release()
batch, br, res, pErr := r.evaluateWriteBatchWithServersideRefreshes(
ctx, idKey, rec, ms, ba, g, st, ui, nil /* deadline */)
ctx, idKey, rec, ms, ba, g, st, ui, hlc.Timestamp{} /* deadline */)
return batch, *ms, br, res, pErr
}

Expand Down Expand Up @@ -606,7 +606,7 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes(
g *concurrency.Guard,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
deadline *hlc.Timestamp,
deadline hlc.Timestamp,
) (batch storage.Batch, br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) {
goldenMS := *ms
for retries := 0; ; retries++ {
Expand Down
28 changes: 13 additions & 15 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ type Txn struct {
// retries).
sender TxnSender

// The txn has to be committed by this deadline. A nil value indicates no
// The txn has to be committed by this deadline. A zero value indicates no
// deadline.
deadline *hlc.Timestamp
deadline hlc.Timestamp
}

// admissionHeader is used for admission control for work done in this
Expand Down Expand Up @@ -772,8 +772,7 @@ func (txn *Txn) UpdateDeadline(ctx context.Context, deadline hlc.Timestamp) erro
"txn has would have no chance to commit. Deadline: %s. Read timestamp: %s Previous Deadline: %s.",
deadline, readTimestamp, txn.mu.deadline)
}
txn.mu.deadline = new(hlc.Timestamp)
*txn.mu.deadline = deadline
txn.mu.deadline = deadline
return nil
}

Expand Down Expand Up @@ -812,8 +811,7 @@ func (txn *Txn) DeadlineLikelySufficient(sv *settings.Values) bool {
roachpb.LEAD_FOR_GLOBAL_READS).Add(int64(time.Second), 0)
}

return txn.mu.deadline != nil &&
!txn.mu.deadline.IsEmpty() &&
return !txn.mu.deadline.IsEmpty() &&
// Avoid trying to get get the txn mutex again by directly
// invoking ProvisionalCommitTimestamp versus calling
// ProvisionalCommitTimestampLocked on the Txn.
Expand All @@ -826,7 +824,7 @@ func (txn *Txn) DeadlineLikelySufficient(sv *settings.Values) bool {

// resetDeadlineLocked resets the deadline.
func (txn *Txn) resetDeadlineLocked() {
txn.mu.deadline = nil
txn.mu.deadline = hlc.Timestamp{}
}

// Rollback sends an EndTxnRequest with Commit=false.
Expand All @@ -850,7 +848,7 @@ func (txn *Txn) rollback(ctx context.Context) *roachpb.Error {
// order to reduce contention by releasing locks. In multi-tenant
// settings, it will be subject to admission control, and the zero
// CreateTime will give it preference within the tenant.
et := endTxnReq(false, nil /* deadline */)
et := endTxnReq(false, hlc.Timestamp{} /* deadline */)
ba := roachpb.BatchRequest{Requests: et.unionArr[:]}
_, pErr := txn.Send(ctx, ba)
if pErr == nil {
Expand All @@ -876,7 +874,7 @@ func (txn *Txn) rollback(ctx context.Context) *roachpb.Error {
// order to reduce contention by releasing locks. In multi-tenant
// settings, it will be subject to admission control, and the zero
// CreateTime will give it preference within the tenant.
et := endTxnReq(false, nil /* deadline */)
et := endTxnReq(false, hlc.Timestamp{} /* deadline */)
ba := roachpb.BatchRequest{Requests: et.unionArr[:]}
_ = contextutil.RunWithTimeout(ctx, "async txn rollback", asyncRollbackTimeout,
func(ctx context.Context) error {
Expand Down Expand Up @@ -918,7 +916,7 @@ type endTxnReqAlloc struct {
unionArr [1]roachpb.RequestUnion
}

func endTxnReq(commit bool, deadline *hlc.Timestamp) *endTxnReqAlloc {
func endTxnReq(commit bool, deadline hlc.Timestamp) *endTxnReqAlloc {
alloc := new(endTxnReqAlloc)
alloc.req.Commit = commit
alloc.req.Deadline = deadline
Expand Down Expand Up @@ -1246,18 +1244,18 @@ func (txn *Txn) applyDeadlineToBoundedStaleness(
ctx context.Context, bs *roachpb.BoundedStalenessHeader,
) error {
d := txn.deadline()
if d == nil {
if d.IsEmpty() {
return nil
}
if d.LessEq(bs.MinTimestampBound) {
return errors.WithContextTags(errors.AssertionFailedf(
"transaction deadline %s equal to or below min_timestamp_bound %s",
*d, bs.MinTimestampBound), ctx)
d, bs.MinTimestampBound), ctx)
}
if bs.MaxTimestampBound.IsEmpty() {
bs.MaxTimestampBound = *d
bs.MaxTimestampBound = d
} else {
bs.MaxTimestampBound.Backward(*d)
bs.MaxTimestampBound.Backward(d)
}
return nil
}
Expand Down Expand Up @@ -1525,7 +1523,7 @@ func (txn *Txn) TestingCloneTxn() *roachpb.Transaction {
return txn.mu.sender.TestingCloneTxn()
}

func (txn *Txn) deadline() *hlc.Timestamp {
func (txn *Txn) deadline() hlc.Timestamp {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.deadline
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,14 +511,14 @@ func TestUpdateDeadlineMaybe(t *testing.T) {
}), clock, stopper)
txn := NewTxn(ctx, db, 0 /* gatewayNodeID */)

if txn.deadline() != nil {
if !txn.deadline().IsEmpty() {
t.Errorf("unexpected initial deadline: %s", txn.deadline())
}

deadline := hlc.Timestamp{WallTime: 10, Logical: 1}
err := txn.UpdateDeadline(ctx, deadline)
require.NoError(t, err, "Deadline update failed")
if d := *txn.deadline(); d != deadline {
if d := txn.deadline(); d != deadline {
t.Errorf("unexpected deadline: %s", d)
}

Expand All @@ -527,14 +527,14 @@ func TestUpdateDeadlineMaybe(t *testing.T) {
futureDeadline := hlc.Timestamp{WallTime: 11, Logical: 1}
err = txn.UpdateDeadline(ctx, futureDeadline)
require.NoError(t, err, "Future deadline update failed")
if d := *txn.deadline(); d != futureDeadline {
if d := txn.deadline(); d != futureDeadline {
t.Errorf("unexpected deadline: %s", d)
}

pastDeadline := hlc.Timestamp{WallTime: 9, Logical: 1}
err = txn.UpdateDeadline(ctx, pastDeadline)
require.NoError(t, err, "Past deadline update failed")
if d := *txn.deadline(); d != pastDeadline {
if d := txn.deadline(); d != pastDeadline {
t.Errorf("unexpected deadline: %s", d)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ message EndTxnRequest {
// If EndTxn(Commit=true) finds that the txn's timestamp has been pushed above
// this deadline, an error will be returned and the client is supposed to
// rollback the txn.
util.hlc.Timestamp deadline = 3;
util.hlc.Timestamp deadline = 3 [(gogoproto.nullable) = false];
// commit triggers. Note that commit triggers are for
// internal use only and will cause an error if requested through the
// external-facing KV API.
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,10 +1143,10 @@ func TestTransactionDeadline(t *testing.T) {

if args, ok := ba.GetArg(roachpb.EndTxn); ok {
et := args.(*roachpb.EndTxnRequest)
if et.Deadline == nil || et.Deadline.IsEmpty() {
if et.Deadline.IsEmpty() {
return nil
}
mu.txnDeadline = *et.Deadline
mu.txnDeadline = et.Deadline
}
return nil
}
Expand Down

0 comments on commit d06a355

Please sign in to comment.