Skip to content

Commit

Permalink
storage: CPut bytes instead of a proto when updating RangeDescriptors
Browse files Browse the repository at this point in the history
In cockroachdb#38147, a new non-nullable field was added to the ReplicaDescriptor
proto that is serialized inside RangeDescriptors. RangeDescriptors are
updated using CPut to detect races. This means that if a RangeDescriptor
had been written by an old version of cockroach and we then attempt to
update it, the CPut will fail because the encoded version of it is
different (non-nullable proto2 fields are always included).

A similar issue was introduced in cockroachdb#38004 which made the StickyBit field
on RangeDescriptor non-nullable.

We could keep the fields as nullable, but this has allocation costs (and
is also more annoying to work with).

Worse, the proto spec is pretty explicit about not relying on
serialization of a given message to always produce exactly the same
bytes:

    From https://developers.google.com/protocol-buffers/docs/encoding#implications
    Do not assume the byte output of a serialized message is stable.

So instead, we've decided to stop CPut-ing protos and to change the
interface of CPut to take the expected value as bytes. To CPut a proto,
the encoded value will be read from kv, passed around with the decoded
struct, and used in the eventual CPut. This work is upcoming, but the
above two PRs are real breakages, so this commit fixes those first.

Neither of these PRs made the last alpha so no release note.

Touches cockroachdb#38308

Release note: None
  • Loading branch information
danhhz committed Jun 19, 2019
1 parent f6d75a5 commit 614906f
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 39 deletions.
118 changes: 80 additions & 38 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,14 @@ func (r *Replica) adminSplitWithDescriptor(
newDesc := *desc
newDesc.StickyBit = args.ExpirationTime
err := r.store.DB().Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
dbDescValue, err := conditionalGetDescValueFromDB(ctx, txn, desc)
if err != nil {
return err
}

b := txn.NewBatch()
descKey := keys.RangeDescriptorKey(desc.StartKey)
if err := updateRangeDescriptor(b, descKey, desc, &newDesc); err != nil {
if err := updateRangeDescriptor(b, descKey, dbDescValue, &newDesc); err != nil {
return err
}
if err := updateRangeAddressing(b, &newDesc); err != nil {
Expand Down Expand Up @@ -264,9 +269,14 @@ func (r *Replica) adminSplitWithDescriptor(
// split. Note that we mutate the descriptor for the left hand
// side of the split first to locate the txn record there.
{
dbDescValue, err := conditionalGetDescValueFromDB(ctx, txn, desc)
if err != nil {
return err
}

b := txn.NewBatch()
leftDescKey := keys.RangeDescriptorKey(leftDesc.StartKey)
if err := updateRangeDescriptor(b, leftDescKey, desc, &leftDesc); err != nil {
if err := updateRangeDescriptor(b, leftDescKey, dbDescValue, &leftDesc); err != nil {
return err
}
// Commit this batch first to ensure that the transaction record
Expand Down Expand Up @@ -374,12 +384,16 @@ func (r *Replica) adminUnsplitWithDescriptor(
}

if err := r.store.DB().Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
dbDescValue, err := conditionalGetDescValueFromDB(ctx, txn, desc)
if err != nil {
return err
}

b := txn.NewBatch()
newDesc := *desc
newDesc.StickyBit = hlc.Timestamp{}
descKey := keys.RangeDescriptorKey(newDesc.StartKey)

if err := updateRangeDescriptor(b, descKey, desc, &newDesc); err != nil {
if err := updateRangeDescriptor(b, descKey, dbDescValue, &newDesc); err != nil {
return err
}
if err := updateRangeAddressing(b, &newDesc); err != nil {
Expand Down Expand Up @@ -515,7 +529,11 @@ func (r *Replica) AdminMerge(
// NB: this read does NOT impact transaction record placement.
var rightDesc roachpb.RangeDescriptor
rightDescKey := keys.RangeDescriptorKey(origLeftDesc.EndKey)
if err := txn.GetProto(ctx, rightDescKey, &rightDesc); err != nil {
dbRightDescKV, err := txn.Get(ctx, rightDescKey)
if err != nil {
return err
}
if err := dbRightDescKV.Value.GetProto(&rightDesc); err != nil {
return err
}

Expand Down Expand Up @@ -543,9 +561,16 @@ func (r *Replica) AdminMerge(
// transaction is this conditional put to change the left hand side's
// descriptor end key.
{
dbOrigLeftDescValue, err := conditionalGetDescValueFromDB(ctx, txn, origLeftDesc)
if err != nil {
return err
}

b := txn.NewBatch()
leftDescKey := keys.RangeDescriptorKey(updatedLeftDesc.StartKey)
if err := updateRangeDescriptor(b, leftDescKey, origLeftDesc, &updatedLeftDesc); err != nil {
if err := updateRangeDescriptor(
b, leftDescKey, dbOrigLeftDescValue, &updatedLeftDesc,
); err != nil {
return err
}
// Commit this batch on its own to ensure that the transaction record
Expand Down Expand Up @@ -573,7 +598,7 @@ func (r *Replica) AdminMerge(
}

// Remove the range descriptor for the deleted range.
if err := updateRangeDescriptor(b, rightDescKey, &rightDesc, nil); err != nil {
if err := updateRangeDescriptor(b, rightDescKey, dbRightDescKV.Value, nil); err != nil {
return err
}

Expand All @@ -600,7 +625,7 @@ func (r *Replica) AdminMerge(
}
rhsSnapshotRes := br.(*roachpb.SubsumeResponse)

err := waitForApplication(ctx, r.store.cfg.NodeDialer, rightDesc, rhsSnapshotRes.LeaseAppliedIndex)
err = waitForApplication(ctx, r.store.cfg.NodeDialer, rightDesc, rhsSnapshotRes.LeaseAppliedIndex)
if err != nil {
return errors.Wrap(err, "waiting for all right-hand replicas to catch up")
}
Expand Down Expand Up @@ -824,6 +849,7 @@ func (r *Replica) changeReplicas(
if desc == nil {
return nil, errors.Errorf("%s: the current RangeDescriptor must not be nil", r)
}

repDesc := roachpb.ReplicaDescriptor{
NodeID: target.NodeID,
StoreID: target.StoreID,
Expand Down Expand Up @@ -902,24 +928,18 @@ func (r *Replica) changeReplicas(
if err := r.store.DB().Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
log.Event(ctx, "attempting txn")
txn.SetDebugName(replicaChangeTxnName)
// TODO(tschottdorf): oldDesc is used for sanity checks related to #7224.
// Remove when that has been solved. The failure mode is likely based on
// prior divergence of the Replica (in which case the check below does not
// fire because everything reads from the local, diverged, set of data),
// so we don't expect to see this fail in practice ever.
oldDesc := new(roachpb.RangeDescriptor)
if err := txn.GetProto(ctx, descKey, oldDesc); err != nil {
dbDescValue, err := conditionalGetDescValueFromDB(ctx, txn, desc)
if err != nil {
return err
}
log.Infof(ctx, "change replicas (%v %s): read existing descriptor %s",
changeType, repDesc, oldDesc)
log.Infof(ctx, "change replicas (%v %s): existing descriptor %s", changeType, repDesc, desc)

{
b := txn.NewBatch()

// Important: the range descriptor must be the first thing touched in the transaction
// so the transaction record is co-located with the range being modified.
if err := updateRangeDescriptor(b, descKey, desc, &updatedDesc); err != nil {
if err := updateRangeDescriptor(b, descKey, dbDescValue, &updatedDesc); err != nil {
return err
}

Expand Down Expand Up @@ -964,12 +984,6 @@ func (r *Replica) changeReplicas(
return err
}

if oldDesc.RangeID != 0 && !oldDesc.Equal(desc) {
// We read the previous value, it wasn't what we supposedly used in
// the CPut, but we still overwrote in the CPut above.
panic(fmt.Sprintf("committed replica change, but oldDesc != assumedOldDesc:\n%+v\n%+v\nnew desc:\n%+v",
oldDesc, desc, updatedDesc))
}
return nil
}); err != nil {
log.Event(ctx, err.Error())
Expand Down Expand Up @@ -1112,6 +1126,37 @@ func replicaSetsEqual(a, b []roachpb.ReplicaDescriptor) bool {
return true
}

// conditionalGetDescValueFromDB fetches an encoded RangeDescriptor from kv,
// checks that it matches the given expectation using proto Equals, and returns
// the raw fetched roachpb.Value. If the fetched value doesn't match the
// expectation, a ConditionFailedError is returned.
//
// This ConditionFailedError is a historical artifact. We used to pass the
// parsed RangeDescriptor directly as the expected value in a CPut, but proto
// message encodings aren't stable so this was fragile. Calling this method and
// then passing the returned *roachpb.Value as the expected value in a CPut does
// the same thing, but also correctly handles proto equality. See #38308.
func conditionalGetDescValueFromDB(
ctx context.Context, txn *client.Txn, expectation *roachpb.RangeDescriptor,
) (*roachpb.Value, error) {
descKey := keys.RangeDescriptorKey(expectation.StartKey)
existingDescKV, err := txn.Get(ctx, descKey)
if err != nil {
return nil, errors.Wrap(err, "fetching current range descriptor value")
}
var existingDesc *roachpb.RangeDescriptor
if existingDescKV.Value != nil {
existingDesc = &roachpb.RangeDescriptor{}
if err := existingDescKV.Value.GetProto(existingDesc); err != nil {
return nil, errors.Wrap(err, "decoding current range descriptor value")
}
}
if !existingDesc.Equal(expectation) {
return nil, &roachpb.ConditionFailedError{ActualValue: existingDescKV.Value}
}
return existingDescKV.Value, nil
}

// updateRangeDescriptor adds a ConditionalPut on the range descriptor. The
// conditional put verifies that changes to the range descriptor are made in a
// well-defined order, preventing a scenario where a wayward replica which is
Expand All @@ -1120,40 +1165,37 @@ func replicaSetsEqual(a, b []roachpb.ReplicaDescriptor) bool {
// range descriptor. This is a last line of defense; other mechanisms should
// prevent rogue replicas from getting this far (see #768).
//
// oldDesc can be nil, meaning that the key is expected to not exist.
// oldValue can be nil, meaning that the key is expected to not exist.
//
// Note that in addition to using this method to update the on-disk range
// descriptor, a CommitTrigger must be used to update the in-memory
// descriptor; it will not automatically be copied from newDesc.
func updateRangeDescriptor(
b *client.Batch,
descKey roachpb.Key,
oldDesc *roachpb.RangeDescriptor,
newDesc *roachpb.RangeDescriptor,
b *client.Batch, descKey roachpb.Key, oldValue *roachpb.Value, newDesc *roachpb.RangeDescriptor,
) error {
// This is subtle: []byte(nil) != interface{}(nil). A []byte(nil) refers to
// an empty value. An interface{}(nil) refers to a non-existent value. So
// we're careful to construct interface{}(nil)s when newDesc/oldDesc are nil.
var newValue interface{}
if newDesc != nil {
if err := newDesc.Validate(); err != nil {
return errors.Wrapf(err, "validating new descriptor %+v (old descriptor is %+v)", newDesc, oldDesc)
return errors.Wrapf(err, "validating new descriptor %+v (old descriptor is %+v)",
newDesc, oldValue)
}
newBytes, err := protoutil.Marshal(newDesc)
if err != nil {
return err
}
newValue = newBytes
}
var oldValue interface{}
if oldDesc != nil {
oldBytes, err := protoutil.Marshal(oldDesc)
if err != nil {
return err
}
oldValue = oldBytes
var ov interface{}
if oldValue != nil {
// If the old value was fetched from kv, it may have a checksum set. This
// panics CPut, so clear it.
oldValue.ClearChecksum()
ov = oldValue
}
b.CPut(descKey, newValue, oldValue)
b.CPut(descKey, newValue, ov)
return nil
}

Expand Down
88 changes: 88 additions & 0 deletions pkg/storage/replica_command_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL.txt and at www.mariadb.com/bsl11.
//
// Change Date: 2022-10-01
//
// On the date above, in accordance with the Business Source License, use
// of this software will be governed by the Apache License, Version 2.0,
// included in the file licenses/APL.txt and at
// https://www.apache.org/licenses/LICENSE-2.0

package storage

import (
"context"
"encoding/binary"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// Regression test for #38308. Summary: a non-nullable field was added to
// RangeDescriptor which broke splits, merges, and replica changes if the
// cluster had been upgraded from a previous version of cockroach.
func TestRangeDescriptorUpdateProtoChangedAcrossVersions(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

bKey := roachpb.Key("b")
if err := kvDB.AdminSplit(ctx, bKey, bKey, hlc.MaxTimestamp /* expirationTime */); err != nil {
t.Fatal(err)
}

// protoVarintField returns an encoded proto field of type varint with the
// given id.
protoVarintField := func(fieldID int) []byte {
var scratch [binary.MaxVarintLen64]byte
const typ = 0 // varint type field
tag := uint64(fieldID<<3) | typ
tagLen := binary.PutUvarint(scratch[:], tag)
// A proto message is a series of <tag><data> where <tag> is a varint
// including the field id and the data type and <data> depends on the type.
buf := append([]byte(nil), scratch[:tagLen]...)
// The test doesn't care what we use for the field data, so use the tag
// since the data is a varint and it's already an encoded varint.
buf = append(buf, scratch[:tagLen]...)
return buf
}

// Update the serialized RangeDescriptor proto for the b to max range to have
// an unknown proto field. Previously, this would break splits, merges,
// replica changes. The real regression was a missing field, but an extra
// unknown field tests the same thing.
{
bDescKey := keys.RangeDescriptorKey(roachpb.RKey(bKey))
bDescKV, err := kvDB.Get(ctx, bDescKey)
require.NoError(t, err)
require.NotNil(t, bDescKV.Value, `could not find "b" descriptor`)

// Update the serialized proto with a new field we don't know about. The
// proto encoding is just a series of these, so we can do this simply by
// appending it.
newBDescBytes, err := bDescKV.Value.GetBytes()
require.NoError(t, err)
newBDescBytes = append(newBDescBytes, protoVarintField(9999)...)

newBDescValue := roachpb.MakeValueFromBytes(newBDescBytes)
require.NoError(t, kvDB.Put(ctx, bDescKey, &newBDescValue))
}

// Verify that splits still work. We could also do a similar thing to test
// merges and replica changes, but they all go through updateRangeDescriptor
// so it's unnecessary.
cKey := roachpb.Key("c")
if err := kvDB.AdminSplit(ctx, cKey, cKey, hlc.MaxTimestamp /* expirationTime */); err != nil {
t.Fatal(err)
}
}
14 changes: 13 additions & 1 deletion pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,14 +347,26 @@ func (tc *testContext) addBogusReplicaToRangeDesc(
newDesc.InternalReplicas = append(newDesc.InternalReplicas, secondReplica)
newDesc.NextReplicaID = 3

dbDescKV, err := tc.store.DB().Get(ctx, keys.RangeDescriptorKey(oldDesc.StartKey))
if err != nil {
return roachpb.ReplicaDescriptor{}, err
}
var dbDesc roachpb.RangeDescriptor
if err := dbDescKV.Value.GetProto(&dbDesc); err != nil {
return roachpb.ReplicaDescriptor{}, err
}
if !oldDesc.Equal(&dbDesc) {
return roachpb.ReplicaDescriptor{}, errors.Errorf(`descs didn't match: %v vs %v`, oldDesc, dbDesc)
}

// Update the "on-disk" replica state, so that it doesn't diverge from what we
// have in memory. At the time of this writing, this is not actually required
// by the tests using this functionality, but it seems sane to do.
ba := client.Batch{
Header: roachpb.Header{Timestamp: tc.Clock().Now()},
}
descKey := keys.RangeDescriptorKey(oldDesc.StartKey)
if err := updateRangeDescriptor(&ba, descKey, &oldDesc, &newDesc); err != nil {
if err := updateRangeDescriptor(&ba, descKey, dbDescKV.Value, &newDesc); err != nil {
return roachpb.ReplicaDescriptor{}, err
}
if err := tc.store.DB().Run(ctx, &ba); err != nil {
Expand Down

0 comments on commit 614906f

Please sign in to comment.