Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow upgrade action to signal retry #1887

Merged
merged 12 commits into from
Oct 24, 2022
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
@@ -27,3 +27,4 @@
- Fleet Server now allows setting transaction sample rate on APM instrumentation {pull}1681[1681]
- Log redacted config when config updates. {issue}1626[1626] {pull}1668[1668]
- Storing checkin message in last_checkin_message {pull}1932[1932]
- Allow upgrade actions to signal that they will be retried. {pull}1887[1887]
54 changes: 40 additions & 14 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
@@ -265,17 +265,17 @@ func (ack *AckT) handleAckEvents(ctx context.Context, zlog zerolog.Logger, agent
// The unenroll and upgrade acks might overwrite it later
setResult(n, http.StatusOK)

if ev.Error == "" {
if action.Type == TypeUnenroll {
unenrollIdxs = append(unenrollIdxs, n)
} else if action.Type == TypeUpgrade {
if err := ack.handleUpgrade(ctx, zlog, agent); err != nil {
setError(n, err)
log.Error().Err(err).Msg("handle upgrade event")
continue
}
if action.Type == TypeUpgrade {
if err := ack.handleUpgrade(ctx, zlog, agent, ev); err != nil {
setError(n, err)
log.Error().Err(err).Msg("handle upgrade event")
continue
}
}

if ev.Error == "" && action.Type == TypeUnenroll {
unenrollIdxs = append(unenrollIdxs, n)
}
}

// Process policy acks
@@ -503,12 +503,38 @@ func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent
return nil
}

func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error {

func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, event Event) error {
now := time.Now().UTC().Format(time.RFC3339)
doc := bulk.UpdateFields{
dl.FieldUpgradeStartedAt: nil,
dl.FieldUpgradedAt: now,
doc := bulk.UpdateFields{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does an empty UpdateFields do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a nop

Copy link
Contributor

@juliaElastic juliaElastic Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove it then?

if event.Error != "" {
// unmarshal event payload
var pl struct {
Retry bool `json:"retry"`
Attempt int `json:"retry_attempt"`
}
err := json.Unmarshal(event.Payload, &pl)
if err != nil {
zlog.Error().Err(err).Msg("unable to unmarshal upgrade event payload")
}

// if the payload indicates a retry, mark change the upgrade status to retrying.
if pl.Retry {
zlog.Info().Int("retry_attempt", pl.Attempt).Msg("marking agent upgrade as retrying")
doc[dl.FieldUpgradeStatus] = "retrying" // Keep FieldUpgradeStatedAt abd FieldUpgradeded at to original values
} else {
zlog.Info().Int("retry_attempt", pl.Attempt).Msg("Agent upgrade failed, marking agent as healthy, agent logs contain failure message")
doc = bulk.UpdateFields{
dl.FieldUpgradeStartedAt: nil,
dl.FieldUpgradeStatus: nil,
michel-laterman marked this conversation as resolved.
Show resolved Hide resolved
dl.FieldUpgradedAt: now,
michel-laterman marked this conversation as resolved.
Show resolved Hide resolved
}
}
} else {
doc = bulk.UpdateFields{
dl.FieldUpgradeStartedAt: nil,
dl.FieldUpgradeStatus: nil,
dl.FieldUpgradedAt: now,
}
}

body, err := doc.Marshal()
136 changes: 136 additions & 0 deletions internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
@@ -406,6 +406,61 @@ func TestHandleAckEvents(t *testing.T) {
},
err: &HTTPError{Status: http.StatusNotFound},
},
{
name: "upgrade action failed",
events: []Event{
{
ActionID: "ab12dcd8-bde0-4045-92dc-c4b27668d73a",
Type: "UPGRADE",
Error: "Error with no payload",
},
},
res: newAckResponse(false, []AckResponseItem{
{
Status: http.StatusOK,
Message: http.StatusText(http.StatusOK),
},
}),
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Search", mock.Anything, mock.Anything, mock.MatchedBy(matchAction(t, "ab12dcd8-bde0-4045-92dc-c4b27668d73a")), mock.Anything).Return(&es.ResultT{HitsT: es.HitsT{
Hits: []es.HitT{{
Source: []byte(`{"action_id":"ab12dcd8-bde0-4045-92dc-c4b27668d73a","type":"UPGRADE"}`),
}},
}}, nil).Once()
m.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("", nil).Once()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
return m
},
},
{
name: "upgrade action retrying",
events: []Event{
{
ActionID: "ab12dcd8-bde0-4045-92dc-c4b27668d73a",
Type: "UPGRADE",
Error: "Error with payload",
Payload: json.RawMessage(`{"retry":true,"retry_attempt":1}`),
},
},
res: newAckResponse(false, []AckResponseItem{
{
Status: http.StatusOK,
Message: http.StatusText(http.StatusOK),
},
}),
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Search", mock.Anything, mock.Anything, mock.MatchedBy(matchAction(t, "ab12dcd8-bde0-4045-92dc-c4b27668d73a")), mock.Anything).Return(&es.ResultT{HitsT: es.HitsT{
Hits: []es.HitT{{
Source: []byte(`{"action_id":"ab12dcd8-bde0-4045-92dc-c4b27668d73a","type":"UPGRADE"}`),
}},
}}, nil).Once()
m.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("", nil).Once()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
return m
},
},
}

for _, tc := range tests {
@@ -492,3 +547,84 @@ func TestInvalidateAPIKeys(t *testing.T) {
bulker.AssertExpectations(t)
}
}

func TestAckHandleUpgrade(t *testing.T) {
tests := []struct {
name string
event Event
bulker func(t *testing.T) *ftesting.MockBulk
}{{
name: "ok",
event: Event{},
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
return m
},
}, {
name: "retry signaled",
event: Event{
Error: "upgrade error",
Payload: json.RawMessage(`{"retry":true,"retry_attempt":1}`),
},
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.MatchedBy(func(p []byte) bool {
var body struct {
Doc struct {
Status string `json:"upgrade_status"`
} `json:"doc"`
}
if err := json.Unmarshal(p, &body); err != nil {
t.Fatal(err)
}
return body.Doc.Status == "retrying"
}), mock.Anything).Return(nil).Once()
return m
},
}, {
name: "no more retries",
event: Event{
Error: "upgrade error",
Payload: json.RawMessage(`{"retry":false}`),
},
bulker: func(t *testing.T) *ftesting.MockBulk {
m := ftesting.NewMockBulk()
m.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.MatchedBy(func(p []byte) bool {
var body struct {
Doc struct {
Status string `json:"upgrade_status"`
} `json:"doc"`
}
if err := json.Unmarshal(p, &body); err != nil {
t.Fatal(err)
}
return body.Doc.Status == ""
}), mock.Anything).Return(nil).Once()
return m
},
}}
cfg := &config.Server{
Limits: config.ServerLimits{},
}
agent := &model.Agent{
ESDocument: model.ESDocument{Id: "ab12dcd8-bde0-4045-92dc-c4b27668d735"},
Agent: &model.AgentMetadata{Version: "8.0.0"},
}
ctx := context.Background()
cache, err := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000})
if err != nil {
t.Fatal(err)
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
logger := testlog.SetLogger(t)
bulker := tc.bulker(t)
ack := NewAckT(cfg, bulker, cache)

err := ack.handleUpgrade(ctx, logger, agent, tc.event)
assert.NoError(t, err)
bulker.AssertExpectations(t)
})
}
}