Skip to content

Commit

Permalink
instancestorage: clean up codec interface
Browse files Browse the repository at this point in the history
Refactor the codec interface. The new codec interface will be used to
support the legacy index format and the RBR compatible index format.

Part of cockroachdb#85736
  • Loading branch information
jeffswenson committed Nov 18, 2022
1 parent 3b802d3 commit 0755afa
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 103 deletions.
21 changes: 4 additions & 17 deletions pkg/sql/sqlinstance/instancestorage/instancereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand All @@ -38,7 +37,6 @@ type Reader struct {
slReader sqlliveness.Reader
f *rangefeed.Factory
codec keys.SQLCodec
tableID descpb.ID
clock *hlc.Clock
stopper *stop.Stopper
rowcodec rowCodec
Expand Down Expand Up @@ -67,9 +65,8 @@ func NewTestingReader(
slReader: slReader,
f: f,
codec: codec,
tableID: tableID,
clock: clock,
rowcodec: makeRowCodec(codec),
rowcodec: makeRowCodec(codec, tableID),
initialScanDone: make(chan struct{}),
stopper: stopper,
}
Expand Down Expand Up @@ -119,22 +116,12 @@ func (r *Reader) maybeStartRangeFeed(ctx context.Context) *rangefeed.RangeFeed {
updateCacheFn := func(
ctx context.Context, keyVal *roachpb.RangeFeedValue,
) {
instanceID, addr, sessionID, locality, timestamp, tombstone, err := r.rowcodec.decodeRow(kv.KeyValue{
Key: keyVal.Key,
Value: &keyVal.Value,
})
instance, err := r.rowcodec.decodeRow(keyVal.Key, &keyVal.Value)
if err != nil {
log.Ops.Warningf(ctx, "failed to decode settings row %v: %v", keyVal.Key, err)
return
}
instance := instancerow{
instanceID: instanceID,
addr: addr,
sessionID: sessionID,
timestamp: timestamp,
locality: locality,
}
r.updateInstanceMap(instance, tombstone)
r.updateInstanceMap(instance, !keyVal.Value.IsPresent())
}
initialScanDoneFn := func(_ context.Context) {
close(r.initialScanDone)
Expand All @@ -151,7 +138,7 @@ func (r *Reader) maybeStartRangeFeed(ctx context.Context) *rangefeed.RangeFeed {
return shouldFail
}

instancesTablePrefix := r.codec.TablePrefix(uint32(r.tableID))
instancesTablePrefix := r.rowcodec.makeIndexPrefix()
instancesTableSpan := roachpb.Span{
Key: instancesTablePrefix,
EndKey: instancesTablePrefix.PrefixEnd(),
Expand Down
34 changes: 12 additions & 22 deletions pkg/sql/sqlinstance/instancestorage/instancestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ var errNoPreallocatedRows = errors.New("no preallocated rows")
type Storage struct {
codec keys.SQLCodec
db *kv.DB
tableID descpb.ID
slReader sqlliveness.Reader
rowcodec rowCodec
settings *cluster.Settings
Expand Down Expand Up @@ -117,8 +116,7 @@ func NewTestingStorage(
s := &Storage{
db: db,
codec: codec,
tableID: sqlInstancesTableID,
rowcodec: makeRowCodec(codec),
rowcodec: makeRowCodec(codec, sqlInstancesTableID),
slReader: slReader,
settings: settings,
}
Expand Down Expand Up @@ -170,14 +168,15 @@ func (s *Storage) CreateInstance(
return err
}

row, err := s.rowcodec.encodeRow(availableID, addr, sessionID, locality, s.codec, s.tableID)
key := s.rowcodec.encodeKey(availableID)
value, err := s.rowcodec.encodeValue(addr, sessionID, locality)
if err != nil {
log.Warningf(ctx, "failed to encode row for instance id %d: %v", availableID, err)
return err
}

b := txn.NewBatch()
b.Put(row.Key, row.Value)
b.Put(key, value)
return txn.CommitInBatch(ctx, b)
}); err != nil {
return base.SQLInstanceID(0), err
Expand Down Expand Up @@ -354,7 +353,7 @@ func (s *Storage) getGlobalInstanceRows(
func (s *Storage) getRegionalInstanceRows(
ctx context.Context, db dbScan,
) (instances []instancerow, _ error) {
start := makeTablePrefix(s.codec, s.tableID)
start := s.rowcodec.makeIndexPrefix()
end := start.PrefixEnd()
// Fetch all rows. The expected data size is small, so it should
// be okay to fetch all rows together.
Expand All @@ -364,19 +363,11 @@ func (s *Storage) getRegionalInstanceRows(
return nil, err
}
for i := range rows {
instanceID, addr, sessionID, locality, timestamp, _, err := s.rowcodec.decodeRow(rows[i])
instance, err := s.rowcodec.decodeRow(rows[i].Key, rows[i].Value)
if err != nil {
log.Warningf(ctx, "failed to decode row %v: %v", rows[i].Key, err)
return nil, err
}
curInstance := instancerow{
instanceID: instanceID,
addr: addr,
sessionID: sessionID,
timestamp: timestamp,
locality: locality,
}
instances = append(instances, curInstance)
instances = append(instances, instance)
}
return instances, nil
}
Expand All @@ -386,7 +377,7 @@ func (s *Storage) getRegionalInstanceRows(
func (s *Storage) ReleaseInstanceID(ctx context.Context, id base.SQLInstanceID) error {
// TODO(andrei): Ensure that we do not delete an instance ID that we no longer
// own, instead of deleting blindly.
key := makeInstanceKey(s.codec, s.tableID, id)
key := s.rowcodec.encodeKey(id)
ctx = multitenant.WithTenantCostControlExemption(ctx)
if _, err := s.db.Del(ctx, key); err != nil {
return errors.Wrapf(err, "could not delete instance %d", id)
Expand Down Expand Up @@ -460,17 +451,16 @@ func (s *Storage) generateAvailableInstanceRows(

b := txn.NewBatch()
for _, instanceID := range toClaim {
row, err := s.rowcodec.encodeRow(
instanceID, "", sqlliveness.SessionID([]byte{}), roachpb.Locality{}, s.codec, s.tableID,
)
key := s.rowcodec.encodeKey(instanceID)
value, err := s.rowcodec.encodeValue("", sqlliveness.SessionID([]byte{}), roachpb.Locality{})
if err != nil {
log.Warningf(ctx, "failed to encode row for instance id %d: %v", instanceID, err)
return err
}
b.Put(row.Key, row.Value)
b.Put(key, value)
}
for _, instanceID := range toDelete {
key := makeInstanceKey(s.codec, s.tableID, instanceID)
key := s.rowcodec.encodeKey(instanceID)
b.Del(key)
}
return txn.CommitInBatch(ctx, b)
Expand Down
140 changes: 80 additions & 60 deletions pkg/sql/sqlinstance/instancestorage/row_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package instancestorage
import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand All @@ -34,46 +33,83 @@ type rowCodec struct {
codec keys.SQLCodec
columns []catalog.Column
decoder valueside.Decoder
tableID descpb.ID
}

// MakeRowCodec makes a new rowCodec for the sql_instances table.
func makeRowCodec(codec keys.SQLCodec) rowCodec {
func makeRowCodec(codec keys.SQLCodec, tableID descpb.ID) rowCodec {
columns := systemschema.SQLInstancesTable.PublicColumns()
return rowCodec{
codec: codec,
columns: columns,
decoder: valueside.MakeDecoder(columns),
tableID: tableID,
}
}

// encodeRow encodes a row of the sql_instances table.
func (d *rowCodec) encodeRow(
instanceID base.SQLInstanceID,
addr string,
sessionID sqlliveness.SessionID,
locality roachpb.Locality,
codec keys.SQLCodec,
tableID descpb.ID,
) (kv kv.KeyValue, err error) {
// decodeRow converts the key and value into an instancerow. value may be nil
// or uninitialized. If it is, the fields stored in the value will be left with
// their default values.
func (d *rowCodec) decodeRow(key roachpb.Key, value *roachpb.Value) (instancerow, error) {
instanceID, err := d.decodeKey(key)
if err != nil {
return instancerow{}, err
}

r := instancerow{
instanceID: instanceID,
}
if value == nil || !value.IsPresent() {
return r, nil
}

r.addr, r.sessionID, r.locality, r.timestamp, err = d.decodeValue(*value)
if err != nil {
return instancerow{}, errors.Wrapf(err, "failed to decode value for: %v", key)
}

return r, nil
}

// makeIndexPrefix returns a roachpb.Key that is the prefix for all encoded
// keys and can be used to scan the entire table.
func (d *rowCodec) makeIndexPrefix() roachpb.Key {
return d.codec.IndexPrefix(uint32(d.tableID), 1)
}

// encodeKey converts the instanceID into an encoded key for the table.
func (d *rowCodec) encodeKey(instanceID base.SQLInstanceID) roachpb.Key {
key := d.makeIndexPrefix()
key = encoding.EncodeVarintAscending(key, int64(instanceID))
return keys.MakeFamilyKey(key, 0)
}

// encodeValue encodes the sql_instance columns into a kv value.
func (d *rowCodec) encodeValue(
addr string, sessionID sqlliveness.SessionID, locality roachpb.Locality,
) (*roachpb.Value, error) {
var valueBuf []byte

addrDatum := tree.DNull
if addr != "" {
addrDatum = tree.NewDString(addr)
}
var valueBuf []byte
valueBuf, err = valueside.Encode(
valueBuf, err := valueside.Encode(
[]byte(nil), valueside.MakeColumnIDDelta(0, d.columns[1].GetID()), addrDatum, []byte(nil))
if err != nil {
return kv, err
return nil, err
}

sessionDatum := tree.DNull
if len(sessionID) > 0 {
sessionDatum = tree.NewDBytes(tree.DBytes(sessionID.UnsafeBytes()))
}
sessionColDiff := valueside.MakeColumnIDDelta(d.columns[1].GetID(), d.columns[2].GetID())
valueBuf, err = valueside.Encode(valueBuf, sessionColDiff, sessionDatum, []byte(nil))
if err != nil {
return kv, err
return nil, err
}

// Preserve the ordering of locality.Tiers, even though we convert it to json.
localityDatum := tree.DNull
if len(locality.Tiers) > 0 {
Expand All @@ -84,54 +120,48 @@ func (d *rowCodec) encodeRow(
localityColDiff := valueside.MakeColumnIDDelta(d.columns[2].GetID(), d.columns[3].GetID())
valueBuf, err = valueside.Encode(valueBuf, localityColDiff, localityDatum, []byte(nil))
if err != nil {
return kv, err
return nil, err
}
var v roachpb.Value

v := &roachpb.Value{}
v.SetTuple(valueBuf)
kv.Value = &v
kv.Key = makeInstanceKey(codec, tableID, instanceID)
return kv, nil
return v, nil
}

// decodeKey decodes a sql_instance key into its logical components.
func (d *rowCodec) decodeKey(key roachpb.Key) (base.SQLInstanceID, error) {
types := []*types.T{d.columns[0].GetType()}
row := make([]rowenc.EncDatum, 1)
_, _, err := rowenc.DecodeIndexKey(d.codec, types, row, nil, key)
if err != nil {
return base.SQLInstanceID(0), errors.Wrap(err, "failed to decode key")
}
var alloc tree.DatumAlloc
if err := row[0].EnsureDecoded(types[0], &alloc); err != nil {
return base.SQLInstanceID(0), err
}
return base.SQLInstanceID(tree.MustBeDInt(row[0].Datum)), nil
}

// decodeRow decodes a row of the sql_instances table.
func (d *rowCodec) decodeRow(
kv kv.KeyValue,
func (d *rowCodec) decodeValue(
value roachpb.Value,
) (
instanceID base.SQLInstanceID,
addr string,
sessionID sqlliveness.SessionID,
locality roachpb.Locality,
timestamp hlc.Timestamp,
tombstone bool,
_ error,
) {
var alloc tree.DatumAlloc
// First, decode the id field from the index key.
{
types := []*types.T{d.columns[0].GetType()}
row := make([]rowenc.EncDatum, 1)
_, _, err := rowenc.DecodeIndexKey(d.codec, types, row, nil, kv.Key)
if err != nil {
return base.SQLInstanceID(0), "", "", roachpb.Locality{}, hlc.Timestamp{}, false, errors.Wrap(err, "failed to decode key")
}
if err := row[0].EnsureDecoded(types[0], &alloc); err != nil {
return base.SQLInstanceID(0), "", "", roachpb.Locality{}, hlc.Timestamp{}, false, err
}
instanceID = base.SQLInstanceID(tree.MustBeDInt(row[0].Datum))
}
if !kv.Value.IsPresent() {
return instanceID, "", "", roachpb.Locality{}, hlc.Timestamp{}, true, nil
}
timestamp = kv.Value.Timestamp
// The rest of the columns are stored as a family.
bytes, err := kv.Value.GetTuple()
bytes, err := value.GetTuple()
if err != nil {
return instanceID, "", "", roachpb.Locality{}, hlc.Timestamp{}, false, err
return "", "", roachpb.Locality{}, hlc.Timestamp{}, err
}

datums, err := d.decoder.Decode(&alloc, bytes)
datums, err := d.decoder.Decode(&tree.DatumAlloc{}, bytes)
if err != nil {
return instanceID, "", "", roachpb.Locality{}, hlc.Timestamp{}, false, err
return "", "", roachpb.Locality{}, hlc.Timestamp{}, err
}

if addrVal := datums[1]; addrVal != tree.DNull {
Expand All @@ -144,30 +174,20 @@ func (d *rowCodec) decodeRow(
localityJ := tree.MustBeDJSON(localityVal)
v, err := localityJ.FetchValKey("Tiers")
if err != nil {
return instanceID, "", "", roachpb.Locality{}, hlc.Timestamp{}, false, errors.Wrap(err, "failed to find Tiers attribute in locality")
return "", "", roachpb.Locality{}, hlc.Timestamp{}, errors.Wrap(err, "failed to find Tiers attribute in locality")
}
if v != nil {
vStr, err := v.AsText()
if err != nil {
return instanceID, "", "", roachpb.Locality{}, hlc.Timestamp{}, false, err
return "", "", roachpb.Locality{}, hlc.Timestamp{}, err
}
if len(*vStr) > 0 {
if err := locality.Set(*vStr); err != nil {
return instanceID, "", "", roachpb.Locality{}, hlc.Timestamp{}, false, err
return "", "", roachpb.Locality{}, hlc.Timestamp{}, err
}
}
}
}

return instanceID, addr, sessionID, locality, timestamp, false, nil
}

func makeTablePrefix(codec keys.SQLCodec, tableID descpb.ID) roachpb.Key {
return codec.IndexPrefix(uint32(tableID), 1)
}

func makeInstanceKey(
codec keys.SQLCodec, tableID descpb.ID, instanceID base.SQLInstanceID,
) roachpb.Key {
return keys.MakeFamilyKey(encoding.EncodeVarintAscending(makeTablePrefix(codec, tableID), int64(instanceID)), 0)
return addr, sessionID, locality, value.Timestamp, nil
}
Loading

0 comments on commit 0755afa

Please sign in to comment.