From a5878706f731a56a99f97ce53d2421270bd226d5 Mon Sep 17 00:00:00 2001 From: Alvar Penning Date: Fri, 25 Oct 2024 10:24:19 +0200 Subject: [PATCH 1/6] icingadb: Unify select cases for derived contexts The main loop select cases for hactx.Done() and ctx.Done() were unified, as hactx is a derived ctx. A closed ctx case may be lost as the hactx case could have been chosen. --- cmd/icingadb/main.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index de2cbdcfc..4422ac29d 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -326,7 +326,10 @@ func run() int { cancelHactx() case <-hactx.Done(): - // Nothing to do here, surrounding loop will terminate now. + if ctx.Err() != nil { + logger.Fatalf("%+v", errors.New("main context closed unexpectedly")) + } + // Otherwise, there is nothing to do here, surrounding loop will terminate now. case <-ha.Done(): if err := ha.Err(); err != nil { logger.Fatalf("%+v", errors.Wrap(err, "HA exited with an error")) @@ -338,8 +341,6 @@ func run() int { cancelHactx() return ExitFailure - case <-ctx.Done(): - logger.Fatalf("%+v", errors.New("main context closed unexpectedly")) case s := <-sig: logger.Infow("Exiting due to signal", zap.String("signal", s.String())) cancelHactx() From 3f69f986bac1f646aa2a04be7b0c270845d4cf92 Mon Sep 17 00:00:00 2001 From: Alvar Penning Date: Fri, 25 Oct 2024 10:29:52 +0200 Subject: [PATCH 2/6] HA: Increase log level for heartbeats from the future Timing issues may be the root of future failures. Thus, it is important to be aware if the timing seems to be out of sync. --- pkg/icingadb/ha.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 1bb04b17c..8b95055fa 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -173,7 +173,7 @@ func (h *HA) controller() { } tt := t.Time() if tt.After(now.Add(1 * time.Second)) { - h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt)) + h.logger.Warnw("Received heartbeat from the future", zap.Time("time", tt)) } if tt.Before(now.Add(-1 * peerTimeout)) { h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) From f8819208ce4856a83d627d703f749e040c4d2888 Mon Sep 17 00:00:00 2001 From: Alvar Penning Date: Fri, 25 Oct 2024 10:32:38 +0200 Subject: [PATCH 3/6] HA: Deferred SQL Transaction Rollback Each transaction is created within the retryable function, but this function may be exited prematurely before committing. A deferred rollback ensures that the transaction will be rolled back and cleaned up in this case, or will be a noop when performed after the commit. --- pkg/icingadb/ha.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 8b95055fa..ad135296f 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -303,6 +303,7 @@ func (h *HA) realize( if errBegin != nil { return errors.Wrap(errBegin, "can't start transaction") } + defer func() { _ = tx.Rollback() }() query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+ "WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock From dd0ca8fb072ae8713b7e3f32df5b6417aa0fb7b6 Mon Sep 17 00:00:00 2001 From: Alvar Penning Date: Fri, 25 Oct 2024 10:39:05 +0200 Subject: [PATCH 4/6] HA: Insert environment within retryable function The HA.insertEnvironment() method was inlined into the retryable function to use the deadlined context. Otherwise, this might block afterwards, as it was used within HA.realize(), but without the passed context. --- pkg/icingadb/ha.go | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index ad135296f..764057183 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -374,9 +374,14 @@ func (h *HA) realize( if takeover != "" { stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?") - _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId) + if _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId); err != nil { + return database.CantPerformQuery(err, stmt) + } - if err != nil { + // Insert the environment after each heartbeat takeover if it does not already exist in the database + // as the environment may have changed, although this is likely to happen very rarely. + stmt, _ = h.db.BuildInsertIgnoreStmt(h.environment) + if _, err := h.db.NamedExecContext(ctx, stmt, h.environment); err != nil { return database.CantPerformQuery(err, stmt) } } @@ -424,12 +429,6 @@ func (h *HA) realize( } if takeover != "" { - // Insert the environment after each heartbeat takeover if it does not already exist in the database - // as the environment may have changed, although this is likely to happen very rarely. - if err := h.insertEnvironment(); err != nil { - return errors.Wrap(err, "can't insert environment") - } - h.signalTakeover(takeover) } else if otherResponsible { if state := h.state.Load(); !state.otherResponsible { @@ -452,18 +451,6 @@ func (h *HA) realizeLostHeartbeat() { } } -// insertEnvironment inserts the environment from the specified state into the database if it does not already exist. -func (h *HA) insertEnvironment() error { - // Instead of checking whether the environment already exists, use an INSERT statement that does nothing if it does. - stmt, _ := h.db.BuildInsertIgnoreStmt(h.environment) - - if _, err := h.db.NamedExecContext(h.ctx, stmt, h.environment); err != nil { - return database.CantPerformQuery(err, stmt) - } - - return nil -} - func (h *HA) removeInstance(ctx context.Context) { h.logger.Debugw("Removing our row from icingadb_instance", zap.String("instance_id", hex.EncodeToString(h.instanceId))) // Intentionally not using h.ctx here as it's already cancelled. From c2d8bd6f476a1aa38e24c2d1dee8270b9db05320 Mon Sep 17 00:00:00 2001 From: Alvar Penning Date: Fri, 25 Oct 2024 10:42:49 +0200 Subject: [PATCH 5/6] HA/Heartbeat: Use last message's timestamp Since the retryable HA function may be executed a few times before succeeding, the inserted heartbeat value will be directly outdated. The heartbeat logic was slightly altered to always use the latest heartbeat time value. --- pkg/icingadb/ha.go | 8 +++++--- pkg/icingaredis/heartbeat.go | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 764057183..92fd0bb76 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -221,7 +221,7 @@ func (h *HA) controller() { // Ensure that updating/inserting the instance row is completed by the current heartbeat's expiry time. realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime()) - err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents) + err = h.realize(realizeCtx, s, envId, shouldLogRoutineEvents) cancelRealizeCtx() if errors.Is(err, context.DeadlineExceeded) { h.signalHandover("instance update/insert deadline exceeded heartbeat expiry time") @@ -268,10 +268,12 @@ func (h *HA) controller() { // realize a HA cycle triggered by a heartbeat event. // // shouldLogRoutineEvents indicates if recurrent events should be logged. +// +// The internal, retryable function always fetches the last received heartbeat's timestamp instead of reusing the one +// from the calling controller loop. Doing so results in inserting a more accurate timestamp if a retry happens. func (h *HA) realize( ctx context.Context, s *icingaredisv1.IcingaStatus, - t *types.UnixMilli, envId types.Binary, shouldLogRoutineEvents bool, ) error { @@ -354,7 +356,7 @@ func (h *HA) realize( EnvironmentMeta: v1.EnvironmentMeta{ EnvironmentId: envId, }, - Heartbeat: *t, + Heartbeat: types.UnixMilli(time.UnixMilli(h.heartbeat.LastMessageTime())), Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true}, EndpointId: s.EndpointId, Icinga2Version: s.Version, diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index bb1e83796..7473659fb 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -25,6 +25,7 @@ type Heartbeat struct { active bool events chan *HeartbeatMessage lastReceivedMs atomic.Int64 + lastMessageMs atomic.Int64 cancelCtx context.CancelFunc client *redis.Client done chan struct{} @@ -62,6 +63,11 @@ func (h *Heartbeat) LastReceived() int64 { return h.lastReceivedMs.Load() } +// LastMessageTime returns the last message's time in ms. +func (h *Heartbeat) LastMessageTime() int64 { + return h.lastMessageMs.Load() +} + // Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. // Implements the io.Closer interface. func (h *Heartbeat) Close() error { @@ -139,6 +145,15 @@ func (h *Heartbeat) controller(ctx context.Context) { } h.lastReceivedMs.Store(m.received.UnixMilli()) + + statsT, err := m.stats.Time() + if err != nil { + h.logger.Warnw("Received Icinga heartbeat with invalid stats time", zap.Error(err)) + h.lastMessageMs.Store(0) + } else { + h.lastMessageMs.Store(statsT.Time().UnixMilli()) + } + h.sendEvent(m) case <-time.After(Timeout): if h.active { @@ -150,6 +165,7 @@ func (h *Heartbeat) controller(ctx context.Context) { } h.lastReceivedMs.Store(0) + h.lastMessageMs.Store(0) case <-ctx.Done(): return ctx.Err() } From 8b95d25ed928dd7de433ff6cb37c819b7c247a2e Mon Sep 17 00:00:00 2001 From: Alvar Penning Date: Fri, 25 Oct 2024 10:45:51 +0200 Subject: [PATCH 6/6] HA: Abort Transaction Commit after Timeout A strange HA behavior was reported in #787, resulting in both instances being active. The logs contained an entry of the previous active instance exiting the HA.realize() method successfully after 1m9s. This, however, should not be possible as the method's context is deadlined to a minute after the heartbeat was received. However, as it turns out, executing COMMIT on a database transaction is not bound to the transaction's context, allowing to survive longer. To mitigate this, another context watch was introduced. Doing so allows directly handing over, while the other instance can now take over due to the expired heartbeat in the database. --- pkg/icingadb/ha.go | 38 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 92fd0bb76..758df2d4d 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -267,6 +267,9 @@ func (h *HA) controller() { // realize a HA cycle triggered by a heartbeat event. // +// The context passed is expected to have a deadline, otherwise the method will panic. This deadline is strictly +// enforced to abort the realization logic the moment the context expires. +// // shouldLogRoutineEvents indicates if recurrent events should be logged. // // The internal, retryable function always fetches the last received heartbeat's timestamp instead of reusing the one @@ -388,8 +391,39 @@ func (h *HA) realize( } } - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "can't commit transaction") + // In general, cancellation does not work for COMMIT and ROLLBACK. Some database drivers may support a + // context-based abort, but only if the DBMS allows it. This was also discussed in the initial issue about + // context support to Go's sql package: https://github.com/golang/go/issues/15123#issuecomment-245882486 + // + // This paragraph is implementation knowledge, not covered by the API specification. Go's sql.Tx.Commit() - + // which is not being overridden by sqlx.Tx - performs a preflight check on the context before handing over + // to the driver's Commit() method. Drivers may behave differently. For example, the used + // github.com/go-sql-driver/mysql package calls its internal exec() method with a COMMIT query, writing and + // reading packets without honoring the context. + // + // In a nutshell, one cannot expect a Tx.Commit() call to be covered by the transaction context. For this + // reason, the following Commit() call has been moved to its own goroutine, which communicates back via a + // channel selected along with the context. If the context ends before Commit(), this retryable function + // returns with a non-retryable error. + // + // However, while the COMMIT continues in the background, it may still succeed. In this case, the state of + // the database does not match the state of Icinga DB, specifically the database says that this instance is + // active while this instance thinks otherwise. Fortunately, this mismatch is not critical because when this + // function is re-entered, the initial SELECT query would be empty for this Icinga DB node and imply the + // presence of another active instance for the other node. Effectively, this could result in a single HA + // cycle with no active node. Afterwards, either this instance takes over due to the false impression that + // no other node is active, or the other instances does so as the inserted heartbeat has already expired. + // Not great, not terrible. + commitErrCh := make(chan error, 1) + go func() { commitErrCh <- tx.Commit() }() + + select { + case err := <-commitErrCh: + if err != nil { + return errors.Wrap(err, "can't commit transaction") + } + case <-ctx.Done(): + return ctx.Err() } return nil