Skip to content

Commit

Permalink
configure min interval for jobs via cmdline
Browse files Browse the repository at this point in the history
  • Loading branch information
arriven committed Mar 29, 2022
1 parent fa2b529 commit 1d24834
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 24 deletions.
36 changes: 34 additions & 2 deletions src/job/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type GlobalConfig struct {
SkipEncrypted bool
EnablePrimitiveJobs bool
ScaleFactor int
MinInterval time.Duration
Backoff utils.BackoffConfig
}

Expand All @@ -60,6 +61,8 @@ func NewGlobalConfigWithFlags() *GlobalConfig {
"set to true if you want to run primitive jobs that are less resource-efficient")
flag.IntVar(&res.ScaleFactor, "scale", utils.GetEnvIntDefault("SCALE_FACTOR", 1),
"used to scale the amount of jobs being launched, effect is similar to launching multiple instances at once")
flag.DurationVar(&res.MinInterval, "min-interval", utils.GetEnvDurationDefault("MIN_INTERVAL", 0),
"minimum interval between job iterations")

flag.IntVar(&res.Backoff.Limit, "backoff-limit", utils.GetEnvIntDefault("BACKOFF_LIMIT", utils.DefaultBackoffConfig().Limit),
"how much exponential backoff can be scaled")
Expand Down Expand Up @@ -111,14 +114,43 @@ func Get(t string) Job {
}
}

type JobConfig interface {
FromGlobal(GlobalConfig)
}

func ParseConfig(c JobConfig, args config.Args, global GlobalConfig) error {
if err := utils.Decode(args, c); err != nil {
return err
}

c.FromGlobal(global)

return nil
}

// BasicJobConfig comment for linter
type BasicJobConfig struct {
IntervalMs int `mapstructure:"interval_ms,omitempty"`
IntervalMs int `mapstructure:"interval_ms,omitempty"`
Interval *time.Duration `mapstructure:"interval"`
utils.Counter
*utils.BackoffConfig
}

func (c *BasicJobConfig) FromGlobal(global GlobalConfig) {
if c.GetInterval() < global.MinInterval {
c.Interval = &global.MinInterval
}

if c.BackoffConfig == nil {
c.BackoffConfig = &global.Backoff
}
}

func (c BasicJobConfig) GetInterval() time.Duration {
return utils.NonNilDurationOrDefault(c.Interval, time.Duration(c.IntervalMs)*time.Millisecond)
}

// Next comment for linter
func (c *BasicJobConfig) Next(ctx context.Context) bool {
return utils.Sleep(ctx, time.Duration(c.IntervalMs)*time.Millisecond) && c.Counter.Next()
return utils.Sleep(ctx, c.GetInterval()) && c.Counter.Next()
}
5 changes: 2 additions & 3 deletions src/job/complex.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"fmt"
"sync"

"github.com/mitchellh/mapstructure"
"go.uber.org/zap"

"github.com/Arriven/db1000n/src/job/config"
Expand All @@ -44,7 +43,7 @@ func sequenceJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalCo
Jobs []config.Config
}

if err := mapstructure.Decode(args, &jobConfig); err != nil {
if err := ParseConfig(&jobConfig, args, *globalConfig); err != nil {
return nil, fmt.Errorf("error parsing job config: %w", err)
}

Expand Down Expand Up @@ -75,7 +74,7 @@ func parallelJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalCo
Jobs []config.Config
}

if err := mapstructure.Decode(args, &jobConfig); err != nil {
if err := ParseConfig(&jobConfig, args, *globalConfig); err != nil {
return nil, fmt.Errorf("error parsing job config: %w", err)
}

Expand Down
7 changes: 3 additions & 4 deletions src/job/dnsblast.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (

"github.com/Arriven/db1000n/src/core/dnsblast"
"github.com/Arriven/db1000n/src/job/config"
"github.com/Arriven/db1000n/src/utils"
)

type dnsBlastConfig struct {
Expand All @@ -45,7 +44,7 @@ type dnsBlastConfig struct {
}

func dnsBlastJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalConfig, args config.Args) (data interface{}, err error) {
jobConfig, err := getDNSBlastConfig(args)
jobConfig, err := getDNSBlastConfig(args, globalConfig)
if err != nil {
return nil, err
}
Expand All @@ -66,14 +65,14 @@ func dnsBlastJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalCo
return nil, err
}

func getDNSBlastConfig(args config.Args) (*dnsBlastConfig, error) {
func getDNSBlastConfig(args config.Args, globalConfig *GlobalConfig) (*dnsBlastConfig, error) {
const (
defaultParallelQueriesPerCycle = 5
defaultIntervalMS = 10
)

var jobConfig dnsBlastConfig
if err := utils.Decode(args, &jobConfig); err != nil {
if err := ParseConfig(&jobConfig, args, *globalConfig); err != nil {
return nil, fmt.Errorf("failed to parse DNS Blast job configurations: %w", err)
}

Expand Down
12 changes: 6 additions & 6 deletions src/job/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func singleRequestJob(ctx context.Context, logger *zap.Logger, globalConfig *Glo
ctx, cancel := context.WithCancel(ctx)
defer cancel()

_, clientConfig, requestTpl, err := getHTTPJobConfigs(ctx, args, globalConfig.ProxyURLs, logger)
_, clientConfig, requestTpl, err := getHTTPJobConfigs(ctx, args, *globalConfig, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -125,7 +125,7 @@ func fastHTTPJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalCo
ctx, cancel := context.WithCancel(ctx)
defer cancel()

jobConfig, clientConfig, requestTpl, err := getHTTPJobConfigs(ctx, args, globalConfig.ProxyURLs, logger)
jobConfig, clientConfig, requestTpl, err := getHTTPJobConfigs(ctx, args, *globalConfig, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -169,11 +169,11 @@ func fastHTTPJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalCo
return nil, nil
}

func getHTTPJobConfigs(ctx context.Context, args config.Args, globalProxyURLs string, logger *zap.Logger) (
func getHTTPJobConfigs(ctx context.Context, args config.Args, global GlobalConfig, logger *zap.Logger) (
cfg *httpJobConfig, clientCfg *http.ClientConfig, requestTpl *templates.MapStruct, err error,
) {
var jobConfig httpJobConfig
if err := utils.Decode(args, &jobConfig); err != nil {
if err := ParseConfig(&jobConfig, args, global); err != nil {
return nil, nil, nil, fmt.Errorf("error parsing job config: %w", err)
}

Expand All @@ -182,8 +182,8 @@ func getHTTPJobConfigs(ctx context.Context, args config.Args, globalProxyURLs st
return nil, nil, nil, fmt.Errorf("error parsing client config: %w", err)
}

if globalProxyURLs != "" {
clientConfig.ProxyURLs = templates.ParseAndExecute(logger, globalProxyURLs, ctx)
if global.ProxyURLs != "" {
clientConfig.ProxyURLs = templates.ParseAndExecute(logger, global.ProxyURLs, ctx)
}

requestTpl, err = templates.ParseMapStruct(jobConfig.Request)
Expand Down
4 changes: 2 additions & 2 deletions src/job/packetgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"github.com/Arriven/db1000n/src/utils/templates"
)

func packetgenJob(ctx context.Context, logger *zap.Logger, _ *GlobalConfig, args config.Args) (data interface{}, err error) {
func packetgenJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalConfig, args config.Args) (data interface{}, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -49,7 +49,7 @@ func packetgenJob(ctx context.Context, logger *zap.Logger, _ *GlobalConfig, args
Connection packetgen.ConnectionConfig
}

if err := utils.Decode(args, &jobConfig); err != nil {
if err := ParseConfig(&jobConfig, args, *globalConfig); err != nil {
return nil, fmt.Errorf("error parsing job config: %w", err)
}

Expand Down
10 changes: 5 additions & 5 deletions src/job/rawnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func tcpJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalConfig,
ctx, cancel := context.WithCancel(ctx)
defer cancel()

jobConfig, err := parseRawNetJobArgs(ctx, logger, args)
jobConfig, err := parseRawNetJobArgs(ctx, logger, globalConfig, args)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -125,11 +125,11 @@ func sendTCP(ctx context.Context, logger *zap.Logger, jobConfig *rawnetConfig, t
return nil
}

func udpJob(ctx context.Context, logger *zap.Logger, _ *GlobalConfig, args config.Args) (data interface{}, err error) {
func udpJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalConfig, args config.Args) (data interface{}, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

jobConfig, err := parseRawNetJobArgs(ctx, logger, args)
jobConfig, err := parseRawNetJobArgs(ctx, logger, globalConfig, args)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func sendUDP(ctx context.Context, logger *zap.Logger, a *net.UDPAddr, conn *net.
metrics.IncRawnetUDP(a.String(), metrics.StatusSuccess)
}

func parseRawNetJobArgs(ctx context.Context, logger *zap.Logger, args config.Args) (tpl *rawnetConfig, err error) {
func parseRawNetJobArgs(ctx context.Context, logger *zap.Logger, globalConfig *GlobalConfig, args config.Args) (tpl *rawnetConfig, err error) {
var jobConfig struct {
BasicJobConfig

Expand All @@ -185,7 +185,7 @@ func parseRawNetJobArgs(ctx context.Context, logger *zap.Logger, args config.Arg
Timeout *time.Duration
}

if err := utils.Decode(args, &jobConfig); err != nil {
if err := ParseConfig(&jobConfig, args, *globalConfig); err != nil {
return nil, fmt.Errorf("error decoding rawnet job config: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions src/job/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func loopJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalConfig
Job config.Config
}

if err := mapstructure.Decode(args, &jobConfig); err != nil {
if err := ParseConfig(&jobConfig, args, *globalConfig); err != nil {
return nil, fmt.Errorf("error parsing job config: %w", err)
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func encryptedJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalC
Data string
}

if err := mapstructure.Decode(args, &jobConfig); err != nil {
if err := ParseConfig(&jobConfig, args, *globalConfig); err != nil {
return nil, fmt.Errorf("error parsing job config: %w", err)
}

Expand Down

0 comments on commit 1d24834

Please sign in to comment.