Skip to content

Commit

Permalink
Merge #39714
Browse files Browse the repository at this point in the history
39714: client: force CPut callers to use a *roachpb.Value expected value r=tbg a=danhhz

As seen in #38004 and #38147, this first came up in the context of CPut
with a proto expected value. If the serialization didn't exactly match,
the CPut would get a condition failed error, even if the two protos were
logically equivalent (missing field vs field with default value). We
were hitting this in a mixed version cluster when trying to add a
non-nullable field to RangeDescriptor, which is updated via CPut to
detect update races. The new fields ended up having to be nullable,
which has allocation consequences as well as ergonomic ones.

In #38302, we changed the RangeDescriptor CPut to use a *roachpb.Value
exactly as it was read from KV for the expected value, but no other
callsites were updated. To prevent future issues, this commit changes
the CPut signature to accept a *roachpb.Value for the expected value
instead of an interface{}.

The old method signature is left as CPutDeprecated for now. The CPut
usages that are trivial to switch are switched and ones that require any
thought are left for a followup PR.

Release note: None

Co-authored-by: Daniel Harrison <[email protected]>
  • Loading branch information
craig[bot] and danhhz committed Aug 20, 2019
2 parents d6e63ea + a32c7d2 commit dc70bfd
Show file tree
Hide file tree
Showing 16 changed files with 134 additions and 46 deletions.
8 changes: 4 additions & 4 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ func prepareExistingTableDescForIngestion(
// upgrade and downgrade, because IMPORT does not operate in mixed-version
// states.
// TODO(jordan,lucy): remove this comment once 19.2 is released.
err := txn.CPut(ctx, sqlbase.MakeDescMetadataKey(desc.ID),
err := txn.CPutDeprecated(ctx, sqlbase.MakeDescMetadataKey(desc.ID),
sqlbase.WrapDescriptor(&importing), sqlbase.WrapDescriptor(desc))
if err != nil {
return nil, errors.Wrap(err, "another operation is currently operating on the table")
Expand Down Expand Up @@ -995,7 +995,7 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) err
// possible. This is safe since the table data was never visible to users,
// and so we don't need to preserve MVCC semantics.
tableDesc.DropTime = 1
b.CPut(sqlbase.MakeNameMetadataKey(tableDesc.ParentID, tableDesc.Name), nil, tableDesc.ID)
b.CPutDeprecated(sqlbase.MakeNameMetadataKey(tableDesc.ParentID, tableDesc.Name), nil, tableDesc.ID)
} else {
// IMPORT did not create this table, so we should not drop it.
tableDesc.State = sqlbase.TableDescriptor_PUBLIC
Expand All @@ -1004,7 +1004,7 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) err
// upgrade and downgrade, because IMPORT does not operate in mixed-version
// states.
// TODO(jordan,lucy): remove this comment once 19.2 is released.
b.CPut(sqlbase.MakeDescMetadataKey(tableDesc.ID), sqlbase.WrapDescriptor(&tableDesc), sqlbase.WrapDescriptor(tbl.Desc))
b.CPutDeprecated(sqlbase.MakeDescMetadataKey(tableDesc.ID), sqlbase.WrapDescriptor(&tableDesc), sqlbase.WrapDescriptor(tbl.Desc))
}
return errors.Wrap(txn.Run(ctx, b), "rolling back tables")
}
Expand Down Expand Up @@ -1054,7 +1054,7 @@ func (r *importResumer) OnSuccess(ctx context.Context, txn *client.Txn) error {
// upgrade and downgrade, because IMPORT does not operate in mixed-version
// states.
// TODO(jordan,lucy): remove this comment once 19.2 is released.
b.CPut(sqlbase.MakeDescMetadataKey(tableDesc.ID), sqlbase.WrapDescriptor(&tableDesc), sqlbase.WrapDescriptor(tbl.Desc))
b.CPutDeprecated(sqlbase.MakeDescMetadataKey(tableDesc.ID), sqlbase.WrapDescriptor(&tableDesc), sqlbase.WrapDescriptor(tbl.Desc))
}
if err := txn.Run(ctx, b); err != nil {
return errors.Wrap(err, "publishing tables")
Expand Down
39 changes: 36 additions & 3 deletions pkg/internal/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,18 +413,51 @@ func (b *Batch) PutInline(key, value interface{}) {
//
// key can be either a byte slice or a string. value can be any key type, a
// protoutil.Message or any Go primitive type (bool, int, etc).
func (b *Batch) CPut(key, value, expValue interface{}) {
func (b *Batch) CPut(key, value interface{}, expValue *roachpb.Value) {
b.cputInternal(key, value, expValue, false)
}

// CPutDeprecated conditionally sets the value for a key if the existing value is equal
// to expValue. To conditionally set a value only if there is no existing entry
// pass nil for expValue. Note that this must be an interface{}(nil), not a
// typed nil value (e.g. []byte(nil)).
//
// A new result will be appended to the batch which will contain a single row
// and Result.Err will indicate success or failure.
//
// key can be either a byte slice or a string. value can be any key type, a
// protoutil.Message or any Go primitive type (bool, int, etc).
func (b *Batch) CPutDeprecated(key, value, expValue interface{}) {
b.cputInternalDeprecated(key, value, expValue, false)
}

// CPutAllowingIfNotExists is like CPut except it also allows the Put when the
// existing entry does not exist -- i.e. it succeeds if there is no existing
// entry or the existing entry has the expected value.
func (b *Batch) CPutAllowingIfNotExists(key, value, expValue interface{}) {
b.cputInternal(key, value, expValue, true)
b.cputInternalDeprecated(key, value, expValue, true)
}

func (b *Batch) cputInternal(key, value interface{}, expValue *roachpb.Value, allowNotExist bool) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 1, notRaw, err)
return
}
v, err := marshalValue(value)
if err != nil {
b.initResult(0, 1, notRaw, err)
return
}
var ev roachpb.Value
if expValue != nil {
ev = *expValue
}
b.appendReqs(roachpb.NewConditionalPut(k, v, ev, allowNotExist))
b.initResult(1, 1, notRaw, nil)
}

func (b *Batch) cputInternal(key, value, expValue interface{}, allowNotExist bool) {
func (b *Batch) cputInternalDeprecated(key, value, expValue interface{}, allowNotExist bool) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 1, notRaw, err)
Expand Down
17 changes: 16 additions & 1 deletion pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,27 @@ func (db *DB) PutInline(ctx context.Context, key, value interface{}) error {
//
// key can be either a byte slice or a string. value can be any key type, a
// protoutil.Message or any Go primitive type (bool, int, etc).
func (db *DB) CPut(ctx context.Context, key, value, expValue interface{}) error {
func (db *DB) CPut(ctx context.Context, key, value interface{}, expValue *roachpb.Value) error {
b := &Batch{}
b.CPut(key, value, expValue)
return getOneErr(db.Run(ctx, b), b)
}

// CPutDeprecated conditionally sets the value for a key if the existing value is equal
// to expValue. To conditionally set a value only if there is no existing entry
// pass nil for expValue. Note that this must be an interface{}(nil), not a
// typed nil value (e.g. []byte(nil)).
//
// Returns an error if the existing value is not equal to expValue.
//
// key can be either a byte slice or a string. value can be any key type, a
// protoutil.Message or any Go primitive type (bool, int, etc).
func (db *DB) CPutDeprecated(ctx context.Context, key, value, expValue interface{}) error {
b := &Batch{}
b.CPutDeprecated(key, value, expValue)
return getOneErr(db.Run(ctx, b), b)
}

// InitPut sets the first value for a key to value. A ConditionFailedError is
// reported if a value already exists for the key and it's not equal to the
// value passed in. If failOnTombstones is set to true, tombstones count as
Expand Down
12 changes: 9 additions & 3 deletions pkg/internal/client/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

func strToValue(s string) *roachpb.Value {
v := roachpb.MakeValueFromBytes([]byte(s))
return &v
}

func setup(t *testing.T) (serverutils.TestServerInterface, *client.DB) {
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
return s, kvDB
Expand Down Expand Up @@ -103,7 +109,7 @@ func TestDB_CPut(t *testing.T) {
if err := db.Put(ctx, "aa", "1"); err != nil {
t.Fatal(err)
}
if err := db.CPut(ctx, "aa", "2", "1"); err != nil {
if err := db.CPut(ctx, "aa", "2", strToValue("1")); err != nil {
t.Fatal(err)
}
result, err := db.Get(ctx, "aa")
Expand All @@ -112,7 +118,7 @@ func TestDB_CPut(t *testing.T) {
}
checkResult(t, []byte("2"), result.ValueBytes())

if err = db.CPut(ctx, "aa", "3", "1"); err == nil {
if err = db.CPut(ctx, "aa", "3", strToValue("1")); err == nil {
t.Fatal("expected error from conditional put")
}
result, err = db.Get(ctx, "aa")
Expand All @@ -121,7 +127,7 @@ func TestDB_CPut(t *testing.T) {
}
checkResult(t, []byte("2"), result.ValueBytes())

if err = db.CPut(ctx, "bb", "4", "1"); err == nil {
if err = db.CPut(ctx, "bb", "4", strToValue("1")); err == nil {
t.Fatal("expected error from conditional put")
}
result, err = db.Get(ctx, "bb")
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/client/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (m *LeaseManager) ExtendLease(ctx context.Context, l *Lease) error {
Expiration: m.clock.Now().Add(m.leaseDuration.Nanoseconds(), 0),
}

if err := m.db.CPut(ctx, l.key, newVal, l.val.lease); err != nil {
if err := m.db.CPutDeprecated(ctx, l.key, newVal, l.val.lease); err != nil {
if _, ok := err.(*roachpb.ConditionFailedError); ok {
// Something is wrong - immediately expire the local lease state.
l.val.lease.Expiration = hlc.Timestamp{}
Expand All @@ -173,5 +173,5 @@ func (m *LeaseManager) ReleaseLease(ctx context.Context, l *Lease) error {
}
defer func() { <-l.val.sem }()

return m.db.CPut(ctx, l.key, nil, l.val.lease)
return m.db.CPutDeprecated(ctx, l.key, nil, l.val.lease)
}
17 changes: 16 additions & 1 deletion pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,27 @@ func (txn *Txn) Put(ctx context.Context, key, value interface{}) error {
//
// key can be either a byte slice or a string. value can be any key type, a
// protoutil.Message or any Go primitive type (bool, int, etc).
func (txn *Txn) CPut(ctx context.Context, key, value, expValue interface{}) error {
func (txn *Txn) CPut(ctx context.Context, key, value interface{}, expValue *roachpb.Value) error {
b := txn.NewBatch()
b.CPut(key, value, expValue)
return getOneErr(txn.Run(ctx, b), b)
}

// CPutDeprecated conditionally sets the value for a key if the existing value is equal
// to expValue. To conditionally set a value only if there is no existing entry
// pass nil for expValue. Note that this must be an interface{}(nil), not a
// typed nil value (e.g. []byte(nil)).
//
// Returns an error if the existing value is not equal to expValue.
//
// key can be either a byte slice or a string. value can be any key type, a
// protoutil.Message or any Go primitive type (bool, int, etc).
func (txn *Txn) CPutDeprecated(ctx context.Context, key, value, expValue interface{}) error {
b := txn.NewBatch()
b.CPutDeprecated(key, value, expValue)
return getOneErr(txn.Run(ctx, b), b)
}

// InitPut sets the first value for a key to value. An error is reported if a
// value already exists for the key and it's not equal to the value passed in.
// If failOnTombstones is set to true, tombstones count as mismatched values
Expand Down
Loading

0 comments on commit dc70bfd

Please sign in to comment.