-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Ingest Manager] Support for upgrade rollback #22537
Changes from 33 commits
d350d91
f4b9bf7
7957f1a
f19380f
05b1811
07454c2
0a1387f
228c620
7295dae
e55b925
2a16639
4c1e819
5665d41
cfb3b02
764b977
4a27cdf
8cc38c0
dca4f7e
46ed099
913f2b4
e390ae9
4a9e82f
7f9e422
7d2f10b
f4b01fe
59ccd59
3e55660
81664a6
7ab12fc
ef96264
bd566c2
ce360fa
628c1ab
cdfbfa0
6e0a679
a653e7b
581142d
ea2b917
d1703f2
549358b
7fe9b02
3b83ccd
a09f28a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package upgrade | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"os" | ||
"sync" | ||
"time" | ||
|
||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" | ||
) | ||
|
||
const ( | ||
defaultCheckPeriod = 10 * time.Second | ||
evaluatedPeriods = 6 // with 10s period this means we evaluate 60s of agent run | ||
crashesAllowed = 2 // means that within 60s one restart is allowed, additional one is considered crash | ||
) | ||
|
||
type serviceHandler interface { | ||
PID(ctx context.Context) (int, error) | ||
Close() | ||
} | ||
|
||
// CrashChecker checks agent for crash pattern in Elastic Agent lifecycle. | ||
type CrashChecker struct { | ||
notifyChan chan error | ||
q *disctintQueue | ||
log *logger.Logger | ||
sc serviceHandler | ||
checkPeriod time.Duration | ||
} | ||
|
||
// NewCrashChecker creates a new crash checker. | ||
func NewCrashChecker(ctx context.Context, ch chan error, log *logger.Logger) (*CrashChecker, error) { | ||
q, err := newDistinctQueue(evaluatedPeriods) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
c := &CrashChecker{ | ||
notifyChan: ch, | ||
q: q, | ||
log: log, | ||
checkPeriod: defaultCheckPeriod, | ||
} | ||
|
||
if err := c.Init(ctx); err != nil { | ||
return nil, err | ||
} | ||
|
||
return c, nil | ||
} | ||
|
||
// Run runs the checking loop. | ||
func (ch *CrashChecker) Run(ctx context.Context) { | ||
defer ch.sc.Close() | ||
|
||
ch.log.Debug("Crash checker started") | ||
for { | ||
ch.log.Debugf("watcher having PID: %d", os.Getpid()) | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-time.After(ch.checkPeriod): | ||
pid, err := ch.sc.PID(ctx) | ||
if err != nil { | ||
ch.log.Error(err) | ||
} | ||
|
||
ch.q.Push(pid) | ||
restarts := ch.q.Distinct() | ||
ch.log.Debugf("retrieved service PID [%d] changed %d times within %d", pid, restarts, evaluatedPeriods) | ||
if restarts > crashesAllowed { | ||
ch.notifyChan <- errors.New(fmt.Sprintf("service restarted '%d' times within '%v' seconds", restarts, ch.checkPeriod.Seconds())) | ||
} | ||
} | ||
} | ||
} | ||
|
||
type disctintQueue struct { | ||
q []int | ||
size int | ||
lock sync.Mutex | ||
} | ||
|
||
func newDistinctQueue(size int) (*disctintQueue, error) { | ||
if size < 1 { | ||
return nil, errors.New("invalid size", errors.TypeUnexpected) | ||
} | ||
return &disctintQueue{ | ||
q: make([]int, 0, size), | ||
size: size, | ||
}, nil | ||
} | ||
|
||
func (dq *disctintQueue) Push(id int) { | ||
dq.lock.Lock() | ||
defer dq.lock.Unlock() | ||
|
||
cutIdx := len(dq.q) | ||
if dq.size-1 < len(dq.q) { | ||
cutIdx = dq.size - 1 | ||
} | ||
dq.q = append([]int{id}, dq.q[:cutIdx]...) | ||
} | ||
|
||
func (dq *disctintQueue) Distinct() int { | ||
dq.lock.Lock() | ||
defer dq.lock.Unlock() | ||
|
||
dm := make(map[int]int) | ||
|
||
for _, id := range dq.q { | ||
dm[id] = 1 | ||
} | ||
|
||
return len(dm) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package upgrade | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" | ||
) | ||
|
||
var ( | ||
testCheckPeriod = 100 * time.Millisecond | ||
) | ||
|
||
func TestChecker(t *testing.T) { | ||
t.Run("no failure when no change", func(t *testing.T) { | ||
pider := &testPider{} | ||
ch, errChan := testableChecker(t, pider) | ||
ctx, canc := context.WithCancel(context.Background()) | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
wg.Done() | ||
ch.Run(ctx) | ||
}() | ||
|
||
wg.Wait() | ||
<-time.After(6 * testCheckPeriod) | ||
|
||
var err error | ||
select { | ||
case err = <-errChan: | ||
default: | ||
} | ||
|
||
canc() | ||
require.NoError(t, err) | ||
}) | ||
|
||
t.Run("no failure when unfrequent change", func(t *testing.T) { | ||
pider := &testPider{} | ||
ch, errChan := testableChecker(t, pider) | ||
ctx, canc := context.WithCancel(context.Background()) | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
wg.Done() | ||
ch.Run(ctx) | ||
}() | ||
|
||
wg.Wait() | ||
for i := 0; i < 2; i++ { | ||
<-time.After(3 * testCheckPeriod) | ||
pider.Change(i) | ||
} | ||
var err error | ||
select { | ||
case err = <-errChan: | ||
default: | ||
} | ||
|
||
canc() | ||
require.NoError(t, err) | ||
}) | ||
|
||
t.Run("no failure when change lower than limit", func(t *testing.T) { | ||
pider := &testPider{} | ||
ch, errChan := testableChecker(t, pider) | ||
ctx, canc := context.WithCancel(context.Background()) | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
wg.Done() | ||
ch.Run(ctx) | ||
}() | ||
|
||
wg.Wait() | ||
for i := 0; i < 3; i++ { | ||
<-time.After(7 * testCheckPeriod) | ||
pider.Change(i) | ||
} | ||
var err error | ||
select { | ||
case err = <-errChan: | ||
default: | ||
} | ||
|
||
canc() | ||
require.NoError(t, err) | ||
}) | ||
|
||
t.Run("fails when pid changes frequently", func(t *testing.T) { | ||
pider := &testPider{} | ||
ch, errChan := testableChecker(t, pider) | ||
ctx, canc := context.WithCancel(context.Background()) | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
wg.Done() | ||
ch.Run(ctx) | ||
}() | ||
|
||
wg.Wait() | ||
for i := 0; i < 12; i++ { | ||
<-time.After(testCheckPeriod / 2) | ||
pider.Change(i) | ||
} | ||
var err error | ||
select { | ||
case err = <-errChan: | ||
default: | ||
} | ||
|
||
canc() | ||
require.Error(t, err) | ||
}) | ||
} | ||
|
||
func testableChecker(t *testing.T, pider *testPider) (*CrashChecker, chan error) { | ||
errChan := make(chan error, 1) | ||
l, _ := logger.New("") | ||
ch, err := NewCrashChecker(context.Background(), errChan, l) | ||
require.NoError(t, err) | ||
|
||
ch.checkPeriod = testCheckPeriod | ||
ch.sc.Close() | ||
ch.sc = pider | ||
|
||
return ch, errChan | ||
} | ||
|
||
type testPider struct { | ||
sync.Mutex | ||
pid int | ||
} | ||
|
||
func (p *testPider) Change(pid int) { | ||
p.Lock() | ||
defer p.Unlock() | ||
p.pid = pid | ||
} | ||
|
||
func (p *testPider) PID(ctx context.Context) (int, error) { | ||
p.Lock() | ||
defer p.Unlock() | ||
return p.pid, nil | ||
} | ||
|
||
func (p *testPider) Close() {} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package upgrade | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/hashicorp/go-multierror" | ||
|
||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" | ||
) | ||
|
||
const ( | ||
statusCheckPeriod = 30 * time.Second | ||
) | ||
|
||
// ErrAgentStatusFailed is returned when agent reports FAILED status. | ||
var ErrAgentStatusFailed = errors.New("agent in a failed state", errors.TypeApplication) | ||
|
||
// ErrorChecker checks agent for status change and sends an error to a channel if found. | ||
type ErrorChecker struct { | ||
notifyChan chan error | ||
log *logger.Logger | ||
agentClient client.Client | ||
} | ||
|
||
// NewErrorChecker creates a new error checker. | ||
func NewErrorChecker(ch chan error, log *logger.Logger) (*ErrorChecker, error) { | ||
c := client.New() | ||
ec := &ErrorChecker{ | ||
notifyChan: ch, | ||
agentClient: c, | ||
log: log, | ||
} | ||
|
||
return ec, nil | ||
} | ||
|
||
// Run runs the checking loop. | ||
func (ch ErrorChecker) Run(ctx context.Context) { | ||
ch.log.Debug("Error checker started") | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-time.After(statusCheckPeriod): | ||
err := ch.agentClient.Connect(ctx) | ||
if err != nil { | ||
ch.log.Error(err, "Failed communicating to running daemon", errors.TypeNetwork, errors.M("socket", control.Address())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if the new Agent starts running but there is a bug that never brings up the control socket? This case would not be detected by the PID watcher. I think the un-ability to communicate with the Agent is something the Maybe we give more chances for this to fail, as the Agent could be starting, but I think it needs to be reported. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was this solved? Doesn't seem like the failed ability to connect, is handled. |
||
continue | ||
} | ||
|
||
status, err := ch.agentClient.Status(ctx) | ||
ch.agentClient.Disconnect() | ||
if err != nil { | ||
ch.log.Error("failed retrieving agent status", err) | ||
// agent is probably not running and this will be detected by pid watcher | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This falls under the comment above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nicely done! |
||
continue | ||
} | ||
|
||
if status.Status == client.Failed { | ||
ch.log.Error("error checker notifying failure of agent") | ||
ch.notifyChan <- ErrAgentStatusFailed | ||
} | ||
|
||
for _, app := range status.Applications { | ||
if app.Status == client.Failed { | ||
err = multierror.Append(err, errors.New(fmt.Sprintf("application %s[%v] failed: %s", app.Name, app.ID, app.Message))) | ||
} | ||
} | ||
|
||
if err != nil { | ||
ch.log.Error("error checker notifying failure of applications") | ||
ch.notifyChan <- errors.New(err, "applications in a failed state", errors.TypeApplication) | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Really like the unit testing on this.