Skip to content

Commit

Permalink
cleanup logs and move non-critical ones to debug
Browse files Browse the repository at this point in the history
  • Loading branch information
arriven committed Mar 29, 2022
1 parent 88121b2 commit 15880ff
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 29 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

metrics.InitOrFail(ctx, *prometheusOn, *prometheusPushGateways, jobsGlobalConfig.ClientID, country)
metrics.InitOrFail(ctx, logger, *prometheusOn, *prometheusPushGateways, jobsGlobalConfig.ClientID, country)

r, err := job.NewRunner(runnerConfigOptions, jobsGlobalConfig)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion src/job/complex.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func parallelJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalCo

go func(i int) {
if _, err := job(ctx, logger, globalConfig, jobConfig.Jobs[i].Args); err != nil {
logger.Error("error running job", zap.Error(err))
logger.Error("error running one of the jobs", zap.Error(err))
}

wg.Done()
Expand Down
14 changes: 5 additions & 9 deletions src/job/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger) {
return
}

if err := dumpMetrics(r.globalJobsCfg.ClientID); err != nil {
if err := dumpMetrics(logger, r.globalJobsCfg.ClientID); err != nil {
logger.Debug("error reporting statistics", zap.Error(err))
}
}
Expand Down Expand Up @@ -168,7 +168,7 @@ func (r *Runner) runJobs(ctx context.Context, logger *zap.Logger, cfg *config.Mu

job := Get(cfg.Jobs[i].Type)
if job == nil {
logger.Error("unknown job", zap.String("type", cfg.Jobs[i].Type))
logger.Warn("unknown job", zap.String("type", cfg.Jobs[i].Type))

continue
}
Expand All @@ -194,7 +194,7 @@ func (r *Runner) runJobs(ctx context.Context, logger *zap.Logger, cfg *config.Mu

_, err := job(ctx, logger, r.globalJobsCfg, cfg.Jobs[i].Args)
if err != nil {
logger.Error("error running job",
logger.Error("error running job one of the jobs",
zap.String("name", cfg.Jobs[i].Name),
zap.String("type", cfg.Jobs[i].Type),
zap.Error(err))
Expand All @@ -210,12 +210,8 @@ func (r *Runner) runJobs(ctx context.Context, logger *zap.Logger, cfg *config.Mu
return cancel
}

func dumpMetrics(clientID string) error {
defer func() {
if err := recover(); err != nil {
log.Printf("caught panic: %v", err)
}
}()
func dumpMetrics(logger *zap.Logger, clientID string) error {
defer utils.PanicHandler(logger)

bytesGenerated := metrics.Default.Read(metrics.Traffic)
bytesProcessed := metrics.Default.Read(metrics.ProcessedTraffic)
Expand Down
111 changes: 111 additions & 0 deletions src/jobs/complex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// MIT License

// Copyright (c) [2022] [Bohdan Ivashko (https://github.com/Arriven)]

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:

// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.

// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package jobs

import (
"context"
"fmt"
"sync"

"github.com/mitchellh/mapstructure"
"go.uber.org/zap"

"github.com/Arriven/db1000n/src/utils/templates"
)

func sequenceJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalConfig, args Args) (data interface{}, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var jobConfig struct {
BasicJobConfig

Jobs []Config
}

if err := mapstructure.Decode(args, &jobConfig); err != nil {
return nil, fmt.Errorf("error parsing job config: %w", err)
}

for _, cfg := range jobConfig.Jobs {
job := Get(cfg.Type)
if job == nil {
return nil, fmt.Errorf("unknown job %q", cfg.Type)
}

data, err := job(ctx, logger, globalConfig, cfg.Args)
if err != nil {
return nil, fmt.Errorf("error running job: %w", err)
}

ctx = context.WithValue(ctx, templates.ContextKey("data."+cfg.Name), data)
}

return nil, nil
}

func parallelJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalConfig, args Args) (data interface{}, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var jobConfig struct {
BasicJobConfig

Jobs []Config
}

if err := mapstructure.Decode(args, &jobConfig); err != nil {
return nil, fmt.Errorf("error parsing job config: %w", err)
}

var wg sync.WaitGroup

for i := range jobConfig.Jobs {
job := Get(jobConfig.Jobs[i].Type)
if job == nil {
logger.Warn("Unknown job", zap.String("type", jobConfig.Jobs[i].Type))

continue
}

if jobConfig.Jobs[i].Count < 1 {
jobConfig.Jobs[i].Count = 1
}

for j := 0; j < jobConfig.Jobs[i].Count; j++ {
wg.Add(1)

go func(i int) {
if _, err := job(ctx, logger, globalConfig, jobConfig.Jobs[i].Args); err != nil {
logger.Error("error running one of the jobs", zap.Error(err))
}

wg.Done()
}(i)
}
}

wg.Wait()

return nil, nil
}
17 changes: 9 additions & 8 deletions src/utils/metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
"go.uber.org/zap"

"github.com/Arriven/db1000n/src/utils"
)
Expand Down Expand Up @@ -107,15 +108,15 @@ func NewOptionsWithFlags() (prometheusOn *bool, prometheusPushGateways *string)
"Comma separated list of prometheus push gateways")
}

func InitOrFail(ctx context.Context, prometheusOn bool, prometheusPushGateways, clientID, country string) {
func InitOrFail(ctx context.Context, logger *zap.Logger, prometheusOn bool, prometheusPushGateways, clientID, country string) {
if !ValidatePrometheusPushGateways(prometheusPushGateways) {
log.Fatal("Invalid value for --prometheus_gateways")
}

if prometheusOn {
Init(clientID, country)

go ExportPrometheusMetrics(ctx, clientID, prometheusPushGateways)
go ExportPrometheusMetrics(ctx, logger, clientID, prometheusPushGateways)
}
}

Expand Down Expand Up @@ -192,11 +193,11 @@ func ValidatePrometheusPushGateways(gatewayURLsCSV string) bool {

// ExportPrometheusMetrics starts http server and export metrics at address <ip>:9090/metrics, also pushes metrics
// to gateways randomly
func ExportPrometheusMetrics(ctx context.Context, clientID, gateways string) {
func ExportPrometheusMetrics(ctx context.Context, logger *zap.Logger, clientID, gateways string) {
registerMetrics()

if gateways != "" {
go pushMetrics(ctx, clientID, strings.Split(gateways, ","))
go pushMetrics(ctx, logger, clientID, strings.Split(gateways, ","))
}

serveMetrics(ctx)
Expand Down Expand Up @@ -261,15 +262,15 @@ func getTLSConfig() (*tls.Config, error) {
}, nil
}

func pushMetrics(ctx context.Context, clientID string, gateways []string) {
func pushMetrics(ctx context.Context, logger *zap.Logger, clientID string, gateways []string) {
jobName := utils.GetEnvStringDefault("PROMETHEUS_JOB_NAME", "db1000n_default_add")
gateway := gateways[rand.Intn(len(gateways))] //nolint:gosec // Cryptographically secure random not required
tickerPeriod := utils.GetEnvDurationDefault("PROMETHEUS_PUSH_PERIOD", time.Minute)
ticker := time.NewTicker(tickerPeriod)

tlsConfig, err := getTLSConfig()
if err != nil {
log.Println("Can't get tls config")
logger.Debug("Can't get tls config", zap.Error(err))

return
}
Expand All @@ -282,7 +283,7 @@ func pushMetrics(ctx context.Context, clientID string, gateways []string) {

user, password, err := getBasicAuth()
if err != nil {
log.Println("Can't fetch basic auth credentials")
logger.Debug("Can't fetch basic auth credentials", zap.Error(err))

return
}
Expand All @@ -295,7 +296,7 @@ func pushMetrics(ctx context.Context, clientID string, gateways []string) {
return
case <-ticker.C:
if err := pusher.Add(); err != nil {
log.Println("Can't push metrics to gateway, trying to change gateway")
logger.Debug("Can't push metrics to gateway, changing gateway", zap.Error(err))

gateway = gateways[rand.Intn(len(gateways))] //nolint:gosec // Cryptographically secure random not required
pusher = setupPusher(push.New(gateway, jobName), clientID, httpClient, user, password)
Expand Down
4 changes: 1 addition & 3 deletions src/utils/metrics/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,5 @@ func serveMetrics(ctx context.Context) {
}
}(ctx, server)

if err := server.ListenAndServe(); err != nil {
log.Fatal()
}
log.Println(server.ListenAndServe())
}
8 changes: 4 additions & 4 deletions src/utils/templates/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func Parse(input string) (*template.Template, error) {
func Execute(logger *zap.Logger, tpl *template.Template, data interface{}) string {
var res strings.Builder
if err := tpl.Execute(&res, data); err != nil {
logger.Error("error executing template", zap.Error(err))
logger.Warn("error executing template", zap.Error(err))

return ""
}
Expand All @@ -165,14 +165,14 @@ func Execute(logger *zap.Logger, tpl *template.Template, data interface{}) strin
func ParseAndExecute(logger *zap.Logger, input string, data interface{}) string {
tpl, err := Parse(input)
if err != nil {
logger.Error("error parsing template", zap.Error(err))
logger.Warn("error parsing template", zap.Error(err))

return input
}

var output strings.Builder
if err = tpl.Execute(&output, data); err != nil {
logger.Error("error executing template", zap.Error(err))
logger.Warn("error executing template", zap.Error(err))

return input
}
Expand All @@ -184,7 +184,7 @@ func ParseAndExecute(logger *zap.Logger, input string, data interface{}) string
func ParseAndExecuteMapStruct(logger *zap.Logger, input map[string]interface{}, data interface{}) map[string]interface{} {
tpl, err := ParseMapStruct(input)
if err != nil {
logger.Error("error parsing template", zap.Error(err))
logger.Warn("error parsing template", zap.Error(err))

return input
}
Expand Down
4 changes: 1 addition & 3 deletions src/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
// PanicHandler just stub it in the beginning of every major module invocation to prevent single module failure from crashing the whole app
func PanicHandler(logger *zap.Logger) {
if err := recover(); err != nil {
logger.Error("caught panic", zap.Any("err", err))
logger.Error("caught panic, recovering", zap.Any("err", err))
}
}

Expand Down Expand Up @@ -103,8 +103,6 @@ func NonNilIntOrDefault(i *int, dflt int) int {
func Decode(input interface{}, output interface{}) error {
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{Squash: true, WeaklyTypedInput: true, Result: output})
if err != nil {
log.Printf("Error parsing job config: %v", err)

return err
}

Expand Down

0 comments on commit 15880ff

Please sign in to comment.