Skip to content

Commit

Permalink
reconciler: Add StatusSet and optimize for multiple reconcilers
Browse files Browse the repository at this point in the history
In order to efficiently support multiple reconcilers per object we
need to make the reconciler aware of whether the object's data changed
versus whether the reconciliation status changed. The current implementation
only checked whether the object has a new revision when committing the
reconciliation status, but in the presence of multiple reconcilers per object
we need to actually check if the object was asked to be re-reconciled.

This commit adds the notion of "pending id" to 'Status' which allows the
reconciler to compare the id of the object it reconciled with the id of the
current object in the database and update it if the ids still match, regardless
of the revision.

To make it easier to implement reconcilers for an object where the set of
reconcilers is unknown beforehand, implement 'StatusSet', a named set of statuses.

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Jul 3, 2024
1 parent 10fcaf7 commit 51dcd3c
Show file tree
Hide file tree
Showing 5 changed files with 474 additions and 39 deletions.
33 changes: 27 additions & 6 deletions reconciler/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package reconciler

import (
"context"
"errors"
"time"

"github.com/cilium/hive/cell"
Expand Down Expand Up @@ -42,6 +43,7 @@ type opResult struct {
original any // the original object
rev statedb.Revision // revision of the object
err error
id uint64 // the "pending" identifier
}

func (r *reconciler[Obj]) incremental(ctx context.Context, txn statedb.ReadTxn, changes statedb.ChangeIterator[Obj]) []error {
Expand Down Expand Up @@ -167,10 +169,11 @@ func (round *incrementalRound[Obj]) batch(changes statedb.ChangeIterator[Obj]) {
)

for _, entry := range updateBatch {
status := round.config.GetObjectStatus(entry.Object)
if entry.Result == nil {
round.retries.Clear(entry.Object)
}
round.results[entry.Object] = opResult{rev: entry.Revision, err: entry.Result, original: entry.original}
round.results[entry.Object] = opResult{rev: entry.Revision, id: status.id, err: entry.Result, original: entry.original}
}
}
}
Expand Down Expand Up @@ -208,7 +211,8 @@ func (round *incrementalRound[Obj]) processSingle(obj Obj, rev statedb.Revision,
obj = round.config.CloneObject(obj)
op = OpUpdate
err = round.config.Operations.Update(round.ctx, round.txn, obj)
round.results[obj] = opResult{original: orig, rev: rev, err: err}
status := round.config.GetObjectStatus(obj)
round.results[obj] = opResult{original: orig, id: status.id, rev: rev, err: err}
}
round.metrics.ReconciliationDuration(round.moduleID, op, time.Since(start))

Expand All @@ -229,17 +233,34 @@ func (round *incrementalRound[Obj]) commitStatus() (numErrors int) {
// Commit status for updated objects.
for obj, result := range round.results {
// Update the object if it is unchanged. It may happen that the object has
// been updated in the meanwhile, in which case we ignore the status as the
// update will be picked up by next reconciliation round.
// been updated in the meanwhile, in which case we skip updating the status
// and reprocess the object on the next round.

var status Status
if result.err == nil {
status = StatusDone()
} else {
status = StatusError(result.err)
numErrors++
}
_, _, err := round.table.CompareAndSwap(wtxn, result.rev,
round.config.SetObjectStatus(obj, status))

current, exists, err := round.table.CompareAndSwap(wtxn, result.rev, round.config.SetObjectStatus(obj, status))
if errors.Is(err, statedb.ErrRevisionNotEqual) && exists {
// The object had changed. Check if the pending status still carries the same
// identifier and if so update the object. This is an optimization for supporting
// multiple reconcilers per object to avoid repeating work when only the
// reconciliation status had changed.
//
// The limitation of this approach is that we cannot support the reconciler
// modifying the object during reconciliation as the following will forget
// the changes.
currentStatus := round.config.GetObjectStatus(current)
if currentStatus.Kind == StatusKindPending && currentStatus.id == result.id {
current = round.config.CloneObject(current)
current = round.config.SetObjectStatus(current, status)
round.table.Insert(wtxn, current)
}
}

if result.err != nil && err == nil {
// Reconciliation of the object had failed and the status was updated
Expand Down
187 changes: 187 additions & 0 deletions reconciler/multi_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package reconciler_test

import (
"context"
"errors"
"sync/atomic"
"testing"
"time"

"github.com/cilium/hive"
"github.com/cilium/hive/cell"
"github.com/cilium/hive/job"
"github.com/cilium/statedb"
"github.com/cilium/statedb/index"
"github.com/cilium/statedb/reconciler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type multiStatusObject struct {
ID uint64
Statuses reconciler.StatusSet
}

func (m *multiStatusObject) Clone() *multiStatusObject {
m2 := *m
return &m2
}

var multiStatusIndex = statedb.Index[*multiStatusObject, uint64]{
Name: "id",
FromObject: func(t *multiStatusObject) index.KeySet {
return index.NewKeySet(index.Uint64(t.ID))
},
FromKey: index.Uint64,
Unique: true,
}

type multiMockOps struct {
numUpdates int
faulty atomic.Bool
}

// Delete implements reconciler.Operations.
func (m *multiMockOps) Delete(context.Context, statedb.ReadTxn, *multiStatusObject) error {
return nil
}

// Prune implements reconciler.Operations.
func (m *multiMockOps) Prune(context.Context, statedb.ReadTxn, statedb.Iterator[*multiStatusObject]) error {
return nil
}

// Update implements reconciler.Operations.
func (m *multiMockOps) Update(ctx context.Context, txn statedb.ReadTxn, obj *multiStatusObject) error {
m.numUpdates++
if m.faulty.Load() {
return errors.New("fail")
}
return nil
}

var _ reconciler.Operations[*multiStatusObject] = &multiMockOps{}

// TestMultipleReconcilers tests use of multiple reconcilers against
// a single object.
func TestMultipleReconcilers(t *testing.T) {
table, err := statedb.NewTable("objects", multiStatusIndex)
require.NoError(t, err, "NewTable")

var ops1, ops2 multiMockOps
var db *statedb.DB

hive := hive.New(
statedb.Cell,
job.Cell,
cell.Provide(
cell.NewSimpleHealth,
reconciler.NewExpVarMetrics,
),
cell.Invoke(func(db_ *statedb.DB) error {
db = db_
return db.RegisterTable(table)
}),

cell.Module("test1", "First reconciler",
cell.Invoke(func(params reconciler.Params) error {
_, err := reconciler.Register(
params,
table,
(*multiStatusObject).Clone,
func(obj *multiStatusObject, s reconciler.Status) *multiStatusObject {
obj.Statuses = obj.Statuses.Set("test1", s)
return obj
},
func(obj *multiStatusObject) reconciler.Status {
return obj.Statuses.Get("test1")
},
&ops1,
nil,
reconciler.WithRetry(time.Hour, time.Hour),
)
return err
}),
),

cell.Module("test2", "Second reconciler",
cell.Invoke(func(params reconciler.Params) error {
_, err := reconciler.Register(
params,
table,
(*multiStatusObject).Clone,
func(obj *multiStatusObject, s reconciler.Status) *multiStatusObject {
obj.Statuses = obj.Statuses.Set("test2", s)
return obj
},
func(obj *multiStatusObject) reconciler.Status {
return obj.Statuses.Get("test2")
},
&ops2,
nil,
reconciler.WithRetry(time.Hour, time.Hour),
)
return err
}),
),
)

require.NoError(t, hive.Start(context.TODO()), "Start")

wtxn := db.WriteTxn(table)
table.Insert(wtxn, &multiStatusObject{
ID: 1,
Statuses: reconciler.NewStatusSet(),
})
wtxn.Commit()

var obj1 *multiStatusObject
for {
obj, _, watch, found := table.GetWatch(db.ReadTxn(), multiStatusIndex.Query(1))
if found &&
obj.Statuses.Get("test1").Kind == reconciler.StatusKindDone &&
obj.Statuses.Get("test2").Kind == reconciler.StatusKindDone {

// Check that both reconcilers performed the update only once.
assert.Equal(t, 1, ops1.numUpdates)
assert.Equal(t, 1, ops2.numUpdates)
assert.Regexp(t, "^Done: test[12] test[12] \\(.* ago\\)", obj.Statuses.String())

obj1 = obj
break
}
<-watch
}

// Make the second reconciler faulty.
ops2.faulty.Store(true)

// Mark the object pending again. Reuse the StatusSet.
wtxn = db.WriteTxn(table)
obj1 = obj1.Clone()
obj1.Statuses = obj1.Statuses.Pending()
assert.Regexp(t, "^Pending: test[12] test[12] \\(.* ago\\)", obj1.Statuses.String())
table.Insert(wtxn, obj1)
wtxn.Commit()

// Wait for it to reconcile.
for {
obj, _, watch, found := table.GetWatch(db.ReadTxn(), multiStatusIndex.Query(1))
if found &&
obj.Statuses.Get("test1").Kind == reconciler.StatusKindDone &&
obj.Statuses.Get("test2").Kind == reconciler.StatusKindError {

assert.Equal(t, 2, ops1.numUpdates)
assert.Equal(t, 2, ops2.numUpdates)
assert.Regexp(t, "^Errored: test2 \\(fail\\), Done: test1 \\(.* ago\\)", obj.Statuses.String())

break
}
<-watch
}

require.NoError(t, hive.Stop(context.TODO()), "Stop")
}
27 changes: 0 additions & 27 deletions reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,30 +660,3 @@ func (h testHelper) waitForReconciliation() {
err := reconciler.WaitForReconciliation(context.TODO(), h.db, h.tbl, statusIndex)
require.NoError(h.t, err, "expected WaitForReconciliation to succeed")
}

func TestStatusString(t *testing.T) {
now := time.Now()

s := reconciler.Status{
Kind: reconciler.StatusKindPending,
UpdatedAt: now,
Error: "",
}
assert.Regexp(t, `Pending \([0-9]+\.[0-9]+m?s ago\)`, s.String())
s.UpdatedAt = now.Add(-time.Hour)
assert.Regexp(t, `Pending \([0-9]+\.[0-9]+h ago\)`, s.String())

s = reconciler.Status{
Kind: reconciler.StatusKindDone,
UpdatedAt: now,
Error: "",
}
assert.Regexp(t, `Done \([0-9]+\.[0-9]+m?s ago\)`, s.String())

s = reconciler.Status{
Kind: reconciler.StatusKindError,
UpdatedAt: now,
Error: "hey I'm an error",
}
assert.Regexp(t, `Error: hey I'm an error \([0-9]+\.[0-9]+m?s ago\)`, s.String())
}
75 changes: 75 additions & 0 deletions reconciler/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package reconciler

import (
"encoding/json"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestStatusString(t *testing.T) {
now := time.Now()

s := Status{
Kind: StatusKindPending,
UpdatedAt: now,
Error: "",
}
assert.Regexp(t, `Pending \([0-9]+\.[0-9]+m?s ago\)`, s.String())
s.UpdatedAt = now.Add(-time.Hour)
assert.Regexp(t, `Pending \([0-9]+\.[0-9]+h ago\)`, s.String())

s = Status{
Kind: StatusKindDone,
UpdatedAt: now,
Error: "",
}
assert.Regexp(t, `Done \([0-9]+\.[0-9]+m?s ago\)`, s.String())

s = Status{
Kind: StatusKindError,
UpdatedAt: now,
Error: "hey I'm an error",
}
assert.Regexp(t, `Error: hey I'm an error \([0-9]+\.[0-9]+m?s ago\)`, s.String())
}

func TestStatusSet(t *testing.T) {
assertJSONRoundtrip := func(s StatusSet) {
data, err := json.Marshal(s)
assert.NoError(t, err, "Marshal")
var s2 StatusSet
err = json.Unmarshal(data, &s2)
assert.NoError(t, err, "Unmarshal")
assert.Equal(t, s.String(), s2.String())
}

set := NewStatusSet()
assert.Equal(t, "Pending", set.String())
assertJSONRoundtrip(set)

s := set.Get("foo")
assert.Equal(t, s.Kind, StatusKindPending)
assert.NotZero(t, s.id)

set = set.Set("foo", StatusDone())
set = set.Set("bar", StatusError(errors.New("fail")))
assertJSONRoundtrip(set)

assert.Equal(t, set.Get("foo").Kind, StatusKindDone)
assert.Equal(t, set.Get("bar").Kind, StatusKindError)
assert.Regexp(t, "^Errored: bar \\(fail\\), Done: foo \\(.* ago\\)", set.String())

set = set.Pending()
assert.NotZero(t, set.Get("foo").id)
assert.Equal(t, set.Get("foo").Kind, StatusKindPending)
assert.Equal(t, set.Get("bar").Kind, StatusKindPending)
assert.Equal(t, set.Get("baz").Kind, StatusKindPending)
assert.Regexp(t, "^Pending: bar foo \\(.* ago\\)", set.String())
assertJSONRoundtrip(set)
}
Loading

0 comments on commit 51dcd3c

Please sign in to comment.