diff --git a/reconciler/metrics.go b/reconciler/metrics.go index f30984a..e53b3b1 100644 --- a/reconciler/metrics.go +++ b/reconciler/metrics.go @@ -24,6 +24,8 @@ const ( ) type ExpVarMetrics struct { + root *expvar.Map + ReconciliationCountVar *expvar.Map ReconciliationDurationVar *expvar.Map ReconciliationTotalErrorsVar *expvar.Map @@ -73,14 +75,22 @@ func NewUnpublishedExpVarMetrics() *ExpVarMetrics { return newExpVarMetrics(false) } +func (m *ExpVarMetrics) Map() *expvar.Map { + return m.root +} + func newExpVarMetrics(publish bool) *ExpVarMetrics { + root := new(expvar.Map).Init() newMap := func(name string) *expvar.Map { if publish { return expvar.NewMap(name) } - return new(expvar.Map).Init() + m := new(expvar.Map).Init() + root.Set(name, m) + return m } return &ExpVarMetrics{ + root: root, ReconciliationCountVar: newMap("reconciliation_count"), ReconciliationDurationVar: newMap("reconciliation_duration"), ReconciliationTotalErrorsVar: newMap("reconciliation_total_errors"), diff --git a/reconciler/multi_test.go b/reconciler/multi_test.go index 3fc1917..0b7bd70 100644 --- a/reconciler/multi_test.go +++ b/reconciler/multi_test.go @@ -83,6 +83,11 @@ func TestMultipleReconcilers(t *testing.T) { cell.Provide( cell.NewSimpleHealth, reconciler.NewExpVarMetrics, + func(r job.Registry, h cell.Health, lc cell.Lifecycle) job.Group { + g := r.NewGroup(h) + lc.Append(g) + return g + }, ), cell.Invoke(func(db_ *statedb.DB) error { db = db_ diff --git a/reconciler/reconciler_test.go b/reconciler/reconciler_test.go deleted file mode 100644 index 038b10c..0000000 --- a/reconciler/reconciler_test.go +++ /dev/null @@ -1,678 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// Copyright Authors of Cilium - -package reconciler_test - -import ( - "context" - "errors" - "expvar" - "fmt" - "iter" - "log/slog" - "slices" - "sort" - "strings" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/goleak" - "golang.org/x/time/rate" - - "github.com/cilium/hive" - "github.com/cilium/hive/cell" - "github.com/cilium/hive/hivetest" - "github.com/cilium/hive/job" - "github.com/cilium/statedb" - "github.com/cilium/statedb/index" - "github.com/cilium/statedb/reconciler" -) - -// Some constants so we don't use mysterious numbers in the test steps. -const ( - ID_1 = uint64(1) - ID_2 = uint64(2) - ID_3 = uint64(3) -) - -func TestReconciler(t *testing.T) { - testReconciler(t, false) -} - -func TestReconciler_Batch(t *testing.T) { - testReconciler(t, true) -} - -func testReconciler(t *testing.T, batchOps bool) { - defer goleak.VerifyNone(t, - goleak.IgnoreCurrent(), - ) - - getInt := func(v expvar.Var) int64 { - if v, ok := v.(*expvar.Int); ok && v != nil { - return v.Value() - } - return -1 - } - - getFloat := func(v expvar.Var) float64 { - if v, ok := v.(*expvar.Float); ok && v != nil { - return v.Value() - } - return -1 - } - - runTest := func(name string, opts []reconciler.Option, run func(testHelper)) { - var ( - ops = &mockOps{} - db *statedb.DB - r reconciler.Reconciler[*testObject] - fakeHealth *cell.SimpleHealth - markInit func() - ) - - expVarMetrics := reconciler.NewUnpublishedExpVarMetrics() - - testObjects, err := statedb.NewTable[*testObject]("test-objects", idIndex, statusIndex) - require.NoError(t, err, "NewTable") - - hive := hive.New( - statedb.Cell, - job.Cell, - - cell.Provide(cell.NewSimpleHealth), - - cell.Module( - "test", - "Test", - - cell.Provide(func() reconciler.Metrics { - return expVarMetrics - }), - - cell.Invoke(func(db_ *statedb.DB) error { - db = db_ - return db.RegisterTable(testObjects) - }), - cell.Provide(func(p reconciler.Params) (reconciler.Reconciler[*testObject], error) { - var bops reconciler.BatchOperations[*testObject] - if batchOps { - bops = ops - } - return reconciler.Register( - p, - testObjects, - (*testObject).Clone, - (*testObject).SetStatus, - (*testObject).GetStatus, - ops, - bops, - append( - []reconciler.Option{ - // Speed things up a bit. - reconciler.WithRetry(5*time.Millisecond, 5*time.Millisecond), - reconciler.WithRoundLimits(1000, rate.NewLimiter(1000.0, 10)), - }, - // Add the override options last. - opts..., - )..., - ) - }), - - cell.Invoke(func(r_ reconciler.Reconciler[*testObject], h *cell.SimpleHealth) { - r = r_ - fakeHealth = h - wtxn := db.WriteTxn(testObjects) - done := testObjects.RegisterInitializer(wtxn, "test") - wtxn.Commit() - markInit = func() { - wtxn := db.WriteTxn(testObjects) - done(wtxn) - wtxn.Commit() - } - }), - ), - ) - - t.Run(name, func(t *testing.T) { - log := hivetest.Logger(t, hivetest.LogLevel(slog.LevelError)) - require.NoError(t, hive.Start(log, context.TODO()), "Start") - t.Cleanup(func() { - assert.NoError(t, hive.Stop(log, context.TODO()), "Stop") - }) - run(testHelper{ - t: t, - db: db, - tbl: testObjects, - ops: ops, - r: r, - health: fakeHealth, - m: expVarMetrics, - markInit: markInit, - }) - - }) - } - - numIterations := 3 - - runTest("incremental", - []reconciler.Option{ - reconciler.WithPruning(0), // Disable - }, - func(h testHelper) { - h.markInitialized() - for i := 0; i < numIterations; i++ { - t.Logf("Iteration %d", i) - - // Insert some test objects and check that they're reconciled - t.Logf("Inserting test objects 1, 2 & 3") - h.insert(ID_1, NonFaulty, reconciler.StatusPending()) - h.expectOp(opUpdate(ID_1)) - h.expectStatus(ID_1, reconciler.StatusKindDone, "") - - h.insert(ID_2, NonFaulty, reconciler.StatusPending()) - h.expectOp(opUpdate(ID_2)) - h.expectStatus(ID_2, reconciler.StatusKindDone, "") - - h.insert(ID_3, NonFaulty, reconciler.StatusPending()) - h.expectOp(opUpdate(ID_3)) - h.expectStatus(ID_3, reconciler.StatusKindDone, "") - - h.expectHealth(cell.StatusOK, "OK, 3 object(s)", "") - h.waitForReconciliation() - - // Set one to be faulty => object will error - t.Log("Setting '1' faulty") - h.insert(ID_1, Faulty, reconciler.StatusPending()) - h.expectOp(opFail(opUpdate(ID_1))) - h.expectStatus(ID_1, reconciler.StatusKindError, "update fail") - h.expectRetried(ID_1) - h.expectHealth(cell.StatusDegraded, "1 error(s)", "update fail") - - // Fix the object => object will reconcile again. - t.Log("Setting '1' non-faulty") - h.insert(ID_1, NonFaulty, reconciler.StatusPending()) - h.expectOp(opUpdate(ID_1)) - h.expectStatus(ID_1, reconciler.StatusKindDone, "") - h.expectHealth(cell.StatusOK, "OK, 3 object(s)", "") - - t.Log("Delete 1 & 2") - h.markForDelete(ID_1) - h.expectOp(opDelete(1)) - h.expectNotFound(ID_1) - - h.markForDelete(ID_2) - h.expectOp(opDelete(2)) - h.expectNotFound(ID_2) - - t.Log("Try to delete '3' with faulty ops") - h.setTargetFaulty(true) - h.markForDelete(ID_3) - h.expectOp(opFail(opDelete(3))) - h.expectHealth(cell.StatusDegraded, "1 error(s)", "delete fail") - - t.Log("Set the target non-faulty to delete '3'") - h.setTargetFaulty(false) - h.expectOp(opDelete(3)) - h.expectHealth(cell.StatusOK, "OK, 0 object(s)", "") - - h.waitForReconciliation() - - assert.Greater(t, getInt(h.m.ReconciliationCountVar.Get("test")), int64(0), "ReconciliationCount") - assert.Greater(t, getFloat(h.m.ReconciliationDurationVar.Get("test/update")), float64(0), "ReconciliationDuration/update") - assert.Greater(t, getFloat(h.m.ReconciliationDurationVar.Get("test/delete")), float64(0), "ReconciliationDuration/delete") - assert.Greater(t, getInt(h.m.ReconciliationTotalErrorsVar.Get("test")), int64(0), "ReconciliationTotalErrors") - assert.Equal(t, getInt(h.m.ReconciliationCurrentErrorsVar.Get("test")), int64(0), "ReconciliationCurrentErrors") - } - }) - - runTest("pruning", nil, func(h testHelper) { - // Without any objects, we should not be able to see a prune, - // even when triggered. - t.Log("Try to prune without objects and uninitialized table") - h.triggerPrune() - h.expectHealth(cell.StatusOK, "OK, 0 object(s)", "") - - // With table not initialized, we should not see the prune even - // when triggered. - t.Log("Try to prune with object and uninitialized table") - h.insert(ID_1, NonFaulty, reconciler.StatusPending()) - h.triggerPrune() - h.expectOp(opUpdate(ID_1)) - h.expectHealth(cell.StatusOK, "OK, 1 object(s)", "") - - // Marking the table initialized prunes immediately. - h.markInitialized() - h.expectOps(opPrune(1), opUpdate(ID_1)) - - h.insert(ID_2, NonFaulty, reconciler.StatusPending()) - h.expectOp(opUpdate(ID_2)) - h.expectHealth(cell.StatusOK, "OK, 2 object(s)", "") - - // Pruning can be now triggered at will. - h.triggerPrune() - h.expectOp(opPrune(2)) - - // Add few objects and wait until incremental reconciliation is done. - t.Log("Insert more objects") - h.insert(ID_1, NonFaulty, reconciler.StatusPending()) - h.insert(ID_2, NonFaulty, reconciler.StatusPending()) - h.insert(ID_3, NonFaulty, reconciler.StatusPending()) - h.expectStatus(ID_1, reconciler.StatusKindDone, "") - h.expectStatus(ID_2, reconciler.StatusKindDone, "") - h.expectStatus(ID_3, reconciler.StatusKindDone, "") - h.expectNumUpdates(ID_1, 1) - h.expectNumUpdates(ID_2, 1) - h.expectNumUpdates(ID_3, 1) - h.expectHealth(cell.StatusOK, "OK, 3 object(s)", "") - - // Pruning with functioning ops. - t.Log("Prune with non-faulty ops") - h.triggerPrune() - h.expectOps(opPrune(3)) - h.expectHealth(cell.StatusOK, "OK, 3 object(s)", "") - - // Make the ops faulty and trigger the pruning. - t.Log("Prune with faulty ops") - h.setTargetFaulty(true) - h.triggerPrune() - h.expectOps(opPrune(3)) - h.expectHealth(cell.StatusDegraded, "1 error(s)", "prune: prune fail") - - // Make the ops healthy again and try pruning again. - t.Log("Prune again with non-faulty ops") - h.setTargetFaulty(false) - h.triggerPrune() - h.expectHealth(cell.StatusOK, "OK, 3 object(s)", "") - - // Cleanup. - h.markForDelete(ID_1) - h.markForDelete(ID_2) - h.markForDelete(ID_3) - h.expectNotFound(ID_1) - h.expectNotFound(ID_2) - h.expectNotFound(ID_3) - h.triggerPrune() - h.expectOps(opDelete(1), opDelete(2), opDelete(3), opPrune(0)) - h.waitForReconciliation() - - // Validate metrics. - assert.Greater(t, getInt(h.m.PruneCountVar.Get("test")), int64(0), "PruneCount") - assert.Greater(t, getFloat(h.m.PruneDurationVar.Get("test")), float64(0), "PruneDuration") - assert.Equal(t, getInt(h.m.PruneCurrentErrorsVar.Get("test")), int64(0), "PruneCurrentErrors") - }) - - runTest("pruning-empty-table", - []reconciler.Option{ - reconciler.WithPruning(100 * time.Millisecond), - }, - func(h testHelper) { - // Mark the table initialized. This should trigger the pruning - // even when there are no objects. - h.markInitialized() - - // Expect to see the initial pruning and then periodic pruning. - h.expectOps(opPrune(0), opPrune(0)) - }) - - runTest("refreshing", - []reconciler.Option{ - reconciler.WithPruning(0), // Disable - reconciler.WithRefreshing(500*time.Millisecond, rate.NewLimiter(100.0, 1)), - }, - func(h testHelper) { - t.Logf("Inserting test object 1") - h.insert(ID_1, NonFaulty, reconciler.StatusPending()) - h.expectOp(opUpdate(ID_1)) - h.expectStatus(ID_1, reconciler.StatusKindDone, "") - - t.Logf("Setting UpdatedAt to be in past to force refresh") - status := reconciler.StatusDone() - status.UpdatedAt = status.UpdatedAt.Add(-2 * time.Minute) - h.insert(ID_1, NonFaulty, status) - - h.expectOps( - opUpdate(ID_1), // Initial insert - opUpdateRefresh(ID_1), // The refresh - ) - - t.Logf("Setting target faulty and forcing refresh") - h.setTargetFaulty(true) - status.UpdatedAt = status.UpdatedAt.Add(-time.Minute) - h.insert(ID_1, NonFaulty, status) - h.expectOp(opFail(opUpdateRefresh(ID_1))) - h.expectStatus(ID_1, reconciler.StatusKindError, "update fail") - h.expectRetried(ID_1) - h.expectHealth(cell.StatusDegraded, "1 error(s)", "update fail") - - t.Logf("Setting target healthy") - h.setTargetFaulty(false) - h.insert(ID_1, NonFaulty, status) - h.expectOp(opUpdateRefresh(ID_1)) - h.expectStatus(ID_1, reconciler.StatusKindDone, "") - h.expectHealth(cell.StatusOK, "OK, 1 object(s)", "") - - }) -} - -type testObject struct { - id uint64 - faulty bool - updates int - status reconciler.Status -} - -var idIndex = statedb.Index[*testObject, uint64]{ - Name: "id", - FromObject: func(t *testObject) index.KeySet { - return index.NewKeySet(index.Uint64(t.id)) - }, - FromKey: index.Uint64, - Unique: true, -} - -var statusIndex = reconciler.NewStatusIndex((*testObject).GetStatus) - -func (t *testObject) GetStatus() reconciler.Status { - return t.status -} - -func (t *testObject) SetStatus(status reconciler.Status) *testObject { - t.status = status - return t -} - -func (t *testObject) Clone() *testObject { - t2 := *t - return &t2 -} - -type opHistory struct { - mu sync.Mutex - history []opHistoryItem -} - -type opHistoryItem = string - -func opUpdate(id uint64) opHistoryItem { - return opHistoryItem(fmt.Sprintf("update(%d)", id)) -} -func opUpdateRefresh(id uint64) opHistoryItem { - return opHistoryItem(fmt.Sprintf("update-refresh(%d)", id)) -} -func opDelete(id uint64) opHistoryItem { - return opHistoryItem(fmt.Sprintf("delete(%d)", id)) -} -func opPrune(numDesiredObjects int) opHistoryItem { - return opHistoryItem(fmt.Sprintf("prune(n=%d)", numDesiredObjects)) -} -func opFail(item opHistoryItem) opHistoryItem { - return item + " fail" -} - -func (o *opHistory) add(item opHistoryItem) { - o.mu.Lock() - o.history = append(o.history, item) - o.mu.Unlock() -} - -func (o *opHistory) latest() opHistoryItem { - o.mu.Lock() - defer o.mu.Unlock() - if len(o.history) > 0 { - return o.history[len(o.history)-1] - } - return "" -} - -func (o *opHistory) take(n int) []opHistoryItem { - o.mu.Lock() - defer o.mu.Unlock() - - out := []opHistoryItem{} - for n > 0 { - idx := len(o.history) - n - if idx >= 0 { - out = append(out, o.history[idx]) - } - n-- - } - return out -} - -type intMap struct { - sync.Map -} - -func (m *intMap) incr(key uint64) { - if n, ok := m.Load(key); ok { - m.Store(key, n.(int)+1) - } else { - m.Store(key, 1) - } -} - -func (m *intMap) get(key uint64) int { - if n, ok := m.Load(key); ok { - return n.(int) - } - return 0 -} - -type mockOps struct { - history opHistory - faulty atomic.Bool - updates intMap -} - -// DeleteBatch implements recogciler.BatchOperations. -func (mt *mockOps) DeleteBatch(ctx context.Context, txn statedb.ReadTxn, batch []reconciler.BatchEntry[*testObject]) { - for i := range batch { - batch[i].Result = mt.Delete(ctx, txn, batch[i].Object) - } -} - -// UpdateBatch implements reconciler.BatchOperations. -func (mt *mockOps) UpdateBatch(ctx context.Context, txn statedb.ReadTxn, batch []reconciler.BatchEntry[*testObject]) { - for i := range batch { - batch[i].Result = mt.Update(ctx, txn, batch[i].Object) - } -} - -// Delete implements reconciler.Operations. -func (mt *mockOps) Delete(ctx context.Context, txn statedb.ReadTxn, obj *testObject) error { - if mt.faulty.Load() || obj.faulty { - mt.history.add(opFail(opDelete(obj.id))) - return errors.New("delete fail") - } - mt.history.add(opDelete(obj.id)) - - return nil -} - -// Prune implements reconciler.Operations. -func (mt *mockOps) Prune(ctx context.Context, txn statedb.ReadTxn, objects iter.Seq2[*testObject, statedb.Revision]) error { - if mt.faulty.Load() { - return errors.New("prune fail") - } - objs := statedb.Collect(objects) - mt.history.add(opPrune(len(objs))) - return nil -} - -// Update implements reconciler.Operations. -func (mt *mockOps) Update(ctx context.Context, txn statedb.ReadTxn, obj *testObject) error { - mt.updates.incr(obj.id) - - op := opUpdate(obj.id) - if obj.status.Kind == reconciler.StatusKindRefreshing { - op = opUpdateRefresh(obj.id) - } - if mt.faulty.Load() || obj.faulty { - mt.history.add(opFail(op)) - return errors.New("update fail") - } - mt.history.add(op) - obj.updates += 1 - - return nil -} - -var _ reconciler.Operations[*testObject] = &mockOps{} -var _ reconciler.BatchOperations[*testObject] = &mockOps{} - -// testHelper defines a sort of mini-language for writing the test steps. -type testHelper struct { - t testing.TB - db *statedb.DB - tbl statedb.RWTable[*testObject] - ops *mockOps - r reconciler.Reconciler[*testObject] - health *cell.SimpleHealth - m *reconciler.ExpVarMetrics - markInit func() -} - -const ( - Faulty = true - NonFaulty = false -) - -func (h testHelper) markInitialized() { - h.markInit() - h.markInit = nil -} - -func (h testHelper) insert(id uint64, faulty bool, status reconciler.Status) { - wtxn := h.db.WriteTxn(h.tbl) - _, _, err := h.tbl.Insert(wtxn, &testObject{ - id: id, - faulty: faulty, - status: status, - }) - require.NoError(h.t, err, "Insert failed") - wtxn.Commit() -} - -func (h testHelper) markForDelete(id uint64) { - wtxn := h.db.WriteTxn(h.tbl) - _, _, err := h.tbl.Delete(wtxn, &testObject{id: id}) - require.NoError(h.t, err, "Delete failed") - wtxn.Commit() -} - -func (h testHelper) expectStatus(id uint64, kind reconciler.StatusKind, err string) { - cond := func() bool { - obj, _, ok := h.tbl.Get(h.db.ReadTxn(), idIndex.Query(id)) - return ok && obj.status.Kind == kind && obj.status.Error == err - } - if !assert.Eventually(h.t, cond, time.Second, time.Millisecond) { - actual := "" - obj, _, ok := h.tbl.Get(h.db.ReadTxn(), idIndex.Query(id)) - if ok { - actual = string(obj.status.Kind) - } - require.Failf(h.t, "status mismatch", "expected object %d to be marked with status %q, but it was %q", - id, kind, actual) - } -} - -func (h testHelper) expectNumUpdates(id uint64, n int) { - cond := func() bool { - obj, _, ok := h.tbl.Get(h.db.ReadTxn(), idIndex.Query(id)) - return ok && obj.updates == n - } - if !assert.Eventually(h.t, cond, time.Second, time.Millisecond) { - actual := "" - obj, _, ok := h.tbl.Get(h.db.ReadTxn(), idIndex.Query(id)) - if ok { - actual = fmt.Sprintf("%d", obj.updates) - } - require.Failf(h.t, "updates mismatch", "expected object %d to be have %d updates but it had %q", - id, n, actual) - } -} - -func (h testHelper) expectNotFound(id uint64) { - h.t.Helper() - cond := func() bool { - _, _, ok := h.tbl.Get(h.db.ReadTxn(), idIndex.Query(id)) - return !ok - } - require.Eventually(h.t, cond, time.Second, time.Millisecond, "expected object %d to not be found", id) -} - -func (h testHelper) expectOp(op opHistoryItem) { - h.t.Helper() - cond := func() bool { - return h.ops.history.latest() == op - } - if !assert.Eventually(h.t, cond, time.Second, time.Millisecond) { - require.Failf(h.t, "operation mismatch", "expected last operation to be %q, it was %q", op, h.ops.history.latest()) - } -} - -func (h testHelper) expectOps(ops ...opHistoryItem) { - h.t.Helper() - sort.Strings(ops) - cond := func() bool { - actual := h.ops.history.take(len(ops)) - sort.Strings(actual) - return slices.Equal(ops, actual) - } - if !assert.Eventually(h.t, cond, time.Second, time.Millisecond) { - actual := h.ops.history.take(len(ops)) - sort.Strings(actual) - require.Failf(h.t, "operations mismatch", "expected operations to be %v, but they were %v", ops, actual) - } -} - -func (h testHelper) expectRetried(id uint64) { - h.t.Helper() - old := h.ops.updates.get(id) - cond := func() bool { - new := h.ops.updates.get(id) - return new > old - } - require.Eventually(h.t, cond, time.Second, time.Millisecond, "expected %d to be retried", id) -} - -func (h testHelper) expectHealth(level cell.Level, statusSubString string, errSubString string) { - h.t.Helper() - cond := func() bool { - health := h.health.GetChild("job-reconcile") - require.NotNil(h.t, health, "GetChild") - health.Lock() - defer health.Unlock() - errStr := "" - if health.Error != nil { - errStr = health.Error.Error() - } - return level == health.Level && strings.Contains(health.Status, statusSubString) && strings.Contains(errStr, errSubString) - } - if !assert.Eventually(h.t, cond, time.Second, time.Millisecond) { - hc := h.health.GetChild("job-reconcile") - require.NotNil(h.t, hc, "GetChild") - hc.Lock() - defer hc.Unlock() - require.Failf(h.t, "health mismatch", "expected health level %q, status %q, error %q, got: %q, %q, %q", level, statusSubString, errSubString, hc.Level, hc.Status, hc.Error) - } -} - -func (h testHelper) setTargetFaulty(faulty bool) { - h.ops.faulty.Store(faulty) -} - -func (h testHelper) triggerPrune() { - h.r.Prune() -} - -func (h testHelper) waitForReconciliation() { - err := reconciler.WaitForReconciliation(context.TODO(), h.db, h.tbl, statusIndex) - require.NoError(h.t, err, "expected WaitForReconciliation to succeed") -} diff --git a/reconciler/script_test.go b/reconciler/script_test.go new file mode 100644 index 0000000..bf9029b --- /dev/null +++ b/reconciler/script_test.go @@ -0,0 +1,398 @@ +package reconciler_test + +import ( + "context" + "errors" + "expvar" + "fmt" + "iter" + "maps" + "slices" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/cilium/hive" + "github.com/cilium/hive/cell" + "github.com/cilium/hive/hivetest" + "github.com/cilium/hive/job" + "github.com/cilium/hive/script" + "github.com/cilium/hive/script/scripttest" + "github.com/cilium/statedb" + "github.com/cilium/statedb/index" + "github.com/cilium/statedb/reconciler" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" +) + +func newScriptTest(t *testing.T) *script.Engine { + log := hivetest.Logger(t) + + var ( + ops = &mockOps{} + db *statedb.DB + r reconciler.Reconciler[*testObject] + reconcilerParams reconciler.Params + reconcilerLifecycle = &cell.DefaultLifecycle{} + markInit func() + ) + + expVarMetrics := reconciler.NewUnpublishedExpVarMetrics() + + testObjects, err := statedb.NewTable("test-objects", idIndex) + require.NoError(t, err, "NewTable") + + hive := hive.New( + statedb.Cell, + job.Cell, + + cell.Provide( + cell.NewSimpleHealth, + func(h *cell.SimpleHealth) hive.ScriptCmdOut { + return hive.NewScriptCmd( + "health", + cell.SimpleHealthCmd(h)) + }, + ), + + cell.Module( + "test", + "Test", + + cell.Provide( + func() reconciler.Metrics { + return expVarMetrics + }), + + cell.Invoke( + func(db_ *statedb.DB, p_ reconciler.Params) error { + db = db_ + reconcilerParams = p_ + return db.RegisterTable(testObjects) + }, + + func(lc cell.Lifecycle) { + lc.Append(cell.Hook{ + OnStop: func(ctx cell.HookContext) error { return reconcilerLifecycle.Stop(log, ctx) }, + }) + }, + + func(h *cell.SimpleHealth) { + wtxn := db.WriteTxn(testObjects) + done := testObjects.RegisterInitializer(wtxn, "test") + wtxn.Commit() + markInit = func() { + wtxn := db.WriteTxn(testObjects) + done(wtxn) + wtxn.Commit() + } + }), + ), + ) + + cmds, err := hive.ScriptCommands(log) + require.NoError(t, err) + + cmds["mark-init"] = script.Command( + script.CmdUsage{Summary: "Mark table as initialized"}, + func(s *script.State, args ...string) (script.WaitFunc, error) { + markInit() + return nil, nil + }, + ) + + cmds["start-reconciler"] = script.Command( + script.CmdUsage{Summary: "Mark table as initialized"}, + func(s *script.State, args ...string) (script.WaitFunc, error) { + opts := []reconciler.Option{ + // Speed things up a bit. Quick retry interval does mean we can't + // assert the metrics exactly (e.g. error count depends on how + // many retries happened). + reconciler.WithRetry(50*time.Millisecond, 50*time.Millisecond), + reconciler.WithRoundLimits(1000, rate.NewLimiter(1000.0, 10)), + } + var bops reconciler.BatchOperations[*testObject] + for _, arg := range args { + switch arg { + case "with-prune": + opts = append(opts, reconciler.WithPruning(time.Hour)) + case "with-refresh": + opts = append(opts, reconciler.WithRefreshing(50*time.Millisecond, rate.NewLimiter(100.0, 1))) + case "with-batchops": + bops = ops + default: + return nil, fmt.Errorf("unexpected arg, expected 'with-prune', 'with-batchops' or 'with-refresh'") + } + } + reconcilerParams.Lifecycle = reconcilerLifecycle + r, err = reconciler.Register( + reconcilerParams, + testObjects, + (*testObject).Clone, + (*testObject).SetStatus, + (*testObject).GetStatus, + ops, + bops, + opts...) + if err != nil { + return nil, err + } + return nil, reconcilerLifecycle.Start(log, context.TODO()) + }, + ) + + cmds["prune"] = script.Command( + script.CmdUsage{Summary: "Trigger pruning"}, + func(s *script.State, args ...string) (script.WaitFunc, error) { + r.Prune() + return nil, nil + }, + ) + + cmds["set-faulty"] = script.Command( + script.CmdUsage{Summary: "Mark target faulty or not"}, + func(s *script.State, args ...string) (script.WaitFunc, error) { + if args[0] == "true" { + t.Logf("Marked target faulty") + ops.faulty.Store(true) + } else { + t.Logf("Marked target healthy") + ops.faulty.Store(false) + } + return nil, nil + }, + ) + + cmds["expect-ops"] = script.Command( + script.CmdUsage{Summary: "Assert ops"}, + func(s *script.State, args ...string) (script.WaitFunc, error) { + sort.Strings(args) + var actual []string + cond := func() bool { + actual = ops.history.take(len(args)) + sort.Strings(actual) + return slices.Equal(args, actual) + } + for s.Context().Err() == nil { + if cond() { + return nil, nil + } + } + return nil, fmt.Errorf("operations mismatch, expected %v, got %v", args, actual) + }, + ) + + cmds["expvar"] = script.Command( + script.CmdUsage{Summary: "Print expvars to stdout"}, + func(s *script.State, args ...string) (script.WaitFunc, error) { + return func(*script.State) (stdout, stderr string, err error) { + var buf strings.Builder + expVarMetrics.Map().Do(func(kv expvar.KeyValue) { + switch v := kv.Value.(type) { + case expvar.Func: + // skip + case *expvar.Map: + v.Do(func(kv2 expvar.KeyValue) { + fmt.Fprintf(&buf, "%s.%s: %s\n", kv.Key, kv2.Key, kv2.Value) + }) + default: + fmt.Fprintf(&buf, "%s: %s\n", kv.Key, kv.Value) + } + }) + return buf.String(), "", nil + }, nil + }, + ) + + require.NoError(t, err, "ScriptCommands") + maps.Insert(cmds, maps.All(script.DefaultCmds())) + + t.Cleanup(func() { + assert.NoError(t, hive.Stop(log, context.TODO())) + }) + + return &script.Engine{ + Cmds: cmds, + } +} + +func TestScript(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + scripttest.Test(t, + ctx, func() *script.Engine { + return newScriptTest(t) + }, []string{}, "testdata/*.txtar") +} + +type testObject struct { + ID uint64 + Faulty bool + Updates int + Status reconciler.Status +} + +var idIndex = statedb.Index[*testObject, uint64]{ + Name: "id", + FromObject: func(t *testObject) index.KeySet { + return index.NewKeySet(index.Uint64(t.ID)) + }, + FromKey: index.Uint64, + Unique: true, +} + +func (t *testObject) GetStatus() reconciler.Status { + return t.Status +} + +func (t *testObject) SetStatus(status reconciler.Status) *testObject { + t.Status = status + return t +} + +func (t *testObject) Clone() *testObject { + t2 := *t + return &t2 +} + +func (t *testObject) TableHeader() []string { + return []string{ + "ID", + "Faulty", + "StatusKind", + "StatusError", + } +} + +func (t *testObject) TableRow() []string { + return []string{ + strconv.FormatUint(t.ID, 10), + strconv.FormatBool(t.Faulty), + string(t.Status.Kind), + t.Status.Error, + } +} + +type opHistory struct { + mu sync.Mutex + history []opHistoryItem +} + +type opHistoryItem = string + +func opUpdate(id uint64) opHistoryItem { + return opHistoryItem(fmt.Sprintf("update(%d)", id)) +} +func opUpdateRefresh(id uint64) opHistoryItem { + return opHistoryItem(fmt.Sprintf("update-refresh(%d)", id)) +} +func opDelete(id uint64) opHistoryItem { + return opHistoryItem(fmt.Sprintf("delete(%d)", id)) +} +func opPrune(numDesiredObjects int) opHistoryItem { + return opHistoryItem(fmt.Sprintf("prune(n=%d)", numDesiredObjects)) +} +func opFail(item opHistoryItem) opHistoryItem { + return item + " fail" +} + +func (o *opHistory) add(item opHistoryItem) { + o.mu.Lock() + o.history = append(o.history, item) + o.mu.Unlock() +} + +func (o *opHistory) take(n int) []opHistoryItem { + o.mu.Lock() + defer o.mu.Unlock() + + out := []opHistoryItem{} + for n > 0 { + idx := len(o.history) - n + if idx >= 0 { + out = append(out, o.history[idx]) + } + n-- + } + return out +} + +type intMap struct { + sync.Map +} + +func (m *intMap) incr(key uint64) { + if n, ok := m.Load(key); ok { + m.Store(key, n.(int)+1) + } else { + m.Store(key, 1) + } +} + +type mockOps struct { + history opHistory + faulty atomic.Bool + updates intMap +} + +// DeleteBatch implements recogciler.BatchOperations. +func (mt *mockOps) DeleteBatch(ctx context.Context, txn statedb.ReadTxn, batch []reconciler.BatchEntry[*testObject]) { + for i := range batch { + batch[i].Result = mt.Delete(ctx, txn, batch[i].Object) + } +} + +// UpdateBatch implements reconciler.BatchOperations. +func (mt *mockOps) UpdateBatch(ctx context.Context, txn statedb.ReadTxn, batch []reconciler.BatchEntry[*testObject]) { + for i := range batch { + batch[i].Result = mt.Update(ctx, txn, batch[i].Object) + } +} + +// Delete implements reconciler.Operations. +func (mt *mockOps) Delete(ctx context.Context, txn statedb.ReadTxn, obj *testObject) error { + if mt.faulty.Load() || obj.Faulty { + mt.history.add(opFail(opDelete(obj.ID))) + return errors.New("delete fail") + } + mt.history.add(opDelete(obj.ID)) + + return nil +} + +// Prune implements reconciler.Operations. +func (mt *mockOps) Prune(ctx context.Context, txn statedb.ReadTxn, objects iter.Seq2[*testObject, statedb.Revision]) error { + objs := statedb.Collect(objects) + if mt.faulty.Load() { + mt.history.add(opFail(opPrune(len(objs)))) + return errors.New("prune fail") + } + mt.history.add(opPrune(len(objs))) + return nil +} + +// Update implements reconciler.Operations. +func (mt *mockOps) Update(ctx context.Context, txn statedb.ReadTxn, obj *testObject) error { + mt.updates.incr(obj.ID) + + op := opUpdate(obj.ID) + if obj.Status.Kind == reconciler.StatusKindRefreshing { + op = opUpdateRefresh(obj.ID) + } + if mt.faulty.Load() || obj.Faulty { + mt.history.add(opFail(op)) + return errors.New("update fail") + } + mt.history.add(op) + obj.Updates += 1 + + return nil +} + +var _ reconciler.Operations[*testObject] = &mockOps{} +var _ reconciler.BatchOperations[*testObject] = &mockOps{} diff --git a/reconciler/testdata/batching.txtar b/reconciler/testdata/batching.txtar new file mode 100644 index 0000000..b1920fc --- /dev/null +++ b/reconciler/testdata/batching.txtar @@ -0,0 +1,126 @@ +# Test the incremental reconciliation with +# batching. + +hive start +start-reconciler with-batchops + +# From here this is the same as incremental.txtar. + +# Step 1: Insert non-faulty objects +db insert test-objects obj1.yaml +db insert test-objects obj2.yaml +db insert test-objects obj3.yaml +db cmp test-objects step1+3.table +expect-ops update(1) update(2) update(3) + +# Reconciler should be running and reporting health +health 'job-reconcile.*level=OK.*message=OK, 3 object' + +# Step 2: Update object '1' to be faulty and check that it fails and is being +# retried. +db insert test-objects obj1_faulty.yaml +expect-ops 'update(1) fail' 'update(1) fail' +db cmp test-objects step2.table +health 'job-reconcile.*level=Degraded.*1 error' + +# Step 3: Set object '1' back to healthy state +db insert test-objects obj1.yaml +expect-ops 'update(1)' +db cmp test-objects step1+3.table +health 'job-reconcile.*level=OK' + +# Step 4: Delete '1' and '2' +db delete test-objects obj1.yaml +db delete test-objects obj2.yaml +db cmp test-objects step4.table +expect-ops 'delete(1)' 'delete(2)' + +# Step 5: Try to delete '3' with faulty target +set-faulty true +db delete test-objects obj3.yaml +db cmp test-objects empty.table +expect-ops 'delete(3) fail' +health 'job-reconcile.*level=Degraded.*1 error' + +# Step 6: Set the target back to healthy +set-faulty false +expect-ops 'delete(3)' +health 'job-reconcile.*level=OK.*message=OK, 0 object' + +# Check metrics +expvar +! grep 'reconciliation_count.test: 0$' +grep 'reconciliation_current_errors.test: 0$' +! grep 'reconciliation_total_errors.test: 0$' +! grep 'reconciliation_duration.test/update: 0$' +! grep 'reconciliation_duration.test/delete: 0$' + +# ------------ + +-- empty.table -- +ID StatusKind + +-- step1+3.table -- +ID StatusKind StatusError +1 Done +2 Done +3 Done + +-- step2.table -- +ID StatusKind StatusError +1 Error update fail +2 Done +3 Done + +-- step4.table -- +ID StatusKind +3 Done + +-- step7.table -- +ID Faulty StatusKind StatusError +4 true Error update fail +5 false Done + + +-- step8.table -- +ID Faulty StatusKind +4 false Done +5 false Done + + +-- obj1.yaml -- +id: 1 +faulty: false +status: + kind: Pending + +-- obj1_faulty.yaml -- +id: 1 +faulty: true +status: + kind: Pending + +-- obj2.yaml -- +id: 2 +faulty: false +status: + kind: Pending + +-- obj2_faulty.yaml -- +id: 2 +faulty: true +status: + kind: Pending + +-- obj3.yaml -- +id: 3 +faulty: false +status: + kind: Pending + +-- obj3_faulty.yaml -- +id: 3 +faulty: true +status: + kind: Pending + diff --git a/reconciler/testdata/incremental.txtar b/reconciler/testdata/incremental.txtar new file mode 100644 index 0000000..32ce940 --- /dev/null +++ b/reconciler/testdata/incremental.txtar @@ -0,0 +1,123 @@ +# Test the incremental reconciliation with non-batched operations +# and without pruning. + +hive start +start-reconciler + +# Step 1: Insert non-faulty objects +db insert test-objects obj1.yaml +db insert test-objects obj2.yaml +db insert test-objects obj3.yaml +db cmp test-objects step1+3.table +expect-ops update(1) update(2) update(3) + +# Reconciler should be running and reporting health +health 'job-reconcile.*level=OK.*message=OK, 3 object' + +# Step 2: Update object '1' to be faulty and check that it fails and is being +# retried. +db insert test-objects obj1_faulty.yaml +db cmp test-objects step2.table +expect-ops 'update(1) fail' 'update(1) fail' +health 'job-reconcile.*level=Degraded.*1 error' + +# Step 3: Set object '1' back to healthy state +db insert test-objects obj1.yaml +db show test-objects +db cmp test-objects step1+3.table +expect-ops 'update(1)' +health 'job-reconcile.*level=OK' + +# Step 4: Delete '1' and '2' +db delete test-objects obj1.yaml +db delete test-objects obj2.yaml +db cmp test-objects step4.table +expect-ops 'delete(1)' 'delete(2)' + +# Step 5: Try to delete '3' with faulty target +set-faulty true +db delete test-objects obj3.yaml +db cmp test-objects empty.table +expect-ops 'delete(3) fail' +health 'job-reconcile.*level=Degraded.*1 error' + +# Step 6: Set the target back to healthy +set-faulty false +expect-ops 'delete(3)' +health 'job-reconcile.*level=OK.*message=OK, 0 object' + +# Check metrics +expvar +! grep 'reconciliation_count.test: 0$' +grep 'reconciliation_current_errors.test: 0$' +! grep 'reconciliation_total_errors.test: 0$' +! grep 'reconciliation_duration.test/update: 0$' +! grep 'reconciliation_duration.test/delete: 0$' + +# ------------ + +-- empty.table -- +ID StatusKind + +-- step1+3.table -- +ID StatusKind StatusError +1 Done +2 Done +3 Done + +-- step2.table -- +ID StatusKind StatusError +1 Error update fail +2 Done +3 Done + +-- step4.table -- +ID StatusKind +3 Done + +-- step7.table -- +ID Faulty StatusKind StatusError +4 true Error update fail +5 false Done + +-- step8.table -- +ID Faulty StatusKind +4 false Done +5 false Done + +-- obj1.yaml -- +id: 1 +faulty: false +status: + kind: Pending + +-- obj1_faulty.yaml -- +id: 1 +faulty: true +status: + kind: Pending + +-- obj2.yaml -- +id: 2 +faulty: false +status: + kind: Pending + +-- obj2_faulty.yaml -- +id: 2 +faulty: true +status: + kind: Pending + +-- obj3.yaml -- +id: 3 +faulty: false +status: + kind: Pending + +-- obj3_faulty.yaml -- +id: 3 +faulty: true +status: + kind: Pending + diff --git a/reconciler/testdata/prune_empty.txtar b/reconciler/testdata/prune_empty.txtar new file mode 100644 index 0000000..64bd299 --- /dev/null +++ b/reconciler/testdata/prune_empty.txtar @@ -0,0 +1,7 @@ +hive start +start-reconciler with-prune + +# Pruning happens when table is initialized even without any objects. +mark-init +expect-ops prune(n=0) +health 'job-reconcile.*level=OK' diff --git a/reconciler/testdata/pruning.txtar b/reconciler/testdata/pruning.txtar new file mode 100644 index 0000000..81c7b40 --- /dev/null +++ b/reconciler/testdata/pruning.txtar @@ -0,0 +1,68 @@ +hive start +start-reconciler with-prune + +# Pruning without table being initialized does nothing. +db insert test-objects obj1.yaml +expect-ops update(1) +prune +db insert test-objects obj2.yaml +expect-ops update(2) update(1) +health 'job-reconcile.*level=OK' + +# After init pruning happens immediately +mark-init +expect-ops prune(n=2) +health 'job-reconcile.*level=OK' +expvar +! grep 'prune_count.test: 0' + +# Pruning with faulty ops will mark status as degraded +set-faulty true +prune +expect-ops 'prune(n=2) fail' +health 'job-reconcile.*level=Degraded.*message=.*prune fail' +expvar +grep 'prune_current_errors.test: 1' + +# Pruning again with healthy ops fixes the status. +set-faulty false +prune +expect-ops 'prune(n=2)' +health 'job-reconcile.*level=OK' +expvar +grep 'prune_current_errors.test: 0' + +# Delete an object and check pruning happens without it +db delete test-objects obj1.yaml +prune +expect-ops 'prune(n=1)' delete(1) + +# Prune without objects +db delete test-objects obj2.yaml +prune +expect-ops prune(n=0) delete(2) prune(n=1) + +# Check metrics +expvar +! grep 'prune_count.test: 0' +grep 'prune_current_errors.test: 0' +grep 'prune_total_errors.test: 1' +! grep 'prune_duration.test: 0$' +! grep 'reconciliation_count.test: 0$' +grep 'reconciliation_current_errors.test: 0$' +grep 'reconciliation_total_errors.test: 0$' +! grep 'reconciliation_duration.test/update: 0$' +! grep 'reconciliation_duration.test/delete: 0$' + +-- obj1.yaml -- +id: 1 +faulty: false +status: + kind: Pending + +-- obj2.yaml -- +id: 2 +faulty: false +status: + kind: Pending + diff --git a/reconciler/testdata/refresh.txtar b/reconciler/testdata/refresh.txtar new file mode 100644 index 0000000..6076543 --- /dev/null +++ b/reconciler/testdata/refresh.txtar @@ -0,0 +1,58 @@ +hive start +start-reconciler with-refresh + +# Step 1: Add a test object. +db insert test-objects obj1.yaml +expect-ops 'update(1)' +db cmp test-objects step1.table + +# Step 2: Set the object as updated in the past to force refresh +db insert test-objects obj1_old.yaml +expect-ops 'update-refresh(1)' + +# Step 3: Refresh with faulty target, should see fail & retries +set-faulty true +db insert test-objects obj1_old.yaml +expect-ops 'update-refresh(1) fail' 'update-refresh(1) fail' +db cmp test-objects step3.table +health +health 'job-reconcile.*Degraded' + +# Step 4: Back to health +set-faulty false +db insert test-objects obj1_old.yaml +expect-ops 'update-refresh(1)' +db cmp test-objects step4.table +health 'job-reconcile.*OK, 1 object' + +# ----- +-- step1.table -- +ID StatusKind +1 Done + +-- step3.table -- +ID StatusKind +1 Error + +-- step4.table -- +ID StatusKind +1 Done + +-- obj1.yaml -- +id: 1 +faulty: false +updates: 1 +status: + kind: Pending + updatedat: 2024-01-01T10:10:10.0+02:00 + error: "" + +-- obj1_old.yaml -- +id: 1 +faulty: false +updates: 1 +status: + kind: Done + updatedat: 2000-01-01T10:10:10.0+02:00 + error: "" + diff --git a/reconciler/types.go b/reconciler/types.go index 6d6342d..01b9fa8 100644 --- a/reconciler/types.go +++ b/reconciler/types.go @@ -20,6 +20,7 @@ import ( "github.com/cilium/statedb" "github.com/cilium/statedb/index" "github.com/cilium/statedb/internal" + "gopkg.in/yaml.v3" ) type Reconciler[Obj any] interface { @@ -143,6 +144,39 @@ type Status struct { id uint64 } +// statusJSON defines the JSON/YAML format for [Status]. Separate to +// [Status] to allow custom unmarshalling that fills in [id]. +type statusJSON struct { + Kind string `json:"kind" yaml:"kind"` + UpdatedAt time.Time `json:"updated-at" yaml:"updated-at"` + Error string `json:"error,omitempty" yaml:"error,omitempty"` +} + +func (sj *statusJSON) fill(s *Status) { + s.Kind = StatusKind(sj.Kind) + s.UpdatedAt = sj.UpdatedAt + s.Error = sj.Error + s.id = nextID() +} + +func (s *Status) UnmarshalYAML(value *yaml.Node) error { + var sj statusJSON + if err := value.Decode(&sj); err != nil { + return err + } + sj.fill(s) + return nil +} + +func (s *Status) UnmarshalJSON(data []byte) error { + var sj statusJSON + if err := json.Unmarshal(data, &sj); err != nil { + return err + } + sj.fill(s) + return nil +} + func (s Status) IsPendingOrRefreshing() bool { return s.Kind == StatusKindPending || s.Kind == StatusKindRefreshing } @@ -186,6 +220,7 @@ func StatusRefreshing() Status { Kind: StatusKindRefreshing, UpdatedAt: time.Now(), Error: "", + id: nextID(), } } @@ -196,6 +231,7 @@ func StatusDone() Status { Kind: StatusKindDone, UpdatedAt: time.Now(), Error: "", + id: nextID(), } } @@ -206,6 +242,7 @@ func StatusError(err error) Status { Kind: StatusKindError, UpdatedAt: time.Now(), Error: err.Error(), + id: nextID(), } }