diff --git a/go.mod b/go.mod index 500bee1..ec8e573 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/r3labs/diff/v2 v2.15.1 github.com/satori/go.uuid v1.2.0 goji.io v2.0.2+incompatible + golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c google.golang.org/grpc v1.45.0 diff --git a/go.sum b/go.sum index c22ce51..a06b0c2 100644 --- a/go.sum +++ b/go.sum @@ -357,6 +357,7 @@ golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= +golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= diff --git a/internal/util/util.go b/internal/util/util.go new file mode 100644 index 0000000..838a9e4 --- /dev/null +++ b/internal/util/util.go @@ -0,0 +1,19 @@ +package util + +import ( + "time" + + "golang.org/x/exp/rand" +) + +// CalcRetryTime is caliculate retry time by exponential backoff and jitter +func CalcRetryTime(count int) time.Duration { + if count == 0 { + return 0 + } + + backoff := 1 << count + jitter := time.Duration(rand.Intn(1000)) * time.Millisecond + + return time.Duration(backoff)*time.Second + jitter +} diff --git a/pkg/metric/scrape_memory.go b/pkg/metric/scrape_memory.go index bd8ab56..8022d6b 100644 --- a/pkg/metric/scrape_memory.go +++ b/pkg/metric/scrape_memory.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + uuid "github.com/satori/go.uuid" + "github.com/prometheus/client_golang/prometheus" "github.com/whywaita/myshoes/pkg/config" "github.com/whywaita/myshoes/pkg/datastore" @@ -34,7 +36,7 @@ var ( memoryStarterRecoveredRuns = prometheus.NewDesc( prometheus.BuildFQName(namespace, memoryName, "starter_recovered_runs"), "recovered runs in starter", - []string{"starter"}, nil, + []string{"starter", "target"}, nil, ) memoryGitHubRateLimitRemaining = prometheus.NewDesc( prometheus.BuildFQName(namespace, memoryName, "github_rate_limit_remaining"), @@ -66,6 +68,16 @@ var ( "deleting concurrency in runner", []string{"runner"}, nil, ) + memoryRunnerDeleteRetryCount = prometheus.NewDesc( + prometheus.BuildFQName(namespace, memoryName, "runner_delete_retry_count"), + "retry count of deleting in runner", + []string{"runner"}, nil, + ) + memoryRunnerCreateRetryCount = prometheus.NewDesc( + prometheus.BuildFQName(namespace, memoryName, "runner_create_retry_count"), + "retry count of creating in runner", + []string{"runner"}, nil, + ) ) // ScraperMemory is scraper implement for memory @@ -89,9 +101,6 @@ func (ScraperMemory) Scrape(ctx context.Context, ds datastore.Datastore, ch chan if err := scrapeGitHubValues(ch); err != nil { return fmt.Errorf("failed to scrape GitHub values: %w", err) } - if err := scrapeRecoveredRuns(ch); err != nil { - return fmt.Errorf("failed to scrape recovered runs: %w", err) - } if config.Config.ProvideDockerHubMetrics { if err := scrapeDockerValues(ch); err != nil { return fmt.Errorf("failed to scrape Docker values: %w", err) @@ -116,7 +125,15 @@ func scrapeStarterValues(ch chan<- prometheus.Metric) error { ch <- prometheus.MustNewConstMetric( memoryStarterQueueWaiting, prometheus.GaugeValue, float64(countWaiting), labelStarter) + starter.CountRecovered.Range(func(key, value interface{}) bool { + ch <- prometheus.MustNewConstMetric( + memoryStarterRecoveredRuns, prometheus.GaugeValue, float64(value.(int)), labelStarter, key.(string), + ) + return true + }) + const labelRunner = "runner" + configRunnerDeletingMax := config.Config.MaxConcurrencyDeleting countRunnerDeletingNow := runner.ConcurrencyDeleting.Load() @@ -125,16 +142,18 @@ func scrapeStarterValues(ch chan<- prometheus.Metric) error { ch <- prometheus.MustNewConstMetric( memoryRunnerQueueConcurrencyDeleting, prometheus.GaugeValue, float64(countRunnerDeletingNow), labelRunner) - return nil -} + runner.DeleteRetryCount.Range(func(key, value any) bool { + ch <- prometheus.MustNewConstMetric( + memoryRunnerDeleteRetryCount, prometheus.GaugeValue, float64(value.(int)), key.(uuid.UUID).String()) + return true + }) -func scrapeRecoveredRuns(ch chan<- prometheus.Metric) error { - starter.CountRecovered.Range(func(key, value interface{}) bool { + starter.AddInstanceRetryCount.Range(func(key, value any) bool { ch <- prometheus.MustNewConstMetric( - memoryStarterRecoveredRuns, prometheus.GaugeValue, float64(value.(int)), key.(string), - ) + memoryRunnerCreateRetryCount, prometheus.GaugeValue, float64(value.(int)), key.(uuid.UUID).String()) return true }) + return nil } diff --git a/pkg/runner/runner_delete.go b/pkg/runner/runner_delete.go index b2b55dd..7cd3f30 100644 --- a/pkg/runner/runner_delete.go +++ b/pkg/runner/runner_delete.go @@ -5,10 +5,12 @@ import ( "errors" "fmt" "strings" + "sync" "sync/atomic" "time" "github.com/google/go-github/v47/github" + "github.com/whywaita/myshoes/internal/util" "github.com/whywaita/myshoes/pkg/config" "github.com/whywaita/myshoes/pkg/datastore" "github.com/whywaita/myshoes/pkg/gh" @@ -25,6 +27,10 @@ var ( ConcurrencyDeleting atomic.Int64 // DeletingTimeout is timeout of deleting runner DeletingTimeout = 3 * time.Minute + // DeleteRetryCount is retry count of deleting runner + DeleteRetryCount = sync.Map{} // key: runner.UUID + // MaxDeleteRetry is max retry count of delete runner + MaxDeleteRetry = 10 ) func (m *Manager) do(ctx context.Context) error { @@ -92,6 +98,12 @@ func (m *Manager) removeRunners(ctx context.Context, t datastore.Target) error { for _, runner := range runners { runner := runner + c, _ := DeleteRetryCount.LoadOrStore(runner.UUID, 0) + count, _ := c.(int) + if count > MaxDeleteRetry { + logger.Logf(false, "runner %s is retry count over %d, so will ignore", runner.UUID, MaxDeleteRetry) + continue + } if err := sem.Acquire(ctx, 1); err != nil { return fmt.Errorf("failed to Acquire: %w", err) @@ -106,9 +118,13 @@ func (m *Manager) removeRunners(ctx context.Context, t datastore.Target) error { sem.Release(1) ConcurrencyDeleting.Add(-1) }() + time.Sleep(util.CalcRetryTime(count)) if err := m.removeRunner(cctx, t, runner, ghRunners); err != nil { + DeleteRetryCount.Store(runner.UUID, count+1) logger.Logf(false, "failed to delete runner: %+v", err) + } else { + DeleteRetryCount.Delete(runner.UUID) } return nil }) diff --git a/pkg/starter/starter.go b/pkg/starter/starter.go index a778914..fed2de4 100644 --- a/pkg/starter/starter.go +++ b/pkg/starter/starter.go @@ -19,6 +19,7 @@ import ( "github.com/google/go-github/v47/github" uuid "github.com/satori/go.uuid" + "github.com/whywaita/myshoes/internal/util" "github.com/whywaita/myshoes/pkg/config" "github.com/whywaita/myshoes/pkg/datastore" "github.com/whywaita/myshoes/pkg/gh" @@ -40,6 +41,9 @@ var ( inProgress = sync.Map{} reQueuedJobs = sync.Map{} + + // AddInstanceRetryCount is count of retry to add instance + AddInstanceRetryCount = sync.Map{} ) // Starter is dispatcher for running job @@ -142,6 +146,8 @@ func (s *Starter) run(ctx context.Context, ch chan datastore.Job) error { // this job is in progress, skip continue } + c, _ := AddInstanceRetryCount.LoadOrStore(job.UUID, 0) + count, _ := c.(int) logger.Logf(true, "found new job: %s", job.UUID) CountWaiting.Add(1) @@ -153,17 +159,22 @@ func (s *Starter) run(ctx context.Context, ch chan datastore.Job) error { inProgress.Store(job.UUID, struct{}{}) - go func(job datastore.Job) { + sleep := util.CalcRetryTime(count) + go func(job datastore.Job, sleep time.Duration) { defer func() { sem.Release(1) inProgress.Delete(job.UUID) CountRunning.Add(-1) }() + time.Sleep(sleep) if err := s.ProcessJob(ctx, job); err != nil { + AddInstanceRetryCount.Store(job.UUID, count+1) logger.Logf(false, "failed to process job: %+v\n", err) + } else { + AddInstanceRetryCount.Delete(job.UUID) } - }(job) + }(job, sleep) case <-ctx.Done(): return nil