Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reconciler: Drop out-of-sync metric and changed parameter from Update #28

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions reconciler/benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,8 @@ func (mt *mockOps) Prune(ctx context.Context, txn statedb.ReadTxn, iter statedb.
}

// Update implements reconciler.Operations.
func (mt *mockOps) Update(ctx context.Context, txn statedb.ReadTxn, obj *testObject, changed *bool) error {
func (mt *mockOps) Update(ctx context.Context, txn statedb.ReadTxn, obj *testObject) error {
mt.numUpdates.Add(1)
if changed != nil {
*changed = true
}
return nil
}

Expand Down
18 changes: 2 additions & 16 deletions reconciler/example/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package main

import (
"bytes"
"context"
"errors"
"log/slog"
Expand Down Expand Up @@ -75,22 +74,9 @@ func (ops *MemoOps) Prune(ctx context.Context, txn statedb.ReadTxn, iter statedb
}

// Update a memo.
func (ops *MemoOps) Update(ctx context.Context, txn statedb.ReadTxn, memo *Memo, changed *bool) error {
func (ops *MemoOps) Update(ctx context.Context, txn statedb.ReadTxn, memo *Memo) error {
filename := path.Join(ops.directory, memo.Name)

// Read the old file to figure out if it had changed.
// The 'changed' boolean is used by full reconciliation to keep track of when the target
// has gone out-of-sync (e.g. there has been some outside influence to it).
old, err := os.ReadFile(filename)
if err == nil && bytes.Equal(old, []byte(memo.Content)) {

// Nothing to do.
return nil
}
if changed != nil {
*changed = true
}
err = os.WriteFile(filename, []byte(memo.Content), 0644)
err := os.WriteFile(filename, []byte(memo.Content), 0644)
ops.log.Info("Update", "filename", filename, "error", err)
return err
}
Expand Down
12 changes: 1 addition & 11 deletions reconciler/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ import (
// Update() is called for each object. Full reconciliation is used to recover from unexpected outside modifications.
func (r *reconciler[Obj]) full(ctx context.Context, txn statedb.ReadTxn) []error {
var errs []error
outOfSync := false
ops := r.Config.Operations

// First perform pruning to make room in the target.
iter, _ := r.Config.Table.All(txn)
start := time.Now()
if err := ops.Prune(ctx, txn, iter); err != nil {
outOfSync = true
errs = append(errs, fmt.Errorf("pruning failed: %w", err))
}
r.metrics.FullReconciliationDuration(r.ModuleID, OpPrune, time.Since(start))
Expand All @@ -32,12 +30,10 @@ func (r *reconciler[Obj]) full(ctx context.Context, txn statedb.ReadTxn) []error
iter, _ = r.Config.Table.All(txn) // Grab a new iterator as Prune() may have consumed it.
for obj, rev, ok := iter.Next(); ok; obj, rev, ok = iter.Next() {
start := time.Now()
var changed bool
obj = r.Config.CloneObject(obj)
err := ops.Update(ctx, txn, obj, &changed)
err := ops.Update(ctx, txn, obj)
r.metrics.FullReconciliationDuration(r.ModuleID, OpUpdate, time.Since(start))

outOfSync = outOfSync || changed
if err == nil {
updateResults[obj] = opResult{rev: rev, status: StatusDone()}
r.retries.Clear(obj)
Expand All @@ -47,12 +43,6 @@ func (r *reconciler[Obj]) full(ctx context.Context, txn statedb.ReadTxn) []error
}
}

// Increment the out-of-sync counter if full reconciliation catched any out-of-sync
// objects.
if outOfSync {
r.metrics.FullReconciliationOutOfSync(r.ModuleID)
}

// Commit the new desired object status. This is performed separately in order
// to not lock the table when performing long-running target operations.
// If the desired object has been updated in the meanwhile the status update is dropped.
Expand Down
2 changes: 1 addition & 1 deletion reconciler/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (round *incrementalRound[Obj]) processSingle(obj Obj, rev statedb.Revision,
// Clone the object so it can be mutated by Update()
obj = round.config.CloneObject(obj)
op = OpUpdate
err = round.config.Operations.Update(round.ctx, round.txn, obj, nil /* changed */)
err = round.config.Operations.Update(round.ctx, round.txn, obj)
if err == nil {
round.results[obj] = opResult{rev: rev, status: StatusDone()}
} else {
Expand Down
15 changes: 4 additions & 11 deletions reconciler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ type Metrics interface {
IncrementalReconciliationDuration(moduleID cell.FullModuleID, operation string, duration time.Duration)
IncrementalReconciliationErrors(moduleID cell.FullModuleID, errs []error)

FullReconciliationOutOfSync(moduleID cell.FullModuleID)
FullReconciliationErrors(moduleID cell.FullModuleID, errs []error)
FullReconciliationDuration(moduleID cell.FullModuleID, operation string, duration time.Duration)
}
Expand All @@ -31,21 +30,16 @@ type ExpVarMetrics struct {
IncrementalReconciliationTotalErrorsVar *expvar.Map
IncrementalReconciliationCurrentErrorsVar *expvar.Map

FullReconciliationCountVar *expvar.Map
FullReconciliationDurationVar *expvar.Map
FullReconciliationTotalErrorsVar *expvar.Map
FullReconciliationCurrentErrorsVar *expvar.Map
FullReconciliationOutOfSyncCountVar *expvar.Map
FullReconciliationCountVar *expvar.Map
FullReconciliationDurationVar *expvar.Map
FullReconciliationTotalErrorsVar *expvar.Map
FullReconciliationCurrentErrorsVar *expvar.Map
}

func (m *ExpVarMetrics) FullReconciliationDuration(moduleID cell.FullModuleID, operation string, duration time.Duration) {
m.FullReconciliationDurationVar.AddFloat(moduleID.String()+"/"+operation, duration.Seconds())
}

func (m *ExpVarMetrics) FullReconciliationOutOfSync(moduleID cell.FullModuleID) {
m.FullReconciliationOutOfSyncCountVar.Add(moduleID.String(), 1)
}

func (m *ExpVarMetrics) FullReconciliationErrors(moduleID cell.FullModuleID, errs []error) {
m.FullReconciliationCountVar.Add(moduleID.String(), 1)
m.FullReconciliationTotalErrorsVar.Add(moduleID.String(), int64(len(errs)))
Expand Down Expand Up @@ -94,6 +88,5 @@ func newExpVarMetrics(publish bool) *ExpVarMetrics {
FullReconciliationDurationVar: newMap("full_reconciliation_duration"),
FullReconciliationTotalErrorsVar: newMap("full_reconciliation_total_errors"),
FullReconciliationCurrentErrorsVar: newMap("full_reconciliation_current_errors"),
FullReconciliationOutOfSyncCountVar: newMap("full_reconciliation_out_of_sync_count"),
}
}
8 changes: 2 additions & 6 deletions reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ func testReconciler(t *testing.T, batchOps bool) {
}

assert.Greater(t, getInt(expVarMetrics.FullReconciliationCountVar.Get("test")), int64(0), "FullReconciliationCount")
assert.Greater(t, getInt(expVarMetrics.FullReconciliationOutOfSyncCountVar.Get("test")), int64(0), "FullReconciliationOutOfSyncCount")
assert.Greater(t, getFloat(expVarMetrics.FullReconciliationDurationVar.Get("test/prune")), float64(0), "FullReconciliationDuration/prune")
assert.Greater(t, getFloat(expVarMetrics.FullReconciliationDurationVar.Get("test/update")), float64(0), "FullReconciliationDuration/update")
assert.Equal(t, getInt(expVarMetrics.FullReconciliationCurrentErrorsVar.Get("test")), int64(0), "FullReconciliationCurrentErrors")
Expand Down Expand Up @@ -421,7 +420,7 @@ func (mt *mockOps) DeleteBatch(ctx context.Context, txn statedb.ReadTxn, batch [
// UpdateBatch implements reconciler.BatchOperations.
func (mt *mockOps) UpdateBatch(ctx context.Context, txn statedb.ReadTxn, batch []reconciler.BatchEntry[*testObject]) {
for i := range batch {
batch[i].Result = mt.Update(ctx, txn, batch[i].Object, nil)
batch[i].Result = mt.Update(ctx, txn, batch[i].Object)
}
}

Expand All @@ -444,10 +443,7 @@ func (mt *mockOps) Prune(ctx context.Context, txn statedb.ReadTxn, iter statedb.
}

// Update implements reconciler.Operations.
func (mt *mockOps) Update(ctx context.Context, txn statedb.ReadTxn, obj *testObject, changed *bool) error {
if changed != nil {
*changed = true
}
func (mt *mockOps) Update(ctx context.Context, txn statedb.ReadTxn, obj *testObject) error {
mt.updates.incr(obj.id)
if mt.faulty.Load() || obj.faulty {
mt.history.add(opFail(opUpdate(obj.id)))
Expand Down
9 changes: 1 addition & 8 deletions reconciler/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,9 @@ type Operations[Obj any] interface {
// reconciliation is performed when the desired state is updated. A full
// reconciliation is done periodically by calling 'Update' on all objects.
//
// If 'changed' is non-nil then the Update must compare the realized state
// with the desired state and set it to true if they differ, e.g. whether
// the operation resulted in a change to the realized state. This is used
// during full reconciliation to catch cases where the realized state has
// gone out of sync due to outside influence. This is tracked in the
// "full_out_of_sync_total" metric.
//
// The object handed to Update is a clone produced by Config.CloneObject
// and thus Update can mutate the object.
Update(ctx context.Context, txn statedb.ReadTxn, obj Obj, changed *bool) error
Update(ctx context.Context, txn statedb.ReadTxn, obj Obj) error

// Delete the object in the target. Same semantics as with Update.
// Deleting a non-existing object is not an error and returns nil.
Expand Down
Loading