-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Ingest Manager] Support for upgrade rollback #22537
Merged
Merged
Changes from 41 commits
Commits
Show all changes
43 commits
Select commit
Hold shift + click to select a range
d350d91
skeleton
michalpristas f4b9bf7
Merge branch 'master' of github.com:elastic/beats into agent-watcher
michalpristas 7957f1a
step 1
michalpristas f19380f
Merge branch 'master' into agent-watcher
michalpristas 05b1811
untested complete
michalpristas 07454c2
Merge branch 'master' into agent-watcher
michalpristas 0a1387f
consider application errors
michalpristas 228c620
Merge branch 'master' of github.com:elastic/beats into agent-watcher
michalpristas 7295dae
testing
michalpristas e55b925
macos watching + removeme
michalpristas 2a16639
linux service
michalpristas 4c1e819
linux service
michalpristas 5665d41
linux service
michalpristas cfb3b02
linux service
michalpristas 764b977
fix
michalpristas 4a27cdf
demonise
michalpristas 8cc38c0
sig play
michalpristas dca4f7e
comment
michalpristas 46ed099
os specific invoke
michalpristas 913f2b4
windows
michalpristas e390ae9
windows
michalpristas 4a9e82f
teardown delay
michalpristas 7f9e422
test
michalpristas 7d2f10b
return ok
michalpristas f4b01fe
log error
michalpristas 59ccd59
cleanup windows on new cycle
michalpristas 3e55660
error expected
michalpristas 81664a6
formatting
michalpristas 7ab12fc
Merge branch 'master' of github.com:elastic/beats into agent-watcher
michalpristas ef96264
logs cleanup
michalpristas bd566c2
fixed log
michalpristas ce360fa
crash checker tests
michalpristas 628c1ab
fmt
michalpristas cdfbfa0
support for other sc
michalpristas 6e0a679
status check counter
michalpristas a653e7b
comments
michalpristas 581142d
Merge branch 'master' of github.com:elastic/beats into agent-watcher
michalpristas ea2b917
Merge branch 'master' of github.com:elastic/beats into agent-watcher
michalpristas d1703f2
sudo only for DEV
michalpristas 549358b
Merge branch 'master' of github.com:elastic/beats into agent-watcher
michalpristas 7fe9b02
Merge branch 'master' of github.com:elastic/beats into agent-watcher
michalpristas 3b83ccd
check connection errors
michalpristas a09f28a
Merge branch 'master' of github.com:elastic/beats into agent-watcher
michalpristas File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
126 changes: 126 additions & 0 deletions
126
x-pack/elastic-agent/pkg/agent/application/upgrade/crash_checker.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package upgrade | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"os" | ||
"sync" | ||
"time" | ||
|
||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" | ||
) | ||
|
||
const ( | ||
defaultCheckPeriod = 10 * time.Second | ||
evaluatedPeriods = 6 // with 10s period this means we evaluate 60s of agent run | ||
crashesAllowed = 2 // means that within 60s one restart is allowed, additional one is considered crash | ||
) | ||
|
||
type serviceHandler interface { | ||
PID(ctx context.Context) (int, error) | ||
Name() string | ||
Close() | ||
} | ||
|
||
// CrashChecker checks agent for crash pattern in Elastic Agent lifecycle. | ||
type CrashChecker struct { | ||
notifyChan chan error | ||
q *disctintQueue | ||
log *logger.Logger | ||
sc serviceHandler | ||
checkPeriod time.Duration | ||
} | ||
|
||
// NewCrashChecker creates a new crash checker. | ||
func NewCrashChecker(ctx context.Context, ch chan error, log *logger.Logger) (*CrashChecker, error) { | ||
q, err := newDistinctQueue(evaluatedPeriods) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
c := &CrashChecker{ | ||
notifyChan: ch, | ||
q: q, | ||
log: log, | ||
checkPeriod: defaultCheckPeriod, | ||
} | ||
|
||
if err := c.Init(ctx, log); err != nil { | ||
return nil, err | ||
} | ||
|
||
log.Debugf("running checks using '%s' controller", c.sc.Name()) | ||
|
||
return c, nil | ||
} | ||
|
||
// Run runs the checking loop. | ||
func (ch *CrashChecker) Run(ctx context.Context) { | ||
defer ch.sc.Close() | ||
|
||
ch.log.Debug("Crash checker started") | ||
for { | ||
ch.log.Debugf("watcher having PID: %d", os.Getpid()) | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-time.After(ch.checkPeriod): | ||
pid, err := ch.sc.PID(ctx) | ||
if err != nil { | ||
ch.log.Error(err) | ||
} | ||
|
||
ch.q.Push(pid) | ||
restarts := ch.q.Distinct() | ||
ch.log.Debugf("retrieved service PID [%d] changed %d times within %d", pid, restarts, evaluatedPeriods) | ||
if restarts > crashesAllowed { | ||
ch.notifyChan <- errors.New(fmt.Sprintf("service restarted '%d' times within '%v' seconds", restarts, ch.checkPeriod.Seconds())) | ||
} | ||
} | ||
} | ||
} | ||
|
||
type disctintQueue struct { | ||
q []int | ||
size int | ||
lock sync.Mutex | ||
} | ||
|
||
func newDistinctQueue(size int) (*disctintQueue, error) { | ||
if size < 1 { | ||
return nil, errors.New("invalid size", errors.TypeUnexpected) | ||
} | ||
return &disctintQueue{ | ||
q: make([]int, 0, size), | ||
size: size, | ||
}, nil | ||
} | ||
|
||
func (dq *disctintQueue) Push(id int) { | ||
dq.lock.Lock() | ||
defer dq.lock.Unlock() | ||
|
||
cutIdx := len(dq.q) | ||
if dq.size-1 < len(dq.q) { | ||
cutIdx = dq.size - 1 | ||
} | ||
dq.q = append([]int{id}, dq.q[:cutIdx]...) | ||
} | ||
|
||
func (dq *disctintQueue) Distinct() int { | ||
dq.lock.Lock() | ||
defer dq.lock.Unlock() | ||
|
||
dm := make(map[int]int) | ||
|
||
for _, id := range dq.q { | ||
dm[id] = 1 | ||
} | ||
|
||
return len(dm) | ||
} |
162 changes: 162 additions & 0 deletions
162
x-pack/elastic-agent/pkg/agent/application/upgrade/crash_checker_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package upgrade | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" | ||
) | ||
|
||
var ( | ||
testCheckPeriod = 100 * time.Millisecond | ||
) | ||
|
||
func TestChecker(t *testing.T) { | ||
t.Run("no failure when no change", func(t *testing.T) { | ||
pider := &testPider{} | ||
ch, errChan := testableChecker(t, pider) | ||
ctx, canc := context.WithCancel(context.Background()) | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
wg.Done() | ||
ch.Run(ctx) | ||
}() | ||
|
||
wg.Wait() | ||
<-time.After(6 * testCheckPeriod) | ||
|
||
var err error | ||
select { | ||
case err = <-errChan: | ||
default: | ||
} | ||
|
||
canc() | ||
require.NoError(t, err) | ||
}) | ||
|
||
t.Run("no failure when unfrequent change", func(t *testing.T) { | ||
pider := &testPider{} | ||
ch, errChan := testableChecker(t, pider) | ||
ctx, canc := context.WithCancel(context.Background()) | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
wg.Done() | ||
ch.Run(ctx) | ||
}() | ||
|
||
wg.Wait() | ||
for i := 0; i < 2; i++ { | ||
<-time.After(3 * testCheckPeriod) | ||
pider.Change(i) | ||
} | ||
var err error | ||
select { | ||
case err = <-errChan: | ||
default: | ||
} | ||
|
||
canc() | ||
require.NoError(t, err) | ||
}) | ||
|
||
t.Run("no failure when change lower than limit", func(t *testing.T) { | ||
pider := &testPider{} | ||
ch, errChan := testableChecker(t, pider) | ||
ctx, canc := context.WithCancel(context.Background()) | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
wg.Done() | ||
ch.Run(ctx) | ||
}() | ||
|
||
wg.Wait() | ||
for i := 0; i < 3; i++ { | ||
<-time.After(7 * testCheckPeriod) | ||
pider.Change(i) | ||
} | ||
var err error | ||
select { | ||
case err = <-errChan: | ||
default: | ||
} | ||
|
||
canc() | ||
require.NoError(t, err) | ||
}) | ||
|
||
t.Run("fails when pid changes frequently", func(t *testing.T) { | ||
pider := &testPider{} | ||
ch, errChan := testableChecker(t, pider) | ||
ctx, canc := context.WithCancel(context.Background()) | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
wg.Done() | ||
ch.Run(ctx) | ||
}() | ||
|
||
wg.Wait() | ||
for i := 0; i < 12; i++ { | ||
<-time.After(testCheckPeriod / 2) | ||
pider.Change(i) | ||
} | ||
var err error | ||
select { | ||
case err = <-errChan: | ||
default: | ||
} | ||
|
||
canc() | ||
require.Error(t, err) | ||
}) | ||
} | ||
|
||
func testableChecker(t *testing.T, pider *testPider) (*CrashChecker, chan error) { | ||
errChan := make(chan error, 1) | ||
l, _ := logger.New("") | ||
ch, err := NewCrashChecker(context.Background(), errChan, l) | ||
require.NoError(t, err) | ||
|
||
ch.checkPeriod = testCheckPeriod | ||
ch.sc.Close() | ||
ch.sc = pider | ||
|
||
return ch, errChan | ||
} | ||
|
||
type testPider struct { | ||
sync.Mutex | ||
pid int | ||
} | ||
|
||
func (p *testPider) Change(pid int) { | ||
p.Lock() | ||
defer p.Unlock() | ||
p.pid = pid | ||
} | ||
|
||
func (p *testPider) PID(ctx context.Context) (int, error) { | ||
p.Lock() | ||
defer p.Unlock() | ||
return p.pid, nil | ||
} | ||
|
||
func (p *testPider) Close() {} | ||
|
||
func (p *testPider) Name() string { return "testPider" } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Really like the unit testing on this.