Skip to content

Commit

Permalink
Rework Changes() API
Browse files Browse the repository at this point in the history
Having Watch() and Changes() separately in ChangeIterator may lead to inconsistent
usage when change iteration is combined with a write transaction:

  var changes statedb.ChangeIterator[someObject]
  wtxn := db.WriteTxn(someTable)
  for change := range changes {
    // changes are reflecting the state of objects coming from the previous
	// Watch(db.ReadTxn()) call below, not the state they're in "wtxn"
    someTable.Insert(...)
  }
  select {
  case <-changes.Watch(db.ReadTxn())
  }

The new API makes it easier to be consistent and use the same transaction
for the changes and for the writes:

  var changeIterator statedb.ChangeIterator[someObject]
  wtxn := db.WriteTxn(someTable)
  // Give me the unobserved changes up to the "snapshot" of wtxn
  changes, watch := changeIterator.Next(wtxn)
  for change := range changes {
    // changes are reflecting the state of objects coming from the previous
	// Watch(db.ReadTxn()) call below, not the state they're in "wtxn"
    someTable.Insert(...)
  }
  select {
  case <-watch:
  }

If the transaction given to Next() is a WriteTxn it will still use the snapshot
of committed data for producing the iterator and will ignore the uncommitted data
in the transaction. This is needed as the txn might be aborted, which would then
reset the revision back and that would mess up the graveyard watermarks as the
marked revision would be in the future.

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Sep 3, 2024
1 parent 3aed513 commit a201670
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 164 deletions.
44 changes: 23 additions & 21 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,14 +252,16 @@ func BenchmarkDB_Changes_Baseline(b *testing.B) {

func BenchmarkDB_Changes(b *testing.B) {
db, table := newTestDBWithMetrics(b, &NopMetrics{})

// Create the change iterator.
txn := db.WriteTxn(table)
require.Zero(b, table.NumObjects(txn))
iter, err := table.Changes(txn)
txn.Commit()
require.NoError(b, err)

b.ResetTimer()
for n := 0; n < b.N; n++ {
// Create the change iterator.
txn := db.WriteTxn(table)
iter, err := table.Changes(txn)
txn.Commit()
require.NoError(b, err)

// Create objects
txn = db.WriteTxn(table)
for i := 0; i < numObjectsToInsert; i++ {
Expand All @@ -270,42 +272,42 @@ func BenchmarkDB_Changes(b *testing.B) {
}
txn.Commit()

// Iterator created before insertions should be empty.
for change := range iter.Changes() {
b.Fatalf("did not expect change: %v", change)
}

// Refresh to observe the insertions.
<-iter.Watch(db.ReadTxn())
// Observe the creations.
changes, watch := iter.Next(db.ReadTxn())
nDeleted := 0
nExists := 0

for change := range iter.Changes() {
for change := range changes {
if change.Deleted {
b.Fatalf("expected create for %v", change)
}
nExists++
}
require.EqualValues(b, numObjectsToInsert, nExists)
if numObjectsToInsert != nExists {
b.Fatalf("expected to observe %d, got %d", numObjectsToInsert, nExists)
}

// Delete all objects to time the cost for deletion tracking.
txn = db.WriteTxn(table)
table.DeleteAll(txn)
txn.Commit()

// Refresh to observe the deletions.
<-iter.Watch(db.ReadTxn())
for change := range iter.Changes() {
// Watch channel should be closed now.
<-watch

// Observe the deletions.
changes, watch = iter.Next(db.ReadTxn())
for change := range changes {
if change.Deleted {
nDeleted++
nExists--
} else {
b.Fatalf("expected deleted for %v", change)
}
}
require.EqualValues(b, numObjectsToInsert, nDeleted)
require.EqualValues(b, 0, nExists)
iter.Close()
if numObjectsToInsert != nDeleted {
b.Fatalf("expected to see %d deleted, got %d", numObjectsToInsert, nDeleted)
}
}
b.StopTimer()
eventuallyGraveyardIsEmpty(b, db)
Expand Down
112 changes: 80 additions & 32 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"log/slog"
"os"
"runtime"
"slices"
"testing"
"time"
Expand Down Expand Up @@ -307,12 +308,12 @@ func TestDB_Changes(t *testing.T) {
require.NoError(t, err, "failed to create ChangeIterator")
iter2, err := table.Changes(wtxn)
require.NoError(t, err, "failed to create ChangeIterator")
wtxn.Commit()
txn0 := wtxn.Commit()

assert.EqualValues(t, 2, expvarInt(metrics.DeleteTrackerCountVar.Get("test")), "DeleteTrackerCount")

// The initial watch channel is closed, so users can either iterate first or watch first.
<-iter.Watch(db.ReadTxn())
changes, watch := iter.Next(db.ReadTxn())

// Delete 2/3 objects
{
Expand Down Expand Up @@ -354,7 +355,9 @@ func TestDB_Changes(t *testing.T) {
nDeleted := 0

// Observe the objects that existed when the tracker was created.
for change := range iter.Changes() {
<-watch
changes, watch = iter.Next(txn0)
for change := range changes {
if change.Deleted {
nDeleted++
} else {
Expand All @@ -365,16 +368,33 @@ func TestDB_Changes(t *testing.T) {
assert.Equal(t, 3, nExist)

// Wait for the new changes.
<-iter.Watch(txn)
<-watch

changes, watch = iter.Next(txn)

// Consume one change, leaving a partially consumed sequence.
for change := range changes {
if change.Deleted {
nDeleted++
nExist--
} else {
nExist++
}
break
}

for change := range iter.Changes() {
// The iterator can be refreshed with new snapshot without having consumed
// the previous sequence fully. No changes are missed.
changes, watch = iter.Next(db.ReadTxn())
for change := range changes {
if change.Deleted {
nDeleted++
nExist--
} else {
nExist++
}
}

assert.Equal(t, 2, nDeleted)
assert.Equal(t, 1, nExist)

Expand All @@ -386,27 +406,24 @@ func TestDB_Changes(t *testing.T) {
nExist = 0
nDeleted = 0

for change := range iter2.Changes() {
changes, watch = iter2.Next(txn)
for change := range changes {
if change.Deleted {
nDeleted++
} else {
nExist++
}
}
<-iter2.Watch(txn)

for change := range iter2.Changes() {
if change.Deleted {
nDeleted++
nExist--
} else {
nExist++
}
}

assert.Equal(t, 1, nExist)
assert.Equal(t, 2, nDeleted)

// Refreshing with the same snapshot yields no new changes.
changes, watch = iter2.Next(txn)
for change := range changes {
t.Fatalf("unexpected change: %v", change)
}

// Graveyard will now be GCd.
eventuallyGraveyardIsEmpty(t, db)

Expand All @@ -422,43 +439,48 @@ func TestDB_Changes(t *testing.T) {
wtxn.Commit()
}

<-watch

txn = db.ReadTxn()
<-iter.Watch(txn)
<-iter2.Watch(txn)
changes, watch = iter.Next(txn)
changes1 := Collect(changes)
changes, _ = iter2.Next(txn)
changes2 := Collect(changes)

changes := Collect(iter.Changes())
changes2 := Collect(iter2.Changes())
assert.Equal(t, len(changes), len(changes2), "expected same number of changes from both iterators")
assert.Equal(t, len(changes1), len(changes2),
"expected same number of changes from both iterators")

if assert.Len(t, changes, 1, "expected one change") {
change := changes[0]
if assert.Len(t, changes1, 1, "expected one change") {
change := changes1[0]
change2 := changes2[0]
assert.EqualValues(t, 88, change.Object.ID)
assert.EqualValues(t, 88, change2.Object.ID)
assert.False(t, change.Deleted)
assert.False(t, change2.Deleted)
}

// After closing the first iterator, deletes are still tracked for second one.
// After dropping the first iterator, deletes are still tracked for second one.
// Delete the remaining objects.
iter.Close()

iter = nil
{
txn := db.WriteTxn(table)
require.NoError(t, table.DeleteAll(txn), "DeleteAll failed")
txn.Commit()
}

require.False(t, db.graveyardIsEmpty())

assert.EqualValues(t, 0, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount")
assert.EqualValues(t, 2, expvarInt(metrics.GraveyardObjectCountVar.Get("test")), "GraveyardObjectCount")

// Consume the deletions using the second iterator.
txn = db.ReadTxn()
<-iter2.Watch(txn)

<-watch
changes, watch = iter2.Next(txn)

count := 0
for change := range iter2.Changes() {
for change := range changes {
count++
assert.True(t, change.Deleted, "expected object %d to be deleted", change.Object.ID)
}
Expand All @@ -469,8 +491,8 @@ func TestDB_Changes(t *testing.T) {
assert.EqualValues(t, 0, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount")
assert.EqualValues(t, 0, expvarInt(metrics.GraveyardObjectCountVar.Get("test")), "GraveyardObjectCount")

// After closing the second iterator the deletions no longer go into graveyard.
iter2.Close()
// After dropping the second iterator the deletions no longer go into graveyard.
iter2 = nil
{
txn := db.WriteTxn(table)
_, _, err := table.Insert(txn, testObject{ID: 78, Tags: part.NewSet("world")})
Expand All @@ -480,10 +502,33 @@ func TestDB_Changes(t *testing.T) {
require.NoError(t, table.DeleteAll(txn), "DeleteAll failed")
txn.Commit()
}
require.True(t, db.graveyardIsEmpty())
// Eventually GC drops the second iterator and the delete tracker is closed.
eventuallyGraveyardIsEmpty(t, db)

assert.EqualValues(t, 0, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount")
assert.EqualValues(t, 0, expvarInt(metrics.GraveyardObjectCountVar.Get("test")), "GraveyardObjectCount")

// Create another iterator and test observing changes using a WriteTxn
// that is mutating the table.
wtxn = db.WriteTxn(table)
iter3, err := table.Changes(wtxn)
require.NoError(t, err, "failed to create ChangeIterator")
_, _, err = table.Insert(wtxn, testObject{ID: 1})
require.NoError(t, err, "Insert failed")
wtxn.Commit()

wtxn = db.WriteTxn(table)
_, _, err = table.Insert(wtxn, testObject{ID: 2})
require.NoError(t, err, "Insert failed")
changes, _ = iter3.Next(wtxn)
// We don't observe the insert of ID 2
count = 0
for change := range changes {
require.EqualValues(t, 1, change.Object.ID)
count++
}
require.Equal(t, 1, count)
wtxn.Abort()
}

func TestDB_Observable(t *testing.T) {
Expand Down Expand Up @@ -1041,7 +1086,10 @@ func Test_nonUniqueKey(t *testing.T) {

func eventuallyGraveyardIsEmpty(t testing.TB, db *DB) {
require.Eventually(t,
db.graveyardIsEmpty,
func() bool {
runtime.GC() // force changeIterator finalizers
return db.graveyardIsEmpty()
},
5*time.Second,
100*time.Millisecond,
"graveyard not garbage collected")
Expand Down
7 changes: 3 additions & 4 deletions deletetracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package statedb

import (
"runtime"
"sync/atomic"

"github.com/cilium/statedb/index"
Expand Down Expand Up @@ -36,8 +35,9 @@ func (dt *deleteTracker[Obj]) getRevision() uint64 {
// Deleted returns an iterator for deleted objects in this table starting from
// 'minRevision'. The deleted objects are not garbage-collected unless 'Mark' is
// called!
func (dt *deleteTracker[Obj]) deleted(txn ReadTxn, minRevision Revision) Iterator[Obj] {
indexTxn := txn.getTxn().mustIndexReadTxn(dt.table, GraveyardRevisionIndexPos)
func (dt *deleteTracker[Obj]) deleted(txn *txn, minRevision Revision) Iterator[Obj] {
indexEntry := txn.root[dt.table.tablePos()].indexes[GraveyardRevisionIndexPos]
indexTxn := indexReadTxn{indexEntry.tree, indexEntry.unique}
iter := indexTxn.LowerBound(index.Uint64(minRevision))
return &iterator[Obj]{iter}
}
Expand All @@ -57,7 +57,6 @@ func (dt *deleteTracker[Obj]) close() {
if dt.db == nil {
return
}
runtime.SetFinalizer(dt, nil)

// Remove the delete tracker from the table.
txn := dt.db.WriteTxn(dt.table).getTxn()
Expand Down
6 changes: 3 additions & 3 deletions derive.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ func (d derive[In, Out]) loop(ctx context.Context, _ cell.Health) error {
if err != nil {
return err
}
defer iter.Close()
for {
wtxn := d.DB.WriteTxn(out)
for change := range iter.Changes() {
changes, watch := iter.Next(wtxn)
for change := range changes {
outObj, result := d.transform(change.Object, change.Deleted)
switch result {
case DeriveInsert:
Expand All @@ -101,7 +101,7 @@ func (d derive[In, Out]) loop(ctx context.Context, _ cell.Health) error {
wtxn.Commit()

select {
case <-iter.Watch(d.DB.ReadTxn()):
case <-watch:
case <-ctx.Done():
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ func trackerWorker(i int, stop <-chan struct{}) {
if err != nil {
panic(err)
}
defer iter.Close()

// Keep track of what state the changes lead us to in order to validate it.
state := map[string]*statedb.Change[fuzzObj]{}
Expand All @@ -381,7 +380,9 @@ func trackerWorker(i int, stop <-chan struct{}) {
var prevRev statedb.Revision
for {
newChanges := false
for change, rev := range iter.Changes() {
txn = fuzzDB.ReadTxn()
changes, watch := iter.Next(txn)
for change, rev := range changes {
newChanges = true
log.log("%d: %v", rev, change)

Expand Down Expand Up @@ -435,9 +436,8 @@ func trackerWorker(i int, stop <-chan struct{}) {
}
}

txn = fuzzDB.ReadTxn()
select {
case <-iter.Watch(txn):
case <-watch:
case <-stop:
log.log("final object count %d", len(state))
return
Expand Down
Loading

0 comments on commit a201670

Please sign in to comment.