From fd070fb07126b04c17b5b18da6366e12c3f89c49 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Fri, 5 Nov 2021 10:42:59 -0400 Subject: [PATCH] Osquerybeat: Improve osquery client connect code --- x-pack/osquerybeat/internal/osqd/args.go | 7 +- x-pack/osquerybeat/internal/osqdcli/client.go | 41 ++---- x-pack/osquerybeat/internal/osqdcli/retry.go | 62 ++++++++ .../internal/osqdcli/retry_test.go | 139 ++++++++++++++++++ 4 files changed, 219 insertions(+), 30 deletions(-) create mode 100644 x-pack/osquerybeat/internal/osqdcli/retry.go create mode 100644 x-pack/osquerybeat/internal/osqdcli/retry_test.go diff --git a/x-pack/osquerybeat/internal/osqd/args.go b/x-pack/osquerybeat/internal/osqd/args.go index c9bc77ec51f..f74d295c8d8 100644 --- a/x-pack/osquerybeat/internal/osqd/args.go +++ b/x-pack/osquerybeat/internal/osqd/args.go @@ -79,7 +79,12 @@ var protectedFlags = Flags{ // The delimiter for a full query name that is concatenated as "pack_" + {{pack name}} + "_" + {{query name}} by default "pack_delimiter": "_", - "config_refresh": 10, + + // Refresh config every 60 seconds + // The previous setting was 10 seconds which is unnecessary frequent. + // Osquery does not expect that frequent policy/configuration changes + // and can tolerate non real-time configuration change application. + "config_refresh": 60, } func init() { diff --git a/x-pack/osquerybeat/internal/osqdcli/client.go b/x-pack/osquerybeat/internal/osqdcli/client.go index ee6e61e9eab..c7c4f8c50e9 100644 --- a/x-pack/osquerybeat/internal/osqdcli/client.go +++ b/x-pack/osquerybeat/internal/osqdcli/client.go @@ -16,6 +16,7 @@ import ( "time" "golang.org/x/sync/semaphore" + "gotest.tools/gotestsum/log" "github.com/osquery/osquery-go" genosquery "github.com/osquery/osquery-go/gen/osquery" @@ -125,28 +126,21 @@ func (c *Client) Connect(ctx context.Context) error { func (c *Client) reconnect(ctx context.Context) error { c.close() - for i := 0; i < c.connectRetries; i++ { - attempt := i + 1 - llog := c.log.With("attempt", attempt) - llog.Debug("connecting") + r := retry{ + maxRetry: c.connectRetries, + retryWait: retryWait, + log: c.log.With("context", "osquery client connect"), + } + + return r.Run(ctx, func(ctx context.Context) error { cli, err := osquery.NewClient(c.socketPath, c.timeout) if err != nil { - llog.Errorf("failed to connect: %v", err) - if i < c.connectRetries-1 { - llog.Infof("wait before next connect attempt: retry_wait: %v", retryWait) - if werr := waitWithContext(ctx, retryWait); werr != nil { - err = werr - break // Context cancelled, exit loop - } - } else { - return err - } - continue + log.Errorf("failed to connect: %v", err) + return err } c.cli = cli - break - } - return nil + return nil + }) } func (c *Client) Close() { @@ -287,17 +281,6 @@ func (c *Client) queryColumnTypes(ctx context.Context, sql string) (map[string]s return colTypes, nil } -func waitWithContext(ctx context.Context, to time.Duration) error { - t := time.NewTimer(to) - defer t.Stop() - select { - case <-ctx.Done(): - return ctx.Err() - case <-t.C: - } - return nil -} - func resolveTypes(hits []map[string]string, colTypes map[string]string) []map[string]interface{} { resolved := make([]map[string]interface{}, 0, len(hits)) for _, hit := range hits { diff --git a/x-pack/osquerybeat/internal/osqdcli/retry.go b/x-pack/osquerybeat/internal/osqdcli/retry.go new file mode 100644 index 00000000000..849ed83d397 --- /dev/null +++ b/x-pack/osquerybeat/internal/osqdcli/retry.go @@ -0,0 +1,62 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package osqdcli + +import ( + "context" + "time" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +type retry struct { + maxRetry int + retryWait time.Duration + log *logp.Logger +} + +type tryFunc func(ctx context.Context) error + +func (r *retry) Run(ctx context.Context, fn tryFunc) (err error) { + maxAttempts := r.maxRetry + 1 + for i := 0; i < maxAttempts; i++ { + attempt := i + 1 + r.log.Debugf("attempt %v out of %v", attempt, maxAttempts) + + err = fn(ctx) + + if err != nil { + r.log.Debugf("attempt %v out of %v failed, err: %v", attempt, maxAttempts, err) + if i != maxAttempts { + if r.retryWait > 0 { + r.log.Debugf("wait for %v before next retry", r.retryWait) + err = waitWithContext(ctx, retryWait) + if err != nil { + r.log.Debugf("wait returned err: %v", err) + return err + } + } + } else { + r.log.Debugf("no more attempts, return err: %v", err) + return err + } + } else { + r.log.Debugf("attempt %v out of %v succeeded", attempt, maxAttempts) + return nil + } + } + return err +} + +func waitWithContext(ctx context.Context, to time.Duration) error { + t := time.NewTimer(to) + defer t.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + } + return nil +} diff --git a/x-pack/osquerybeat/internal/osqdcli/retry_test.go b/x-pack/osquerybeat/internal/osqdcli/retry_test.go new file mode 100644 index 00000000000..19052c2e338 --- /dev/null +++ b/x-pack/osquerybeat/internal/osqdcli/retry_test.go @@ -0,0 +1,139 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package osqdcli + +import ( + "context" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +func TestRetryRun(t *testing.T) { + logp.Configure(logp.Config{ + Level: logp.DebugLevel, + ToStderr: true, + Selectors: []string{"*"}, + }) + + log := logp.NewLogger("retry_test").With("context", "osquery client connect") + ctx := context.Background() + + type fields struct { + maxRetry int + retryWait time.Duration + log *logp.Logger + } + + type args struct { + ctx context.Context + fn tryFunc + } + + argsWithFunc := func(fn tryFunc) args { + return args{ + ctx: ctx, + fn: fn, + } + } + + funcSucceedsOnNAttempt := func(attempt int) func(context.Context) error { + curAttempt := 1 + return func(ctx context.Context) error { + if curAttempt == attempt { + return nil + } + curAttempt++ + return ErrAlreadyConnected + } + } + + tests := []struct { + name string + fields fields + args args + wantErr error + }{ + { + name: "no retries, no wait, success", + fields: fields{ + log: log, + }, + args: argsWithFunc(func(ctx context.Context) error { + return nil + }), + }, + { + name: "no retries, no wait, error", + fields: fields{ + log: log, + }, + args: argsWithFunc(func(ctx context.Context) error { + return ErrAlreadyConnected + }), + wantErr: ErrAlreadyConnected, + }, + { + name: "retries, no wait, no more retries fails", + fields: fields{ + maxRetry: 3, + log: log, + }, + args: argsWithFunc(funcSucceedsOnNAttempt(8)), + wantErr: ErrAlreadyConnected, + }, + { + name: "retries, no wait, success", + fields: fields{ + maxRetry: 3, + log: log, + }, + args: argsWithFunc(funcSucceedsOnNAttempt(4)), + }, + { + name: "retries, with wait, success", + fields: fields{ + maxRetry: 3, + retryWait: 1 * time.Millisecond, + log: log, + }, + args: argsWithFunc(funcSucceedsOnNAttempt(4)), + }, + { + name: "retries, with wait, success sooner", + fields: fields{ + maxRetry: 3, + retryWait: 1 * time.Millisecond, + log: log, + }, + args: argsWithFunc(funcSucceedsOnNAttempt(2)), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &retry{ + maxRetry: tt.fields.maxRetry, + retryWait: tt.fields.retryWait, + log: tt.fields.log, + } + err := r.Run(tt.args.ctx, tt.args.fn) + if err != nil { + if tt.wantErr != nil { + diff := cmp.Diff(tt.wantErr, err, cmpopts.EquateErrors()) + if diff != "" { + t.Error(diff) + } + } else { + t.Errorf("got err: %v, wantErr: nil", err) + } + } else if tt.wantErr != nil { + t.Errorf("got err: nil, wantErr: %v", tt.wantErr) + } + }) + } +}