Skip to content

Commit

Permalink
cleanup linter
Browse files Browse the repository at this point in the history
  • Loading branch information
arriven committed Sep 24, 2022
1 parent e5e3b7e commit bd7ff16
Show file tree
Hide file tree
Showing 15 changed files with 77 additions and 105 deletions.
6 changes: 6 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ linters-settings:
section-separators:
- newLine

gomnd:
ignored-functions:
- strings.SplitN
- strconv.ParseUint
- strconv.ParseInt

gosec:
excludes:
- G404 # Cryptographically secure random not required across the whole project
Expand Down
22 changes: 15 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,24 +100,25 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

country := utils.CheckCountryOrFail(ctx, logger, countryCheckerConfig, jobsGlobalConfig.GetProxyParams(logger, nil))

metrics.InitOrFail(ctx, logger, *prometheusOn, *prometheusListenAddress, jobsGlobalConfig.ClientID, country)

reporter := newReporter(*logFormat, *lessStats, logger)
job.NewRunner(runnerConfigOptions, jobsGlobalConfig, reporter).Run(ctx, logger)
metrics.InitOrFail(ctx, logger, *prometheusOn, *prometheusListenAddress, jobsGlobalConfig.ClientID,
utils.CheckCountryOrFail(ctx, logger, countryCheckerConfig, jobsGlobalConfig.GetProxyParams(logger, nil)))
job.NewRunner(runnerConfigOptions, jobsGlobalConfig, newReporter(*logFormat, *lessStats, logger)).Run(ctx, logger)
}

func periodicGC(enabled *bool, period time.Duration, log *zap.Logger) {
if !*enabled {
return
}

var m runtime.MemStats

for {
<-time.After(period)
runtime.ReadMemStats(&m)

memBefore := m.Alloc
start := time.Now()

runtime.GC()
runtime.ReadMemStats(&m)
log.Info("GC finished",
Expand Down Expand Up @@ -177,8 +178,15 @@ func setUpPprof(logger *zap.Logger, pprof string, debug bool) {
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprofhttp.Symbol))
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprofhttp.Trace))

server := &http.Server{
Addr: pprof,
Handler: mux,
ReadTimeout: time.Second,
WriteTimeout: time.Second,
}

// this has to be wrapped into a lambda bc otherwise it blocks when evaluating argument for zap.Error
go func() { logger.Warn("pprof server", zap.Error(http.ListenAndServe(pprof, mux))) }()
go func() { logger.Warn("pprof server", zap.Error(server.ListenAndServe())) }()
}

func newReporter(logFormat string, groupTargets bool, logger *zap.Logger) metrics.Reporter {
Expand Down
2 changes: 1 addition & 1 deletion src/core/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func NewClient(ctx context.Context, clientConfig ClientConfig, logger *zap.Logge
tlsConfig := utils.NonNilOrDefault(clientConfig.TLSClientConfig, tls.Config{
InsecureSkipVerify: true, //nolint:gosec // This is intentional
})
proxyFunc := utils.GetProxyFunc(*clientConfig.Proxy, "http")
proxyFunc := utils.GetProxyFunc(ctx, *clientConfig.Proxy, "http")

if clientConfig.StaticHost != nil {
makeHostClient := func(tls bool) *fasthttp.HostClient {
Expand Down
9 changes: 5 additions & 4 deletions src/core/packetgen/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package packetgen

import (
"context"
"crypto/tls"
"fmt"
"net"
Expand All @@ -41,7 +42,7 @@ type ConnectionConfig struct {
Proxy *utils.ProxyParams
}

func OpenConnection(c ConnectionConfig) (Connection, error) {
func OpenConnection(ctx context.Context, c ConnectionConfig) (Connection, error) {
switch c.Type {
case "raw":
var cfg rawConnConfig
Expand All @@ -56,7 +57,7 @@ func OpenConnection(c ConnectionConfig) (Connection, error) {
return nil, fmt.Errorf("error decoding connection config: %w", err)
}

return openNetConn(cfg, c.Proxy)
return openNetConn(ctx, cfg, c.Proxy)
default:
return nil, fmt.Errorf("unknown connection type: %v", c.Type)
}
Expand Down Expand Up @@ -128,8 +129,8 @@ type netConn struct {
target string
}

func openNetConn(c netConnConfig, proxyParams *utils.ProxyParams) (*netConn, error) {
conn, err := utils.GetProxyFunc(utils.NonNilOrDefault(proxyParams, utils.ProxyParams{}), c.Protocol)(c.Protocol, c.Address)
func openNetConn(ctx context.Context, c netConnConfig, proxyParams *utils.ProxyParams) (*netConn, error) {
conn, err := utils.GetProxyFunc(ctx, utils.NonNilOrDefault(proxyParams, utils.ProxyParams{}), c.Protocol)(c.Protocol, c.Address)

switch {
case err != nil:
Expand Down
3 changes: 3 additions & 0 deletions src/job/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
// GlobalConfig passes commandline arguments to every job.
type GlobalConfig struct {
ClientID string
UserID string

ProxyURLs string
LocalAddr string
Expand All @@ -59,6 +60,8 @@ func NewGlobalConfigWithFlags() *GlobalConfig {
ClientID: uuid.NewString(),
}

flag.StringVar(&res.UserID, "user-id", utils.GetEnvStringDefault("USER_ID", ""),
"user id for optional metrics")
flag.StringVar(&res.ProxyURLs, "proxy", utils.GetEnvStringDefault("SYSTEM_PROXY", ""),
"system proxy to set by default (can be a comma-separated list or a template)")
flag.StringVar(&res.LocalAddr, "local-address", utils.GetEnvStringDefault("LOCAL_ADDRESS", ""),
Expand Down
20 changes: 10 additions & 10 deletions src/job/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ type RawMultiConfig struct {
}

// fetch tries to read a config from the list of mirrors until it succeeds
func fetch(logger *zap.Logger, paths []string, lastKnownConfig *RawMultiConfig, skipEncrypted bool) *RawMultiConfig {
func fetch(ctx context.Context, logger *zap.Logger, paths []string, lastKnownConfig *RawMultiConfig, skipEncrypted bool) *RawMultiConfig {
for i := range paths {
config, err := fetchAndDecrypt(logger, paths[i], lastKnownConfig, skipEncrypted)
config, err := fetchAndDecrypt(ctx, logger, paths[i], lastKnownConfig, skipEncrypted)
if err != nil {
continue
}
Expand All @@ -78,8 +78,8 @@ func fetch(logger *zap.Logger, paths []string, lastKnownConfig *RawMultiConfig,
return lastKnownConfig
}

func fetchAndDecrypt(logger *zap.Logger, path string, lastKnownConfig *RawMultiConfig, skipEncrypted bool) (*RawMultiConfig, error) {
config, err := fetchSingle(path, lastKnownConfig)
func fetchAndDecrypt(ctx context.Context, logger *zap.Logger, path string, lastKnownConfig *RawMultiConfig, skipEncrypted bool) (*RawMultiConfig, error) {
config, err := fetchSingle(ctx, path, lastKnownConfig)
if err != nil {
logger.Warn("failed to fetch config", zap.String("path", path), zap.Error(err))

Expand Down Expand Up @@ -110,7 +110,7 @@ func fetchAndDecrypt(logger *zap.Logger, path string, lastKnownConfig *RawMultiC
}

// fetchSingle reads a config from a single source
func fetchSingle(path string, lastKnownConfig *RawMultiConfig) (*RawMultiConfig, error) {
func fetchSingle(ctx context.Context, path string, lastKnownConfig *RawMultiConfig) (*RawMultiConfig, error) {
configURL, err := url.ParseRequestURI(path)
// absolute paths can be interpreted as a URL with no schema, need to check for that explicitly
if err != nil || filepath.IsAbs(path) {
Expand All @@ -122,13 +122,13 @@ func fetchSingle(path string, lastKnownConfig *RawMultiConfig) (*RawMultiConfig,
return &RawMultiConfig{Body: res, lastModified: "", etag: ""}, nil
}

return fetchURL(configURL, lastKnownConfig)
return fetchURL(ctx, configURL, lastKnownConfig)
}

func fetchURL(configURL *url.URL, lastKnownConfig *RawMultiConfig) (*RawMultiConfig, error) {
func fetchURL(ctx context.Context, configURL *url.URL, lastKnownConfig *RawMultiConfig) (*RawMultiConfig, error) {
const requestTimeout = 20 * time.Second

ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, configURL.String(), nil)
Expand Down Expand Up @@ -171,8 +171,8 @@ func fetchURL(configURL *url.URL, lastKnownConfig *RawMultiConfig) (*RawMultiCon
}

// FetchRawMultiConfig retrieves the current config using a list of paths. Falls back to the last known config in case of errors.
func FetchRawMultiConfig(logger *zap.Logger, paths []string, lastKnownConfig *RawMultiConfig, skipEncrypted bool) *RawMultiConfig {
return fetch(logger, paths, lastKnownConfig, skipEncrypted)
func FetchRawMultiConfig(ctx context.Context, logger *zap.Logger, paths []string, lastKnownConfig *RawMultiConfig, skipEncrypted bool) *RawMultiConfig {
return fetch(ctx, logger, paths, lastKnownConfig, skipEncrypted)
}

// Unmarshal config encoded with the given format.
Expand Down
4 changes: 3 additions & 1 deletion src/job/config/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"bytes"
"context"
"flag"
"os"
"time"
Expand All @@ -22,7 +23,8 @@ func UpdateLocal(logger *zap.Logger, destinationPath string, configPaths []strin
lastKnownConfig := &RawMultiConfig{Body: backupConfig}

for {
if rawConfig := FetchRawMultiConfig(logger, configPaths, lastKnownConfig, skipEncrypted); !bytes.Equal(lastKnownConfig.Body, rawConfig.Body) {
rawConfig := FetchRawMultiConfig(context.Background(), logger, configPaths, lastKnownConfig, skipEncrypted)
if !bytes.Equal(lastKnownConfig.Body, rawConfig.Body) {
if err := writeConfig(logger, rawConfig.Body, destinationPath); err != nil {
logger.Error("error writing config", zap.Error(err))

Expand Down
2 changes: 1 addition & 1 deletion src/job/packetgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func sendPacket(ctx context.Context, logger *zap.Logger, jobConfig *packetgenJob
ctx, cancel := context.WithCancel(ctx)
defer cancel()

conn, err := packetgen.OpenConnection(jobConfig.Connection)
conn, err := packetgen.OpenConnection(ctx, jobConfig.Connection)
if err != nil {
return err
}
Expand Down
35 changes: 10 additions & 25 deletions src/job/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func NewRunner(cfgOptions *ConfigOptions, globalJobsCfg *GlobalConfig, reporter

// Run the runner and block until Stop() is called
func (r *Runner) Run(ctx context.Context, logger *zap.Logger) {
ctx = context.WithValue(ctx, templates.ContextKey("goos"), runtime.GOOS)
ctx = context.WithValue(ctx, templates.ContextKey("goarch"), runtime.GOARCH)
ctx = context.WithValue(ctx, templates.ContextKey("version"), ota.Version)
ctx = context.WithValue(ctx, templates.ContextKey("global"), r.globalJobsCfg)
lastKnownConfig := &config.RawMultiConfig{}
refreshTimer := time.NewTicker(r.cfgOptions.RefreshTimeout)
Expand All @@ -92,26 +95,22 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger) {
metrics.IncClient()

var (
cancel context.CancelFunc
cancel context.CancelFunc = func() {}
tracker *metrics.StatsTracker
)

for {
rawConfig := config.FetchRawMultiConfig(logger, strings.Split(r.cfgOptions.PathsCSV, ","),
rawConfig := config.FetchRawMultiConfig(ctx, logger, strings.Split(r.cfgOptions.PathsCSV, ","),
nonNilConfigOrDefault(lastKnownConfig, &config.RawMultiConfig{
Body: []byte(nonEmptyStringOrDefault(r.cfgOptions.BackupConfig, config.DefaultConfig)),
}), r.globalJobsCfg.SkipEncrypted)
cfg := config.Unmarshal(rawConfig.Body, r.cfgOptions.Format)

if !bytes.Equal(lastKnownConfig.Body, rawConfig.Body) && cfg != nil { // Only restart jobs if the new config differs from the current one
logger.Info("new config received, applying")
cancel()

lastKnownConfig = rawConfig

if cancel != nil {
cancel()
}

metric := &metrics.Metrics{} // clear info about previous targets and avoid old jobs from dumping old info to new metrics
tracker = metrics.NewStatsTracker(metric)

Expand All @@ -130,14 +129,14 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger) {
select {
case <-refreshTimer.C:
case <-ctx.Done():
if cancel != nil {
cancel()
}
cancel()

return
}

reportMetrics(r.reporter, tracker, r.globalJobsCfg.ClientID, logger)
if r.reporter != nil && tracker != nil {
r.reporter.WriteSummary(tracker)
}
}
}

Expand Down Expand Up @@ -173,9 +172,6 @@ func computeCount(count int, scaleFactor float64) int {

func (r *Runner) runJobs(ctx context.Context, cfg *config.MultiConfig, metric *metrics.Metrics, logger *zap.Logger) (cancel context.CancelFunc) {
ctx, cancel = context.WithCancel(ctx)
ctx = context.WithValue(ctx, templates.ContextKey("goos"), runtime.GOOS)
ctx = context.WithValue(ctx, templates.ContextKey("goarch"), runtime.GOARCH)
ctx = context.WithValue(ctx, templates.ContextKey("version"), ota.Version)

var jobInstancesCount int

Expand Down Expand Up @@ -229,14 +225,3 @@ func (r *Runner) runJobs(ctx context.Context, cfg *config.MultiConfig, metric *m

return cancel
}

func reportMetrics(reporter metrics.Reporter, tracker *metrics.StatsTracker, clientID string, logger *zap.Logger) {
if reporter != nil && tracker != nil {
reporter.WriteSummary(tracker)

// TODO: get rid of this
if err := metrics.ReportStatistics(0, clientID); err != nil {
logger.Debug("error reporting statistics", zap.Error(err))
}
}
}
3 changes: 3 additions & 0 deletions src/job/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func setVarJob(ctx context.Context, args config.Args, globalConfig *GlobalConfig
return strconv.Atoi(templates.ParseAndExecute(logger, jobConfig.Value, ctx))
case "uint":
val, err := strconv.ParseUint(templates.ParseAndExecute(logger, jobConfig.Value, ctx), 10, 32)

return uint(val), err
case "int64":
return strconv.ParseInt(templates.ParseAndExecute(logger, jobConfig.Value, ctx), 10, 64)
Expand Down Expand Up @@ -122,6 +123,8 @@ func sleepJob(ctx context.Context, args config.Args, globalConfig *GlobalConfig,
func discardErrorJob(ctx context.Context, args config.Args, globalConfig *GlobalConfig, a *metrics.Accumulator, logger *zap.Logger) (
data any, err error, //nolint:unparam // data is here to match Job
) {
defer utils.PanicHandler(logger)

var jobConfig struct {
BasicJobConfig

Expand Down
18 changes: 9 additions & 9 deletions src/utils/countrychecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ func CheckCountryOrFail(ctx context.Context, logger *zap.Logger, cfg *CountryChe
case <-ctx.Done():
return
case <-ticker.C:
_ = ckeckCountryOnce(logger, cfg, proxyParams)
_ = ckeckCountryOnce(ctx, logger, cfg, proxyParams)
}
}
}()
}

return ckeckCountryOnce(logger, cfg, proxyParams)
return ckeckCountryOnce(ctx, logger, cfg, proxyParams)
}

func ckeckCountryOnce(logger *zap.Logger, cfg *CountryCheckerConfig, proxyParams ProxyParams) string {
country, ip, err := getCountry(logger, proxyParams, cfg.maxRetries)
func ckeckCountryOnce(ctx context.Context, logger *zap.Logger, cfg *CountryCheckerConfig, proxyParams ProxyParams) string {
country, ip, err := getCountry(ctx, logger, proxyParams, cfg.maxRetries)
if err != nil {
if cfg.strict {
logger.Fatal("country strict check failed", zap.Error(err))
Expand All @@ -80,16 +80,16 @@ func ckeckCountryOnce(logger *zap.Logger, cfg *CountryCheckerConfig, proxyParams
return country
}

func getCountry(logger *zap.Logger, proxyParams ProxyParams, maxFetchRetries int) (country, ip string, err error) {
func getCountry(ctx context.Context, logger *zap.Logger, proxyParams ProxyParams, maxFetchRetries int) (country, ip string, err error) {
counter := Counter{Count: maxFetchRetries}
backoffController := BackoffController{BackoffConfig: DefaultBackoffConfig()}

for counter.Next() {
logger.Info("checking IP address,", zap.Int("iter", counter.iter))

if country, ip, err = fetchLocationInfo(proxyParams); err != nil {
if country, ip, err = fetchLocationInfo(ctx, proxyParams); err != nil {
logger.Warn("error fetching location info", zap.Error(err))
Sleep(context.Background(), backoffController.Increment().GetTimeout())
Sleep(ctx, backoffController.Increment().GetTimeout())
} else {
return
}
Expand All @@ -98,13 +98,13 @@ func getCountry(logger *zap.Logger, proxyParams ProxyParams, maxFetchRetries int
return "", "", fmt.Errorf("couldn't get location info in %d tries", maxFetchRetries)
}

func fetchLocationInfo(proxyParams ProxyParams) (country, ip string, err error) {
func fetchLocationInfo(ctx context.Context, proxyParams ProxyParams) (country, ip string, err error) {
const (
ipCheckerURI = "https://api.myip.com/"
requestTimeout = 3 * time.Second
)

proxyFunc := GetProxyFunc(proxyParams, "http")
proxyFunc := GetProxyFunc(ctx, proxyParams, "http")

client := &fasthttp.Client{
MaxConnDuration: requestTimeout,
Expand Down
Loading

0 comments on commit bd7ff16

Please sign in to comment.