Skip to content

Commit

Permalink
refactor commitToDB()
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Jan 8, 2025
1 parent 3655373 commit e923601
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 76 deletions.
144 changes: 68 additions & 76 deletions db/db_versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,25 +241,20 @@ func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve,
}
buckets[ns] = bucket
}
var vn *versionedNamespace
if val := bucket.Get(_minKey); val == nil {
val := bucket.Get(_minKey)
if val == nil {
// namespace not created yet
vn = &versionedNamespace{
keyLen: uint32(size),
}
ve = append(ve, batch.NewWriteInfo(
batch.Put, ns, _minKey, vn.serialize(),
fmt.Sprintf("failed to create metadata for namespace %s", ns),
))
} else {
if vn, err = deserializeVersionedNamespace(val); err != nil {
nonDBErr = true
return errors.Wrapf(err, "failed to get metadata of bucket %s", ns)
}
if vn.keyLen != uint32(size) {
nonDBErr = true
return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, size)
}
nonDBErr = true
return errors.Wrapf(ErrInvalid, "namespace %s has not been added", ns)
}
vn, err := deserializeVersionedNamespace(val)
if err != nil {
nonDBErr = true
return errors.Wrapf(err, "failed to get metadata of bucket %s", ns)
}
if vn.keyLen != uint32(size) {
nonDBErr = true
return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, size)
}
}
// keep order of the writes same as the original batch
Expand All @@ -268,71 +263,19 @@ func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve,
write = ve[i]
ns = write.Namespace()
key = write.Key()
val = write.Value()
)
// get bucket
bucket, ok := buckets[ns]
if !ok {
panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), vns = %s does not exist", ns))
}
// check key's last version
var (
last uint64
notexist bool
maxKey = keyForWrite(key, math.MaxUint64)
)
c := bucket.Cursor()
k, _ := c.Seek(maxKey)
if k == nil || bytes.Compare(k, maxKey) == 1 {
k, _ = c.Prev()
if k == nil || bytes.Compare(k, keyForDelete(key, 0)) <= 0 {
// cursor is at the beginning/end of the bucket or smaller than minimum key
notexist = true
}
// wrong-size key should be caught in dedup(), but check anyway
if vnsize[ns] != len(key) {
panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key)))
}
if !notexist {
_, last = parseKey(k)
}
switch write.WriteType() {
case batch.Put:
if bytes.Equal(key, _minKey) {
// create metadata for namespace
if err = bucket.Put(key, val); err != nil {
return errors.Wrap(err, write.Error())
}
} else {
// wrong-size key should be caught in dedup(), but check anyway
if vnsize[ns] != len(key) {
panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key)))
}
if !notexist && version <= last {
// not allowed to perform write on an earlier version
nonDBErr = true
return ErrInvalid
}
if err = bucket.Put(keyForWrite(key, version), val); err != nil {
return errors.Wrap(err, write.Error())
}
}
case batch.Delete:
if notexist {
continue
}
// wrong-size key should be caught in dedup(), but check anyway
if vnsize[ns] != len(key) {
panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key)))
}
if version < last {
// not allowed to perform delete on an earlier version
nonDBErr = true
return ErrInvalid
}
if err = bucket.Put(keyForDelete(key, version), nil); err != nil {
return errors.Wrap(err, write.Error())
}
if err = bucket.Delete(keyForWrite(key, version)); err != nil {
return errors.Wrap(err, write.Error())
}
nonDBErr, err = writeVersionedEntry(version, bucket, write)
if err != nil {
return err
}
}
// write non-versioned keys
Expand Down Expand Up @@ -382,6 +325,55 @@ func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve,
return nil
}

func writeVersionedEntry(version uint64, bucket *bolt.Bucket, ve *batch.WriteInfo) (bool, error) {
var (
key = ve.Key()
val = ve.Value()
last uint64
notexist bool
maxKey = keyForWrite(key, math.MaxUint64)
)
c := bucket.Cursor()
k, _ := c.Seek(maxKey)
if k == nil || bytes.Compare(k, maxKey) == 1 {
k, _ = c.Prev()
if k == nil || bytes.Compare(k, keyForDelete(key, 0)) <= 0 {
// cursor is at the beginning/end of the bucket or smaller than minimum key
notexist = true
}
}
if !notexist {
_, last = parseKey(k)
}
switch ve.WriteType() {
case batch.Put:
if !notexist && version <= last {
// not allowed to perform write on an earlier version
return true, ErrInvalid
}
if err := bucket.Put(keyForWrite(key, version), val); err != nil {
return false, errors.Wrap(err, ve.Error())
}
case batch.Delete:
if notexist {
return false, nil
}
if version < last {
// not allowed to perform delete on an earlier version
return true, ErrInvalid
}
if err := bucket.Put(keyForDelete(key, version), nil); err != nil {
return false, errors.Wrap(err, ve.Error())
}
if version == last {
if err := bucket.Delete(keyForWrite(key, version)); err != nil {
return false, errors.Wrap(err, ve.Error())
}
}
}
return false, nil
}

// dedup does 3 things:
// 1. deduplicate entries in the batch, only keep the last write for each key
// 2. splits entries into 2 slices according to the input namespace map
Expand Down
1 change: 1 addition & 0 deletions db/db_versioned_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ func TestCommitToDB(t *testing.T) {
} {
b.Put(e.ns, e.k, e.v, "test")
}
r.ErrorContains(db.CommitToDB(1, nil, b), "has not been added")

// create namespace
r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k1))))
Expand Down

0 comments on commit e923601

Please sign in to comment.