From c6d44a19e1af8679ac465c2bb0267e67fab8f17e Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 14 Jul 2020 17:00:54 +0200 Subject: [PATCH] [Ingest Manager] Unroll on unauthorised (#19722) (#19888) [Ingest Manager] Unroll on unauthorised (#19722) --- NOTICE.txt | 46 ++++++++--------- go.mod | 1 + .../pkg/agent/application/action_store.go | 1 + .../pkg/agent/application/fleet_gateway.go | 50 ++++++++++++++----- .../application/handler_action_unenroll.go | 35 +++++++------ .../pkg/agent/application/managed_mode.go | 9 ++-- .../elastic-agent/pkg/agent/errors/error.go | 47 ++++++++++++++++- .../pkg/agent/errors/error_test.go | 48 ++++++++++++++++++ x-pack/elastic-agent/pkg/fleetapi/action.go | 1 + 9 files changed, 182 insertions(+), 56 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index 1c51ea96d16..a0668081b55 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -16506,6 +16506,29 @@ Contents of probable licence file $GOMODCACHE/gopkg.in/yaml.v2@v2.3.0/LICENSE: limitations under the License. +-------------------------------------------------------------------------------- +Dependency : gotest.tools +Version: v2.2.0+incompatible +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/gotest.tools@v2.2.0+incompatible/LICENSE: + +Copyright 2018 gotest.tools authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + -------------------------------------------------------------------------------- Dependency : howett.net/plist Version: v0.0.0-20181124034731-591f970eefbb @@ -39672,29 +39695,6 @@ See the License for the specific language governing permissions and limitations under the License. --------------------------------------------------------------------------------- -Dependency : gotest.tools -Version: v2.2.0+incompatible -Licence type (autodetected): Apache-2.0 --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/gotest.tools@v2.2.0+incompatible/LICENSE: - -Copyright 2018 gotest.tools authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - - -------------------------------------------------------------------------------- Dependency : honnef.co/go/tools Version: v0.0.1-2019.2.3 diff --git a/go.mod b/go.mod index 0a7e675f6c2..e602b70c2f8 100644 --- a/go.mod +++ b/go.mod @@ -174,6 +174,7 @@ require ( gopkg.in/jcmturner/gokrb5.v7 v7.5.0 gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528 gopkg.in/yaml.v2 v2.3.0 + gotest.tools v2.2.0+incompatible howett.net/plist v0.0.0-20181124034731-591f970eefbb k8s.io/api v0.18.3 k8s.io/apimachinery v0.18.3 diff --git a/x-pack/elastic-agent/pkg/agent/application/action_store.go b/x-pack/elastic-agent/pkg/agent/application/action_store.go index cc7e6b6e665..a0b008d9623 100644 --- a/x-pack/elastic-agent/pkg/agent/application/action_store.go +++ b/x-pack/elastic-agent/pkg/agent/application/action_store.go @@ -142,6 +142,7 @@ var _ actionConfigChangeSerializer = actionConfigChangeSerializer(fleetapi.Actio type actionUnenrollSerializer struct { ActionID string `yaml:"action_id"` ActionType string `yaml:"action_type"` + IsDetected bool `yaml:"is_detected"` } // Add a guards between the serializer structs and the original struct. diff --git a/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go b/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go index e45c2f4ecc0..2856cd83abf 100644 --- a/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go +++ b/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go @@ -16,6 +16,8 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/scheduler" ) +const maxUnauthCounter int = 6 + // Default Configuration for the Fleet Gateway. var defaultGatewaySettings = &fleetGatewaySettings{ Duration: 1 * time.Second, // time between successful calls @@ -59,18 +61,19 @@ type fleetAcker interface { // call the API to send the events and will receive actions to be executed locally. // The only supported action for now is a "ActionPolicyChange". type fleetGateway struct { - bgContext context.Context - log *logger.Logger - dispatcher dispatcher - client clienter - scheduler scheduler.Scheduler - backoff backoff.Backoff - settings *fleetGatewaySettings - agentInfo agentInfo - reporter fleetReporter - done chan struct{} - wg sync.WaitGroup - acker fleetAcker + bgContext context.Context + log *logger.Logger + dispatcher dispatcher + client clienter + scheduler scheduler.Scheduler + backoff backoff.Backoff + settings *fleetGatewaySettings + agentInfo agentInfo + reporter fleetReporter + done chan struct{} + wg sync.WaitGroup + acker fleetAcker + unauthCounter int } func newFleetGateway( @@ -203,6 +206,21 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, } resp, err := cmd.Execute(ctx, req) + if isUnauth(err) { + f.unauthCounter++ + + if f.shouldUnroll() { + f.log.Warnf("retrieved unauthorized for '%d' times. Unrolling.", f.unauthCounter) + return &fleetapi.CheckinResponse{ + Actions: []fleetapi.Action{&fleetapi.ActionUnenroll{ActionID: "", ActionType: "UNENROLL", IsDetected: true}}, + Success: true, + }, nil + } + + return nil, err + } + + f.unauthCounter = 0 if err != nil { return nil, err } @@ -212,6 +230,14 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, return resp, nil } +func (f *fleetGateway) shouldUnroll() bool { + return f.unauthCounter >= maxUnauthCounter +} + +func isUnauth(err error) bool { + return errors.Is(err, fleetapi.ErrInvalidAPIKey) +} + func (f *fleetGateway) Start() { f.wg.Add(1) go func(wg *sync.WaitGroup) { diff --git a/x-pack/elastic-agent/pkg/agent/application/handler_action_unenroll.go b/x-pack/elastic-agent/pkg/agent/application/handler_action_unenroll.go index 5ef04947be9..da33f2001ff 100644 --- a/x-pack/elastic-agent/pkg/agent/application/handler_action_unenroll.go +++ b/x-pack/elastic-agent/pkg/agent/application/handler_action_unenroll.go @@ -16,10 +16,11 @@ import ( // After running Unenroll agent is in idle state, non managed non standalone. // For it to be operational again it needs to be either enrolled or reconfigured. type handlerUnenroll struct { - log *logger.Logger - emitter emitterFunc - dispatcher programsDispatcher - closers []context.CancelFunc + log *logger.Logger + emitter emitterFunc + dispatcher programsDispatcher + closers []context.CancelFunc + actionStore *actionStore } func (h *handlerUnenroll) Handle(ctx context.Context, a action, acker fleetAcker) error { @@ -33,13 +34,20 @@ func (h *handlerUnenroll) Handle(ctx context.Context, a action, acker fleetAcker noPrograms := make(map[routingKey][]program.Program) h.dispatcher.Dispatch(a.ID(), noPrograms) - if err := acker.Ack(ctx, action); err != nil { - return err - } - - // commit all acks before quitting. - if err := acker.Commit(ctx); err != nil { - return err + if !action.IsDetected { + // ACK only events comming from fleet + if err := acker.Ack(ctx, action); err != nil { + return err + } + + // commit all acks before quitting. + if err := acker.Commit(ctx); err != nil { + return err + } + } else if h.actionStore != nil { + // backup action for future start to avoid starting fleet gateway loop + h.actionStore.Add(a) + h.actionStore.Save() } // close fleet gateway loop @@ -47,10 +55,5 @@ func (h *handlerUnenroll) Handle(ctx context.Context, a action, acker fleetAcker c() } - // clean action store - // if err := os.Remove(info.AgentActionStoreFile()); err != nil && !os.IsNotExist(err) { - // return errors.New(err, "failed to clear action store") - // } - return nil } diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go index 54e30242f23..b4e85f596bd 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go @@ -185,10 +185,11 @@ func newManaged( actionDispatcher.MustRegister( &fleetapi.ActionUnenroll{}, &handlerUnenroll{ - log: log, - emitter: emit, - dispatcher: router, - closers: []context.CancelFunc{managedApplication.cancelCtxFn}, + log: log, + emitter: emit, + dispatcher: router, + closers: []context.CancelFunc{managedApplication.cancelCtxFn}, + actionStore: actionStore, }, ) diff --git a/x-pack/elastic-agent/pkg/agent/errors/error.go b/x-pack/elastic-agent/pkg/agent/errors/error.go index c3c1d6a5ddb..7ce5c770349 100644 --- a/x-pack/elastic-agent/pkg/agent/errors/error.go +++ b/x-pack/elastic-agent/pkg/agent/errors/error.go @@ -4,7 +4,27 @@ package errors -import "github.com/pkg/errors" +import ( + goerrors "errors" + "reflect" + + "github.com/pkg/errors" +) + +// As is just a helper so user dont have to use multiple imports for errors. +func As(err error, target interface{}) bool { + return goerrors.As(err, target) +} + +// Is is just a helper so user dont have to use multiple imports for errors. +func Is(err, target error) bool { + return goerrors.Is(err, target) +} + +// Unwrap is just a helper so user dont have to use multiple imports for errors. +func Unwrap(err error) error { + return goerrors.Unwrap(err) +} // MetaRecord is a entry of metadata enhancing an error. type MetaRecord struct { @@ -101,6 +121,31 @@ func (e agentError) Meta() map[string]interface{} { return resultingMeta } +// Equal compares errors and evaluates if they are the same or not. +// Agent error is not comparable due to included map so we need to +// do the heavy lifting ourselves. +func (e agentError) Equal(target error) bool { + targetErr, ok := target.(agentError) + if !ok { + return false + } + + return errors.Is(e.err, targetErr.err) && + e.errType == targetErr.errType && + e.msg == targetErr.msg && + reflect.DeepEqual(e.meta, targetErr.meta) + +} + +// Is checks whether agent err is an err. +func (e agentError) Is(target error) bool { + if agentErr, ok := target.(agentError); ok { + return e.Equal(agentErr) + } + + return goerrors.Is(e.err, target) +} + // Check it implements Error var _ Error = agentError{} diff --git a/x-pack/elastic-agent/pkg/agent/errors/error_test.go b/x-pack/elastic-agent/pkg/agent/errors/error_test.go index 8b764f48ee5..faee302b8a0 100644 --- a/x-pack/elastic-agent/pkg/agent/errors/error_test.go +++ b/x-pack/elastic-agent/pkg/agent/errors/error_test.go @@ -6,12 +6,60 @@ package errors import ( "fmt" + "io" "strings" "testing" "github.com/pkg/errors" + "gotest.tools/assert" ) +func TestErrorsIs(t *testing.T) { + type testCase struct { + id string + actualErr error + expectedErr error + expectedMatch bool + } + + simpleErr := io.ErrNoProgress + simpleWrap := errors.Wrap(simpleErr, "wrapping %w") + agentErr := New() + nestedSimple := New(simpleErr) + nestedWrap := New(simpleWrap) + agentInErr := errors.Wrap(nestedWrap, "wrapping %w") + + tt := []testCase{ + {"simple wrap", simpleWrap, simpleErr, true}, + {"simple mismatch", simpleWrap, errors.New("sample"), false}, + + {"direct nested - root check", nestedSimple, simpleErr, true}, + {"direct nested - mismatch", nestedSimple, errors.New("sample"), false}, + {"direct nested - comparing agent errors", nestedSimple, agentErr, false}, + + {"deep nested - root check", New(nestedSimple), simpleErr, true}, + {"deep nested - mismatch", New(nestedSimple), errors.New("sample"), false}, + {"deep nested - comparing agent errors", New(nestedSimple), agentErr, false}, + + {"nested wrap - wrap check", New(nestedWrap), simpleWrap, true}, + {"nested wrap - root", New(nestedWrap), simpleErr, true}, + + {"comparing agent errors", New(agentErr), agentErr, true}, + + {"agent in error", agentInErr, nestedWrap, true}, + {"agent in error wrap", agentInErr, simpleWrap, true}, + {"agent in error root", agentInErr, simpleErr, true}, + {"agent in error nil check", agentInErr, nil, false}, + } + + for _, tc := range tt { + t.Run(tc.id, func(t *testing.T) { + match := Is(tc.actualErr, tc.expectedErr) + assert.Equal(t, tc.expectedMatch, match) + }) + } +} + func TestErrorsWrap(t *testing.T) { ce := New("custom error", TypePath, M("k", "v")) ew := errors.Wrap(ce, "wrapper") diff --git a/x-pack/elastic-agent/pkg/fleetapi/action.go b/x-pack/elastic-agent/pkg/fleetapi/action.go index 83d53eb9d06..bf7024e304f 100644 --- a/x-pack/elastic-agent/pkg/fleetapi/action.go +++ b/x-pack/elastic-agent/pkg/fleetapi/action.go @@ -87,6 +87,7 @@ func (a *ActionConfigChange) ID() string { type ActionUnenroll struct { ActionID string ActionType string + IsDetected bool } func (a *ActionUnenroll) String() string {