Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename First to Get and Get to List/ListWatch #26

Merged
merged 1 commit into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ func example() {
myObjects.Insert(wtxn, &MyObject{3, "c"})

// Modify an object
if obj, _, found := myObjects.First(wtxn, IDIndex.Query(1)); found {
if obj, _, found := myObjects.Get(wtxn, IDIndex.Query(1)); found {
objCopy := *obj
objCopy.Foo = "d"
myObjects.Insert(wtxn, &objCopy)
}

// Delete an object
if obj, _, found := myObjects.First(wtxn, IDIndex.Query(2)); found {
if obj, _, found := myObjects.Get(wtxn, IDIndex.Query(2)); found {
myObjects.Delete(wtxn, obj)
}

Expand All @@ -111,7 +111,7 @@ func example() {
// Query the objects with a snapshot of the database.
txn := db.ReadTxn()

if obj, _, found := myObjects.First(wtxn, IDIndex.Query(1)); found {
if obj, _, found := myObjects.Get(wtxn, IDIndex.Query(1)); found {
...
}

Expand Down Expand Up @@ -314,33 +314,39 @@ var (
found bool
watch <-chan struct{}
)
// First returns the first matching object in the query.
obj, revision, found = myObjects.First(txn, IDIndex.Query(42))
// Get returns the first matching object in the query.
obj, revision, found = myObjects.Get(txn, IDIndex.Query(42))
if found {
// obj points to the object we inserted earlier.
// revision is the "table revision" for the object. Revisions are
// incremented for a table on every insertion or deletion.
}
// FirstWatch is the same as First, but also gives us a watch
// GetWatch is the same as Get, but also gives us a watch
// channel that we can use to wait on the object to appear or to
// change.
obj, revision, watch, found = myObjects.FirstWatch(txn, IDIndex.Query(42))
obj, revision, watch, found = myObjects.GetWatch(txn, IDIndex.Query(42))
<-watch // closes when object with ID '42' is inserted or deleted
```

### Iterating

`Get` can be used to iterate over all objects that match the query.
`List` can be used to iterate over all objects that match the query.

```go
var iter statedb.Iterator[*MyObject]
// Get returns all matching objects as an iterator. The iterator is lazy
// List returns all matching objects as an iterator. The iterator is lazy
// and one can stop reading at any time without worrying about the rest.
iter, watch = myObjects.Get(txn, TagsIndex.Query("hello"))
iter := myObjects.List(txn, TagsIndex.Query("hello"))
for obj, revision, ok := iter.Next(); ok; obj, revision, ok = iter.Next() {
// ...
}
<-watch // closes when an object with tag "hello" is inserted or deleted

// ListWatch is like List, but also returns a watch channel.
iter, watch := myObjects.ListWatch(txn, TagsIndex.Query("hello"))
for obj, revision, ok := iter.Next(); ok; obj, revision, ok = iter.Next() { ... }

// closes when an object with tag "hello" is inserted or deleted
<-watch
```

`Prefix` can be used to iterate over objects that match a given prefix.
Expand Down Expand Up @@ -461,7 +467,7 @@ wtxn := db.WriteTxn(myObjects)

// Now that we have the table written we can retrieve an object and none will
// be able to modify it until we commit.
obj, revision, found := myObjects.First(wtxn, IDIndex.Query(42))
obj, revision, found := myObjects.Get(wtxn, IDIndex.Query(42))
if !found { panic("it should be there, I swear!") }

// We cannot just straight up modify 'obj' since someone might be reading it.
Expand Down Expand Up @@ -498,7 +504,7 @@ txn := db.ReadTxn()

// Look up the object we want to update and perform some slow calculation
// to produce the desired new object.
obj, revision, found := myObjects.First(txn, IDIndex.Query(42))
obj, revision, found := myObjects.Get(txn, IDIndex.Query(42))
obj = veryExpensiveCalculation(obj)

// Now that we're ready to insert we can grab a WriteTxn.
Expand Down
6 changes: 3 additions & 3 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func BenchmarkDB_RandomLookup(b *testing.B) {
for j := 0; j < b.N; j++ {
txn := db.ReadTxn()
for _, q := range queries {
_, _, ok := table.First(txn, q)
_, _, ok := table.Get(txn, q)
if !ok {
b.Fatal("object not found")
}
Expand All @@ -319,7 +319,7 @@ func BenchmarkDB_SequentialLookup(b *testing.B) {
txn := db.ReadTxn()
for n := 0; n < b.N; n++ {
for _, q := range queries {
_, _, ok := table.First(txn, q)
_, _, ok := table.Get(txn, q)
if !ok {
b.Fatalf("Object not found")
}
Expand Down Expand Up @@ -374,7 +374,7 @@ func BenchmarkDB_FullIteration_Get(b *testing.B) {
txn := db.ReadTxn()
for n := 0; n < b.N; n++ {
for _, q := range queries {
_, _, ok := table.First(txn, q)
_, _, ok := table.Get(txn, q)
if !ok {
b.Fatalf("Object not found")
}
Expand Down
66 changes: 33 additions & 33 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ func TestDB_Revision(t *testing.T) {
require.Equal(t, writeRevision, readRevision, "committed transaction changed revision")
}

func TestDB_GetFirstLast(t *testing.T) {
func TestDB_GetList(t *testing.T) {
t.Parallel()

db, table, _ := newTestDB(t, tagsIndex)
Expand All @@ -597,49 +597,49 @@ func TestDB_GetFirstLast(t *testing.T) {
require.NoError(t, err)
}
// Check that we can query the not-yet-committed write transaction.
obj, rev, ok := table.First(txn, idIndex.Query(1))
require.True(t, ok, "expected First(1) to return result")
obj, rev, ok := table.Get(txn, idIndex.Query(1))
require.True(t, ok, "expected Get(1) to return result")
require.NotZero(t, rev, "expected non-zero revision")
require.EqualValues(t, obj.ID, 1, "expected first obj.ID to equal 1")
txn.Commit()
}

txn := db.ReadTxn()

// Test Get against the ID index.
iter, _ := table.Get(txn, idIndex.Query(0))
// Test List against the ID index.
iter := table.List(txn, idIndex.Query(0))
items := Collect(iter)
require.Len(t, items, 0, "expected Get(0) to not return results")

iter, _ = table.Get(txn, idIndex.Query(1))
iter = table.List(txn, idIndex.Query(1))
items = Collect(iter)
require.Len(t, items, 1, "expected Get(1) to return result")
require.EqualValues(t, items[0].ID, 1, "expected items[0].ID to equal 1")

iter, getWatch := table.Get(txn, idIndex.Query(2))
iter, listWatch := table.ListWatch(txn, idIndex.Query(2))
items = Collect(iter)
require.Len(t, items, 1, "expected Get(2) to return result")
require.EqualValues(t, items[0].ID, 2, "expected items[0].ID to equal 2")

// Test First/FirstWatch against the ID index.
_, _, ok := table.First(txn, idIndex.Query(0))
require.False(t, ok, "expected First(0) to not return result")
// Test Get/GetWatch against the ID index.
_, _, ok := table.Get(txn, idIndex.Query(0))
require.False(t, ok, "expected Get(0) to not return result")

obj, rev, ok := table.First(txn, idIndex.Query(1))
require.True(t, ok, "expected First(1) to return result")
obj, rev, ok := table.Get(txn, idIndex.Query(1))
require.True(t, ok, "expected Get(1) to return result")
require.NotZero(t, rev, "expected non-zero revision")
require.EqualValues(t, obj.ID, 1, "expected first obj.ID to equal 1")

obj, rev, firstWatch, ok := table.FirstWatch(txn, idIndex.Query(2))
require.True(t, ok, "expected FirstWatch(2) to return result")
obj, rev, getWatch, ok := table.GetWatch(txn, idIndex.Query(2))
require.True(t, ok, "expected GetWatch(2) to return result")
require.NotZero(t, rev, "expected non-zero revision")
require.EqualValues(t, obj.ID, 2, "expected obj.ID to equal 2")

select {
case <-firstWatch:
t.Fatalf("FirstWatch channel closed before changes")
case <-getWatch:
t.Fatalf("Get channel closed before changes")
t.Fatalf("GetWatch channel closed before changes")
case <-listWatch:
t.Fatalf("List channel closed before changes")
default:
}

Expand All @@ -651,28 +651,28 @@ func TestDB_GetFirstLast(t *testing.T) {
wtxn.Commit()

select {
case <-firstWatch:
case <-getWatch:
case <-time.After(watchCloseTimeout):
t.Fatalf("FirstWatch channel not closed after change")
t.Fatalf("GetWatch channel not closed after change")
}
select {
case <-getWatch:
case <-listWatch:
case <-time.After(watchCloseTimeout):
t.Fatalf("Get channel not closed after change")
t.Fatalf("List channel not closed after change")
}

// Since we modified the database, grab a fresh read transaction.
txn = db.ReadTxn()

// Test First and Last against the tags multi-index which will
// Test Get and Last against the tags multi-index which will
// return multiple results.
obj, rev, _, ok = table.FirstWatch(txn, tagsIndex.Query("even"))
require.True(t, ok, "expected First(even) to return result")
obj, rev, _, ok = table.GetWatch(txn, tagsIndex.Query("even"))
require.True(t, ok, "expected Get(even) to return result")
require.NotZero(t, rev, "expected non-zero revision")
require.ElementsMatch(t, obj.Tags.Slice(), []string{"even", "modified"})
require.EqualValues(t, 2, obj.ID)

iter, _ = table.Get(txn, tagsIndex.Query("odd"))
iter = table.List(txn, tagsIndex.Query("odd"))
items = Collect(iter)
require.Len(t, items, 5, "expected Get(odd) to return 5 items")
for i, item := range items {
Expand All @@ -696,8 +696,8 @@ func TestDB_CommitAbort(t *testing.T) {
assert.Greater(t, expvarFloat(metrics.WriteTxnAcquisitionVar.Get("test-handle/test")), 0.0, "WriteTxnAcquisition")
assert.Greater(t, expvarFloat(metrics.WriteTxnDurationVar.Get("test-handle/test")), 0.0, "WriteTxnDuration")

obj, rev, ok := table.First(db.ReadTxn(), idIndex.Query(123))
require.True(t, ok, "expected First(1) to return result")
obj, rev, ok := table.Get(db.ReadTxn(), idIndex.Query(123))
require.True(t, ok, "expected Get(1) to return result")
require.NotZero(t, rev, "expected non-zero revision")
require.EqualValues(t, obj.ID, 123, "expected obj.ID to equal 123")
require.Zero(t, obj.Tags.Len(), "expected no tags")
Expand All @@ -715,7 +715,7 @@ func TestDB_CommitAbort(t *testing.T) {

// Check that insert after commit and insert after abort do not change the
// table.
obj, newRev, ok := table.First(db.ReadTxn(), idIndex.Query(123))
obj, newRev, ok := table.Get(db.ReadTxn(), idIndex.Query(123))
require.True(t, ok, "expected object to exist")
require.Equal(t, rev, newRev, "expected unchanged revision")
require.EqualValues(t, obj.ID, 123, "expected obj.ID to equal 123")
Expand Down Expand Up @@ -745,7 +745,7 @@ func TestDB_CompareAndSwap_CompareAndDelete(t *testing.T) {
table.Insert(wtxn, testObject{ID: 1})
wtxn.Commit()

obj, rev1, ok := table.First(db.ReadTxn(), idIndex.Query(1))
obj, rev1, ok := table.Get(db.ReadTxn(), idIndex.Query(1))
require.True(t, ok)

// Updating an object with matching revision number works
Expand All @@ -757,7 +757,7 @@ func TestDB_CompareAndSwap_CompareAndDelete(t *testing.T) {
require.EqualValues(t, 1, oldObj.ID)
wtxn.Commit()

obj, _, ok = table.First(db.ReadTxn(), idIndex.Query(1))
obj, _, ok = table.Get(db.ReadTxn(), idIndex.Query(1))
require.True(t, ok)
require.Equal(t, 1, obj.Tags.Len())
v, _ := obj.Tags.All().Next()
Expand All @@ -772,7 +772,7 @@ func TestDB_CompareAndSwap_CompareAndDelete(t *testing.T) {
require.EqualValues(t, 1, oldObj.ID)
wtxn.Commit()

obj, _, ok = table.First(db.ReadTxn(), idIndex.Query(1))
obj, _, ok = table.Get(db.ReadTxn(), idIndex.Query(1))
require.True(t, ok)
require.Equal(t, 1, obj.Tags.Len())
v, _ = obj.Tags.All().Next()
Expand All @@ -787,7 +787,7 @@ func TestDB_CompareAndSwap_CompareAndDelete(t *testing.T) {
require.EqualValues(t, 1, oldObj.ID)
wtxn.Commit()

obj, rev2, ok := table.First(db.ReadTxn(), idIndex.Query(1))
obj, rev2, ok := table.Get(db.ReadTxn(), idIndex.Query(1))
require.True(t, ok)
require.Equal(t, 1, obj.Tags.Len())
v, _ = obj.Tags.All().Next()
Expand All @@ -802,7 +802,7 @@ func TestDB_CompareAndSwap_CompareAndDelete(t *testing.T) {
require.EqualValues(t, 1, oldObj.ID)
wtxn.Commit()

_, _, ok = table.First(db.ReadTxn(), idIndex.Query(1))
_, _, ok = table.Get(db.ReadTxn(), idIndex.Query(1))
require.False(t, ok)

// Deleting non-existing object yields not found
Expand Down
4 changes: 2 additions & 2 deletions derive.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type derive[In, Out any] struct {
transform func(obj In, deleted bool) (Out, DeriveResult)
}

func (d derive[In, Out]) loop(ctx context.Context, health cell.Health) error {
func (d derive[In, Out]) loop(ctx context.Context, _ cell.Health) error {
out := d.OutTable
txn := d.DB.WriteTxn(d.InTable)
iter, err := d.InTable.Changes(txn)
Expand All @@ -85,7 +85,7 @@ func (d derive[In, Out]) loop(ctx context.Context, health cell.Health) error {
case DeriveInsert:
_, _, err = out.Insert(wtxn, outObj)
case DeriveUpdate:
_, _, found := out.First(wtxn, out.PrimaryIndexer().QueryFromObject(outObj))
_, _, found := out.Get(wtxn, out.PrimaryIndexer().QueryFromObject(outObj))
if found {
_, _, err = out.Insert(wtxn, outObj)
}
Expand Down
22 changes: 11 additions & 11 deletions fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,9 @@ func allAction(ctx actionContext) {
ctx.log.log("%s: All => %d found", ctx.table.Name(), len(statedb.Collect(iter)))
}

func getAction(ctx actionContext) {
func listAction(ctx actionContext) {
value := mkValue()
iter, _ := ctx.table.Get(ctx.txn, valueIndex.Query(value))
iter := ctx.table.List(ctx.txn, valueIndex.Query(value))
ctx.log.log("%s: Get(%d)", ctx.table.Name(), value)
for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() {
if e, ok2 := ctx.txnLog.latest[tableAndID{ctx.table.Name(), obj.id}]; ok2 {
Expand All @@ -296,25 +296,25 @@ func getAction(ctx actionContext) {
}
}

func firstAction(ctx actionContext) {
func getAction(ctx actionContext) {
id := mkID()
obj, rev, ok := ctx.table.First(ctx.txn, idIndex.Query(id))
obj, rev, ok := ctx.table.Get(ctx.txn, idIndex.Query(id))

if e, ok2 := ctx.txnLog.latest[tableAndID{ctx.table.Name(), id}]; ok2 {
if e.act == actInsert {
if !ok {
panic("First() returned not found, expected last inserted value")
panic("Get() returned not found, expected last inserted value")
}
if e.value != obj.value {
panic("First() did not return the last write")
panic("Get() did not return the last write")
}
} else if e.act == actDelete {
if ok {
panic("First() returned value even though it was deleted")
panic("Get() returned value even though it was deleted")
}
}
}
ctx.log.log("%s: First(%s) => rev=%d, ok=%v", ctx.table.Name(), id, rev, ok)
ctx.log.log("%s: Get(%s) => rev=%d, ok=%v", ctx.table.Name(), id, rev, ok)
}

func lowerboundAction(ctx actionContext) {
Expand Down Expand Up @@ -358,10 +358,10 @@ var actions = []action{
deleteAction, deleteAction, deleteAction,
deleteManyAction, deleteAllAction,

firstAction, firstAction, firstAction, firstAction, firstAction,
firstAction, firstAction, firstAction, firstAction, firstAction,
firstAction, firstAction, firstAction, firstAction, firstAction,
getAction, getAction, getAction, getAction, getAction,
getAction, getAction, getAction, getAction, getAction,
getAction, getAction, getAction, getAction, getAction,
listAction, listAction, listAction, listAction, listAction,
allAction, allAction,
lowerboundAction, lowerboundAction, lowerboundAction,
prefixAction, prefixAction, prefixAction,
Expand Down
2 changes: 1 addition & 1 deletion reconciler/benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func main() {
// Wait for all to be reconciled by waiting for the last added objects to be marked
// reconciled. This only works here since none of the operations fail.
for {
obj, _, watch, ok := testObjects.FirstWatch(db.ReadTxn(), idIndex.Query(id-1))
obj, _, watch, ok := testObjects.GetWatch(db.ReadTxn(), idIndex.Query(id-1))
if ok && obj.status.Kind == reconciler.StatusKindDone {
break
}
Expand Down
2 changes: 1 addition & 1 deletion reconciler/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func registerHTTPServer(
w.WriteHeader(http.StatusOK)

case "DELETE":
memo, _, ok := memos.First(txn, MemoNameIndex.Query(name))
memo, _, ok := memos.Get(txn, MemoNameIndex.Query(name))
if !ok {
w.WriteHeader(http.StatusNotFound)
return
Expand Down
2 changes: 1 addition & 1 deletion reconciler/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (round *incrementalRound[Obj]) processRetries() {
}
round.retries.Pop()

obj, rev, found := round.table.First(round.txn, round.primaryIndexer.QueryFromObject(robj.(Obj)))
obj, rev, found := round.table.Get(round.txn, round.primaryIndexer.QueryFromObject(robj.(Obj)))
if found {
status := round.config.GetObjectStatus(obj)
if status.Kind != StatusKindError {
Expand Down
Loading
Loading