Skip to content

Commit

Permalink
Merge pull request #24 from lucab/ups/runtime-consistency
Browse files Browse the repository at this point in the history
cli/serve: check for runtime inconsistencies
  • Loading branch information
Luca Bruno authored Sep 18, 2019
2 parents 2f6e29a + f0b65fa commit 87737ca
Showing 1 changed file with 97 additions and 0 deletions.
97 changes: 97 additions & 0 deletions internal/cli/serve.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package cli

import (
"context"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/coreos/airlock/internal/lock"
"github.com/coreos/airlock/internal/server"
"github.com/coreos/airlock/internal/status"
)
Expand All @@ -20,6 +24,23 @@ var (
Use: "serve",
RunE: runServe,
}

configGroups = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "airlock_config_groups",
Help: "Total number of configured groups.",
})
configSlots = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "airlock_config_semaphore_slots",
Help: "Total number of configured slots per group.",
}, []string{"group"})
databaseSlots = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "airlock_database_semaphore_slots",
Help: "Total number of slots per group, in the database.",
}, []string{"group"})
databaseLocks = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "airlock_database_semaphore_lock_holders",
Help: "Total number of locked slots per group, in the database.",
}, []string{"group"})
)

// runServe runs the main HTTP service
Expand All @@ -36,6 +57,10 @@ func runServe(cmd *cobra.Command, cmdArgs []string) error {
stopCh := make(chan os.Signal)
signal.Notify(stopCh, os.Interrupt, syscall.SIGTERM)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go checkConsistency(ctx, airlock)

if runSettings.StatusEnabled {
statusMux := http.NewServeMux()
statusMux.Handle(status.MetricsEndpoint, status.Metrics())
Expand Down Expand Up @@ -81,3 +106,75 @@ func runService(stopCh chan os.Signal, service http.Server, airlock server.Airlo
}
stopCh <- os.Interrupt
}

// checkConsistency continuously checks for consistency between configuration and remote state.
//
// It takes care of polling etcd, exposing the shared state as metrics, and warning if
// it detects a mismatch with the service configuration.
func checkConsistency(ctx context.Context, service server.Airlock) {
prometheus.MustRegister(configGroups)
prometheus.MustRegister(configSlots)
prometheus.MustRegister(databaseLocks)
prometheus.MustRegister(databaseSlots)

configGroups.Set(float64(len(service.LockGroups)))
for group, maxSlots := range service.LockGroups {
configSlots.WithLabelValues(group).Set(float64(maxSlots))
}

// Consistency-checking logic, with its own scope for defers.
checkAndLog := func() {
for group, maxSlots := range service.LockGroups {
innerCtx, cancel := context.WithTimeout(ctx, service.EtcdTxnTimeout)
defer cancel()

// TODO(lucab): re-arrange so that the manager can be re-used.
manager, err := lock.NewManager(innerCtx, service.EtcdEndpoints, group, maxSlots)
if err != nil {
logrus.WithFields(logrus.Fields{
"reason": err.Error(),
}).Warn("consistency check, manager creation failed")
continue
}
semaphore, err := manager.FetchSemaphore(innerCtx)
if err != nil {
logrus.WithFields(logrus.Fields{
"reason": err.Error(),
}).Warn("consistency check, semaphore fetch failed")
continue
}

// Update metrics.
databaseSlots.WithLabelValues(group).Set(float64(semaphore.TotalSlots))
databaseLocks.WithLabelValues(group).Set(float64(len(semaphore.Holders)))

// Log any inconsistencies.
if semaphore.TotalSlots != maxSlots {
logrus.WithFields(logrus.Fields{
"config": maxSlots,
"database": semaphore.TotalSlots,
"group": group,
}).Warn("semaphore max slots consistency check failed")
}
if semaphore.TotalSlots < uint64(len(semaphore.Holders)) {
logrus.WithFields(logrus.Fields{
"group": group,
"holder": len(semaphore.Holders),
"slots": semaphore.TotalSlots,
}).Warn("semaphore locks consistency check failed")
}
}
}

for {
checkAndLog()

pause := time.NewTimer(time.Minute)
select {
case <-ctx.Done():
break
case <-pause.C:
continue
}
}
}

0 comments on commit 87737ca

Please sign in to comment.