Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Osquerybeat: Improve osquery client connect code. Update config_refresh to 60 seconds. #28848

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion x-pack/osquerybeat/internal/osqd/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ var protectedFlags = Flags{

// The delimiter for a full query name that is concatenated as "pack_" + {{pack name}} + "_" + {{query name}} by default
"pack_delimiter": "_",
"config_refresh": 10,

// Refresh config every 60 seconds
// The previous setting was 10 seconds which is unnecessary frequent.
// Osquery does not expect that frequent policy/configuration changes
// and can tolerate non real-time configuration change application.
"config_refresh": 60,
}

func init() {
Expand Down
41 changes: 12 additions & 29 deletions x-pack/osquerybeat/internal/osqdcli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"golang.org/x/sync/semaphore"
"gotest.tools/gotestsum/log"

"github.com/osquery/osquery-go"
genosquery "github.com/osquery/osquery-go/gen/osquery"
Expand Down Expand Up @@ -125,28 +126,21 @@ func (c *Client) Connect(ctx context.Context) error {
func (c *Client) reconnect(ctx context.Context) error {
c.close()

for i := 0; i < c.connectRetries; i++ {
attempt := i + 1
llog := c.log.With("attempt", attempt)
llog.Debug("connecting")
r := retry{
maxRetry: c.connectRetries,
retryWait: retryWait,
log: c.log.With("context", "osquery client connect"),
}

return r.Run(ctx, func(ctx context.Context) error {
cli, err := osquery.NewClient(c.socketPath, c.timeout)
if err != nil {
llog.Errorf("failed to connect: %v", err)
if i < c.connectRetries-1 {
llog.Infof("wait before next connect attempt: retry_wait: %v", retryWait)
if werr := waitWithContext(ctx, retryWait); werr != nil {
err = werr
break // Context cancelled, exit loop
}
} else {
return err
}
continue
log.Errorf("failed to connect: %v", err)
return err
}
c.cli = cli
break
}
return nil
return nil
})
}

func (c *Client) Close() {
Expand Down Expand Up @@ -287,17 +281,6 @@ func (c *Client) queryColumnTypes(ctx context.Context, sql string) (map[string]s
return colTypes, nil
}

func waitWithContext(ctx context.Context, to time.Duration) error {
t := time.NewTimer(to)
defer t.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
}
return nil
}

func resolveTypes(hits []map[string]string, colTypes map[string]string) []map[string]interface{} {
resolved := make([]map[string]interface{}, 0, len(hits))
for _, hit := range hits {
Expand Down
62 changes: 62 additions & 0 deletions x-pack/osquerybeat/internal/osqdcli/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package osqdcli

import (
"context"
"time"

"github.com/elastic/beats/v7/libbeat/logp"
)

type retry struct {
maxRetry int
retryWait time.Duration
log *logp.Logger
}

type tryFunc func(ctx context.Context) error

func (r *retry) Run(ctx context.Context, fn tryFunc) (err error) {
maxAttempts := r.maxRetry + 1
for i := 0; i < maxAttempts; i++ {
attempt := i + 1
r.log.Debugf("attempt %v out of %v", attempt, maxAttempts)

err = fn(ctx)

if err != nil {
r.log.Debugf("attempt %v out of %v failed, err: %v", attempt, maxAttempts, err)
if i != maxAttempts {
if r.retryWait > 0 {
r.log.Debugf("wait for %v before next retry", r.retryWait)
err = waitWithContext(ctx, retryWait)
if err != nil {
r.log.Debugf("wait returned err: %v", err)
return err
}
}
} else {
r.log.Debugf("no more attempts, return err: %v", err)
return err
}
} else {
r.log.Debugf("attempt %v out of %v succeeded", attempt, maxAttempts)
return nil
}
}
return err
}

func waitWithContext(ctx context.Context, to time.Duration) error {
t := time.NewTimer(to)
defer t.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
}
return nil
}
139 changes: 139 additions & 0 deletions x-pack/osquerybeat/internal/osqdcli/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package osqdcli

import (
"context"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)

func TestRetryRun(t *testing.T) {
logp.Configure(logp.Config{
Level: logp.DebugLevel,
ToStderr: true,
Selectors: []string{"*"},
})

log := logp.NewLogger("retry_test").With("context", "osquery client connect")
ctx := context.Background()

type fields struct {
maxRetry int
retryWait time.Duration
log *logp.Logger
}

type args struct {
ctx context.Context
fn tryFunc
}

argsWithFunc := func(fn tryFunc) args {
return args{
ctx: ctx,
fn: fn,
}
}

funcSucceedsOnNAttempt := func(attempt int) func(context.Context) error {
curAttempt := 1
return func(ctx context.Context) error {
if curAttempt == attempt {
return nil
}
curAttempt++
return ErrAlreadyConnected
}
}

tests := []struct {
name string
fields fields
args args
wantErr error
}{
{
name: "no retries, no wait, success",
fields: fields{
log: log,
},
args: argsWithFunc(func(ctx context.Context) error {
return nil
}),
},
{
name: "no retries, no wait, error",
fields: fields{
log: log,
},
args: argsWithFunc(func(ctx context.Context) error {
return ErrAlreadyConnected
}),
wantErr: ErrAlreadyConnected,
},
{
name: "retries, no wait, no more retries fails",
fields: fields{
maxRetry: 3,
log: log,
},
args: argsWithFunc(funcSucceedsOnNAttempt(8)),
wantErr: ErrAlreadyConnected,
},
{
name: "retries, no wait, success",
fields: fields{
maxRetry: 3,
log: log,
},
args: argsWithFunc(funcSucceedsOnNAttempt(4)),
},
{
name: "retries, with wait, success",
fields: fields{
maxRetry: 3,
retryWait: 1 * time.Millisecond,
log: log,
},
args: argsWithFunc(funcSucceedsOnNAttempt(4)),
},
{
name: "retries, with wait, success sooner",
fields: fields{
maxRetry: 3,
retryWait: 1 * time.Millisecond,
log: log,
},
args: argsWithFunc(funcSucceedsOnNAttempt(2)),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &retry{
maxRetry: tt.fields.maxRetry,
retryWait: tt.fields.retryWait,
log: tt.fields.log,
}
err := r.Run(tt.args.ctx, tt.args.fn)
if err != nil {
if tt.wantErr != nil {
diff := cmp.Diff(tt.wantErr, err, cmpopts.EquateErrors())
if diff != "" {
t.Error(diff)
}
} else {
t.Errorf("got err: %v, wantErr: nil", err)
}
} else if tt.wantErr != nil {
t.Errorf("got err: nil, wantErr: %v", tt.wantErr)
}
})
}
}