Skip to content

Commit

Permalink
Merge pull request #211 from gamoutatsumi/add_exponential_backoff
Browse files Browse the repository at this point in the history
add exponential backoff in delete runner
  • Loading branch information
whywaita authored Sep 12, 2024
2 parents b773820 + 57258ea commit a7e9db7
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 12 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/r3labs/diff/v2 v2.15.1
github.com/satori/go.uuid v1.2.0
goji.io v2.0.2+incompatible
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/grpc v1.45.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
Expand Down
19 changes: 19 additions & 0 deletions internal/util/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package util

import (
"time"

"golang.org/x/exp/rand"
)

// CalcRetryTime is caliculate retry time by exponential backoff and jitter
func CalcRetryTime(count int) time.Duration {
if count == 0 {
return 0
}

backoff := 1 << count
jitter := time.Duration(rand.Intn(1000)) * time.Millisecond

return time.Duration(backoff)*time.Second + jitter
}
39 changes: 29 additions & 10 deletions pkg/metric/scrape_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

uuid "github.com/satori/go.uuid"

"github.com/prometheus/client_golang/prometheus"
"github.com/whywaita/myshoes/pkg/config"
"github.com/whywaita/myshoes/pkg/datastore"
Expand Down Expand Up @@ -34,7 +36,7 @@ var (
memoryStarterRecoveredRuns = prometheus.NewDesc(
prometheus.BuildFQName(namespace, memoryName, "starter_recovered_runs"),
"recovered runs in starter",
[]string{"starter"}, nil,
[]string{"starter", "target"}, nil,
)
memoryGitHubRateLimitRemaining = prometheus.NewDesc(
prometheus.BuildFQName(namespace, memoryName, "github_rate_limit_remaining"),
Expand Down Expand Up @@ -66,6 +68,16 @@ var (
"deleting concurrency in runner",
[]string{"runner"}, nil,
)
memoryRunnerDeleteRetryCount = prometheus.NewDesc(
prometheus.BuildFQName(namespace, memoryName, "runner_delete_retry_count"),
"retry count of deleting in runner",
[]string{"runner"}, nil,
)
memoryRunnerCreateRetryCount = prometheus.NewDesc(
prometheus.BuildFQName(namespace, memoryName, "runner_create_retry_count"),
"retry count of creating in runner",
[]string{"runner"}, nil,
)
)

// ScraperMemory is scraper implement for memory
Expand All @@ -89,9 +101,6 @@ func (ScraperMemory) Scrape(ctx context.Context, ds datastore.Datastore, ch chan
if err := scrapeGitHubValues(ch); err != nil {
return fmt.Errorf("failed to scrape GitHub values: %w", err)
}
if err := scrapeRecoveredRuns(ch); err != nil {
return fmt.Errorf("failed to scrape recovered runs: %w", err)
}
if config.Config.ProvideDockerHubMetrics {
if err := scrapeDockerValues(ch); err != nil {
return fmt.Errorf("failed to scrape Docker values: %w", err)
Expand All @@ -116,7 +125,15 @@ func scrapeStarterValues(ch chan<- prometheus.Metric) error {
ch <- prometheus.MustNewConstMetric(
memoryStarterQueueWaiting, prometheus.GaugeValue, float64(countWaiting), labelStarter)

starter.CountRecovered.Range(func(key, value interface{}) bool {
ch <- prometheus.MustNewConstMetric(
memoryStarterRecoveredRuns, prometheus.GaugeValue, float64(value.(int)), labelStarter, key.(string),
)
return true
})

const labelRunner = "runner"

configRunnerDeletingMax := config.Config.MaxConcurrencyDeleting
countRunnerDeletingNow := runner.ConcurrencyDeleting.Load()

Expand All @@ -125,16 +142,18 @@ func scrapeStarterValues(ch chan<- prometheus.Metric) error {
ch <- prometheus.MustNewConstMetric(
memoryRunnerQueueConcurrencyDeleting, prometheus.GaugeValue, float64(countRunnerDeletingNow), labelRunner)

return nil
}
runner.DeleteRetryCount.Range(func(key, value any) bool {
ch <- prometheus.MustNewConstMetric(
memoryRunnerDeleteRetryCount, prometheus.GaugeValue, float64(value.(int)), key.(uuid.UUID).String())
return true
})

func scrapeRecoveredRuns(ch chan<- prometheus.Metric) error {
starter.CountRecovered.Range(func(key, value interface{}) bool {
starter.AddInstanceRetryCount.Range(func(key, value any) bool {
ch <- prometheus.MustNewConstMetric(
memoryStarterRecoveredRuns, prometheus.GaugeValue, float64(value.(int)), key.(string),
)
memoryRunnerCreateRetryCount, prometheus.GaugeValue, float64(value.(int)), key.(uuid.UUID).String())
return true
})

return nil
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/runner/runner_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/google/go-github/v47/github"
"github.com/whywaita/myshoes/internal/util"
"github.com/whywaita/myshoes/pkg/config"
"github.com/whywaita/myshoes/pkg/datastore"
"github.com/whywaita/myshoes/pkg/gh"
Expand All @@ -25,6 +27,10 @@ var (
ConcurrencyDeleting atomic.Int64
// DeletingTimeout is timeout of deleting runner
DeletingTimeout = 3 * time.Minute
// DeleteRetryCount is retry count of deleting runner
DeleteRetryCount = sync.Map{} // key: runner.UUID
// MaxDeleteRetry is max retry count of delete runner
MaxDeleteRetry = 10
)

func (m *Manager) do(ctx context.Context) error {
Expand Down Expand Up @@ -92,6 +98,12 @@ func (m *Manager) removeRunners(ctx context.Context, t datastore.Target) error {

for _, runner := range runners {
runner := runner
c, _ := DeleteRetryCount.LoadOrStore(runner.UUID, 0)
count, _ := c.(int)
if count > MaxDeleteRetry {
logger.Logf(false, "runner %s is retry count over %d, so will ignore", runner.UUID, MaxDeleteRetry)
continue
}

if err := sem.Acquire(ctx, 1); err != nil {
return fmt.Errorf("failed to Acquire: %w", err)
Expand All @@ -106,9 +118,13 @@ func (m *Manager) removeRunners(ctx context.Context, t datastore.Target) error {
sem.Release(1)
ConcurrencyDeleting.Add(-1)
}()
time.Sleep(util.CalcRetryTime(count))

if err := m.removeRunner(cctx, t, runner, ghRunners); err != nil {
DeleteRetryCount.Store(runner.UUID, count+1)
logger.Logf(false, "failed to delete runner: %+v", err)
} else {
DeleteRetryCount.Delete(runner.UUID)
}
return nil
})
Expand Down
15 changes: 13 additions & 2 deletions pkg/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/google/go-github/v47/github"
uuid "github.com/satori/go.uuid"
"github.com/whywaita/myshoes/internal/util"
"github.com/whywaita/myshoes/pkg/config"
"github.com/whywaita/myshoes/pkg/datastore"
"github.com/whywaita/myshoes/pkg/gh"
Expand All @@ -40,6 +41,9 @@ var (
inProgress = sync.Map{}

reQueuedJobs = sync.Map{}

// AddInstanceRetryCount is count of retry to add instance
AddInstanceRetryCount = sync.Map{}
)

// Starter is dispatcher for running job
Expand Down Expand Up @@ -142,6 +146,8 @@ func (s *Starter) run(ctx context.Context, ch chan datastore.Job) error {
// this job is in progress, skip
continue
}
c, _ := AddInstanceRetryCount.LoadOrStore(job.UUID, 0)
count, _ := c.(int)

logger.Logf(true, "found new job: %s", job.UUID)
CountWaiting.Add(1)
Expand All @@ -153,17 +159,22 @@ func (s *Starter) run(ctx context.Context, ch chan datastore.Job) error {

inProgress.Store(job.UUID, struct{}{})

go func(job datastore.Job) {
sleep := util.CalcRetryTime(count)
go func(job datastore.Job, sleep time.Duration) {
defer func() {
sem.Release(1)
inProgress.Delete(job.UUID)
CountRunning.Add(-1)
}()

time.Sleep(sleep)
if err := s.ProcessJob(ctx, job); err != nil {
AddInstanceRetryCount.Store(job.UUID, count+1)
logger.Logf(false, "failed to process job: %+v\n", err)
} else {
AddInstanceRetryCount.Delete(job.UUID)
}
}(job)
}(job, sleep)

case <-ctx.Done():
return nil
Expand Down

0 comments on commit a7e9db7

Please sign in to comment.