Skip to content

Commit

Permalink
Merge pull request #800 from Icinga/ha-desperate-timing-fixes
Browse files Browse the repository at this point in the history
Abort HA Realization Logic After Timeout
  • Loading branch information
lippserd authored Dec 16, 2024
2 parents f923d6f + 8b95d25 commit 63d51df
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 29 deletions.
7 changes: 4 additions & 3 deletions cmd/icingadb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,10 @@ func run() int {

cancelHactx()
case <-hactx.Done():
// Nothing to do here, surrounding loop will terminate now.
if ctx.Err() != nil {
logger.Fatalf("%+v", errors.New("main context closed unexpectedly"))
}
// Otherwise, there is nothing to do here, surrounding loop will terminate now.
case <-ha.Done():
if err := ha.Err(); err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "HA exited with an error"))
Expand All @@ -338,8 +341,6 @@ func run() int {
cancelHactx()

return ExitFailure
case <-ctx.Done():
logger.Fatalf("%+v", errors.New("main context closed unexpectedly"))
case s := <-sig:
logger.Infow("Exiting due to signal", zap.String("signal", s.String()))
cancelHactx()
Expand Down
76 changes: 50 additions & 26 deletions pkg/icingadb/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (h *HA) controller() {
}
tt := t.Time()
if tt.After(now.Add(1 * time.Second)) {
h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt))
h.logger.Warnw("Received heartbeat from the future", zap.Time("time", tt))
}
if tt.Before(now.Add(-1 * peerTimeout)) {
h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt))
Expand Down Expand Up @@ -221,7 +221,7 @@ func (h *HA) controller() {

// Ensure that updating/inserting the instance row is completed by the current heartbeat's expiry time.
realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime())
err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents)
err = h.realize(realizeCtx, s, envId, shouldLogRoutineEvents)
cancelRealizeCtx()
if errors.Is(err, context.DeadlineExceeded) {
h.signalHandover("instance update/insert deadline exceeded heartbeat expiry time")
Expand Down Expand Up @@ -267,11 +267,16 @@ func (h *HA) controller() {

// realize a HA cycle triggered by a heartbeat event.
//
// The context passed is expected to have a deadline, otherwise the method will panic. This deadline is strictly
// enforced to abort the realization logic the moment the context expires.
//
// shouldLogRoutineEvents indicates if recurrent events should be logged.
//
// The internal, retryable function always fetches the last received heartbeat's timestamp instead of reusing the one
// from the calling controller loop. Doing so results in inserting a more accurate timestamp if a retry happens.
func (h *HA) realize(
ctx context.Context,
s *icingaredisv1.IcingaStatus,
t *types.UnixMilli,
envId types.Binary,
shouldLogRoutineEvents bool,
) error {
Expand Down Expand Up @@ -303,6 +308,7 @@ func (h *HA) realize(
if errBegin != nil {
return errors.Wrap(errBegin, "can't start transaction")
}
defer func() { _ = tx.Rollback() }()

query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+
"WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock
Expand Down Expand Up @@ -353,7 +359,7 @@ func (h *HA) realize(
EnvironmentMeta: v1.EnvironmentMeta{
EnvironmentId: envId,
},
Heartbeat: *t,
Heartbeat: types.UnixMilli(time.UnixMilli(h.heartbeat.LastMessageTime())),
Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true},
EndpointId: s.EndpointId,
Icinga2Version: s.Version,
Expand All @@ -373,15 +379,51 @@ func (h *HA) realize(

if takeover != "" {
stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?")
_, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId)
if _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId); err != nil {
return database.CantPerformQuery(err, stmt)
}

if err != nil {
// Insert the environment after each heartbeat takeover if it does not already exist in the database
// as the environment may have changed, although this is likely to happen very rarely.
stmt, _ = h.db.BuildInsertIgnoreStmt(h.environment)
if _, err := h.db.NamedExecContext(ctx, stmt, h.environment); err != nil {
return database.CantPerformQuery(err, stmt)
}
}

if err := tx.Commit(); err != nil {
return errors.Wrap(err, "can't commit transaction")
// In general, cancellation does not work for COMMIT and ROLLBACK. Some database drivers may support a
// context-based abort, but only if the DBMS allows it. This was also discussed in the initial issue about
// context support to Go's sql package: https://github.com/golang/go/issues/15123#issuecomment-245882486
//
// This paragraph is implementation knowledge, not covered by the API specification. Go's sql.Tx.Commit() -
// which is not being overridden by sqlx.Tx - performs a preflight check on the context before handing over
// to the driver's Commit() method. Drivers may behave differently. For example, the used
// github.com/go-sql-driver/mysql package calls its internal exec() method with a COMMIT query, writing and
// reading packets without honoring the context.
//
// In a nutshell, one cannot expect a Tx.Commit() call to be covered by the transaction context. For this
// reason, the following Commit() call has been moved to its own goroutine, which communicates back via a
// channel selected along with the context. If the context ends before Commit(), this retryable function
// returns with a non-retryable error.
//
// However, while the COMMIT continues in the background, it may still succeed. In this case, the state of
// the database does not match the state of Icinga DB, specifically the database says that this instance is
// active while this instance thinks otherwise. Fortunately, this mismatch is not critical because when this
// function is re-entered, the initial SELECT query would be empty for this Icinga DB node and imply the
// presence of another active instance for the other node. Effectively, this could result in a single HA
// cycle with no active node. Afterwards, either this instance takes over due to the false impression that
// no other node is active, or the other instances does so as the inserted heartbeat has already expired.
// Not great, not terrible.
commitErrCh := make(chan error, 1)
go func() { commitErrCh <- tx.Commit() }()

select {
case err := <-commitErrCh:
if err != nil {
return errors.Wrap(err, "can't commit transaction")
}
case <-ctx.Done():
return ctx.Err()
}

return nil
Expand Down Expand Up @@ -423,12 +465,6 @@ func (h *HA) realize(
}

if takeover != "" {
// Insert the environment after each heartbeat takeover if it does not already exist in the database
// as the environment may have changed, although this is likely to happen very rarely.
if err := h.insertEnvironment(); err != nil {
return errors.Wrap(err, "can't insert environment")
}

h.signalTakeover(takeover)
} else if otherResponsible {
if state := h.state.Load(); !state.otherResponsible {
Expand All @@ -451,18 +487,6 @@ func (h *HA) realizeLostHeartbeat() {
}
}

// insertEnvironment inserts the environment from the specified state into the database if it does not already exist.
func (h *HA) insertEnvironment() error {
// Instead of checking whether the environment already exists, use an INSERT statement that does nothing if it does.
stmt, _ := h.db.BuildInsertIgnoreStmt(h.environment)

if _, err := h.db.NamedExecContext(h.ctx, stmt, h.environment); err != nil {
return database.CantPerformQuery(err, stmt)
}

return nil
}

func (h *HA) removeInstance(ctx context.Context) {
h.logger.Debugw("Removing our row from icingadb_instance", zap.String("instance_id", hex.EncodeToString(h.instanceId)))
// Intentionally not using h.ctx here as it's already cancelled.
Expand Down
16 changes: 16 additions & 0 deletions pkg/icingaredis/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Heartbeat struct {
active bool
events chan *HeartbeatMessage
lastReceivedMs atomic.Int64
lastMessageMs atomic.Int64
cancelCtx context.CancelFunc
client *redis.Client
done chan struct{}
Expand Down Expand Up @@ -62,6 +63,11 @@ func (h *Heartbeat) LastReceived() int64 {
return h.lastReceivedMs.Load()
}

// LastMessageTime returns the last message's time in ms.
func (h *Heartbeat) LastMessageTime() int64 {
return h.lastMessageMs.Load()
}

// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any.
// Implements the io.Closer interface.
func (h *Heartbeat) Close() error {
Expand Down Expand Up @@ -139,6 +145,15 @@ func (h *Heartbeat) controller(ctx context.Context) {
}

h.lastReceivedMs.Store(m.received.UnixMilli())

statsT, err := m.stats.Time()
if err != nil {
h.logger.Warnw("Received Icinga heartbeat with invalid stats time", zap.Error(err))
h.lastMessageMs.Store(0)
} else {
h.lastMessageMs.Store(statsT.Time().UnixMilli())
}

h.sendEvent(m)
case <-time.After(Timeout):
if h.active {
Expand All @@ -150,6 +165,7 @@ func (h *Heartbeat) controller(ctx context.Context) {
}

h.lastReceivedMs.Store(0)
h.lastMessageMs.Store(0)
case <-ctx.Done():
return ctx.Err()
}
Expand Down

0 comments on commit 63d51df

Please sign in to comment.