Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: emit point deletes during delete fastpath #63416

Merged
merged 1 commit into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 22.1-44 set the active cluster version in the format '<major>.<minor>'
version version 22.1-46 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-44</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-46</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
2 changes: 1 addition & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,7 @@ func removeDeadReplicas(
// A crude form of the intent resolution process: abort the
// transaction by deleting its record.
txnKey := keys.TransactionKey(intent.Txn.Key, intent.Txn.ID)
if err := storage.MVCCDelete(ctx, batch, &ms, txnKey, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil); err != nil {
if _, err := storage.MVCCDelete(ctx, batch, &ms, txnKey, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil); err != nil {
return nil, err
}
update := roachpb.LockUpdate{
Expand Down
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ const (
// SchemaChangeSupportsCreateFunction adds support of CREATE FUNCTION
// statement.
SchemaChangeSupportsCreateFunction
// DeleteRequestReturnKey is the version where the DeleteRequest began
// populating the FoundKey value in the response.
DeleteRequestReturnKey

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -604,6 +607,10 @@ var versionsSingleton = keyedVersions{
Key: SchemaChangeSupportsCreateFunction,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 44},
},
{
Key: DeleteRequestReturnKey,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 46},
},
// *************************************************
// Step (2): Add new versions here.
// Do not add new versions to a patch release.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 13 additions & 6 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,14 @@ func (b *Batch) fillResults(ctx context.Context) {
}
}
case *roachpb.DeleteRequest:
row := &result.Rows[k]
row.Key = []byte(args.(*roachpb.DeleteRequest).Key)
if result.Err == nil {
resp := reply.(*roachpb.DeleteResponse)
if resp.FoundKey {
// Accumulate all keys that were deleted as part of a
// single Del() operation.
result.Keys = append(result.Keys, args.(*roachpb.DeleteRequest).Key)
}
}
case *roachpb.DeleteRangeRequest:
if result.Err == nil {
result.Keys = reply.(*roachpb.DeleteRangeResponse).Keys
Expand Down Expand Up @@ -609,23 +615,24 @@ func (b *Batch) ReverseScanForUpdate(s, e interface{}) {

// Del deletes one or more keys.
//
// A new result will be appended to the batch and each key will have a
// corresponding row in the returned Result.
// A new result will be appended to the batch which will contain 0 rows and
// Result.Err will indicate success or failure. Each key will be included in
// Result.Keys if it was actually deleted.
//
// key can be either a byte slice or a string.
func (b *Batch) Del(keys ...interface{}) {
reqs := make([]roachpb.Request, 0, len(keys))
for _, key := range keys {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, len(keys), notRaw, err)
b.initResult(len(keys), 0, notRaw, err)
return
}
reqs = append(reqs, roachpb.NewDelete(k))
b.approxMutationReqBytes += len(k)
}
b.appendReqs(reqs...)
b.initResult(len(reqs), len(reqs), notRaw, nil)
b.initResult(len(reqs), 0, notRaw, nil)
}

// DelRange deletes the rows between begin (inclusive) and end (exclusive).
Expand Down
13 changes: 8 additions & 5 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ type Result struct {
// Err contains any error encountered when performing the operation.
Err error
// Rows contains the key/value pairs for the operation. The number of rows
// returned varies by operation. For Get, Put, CPut, Inc and Del the number
// returned varies by operation. For Get, Put, CPut, and Inc the number
// of rows returned is the number of keys operated on. For Scan the number of
// rows returned is the number or rows matching the scan capped by the
// maxRows parameter and other options. For DelRange Rows is nil.
// maxRows parameter and other options. For Del and DelRange Rows is nil.
Rows []KeyValue

// Keys is set by some operations instead of returning the rows themselves.
// Keys is set by Del and DelRange instead of returning the rows themselves.
Keys []roachpb.Key

// ResumeSpan is the span to be used on the next operation in a
Expand Down Expand Up @@ -526,11 +526,14 @@ func (db *DB) ReverseScanForUpdate(

// Del deletes one or more keys.
//
// The returned []roachpb.Key will contain the keys that were actually deleted.
//
// key can be either a byte slice or a string.
func (db *DB) Del(ctx context.Context, keys ...interface{}) error {
func (db *DB) Del(ctx context.Context, keys ...interface{}) ([]roachpb.Key, error) {
b := &Batch{}
b.Del(keys...)
return getOneErr(db.Run(ctx, b), b)
r, err := getOneResult(db.Run(ctx, b), b)
return r.Keys, err
}

// DelRange deletes the rows between begin (inclusive) and end (exclusive).
Expand Down
14 changes: 11 additions & 3 deletions pkg/kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func TestDB_InitPut(t *testing.T) {
if err := db.InitPut(ctx, "aa", "2", false); err == nil {
t.Fatal("expected error from init put")
}
if err := db.Del(ctx, "aa"); err != nil {
if _, err := db.Del(ctx, "aa"); err != nil {
t.Fatal(err)
}
if err := db.InitPut(ctx, "aa", "2", true); err == nil {
Expand Down Expand Up @@ -479,9 +479,16 @@ func TestDB_Del(t *testing.T) {
if err := db.Run(context.Background(), b); err != nil {
t.Fatal(err)
}
if err := db.Del(context.Background(), "ab"); err != nil {
if _, err := db.Del(context.Background(), "ab"); err != nil {
t.Fatal(err)
}
// Also try deleting a non-existent key and verify that no key is
// returned.
if deletedKeys, err := db.Del(context.Background(), "ad"); err != nil {
t.Fatal(err)
} else if len(deletedKeys) > 0 {
t.Errorf("expected deleted key to be empty when deleting a non-existent key, got %v", deletedKeys)
}
rows, err := db.Scan(context.Background(), "a", "b", 100)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -652,7 +659,8 @@ func TestDBDecommissionedOperations(t *testing.T) {
op func() error
}{
{"Del", func() error {
return db.Del(ctx, key)
_, err := db.Del(ctx, key)
return err
}},
{"DelRange", func() error {
_, err := db.DelRange(ctx, key, keyEnd, false)
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,8 @@ func (ds *DistSender) initAndVerifyBatch(
// Accepted reverse range requests.
foundReverse = true

case *roachpb.QueryIntentRequest, *roachpb.EndTxnRequest, *roachpb.GetRequest:
case *roachpb.QueryIntentRequest, *roachpb.EndTxnRequest,
*roachpb.GetRequest, *roachpb.DeleteRequest:
// Accepted point requests that can be in batches with limit.

default:
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2603,7 +2603,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
{
name: "write too old with initput failing on tombstone before",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Del(ctx, "iput")
_, err := db.Del(ctx, "iput")
return err
},
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "iput", "put2")
Expand All @@ -2619,7 +2620,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
return db.Put(ctx, "iput", "put")
},
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Del(ctx, "iput")
_, err := db.Del(ctx, "iput")
return err
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.InitPut(ctx, "iput", "put", true)
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1911,7 +1911,10 @@ func TestCommitMutatingTransaction(t *testing.T) {
pointWrite: true,
},
{
f: func(ctx context.Context, txn *kv.Txn) error { return txn.Del(ctx, "a") },
f: func(ctx context.Context, txn *kv.Txn) error {
_, err := txn.Del(ctx, "a")
return err
},
expMethod: roachpb.Delete,
pointWrite: true,
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_correctness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func readCmd(ctx context.Context, c *cmd, txn *kv.Txn) error {

// deleteCmd deletes the value at the given key from the db.
func deleteCmd(ctx context.Context, c *cmd, txn *kv.Txn) error {
return txn.Del(ctx, c.getKey())
_, err := txn.Del(ctx, c.getKey())
return err
}

// deleteRngCmd deletes the range of values from the db from [key, endKey).
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,8 @@ func TestRangefeedValueTimestamps(t *testing.T) {

{
beforeDelTS := db.Clock().Now()
require.NoError(t, db.Del(ctx, mkKey("a")))
_, err = db.Del(ctx, mkKey("a"))
require.NoError(t, err)
afterDelTS := db.Clock().Now()

v := <-rows
Expand All @@ -454,7 +455,8 @@ func TestRangefeedValueTimestamps(t *testing.T) {

{
beforeDelTS := db.Clock().Now()
require.NoError(t, db.Del(ctx, mkKey("a")))
_, err = db.Del(ctx, mkKey("a"))
require.NoError(t, err)
afterDelTS := db.Clock().Now()

v := <-rows
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeedcache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func TestCache(t *testing.T) {
require.NoError(t, txn.Put(ctx, mkKey("d"), 1))
})
writeAndCheck(t, func(t *testing.T, txn *kv.Txn) {
require.NoError(t, txn.Del(ctx, mkKey("a")))
_, err := txn.Del(ctx, mkKey("a"))
require.NoError(t, err)
})
writeAndCheck(t, func(t *testing.T, txn *kv.Txn) {
_, err := txn.DelRange(ctx, mkKey("a"), mkKey("c"), false)
Expand Down
14 changes: 11 additions & 3 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ type clientI interface {
ScanForUpdate(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error)
ReverseScan(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error)
ReverseScanForUpdate(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error)
Del(context.Context, ...interface{}) error
Del(context.Context, ...interface{}) ([]roachpb.Key, error)
DelRange(context.Context, interface{}, interface{}, bool) ([]roachpb.Key, error)
Run(context.Context, *kv.Batch) error
}
Expand Down Expand Up @@ -208,8 +208,16 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
}
}
case *DeleteOperation:
err := db.Del(ctx, o.Key)
o.Result = resultError(ctx, err)
deletedKeys, err := db.Del(ctx, o.Key)
if err != nil {
o.Result = resultError(ctx, err)
} else {
o.Result.Type = ResultType_Keys
o.Result.Keys = make([][]byte, len(deletedKeys))
for i, deletedKey := range deletedKeys {
o.Result.Keys[i] = deletedKey
}
}
case *DeleteRangeOperation:
if !inTxn {
panic(errors.AssertionFailedf(`non-transactional DelRange operations currently unsupported`))
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvnemesis/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ func TestApplier(t *testing.T) {
check(t, step(reverseScan(`a`, `c`)), `db0.ReverseScan(ctx, "a", "c", 0) // (["b":"2", "a":"1"], nil)`)
check(t, step(reverseScanForUpdate(`a`, `b`)), `db1.ReverseScanForUpdate(ctx, "a", "b", 0) // (["a":"1"], nil)`)

check(t, step(del(`b`)), `db0.Del(ctx, "b") // nil`)
check(t, step(del(`b`)), `db0.Del(ctx, "b")`)
check(t, step(get(`b`)), `db1.Get(ctx, "b") // (nil, nil)`)

check(t, step(put(`c`, `3`)), `db0.Put(ctx, "c", 3) // nil`)
check(t, step(put(`d`, `4`)), `db1.Put(ctx, "d", 4) // nil`)

check(t, step(del(`c`)), `db0.Del(ctx, "c") // nil`)
check(t, step(del(`c`)), `db0.Del(ctx, "c")`)
check(t, step(scan(`a`, `e`)), `db1.Scan(ctx, "a", "e", 0) // (["a":"1", "d":"4"], nil)`)

check(t, step(put(`c`, `5`)), `db0.Put(ctx, "c", 5) // nil`)
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/abortspan/abortspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ func (sc *AbortSpan) Del(
ctx context.Context, reader storage.ReadWriter, ms *enginepb.MVCCStats, txnID uuid.UUID,
) error {
key := keys.AbortSpanKey(sc.rangeID, txnID)
return storage.MVCCDelete(ctx, reader, ms, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil /* txn */)
_, err := storage.MVCCDelete(ctx, reader, ms, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil /* txn */)
return err
}

// Put writes an entry for the specified transaction ID.
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ func Delete(
) (result.Result, error) {
args := cArgs.Args.(*roachpb.DeleteRequest)
h := cArgs.Header
reply := resp.(*roachpb.DeleteResponse)

err := storage.MVCCDelete(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, cArgs.Now, h.Txn)
var err error
reply.FoundKey, err = storage.MVCCDelete(
ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, cArgs.Now, h.Txn,
)
// NB: even if MVCC returns an error, it may still have written an intent
// into the batch. This allows callers to consume errors like WriteTooOld
// without re-evaluating the batch. This behavior isn't particularly
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_delete_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func TestDeleteRangeTombstone(t *testing.T) {
require.NoError(t, storage.MVCCPut(ctx, rw, nil, roachpb.Key("b"), hlc.Timestamp{WallTime: 2e9}, localTS, roachpb.MakeValueFromString("b2"), nil))
require.NoError(t, storage.MVCCPut(ctx, rw, nil, roachpb.Key("c"), hlc.Timestamp{WallTime: 4e9}, localTS, roachpb.MakeValueFromString("c4"), nil))
require.NoError(t, storage.MVCCPut(ctx, rw, nil, roachpb.Key("d"), hlc.Timestamp{WallTime: 2e9}, localTS, roachpb.MakeValueFromString("d2"), nil))
require.NoError(t, storage.MVCCDelete(ctx, rw, nil, roachpb.Key("d"), hlc.Timestamp{WallTime: 3e9}, localTS, nil))
_, err := storage.MVCCDelete(ctx, rw, nil, roachpb.Key("d"), hlc.Timestamp{WallTime: 3e9}, localTS, nil)
require.NoError(t, err)
require.NoError(t, storage.MVCCPut(ctx, rw, nil, roachpb.Key("i"), hlc.Timestamp{WallTime: 5e9}, localTS, roachpb.MakeValueFromString("i5"), &txn))
require.NoError(t, storage.MVCCDeleteRangeUsingTombstone(ctx, rw, nil, roachpb.Key("f"), roachpb.Key("h"), hlc.Timestamp{WallTime: 3e9}, localTS, nil, nil, 0, nil))
require.NoError(t, storage.MVCCDeleteRangeUsingTombstone(ctx, rw, nil, roachpb.Key("Z"), roachpb.Key("a"), hlc.Timestamp{WallTime: 100e9}, localTS, nil, nil, 0, nil))
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,8 @@ func updateFinalizedTxn(
// BatchRequest writes.
return nil
}
return storage.MVCCDelete(ctx, readWriter, ms, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil)
_, err := storage.MVCCDelete(ctx, readWriter, ms, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil)
return err
}
txn.LockSpans = externalLocks
txn.InFlightWrites = nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_is_span_empty_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func TestIsSpanEmpty(t *testing.T) {
requireEmpty(t, mkKey(""), mkKey("x"))
requireNotEmpty(t, mkKey(""), mkKey("").PrefixEnd())

require.NoError(t, kvDB.Del(ctx, mkKey("x")))
_, err := kvDB.Del(ctx, mkKey("x"))
require.NoError(t, err)
requireEmpty(t, mkKey(""), mkKey("x"))
requireNotEmpty(t, mkKey(""), mkKey("").PrefixEnd())

Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_query_intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func TestQueryIntent(t *testing.T) {

writeIntent := func(k roachpb.Key, ts int64) roachpb.Transaction {
txn := roachpb.MakeTransaction("test", k, 0, makeTS(ts), 0, 1)
require.NoError(t, storage.MVCCDelete(ctx, db, nil, k, makeTS(ts), hlc.ClockTimestamp{}, &txn))
_, err := storage.MVCCDelete(ctx, db, nil, k, makeTS(ts), hlc.ClockTimestamp{}, &txn)
require.NoError(t, err)
return txn
}

Expand Down
Loading