diff --git a/.golangci.yml b/.golangci.yml index b6a66780..43cb95a2 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -21,6 +21,12 @@ linters-settings: section-separators: - newLine + gomnd: + ignored-functions: + - strings.SplitN + - strconv.ParseUint + - strconv.ParseInt + gosec: excludes: - G404 # Cryptographically secure random not required across the whole project diff --git a/main.go b/main.go index 51894a8d..c1f7a62f 100644 --- a/main.go +++ b/main.go @@ -100,24 +100,25 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - country := utils.CheckCountryOrFail(ctx, logger, countryCheckerConfig, jobsGlobalConfig.GetProxyParams(logger, nil)) - - metrics.InitOrFail(ctx, logger, *prometheusOn, *prometheusListenAddress, jobsGlobalConfig.ClientID, country) - - reporter := newReporter(*logFormat, *lessStats, logger) - job.NewRunner(runnerConfigOptions, jobsGlobalConfig, reporter).Run(ctx, logger) + metrics.InitOrFail(ctx, logger, *prometheusOn, *prometheusListenAddress, jobsGlobalConfig.ClientID, + utils.CheckCountryOrFail(ctx, logger, countryCheckerConfig, jobsGlobalConfig.GetProxyParams(logger, nil))) + job.NewRunner(runnerConfigOptions, jobsGlobalConfig, newReporter(*logFormat, *lessStats, logger)).Run(ctx, logger) } func periodicGC(enabled *bool, period time.Duration, log *zap.Logger) { if !*enabled { return } + var m runtime.MemStats + for { <-time.After(period) runtime.ReadMemStats(&m) + memBefore := m.Alloc start := time.Now() + runtime.GC() runtime.ReadMemStats(&m) log.Info("GC finished", @@ -177,8 +178,15 @@ func setUpPprof(logger *zap.Logger, pprof string, debug bool) { mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprofhttp.Symbol)) mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprofhttp.Trace)) + server := &http.Server{ + Addr: pprof, + Handler: mux, + ReadTimeout: time.Second, + WriteTimeout: time.Second, + } + // this has to be wrapped into a lambda bc otherwise it blocks when evaluating argument for zap.Error - go func() { logger.Warn("pprof server", zap.Error(http.ListenAndServe(pprof, mux))) }() + go func() { logger.Warn("pprof server", zap.Error(server.ListenAndServe())) }() } func newReporter(logFormat string, groupTargets bool, logger *zap.Logger) metrics.Reporter { diff --git a/src/core/http/http.go b/src/core/http/http.go index 1f38720e..65d4dc52 100644 --- a/src/core/http/http.go +++ b/src/core/http/http.go @@ -113,7 +113,7 @@ func NewClient(ctx context.Context, clientConfig ClientConfig, logger *zap.Logge tlsConfig := utils.NonNilOrDefault(clientConfig.TLSClientConfig, tls.Config{ InsecureSkipVerify: true, //nolint:gosec // This is intentional }) - proxyFunc := utils.GetProxyFunc(*clientConfig.Proxy, "http") + proxyFunc := utils.GetProxyFunc(ctx, *clientConfig.Proxy, "http") if clientConfig.StaticHost != nil { makeHostClient := func(tls bool) *fasthttp.HostClient { diff --git a/src/core/packetgen/connection.go b/src/core/packetgen/connection.go index 5a7fd6f8..3b966993 100644 --- a/src/core/packetgen/connection.go +++ b/src/core/packetgen/connection.go @@ -23,6 +23,7 @@ package packetgen import ( + "context" "crypto/tls" "fmt" "net" @@ -41,7 +42,7 @@ type ConnectionConfig struct { Proxy *utils.ProxyParams } -func OpenConnection(c ConnectionConfig) (Connection, error) { +func OpenConnection(ctx context.Context, c ConnectionConfig) (Connection, error) { switch c.Type { case "raw": var cfg rawConnConfig @@ -56,7 +57,7 @@ func OpenConnection(c ConnectionConfig) (Connection, error) { return nil, fmt.Errorf("error decoding connection config: %w", err) } - return openNetConn(cfg, c.Proxy) + return openNetConn(ctx, cfg, c.Proxy) default: return nil, fmt.Errorf("unknown connection type: %v", c.Type) } @@ -128,8 +129,8 @@ type netConn struct { target string } -func openNetConn(c netConnConfig, proxyParams *utils.ProxyParams) (*netConn, error) { - conn, err := utils.GetProxyFunc(utils.NonNilOrDefault(proxyParams, utils.ProxyParams{}), c.Protocol)(c.Protocol, c.Address) +func openNetConn(ctx context.Context, c netConnConfig, proxyParams *utils.ProxyParams) (*netConn, error) { + conn, err := utils.GetProxyFunc(ctx, utils.NonNilOrDefault(proxyParams, utils.ProxyParams{}), c.Protocol)(c.Protocol, c.Address) switch { case err != nil: diff --git a/src/job/base.go b/src/job/base.go index 7e75515d..38a83e70 100644 --- a/src/job/base.go +++ b/src/job/base.go @@ -41,6 +41,7 @@ import ( // GlobalConfig passes commandline arguments to every job. type GlobalConfig struct { ClientID string + UserID string ProxyURLs string LocalAddr string @@ -59,6 +60,8 @@ func NewGlobalConfigWithFlags() *GlobalConfig { ClientID: uuid.NewString(), } + flag.StringVar(&res.UserID, "user-id", utils.GetEnvStringDefault("USER_ID", ""), + "user id for optional metrics") flag.StringVar(&res.ProxyURLs, "proxy", utils.GetEnvStringDefault("SYSTEM_PROXY", ""), "system proxy to set by default (can be a comma-separated list or a template)") flag.StringVar(&res.LocalAddr, "local-address", utils.GetEnvStringDefault("LOCAL_ADDRESS", ""), diff --git a/src/job/config/config.go b/src/job/config/config.go index 8e983594..8c4b8888 100644 --- a/src/job/config/config.go +++ b/src/job/config/config.go @@ -63,9 +63,9 @@ type RawMultiConfig struct { } // fetch tries to read a config from the list of mirrors until it succeeds -func fetch(logger *zap.Logger, paths []string, lastKnownConfig *RawMultiConfig, skipEncrypted bool) *RawMultiConfig { +func fetch(ctx context.Context, logger *zap.Logger, paths []string, lastKnownConfig *RawMultiConfig, skipEncrypted bool) *RawMultiConfig { for i := range paths { - config, err := fetchAndDecrypt(logger, paths[i], lastKnownConfig, skipEncrypted) + config, err := fetchAndDecrypt(ctx, logger, paths[i], lastKnownConfig, skipEncrypted) if err != nil { continue } @@ -78,8 +78,8 @@ func fetch(logger *zap.Logger, paths []string, lastKnownConfig *RawMultiConfig, return lastKnownConfig } -func fetchAndDecrypt(logger *zap.Logger, path string, lastKnownConfig *RawMultiConfig, skipEncrypted bool) (*RawMultiConfig, error) { - config, err := fetchSingle(path, lastKnownConfig) +func fetchAndDecrypt(ctx context.Context, logger *zap.Logger, path string, lastKnownConfig *RawMultiConfig, skipEncrypted bool) (*RawMultiConfig, error) { + config, err := fetchSingle(ctx, path, lastKnownConfig) if err != nil { logger.Warn("failed to fetch config", zap.String("path", path), zap.Error(err)) @@ -110,7 +110,7 @@ func fetchAndDecrypt(logger *zap.Logger, path string, lastKnownConfig *RawMultiC } // fetchSingle reads a config from a single source -func fetchSingle(path string, lastKnownConfig *RawMultiConfig) (*RawMultiConfig, error) { +func fetchSingle(ctx context.Context, path string, lastKnownConfig *RawMultiConfig) (*RawMultiConfig, error) { configURL, err := url.ParseRequestURI(path) // absolute paths can be interpreted as a URL with no schema, need to check for that explicitly if err != nil || filepath.IsAbs(path) { @@ -122,13 +122,13 @@ func fetchSingle(path string, lastKnownConfig *RawMultiConfig) (*RawMultiConfig, return &RawMultiConfig{Body: res, lastModified: "", etag: ""}, nil } - return fetchURL(configURL, lastKnownConfig) + return fetchURL(ctx, configURL, lastKnownConfig) } -func fetchURL(configURL *url.URL, lastKnownConfig *RawMultiConfig) (*RawMultiConfig, error) { +func fetchURL(ctx context.Context, configURL *url.URL, lastKnownConfig *RawMultiConfig) (*RawMultiConfig, error) { const requestTimeout = 20 * time.Second - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + ctx, cancel := context.WithTimeout(ctx, requestTimeout) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodGet, configURL.String(), nil) @@ -171,8 +171,8 @@ func fetchURL(configURL *url.URL, lastKnownConfig *RawMultiConfig) (*RawMultiCon } // FetchRawMultiConfig retrieves the current config using a list of paths. Falls back to the last known config in case of errors. -func FetchRawMultiConfig(logger *zap.Logger, paths []string, lastKnownConfig *RawMultiConfig, skipEncrypted bool) *RawMultiConfig { - return fetch(logger, paths, lastKnownConfig, skipEncrypted) +func FetchRawMultiConfig(ctx context.Context, logger *zap.Logger, paths []string, lastKnownConfig *RawMultiConfig, skipEncrypted bool) *RawMultiConfig { + return fetch(ctx, logger, paths, lastKnownConfig, skipEncrypted) } // Unmarshal config encoded with the given format. diff --git a/src/job/config/updater.go b/src/job/config/updater.go index 97554fc9..486d7a07 100644 --- a/src/job/config/updater.go +++ b/src/job/config/updater.go @@ -2,6 +2,7 @@ package config import ( "bytes" + "context" "flag" "os" "time" @@ -22,7 +23,8 @@ func UpdateLocal(logger *zap.Logger, destinationPath string, configPaths []strin lastKnownConfig := &RawMultiConfig{Body: backupConfig} for { - if rawConfig := FetchRawMultiConfig(logger, configPaths, lastKnownConfig, skipEncrypted); !bytes.Equal(lastKnownConfig.Body, rawConfig.Body) { + rawConfig := FetchRawMultiConfig(context.Background(), logger, configPaths, lastKnownConfig, skipEncrypted) + if !bytes.Equal(lastKnownConfig.Body, rawConfig.Body) { if err := writeConfig(logger, rawConfig.Body, destinationPath); err != nil { logger.Error("error writing config", zap.Error(err)) diff --git a/src/job/packetgen.go b/src/job/packetgen.go index a507fa38..62372619 100644 --- a/src/job/packetgen.go +++ b/src/job/packetgen.go @@ -72,7 +72,7 @@ func sendPacket(ctx context.Context, logger *zap.Logger, jobConfig *packetgenJob ctx, cancel := context.WithCancel(ctx) defer cancel() - conn, err := packetgen.OpenConnection(jobConfig.Connection) + conn, err := packetgen.OpenConnection(ctx, jobConfig.Connection) if err != nil { return err } diff --git a/src/job/runner.go b/src/job/runner.go index 8fd05eda..b69ea08b 100644 --- a/src/job/runner.go +++ b/src/job/runner.go @@ -84,6 +84,9 @@ func NewRunner(cfgOptions *ConfigOptions, globalJobsCfg *GlobalConfig, reporter // Run the runner and block until Stop() is called func (r *Runner) Run(ctx context.Context, logger *zap.Logger) { + ctx = context.WithValue(ctx, templates.ContextKey("goos"), runtime.GOOS) + ctx = context.WithValue(ctx, templates.ContextKey("goarch"), runtime.GOARCH) + ctx = context.WithValue(ctx, templates.ContextKey("version"), ota.Version) ctx = context.WithValue(ctx, templates.ContextKey("global"), r.globalJobsCfg) lastKnownConfig := &config.RawMultiConfig{} refreshTimer := time.NewTicker(r.cfgOptions.RefreshTimeout) @@ -92,12 +95,12 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger) { metrics.IncClient() var ( - cancel context.CancelFunc + cancel context.CancelFunc = func() {} tracker *metrics.StatsTracker ) for { - rawConfig := config.FetchRawMultiConfig(logger, strings.Split(r.cfgOptions.PathsCSV, ","), + rawConfig := config.FetchRawMultiConfig(ctx, logger, strings.Split(r.cfgOptions.PathsCSV, ","), nonNilConfigOrDefault(lastKnownConfig, &config.RawMultiConfig{ Body: []byte(nonEmptyStringOrDefault(r.cfgOptions.BackupConfig, config.DefaultConfig)), }), r.globalJobsCfg.SkipEncrypted) @@ -105,13 +108,9 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger) { if !bytes.Equal(lastKnownConfig.Body, rawConfig.Body) && cfg != nil { // Only restart jobs if the new config differs from the current one logger.Info("new config received, applying") + cancel() lastKnownConfig = rawConfig - - if cancel != nil { - cancel() - } - metric := &metrics.Metrics{} // clear info about previous targets and avoid old jobs from dumping old info to new metrics tracker = metrics.NewStatsTracker(metric) @@ -130,14 +129,14 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger) { select { case <-refreshTimer.C: case <-ctx.Done(): - if cancel != nil { - cancel() - } + cancel() return } - reportMetrics(r.reporter, tracker, r.globalJobsCfg.ClientID, logger) + if r.reporter != nil && tracker != nil { + r.reporter.WriteSummary(tracker) + } } } @@ -173,9 +172,6 @@ func computeCount(count int, scaleFactor float64) int { func (r *Runner) runJobs(ctx context.Context, cfg *config.MultiConfig, metric *metrics.Metrics, logger *zap.Logger) (cancel context.CancelFunc) { ctx, cancel = context.WithCancel(ctx) - ctx = context.WithValue(ctx, templates.ContextKey("goos"), runtime.GOOS) - ctx = context.WithValue(ctx, templates.ContextKey("goarch"), runtime.GOARCH) - ctx = context.WithValue(ctx, templates.ContextKey("version"), ota.Version) var jobInstancesCount int @@ -229,14 +225,3 @@ func (r *Runner) runJobs(ctx context.Context, cfg *config.MultiConfig, metric *m return cancel } - -func reportMetrics(reporter metrics.Reporter, tracker *metrics.StatsTracker, clientID string, logger *zap.Logger) { - if reporter != nil && tracker != nil { - reporter.WriteSummary(tracker) - - // TODO: get rid of this - if err := metrics.ReportStatistics(0, clientID); err != nil { - logger.Debug("error reporting statistics", zap.Error(err)) - } - } -} diff --git a/src/job/utils.go b/src/job/utils.go index 0c519554..d3da53d9 100644 --- a/src/job/utils.go +++ b/src/job/utils.go @@ -74,6 +74,7 @@ func setVarJob(ctx context.Context, args config.Args, globalConfig *GlobalConfig return strconv.Atoi(templates.ParseAndExecute(logger, jobConfig.Value, ctx)) case "uint": val, err := strconv.ParseUint(templates.ParseAndExecute(logger, jobConfig.Value, ctx), 10, 32) + return uint(val), err case "int64": return strconv.ParseInt(templates.ParseAndExecute(logger, jobConfig.Value, ctx), 10, 64) @@ -122,6 +123,8 @@ func sleepJob(ctx context.Context, args config.Args, globalConfig *GlobalConfig, func discardErrorJob(ctx context.Context, args config.Args, globalConfig *GlobalConfig, a *metrics.Accumulator, logger *zap.Logger) ( data any, err error, //nolint:unparam // data is here to match Job ) { + defer utils.PanicHandler(logger) + var jobConfig struct { BasicJobConfig diff --git a/src/utils/countrychecker.go b/src/utils/countrychecker.go index 0c06a55b..0f3050c0 100644 --- a/src/utils/countrychecker.go +++ b/src/utils/countrychecker.go @@ -48,17 +48,17 @@ func CheckCountryOrFail(ctx context.Context, logger *zap.Logger, cfg *CountryChe case <-ctx.Done(): return case <-ticker.C: - _ = ckeckCountryOnce(logger, cfg, proxyParams) + _ = ckeckCountryOnce(ctx, logger, cfg, proxyParams) } } }() } - return ckeckCountryOnce(logger, cfg, proxyParams) + return ckeckCountryOnce(ctx, logger, cfg, proxyParams) } -func ckeckCountryOnce(logger *zap.Logger, cfg *CountryCheckerConfig, proxyParams ProxyParams) string { - country, ip, err := getCountry(logger, proxyParams, cfg.maxRetries) +func ckeckCountryOnce(ctx context.Context, logger *zap.Logger, cfg *CountryCheckerConfig, proxyParams ProxyParams) string { + country, ip, err := getCountry(ctx, logger, proxyParams, cfg.maxRetries) if err != nil { if cfg.strict { logger.Fatal("country strict check failed", zap.Error(err)) @@ -80,16 +80,16 @@ func ckeckCountryOnce(logger *zap.Logger, cfg *CountryCheckerConfig, proxyParams return country } -func getCountry(logger *zap.Logger, proxyParams ProxyParams, maxFetchRetries int) (country, ip string, err error) { +func getCountry(ctx context.Context, logger *zap.Logger, proxyParams ProxyParams, maxFetchRetries int) (country, ip string, err error) { counter := Counter{Count: maxFetchRetries} backoffController := BackoffController{BackoffConfig: DefaultBackoffConfig()} for counter.Next() { logger.Info("checking IP address,", zap.Int("iter", counter.iter)) - if country, ip, err = fetchLocationInfo(proxyParams); err != nil { + if country, ip, err = fetchLocationInfo(ctx, proxyParams); err != nil { logger.Warn("error fetching location info", zap.Error(err)) - Sleep(context.Background(), backoffController.Increment().GetTimeout()) + Sleep(ctx, backoffController.Increment().GetTimeout()) } else { return } @@ -98,13 +98,13 @@ func getCountry(logger *zap.Logger, proxyParams ProxyParams, maxFetchRetries int return "", "", fmt.Errorf("couldn't get location info in %d tries", maxFetchRetries) } -func fetchLocationInfo(proxyParams ProxyParams) (country, ip string, err error) { +func fetchLocationInfo(ctx context.Context, proxyParams ProxyParams) (country, ip string, err error) { const ( ipCheckerURI = "https://api.myip.com/" requestTimeout = 3 * time.Second ) - proxyFunc := GetProxyFunc(proxyParams, "http") + proxyFunc := GetProxyFunc(ctx, proxyParams, "http") client := &fasthttp.Client{ MaxConnDuration: requestTimeout, diff --git a/src/utils/crypto.go b/src/utils/crypto.go index c09d65d7..7cdca53d 100644 --- a/src/utils/crypto.go +++ b/src/utils/crypto.go @@ -26,11 +26,11 @@ const ( type encryptionKey struct { key string - protected bool //indicates that the content encrypted by this key shouldn't be logged anywhere + protected bool // indicates that the content encrypted by this key shouldn't be logged anywhere } -// GetEncryptionKeys returns list of encryption keys from ENCRYPTION_KEYS env variable name or default value -func GetEncryptionKeys() ([]encryptionKey, error) { +// getEncryptionKeys returns list of encryption keys from ENCRYPTION_KEYS env variable name or default value +func getEncryptionKeys() []encryptionKey { keysString := GetEnvStringDefault(encryptionKeyEnvName, EncryptionKeys) if keysString != EncryptionKeys { // if user specified own keys, add default at end to be sure that it always used too @@ -55,7 +55,7 @@ func GetEncryptionKeys() ([]encryptionKey, error) { } } - return output, nil + return output } // IsEncrypted returns true if cfg encrypted with age tool (https://github.com/FiloSottile/age) @@ -65,17 +65,13 @@ func IsEncrypted(cfg []byte) bool { // Decrypt decrypts config using EncryptionKeys func Decrypt(cfg []byte) (result []byte, protected bool, err error) { - keys, err := GetEncryptionKeys() - if err != nil { - return nil, false, err - } - decryptMutex.Lock() defer decryptMutex.Unlock() // iterate over all keys and return on first success decryption - for _, key := range keys { + for _, key := range getEncryptionKeys() { result, err = decrypt(cfg, key.key) + runtime.GC() // force GC to decrease memory usage if err != nil { diff --git a/src/utils/metrics/ga.go b/src/utils/metrics/ga.go deleted file mode 100644 index c9a9cb84..00000000 --- a/src/utils/metrics/ga.go +++ /dev/null @@ -1,35 +0,0 @@ -package metrics - -import ( - v1 "github.com/mjpitz/go-ga/client/v1" - "github.com/mjpitz/go-ga/client/v1/gatypes" -) - -var ( - previousTraffic int64 - client = v1.NewClient("UA-222030361-1", "customUserAgent") -) - -// ReportStatistics sends basic usage events to google analytics -func ReportStatistics(traffic int64, clientID string) error { - delta := traffic - previousTraffic - previousTraffic = traffic - - return trackEvent(delta, clientID) -} - -func trackEvent(traffic int64, clientID string) error { - const kb = 1024 - - return client.SendPost(&gatypes.Payload{ - HitType: "event", - NonInteractionHit: true, - DisableAdvertisingPersonalization: true, - Users: gatypes.Users{ClientID: clientID}, - Event: gatypes.Event{ - EventCategory: "statistics", - EventAction: "heartbeat", - EventValue: traffic / kb, - }, - }) -} diff --git a/src/utils/proxy.go b/src/utils/proxy.go index d22bb818..9d758633 100644 --- a/src/utils/proxy.go +++ b/src/utils/proxy.go @@ -1,6 +1,7 @@ package utils import ( + "context" "fmt" "math/rand" "net" @@ -24,7 +25,7 @@ type ProxyParams struct { // this won't work for udp payloads but if people use proxies they might not want to have their ip exposed // so it's probably better to fail instead of routing the traffic directly -func GetProxyFunc(params ProxyParams, protocol string) ProxyFunc { +func GetProxyFunc(ctx context.Context, params ProxyParams, protocol string) ProxyFunc { direct := &net.Dialer{Timeout: params.Timeout, LocalAddr: resolveAddr(protocol, params.LocalAddr), Control: BindToInterface(params.Interface)} if params.URLs == "" { return proxy.FromEnvironmentUsing(direct).Dial diff --git a/src/utils/utils.go b/src/utils/utils.go index 19b1d4c8..52dc67de 100644 --- a/src/utils/utils.go +++ b/src/utils/utils.go @@ -178,5 +178,7 @@ func Unmarshal(input []byte, output any, format string) error { } func ToMiB(bytes uint64) uint64 { - return bytes / 1048576 + const bytesInMiB = 1048576 + + return bytes / bytesInMiB }