Skip to content

Commit

Permalink
Storing checkin message in last_checkin_message (#1932)
Browse files Browse the repository at this point in the history
* Storing checkin message in last_checkin_message

* added changelog

* fixed tests
  • Loading branch information
juliaElastic authored Sep 29, 2022
1 parent 69687f3 commit 129ea1c
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
- Fleet Server now allows setting global labels on APM instrumentation. {pull}1649[1649]
- Fleet Server now allows setting transaction sample rate on APM instrumentation {pull}1681[1681]
- Log redacted config when config updates. {issue}1626[1626] {pull}1668[1668]
- Storing checkin message in last_checkin_message {pull}1932[1932]
4 changes: 2 additions & 2 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r
defer longPoll.Stop()

// Initial update on checkin, and any user fields that might have changed
err = ct.bc.CheckIn(agent.Id, req.Status, rawMeta, seqno, ver)
err = ct.bc.CheckIn(agent.Id, req.Status, req.Message, rawMeta, seqno, ver)
if err != nil {
zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed")
}
Expand Down Expand Up @@ -277,7 +277,7 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r
zlog.Trace().Msg("fire long poll")
break LOOP
case <-tick.C:
err := ct.bc.CheckIn(agent.Id, req.Status, nil, nil, ver)
err := ct.bc.CheckIn(agent.Id, req.Status, req.Message, nil, nil, ver)
if err != nil {
zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed")
}
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/api/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type EnrollResponse struct {

type CheckinRequest struct {
Status string `json:"status"`
Message string `json:"message"`
AckToken string `json:"ack_token,omitempty"`
LocalMeta json.RawMessage `json:"local_metadata"`
}
Expand Down
30 changes: 17 additions & 13 deletions internal/pkg/checkin/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ type extraT struct {
// There will be 10's of thousands of items
// in the map at any point.
type pendingT struct {
ts string
status string
extra *extraT
ts string
status string
message string
extra *extraT
}

// Bulk will batch pending checkins and update elasticsearch at a set interval.
Expand Down Expand Up @@ -98,7 +99,7 @@ func (bc *Bulk) timestamp() string {
// CheckIn will add the agent (identified by id) to the pending set.
// The pending agents are sent to elasticsearch as a bulk update at each flush interval.
// WARNING: Bulk will take ownership of fields, so do not use after passing in.
func (bc *Bulk) CheckIn(id string, status string, meta []byte, seqno sqn.SeqNo, newVer string) error {
func (bc *Bulk) CheckIn(id string, status string, message string, meta []byte, seqno sqn.SeqNo, newVer string) error {
// Separate out the extra data to minimize
// the memory footprint of the 90% case of just
// updating the timestamp.
Expand All @@ -114,9 +115,10 @@ func (bc *Bulk) CheckIn(id string, status string, meta []byte, seqno sqn.SeqNo,
bc.mut.Lock()

bc.pending[id] = pendingT{
ts: bc.timestamp(),
status: status,
extra: extra,
ts: bc.timestamp(),
status: status,
message: message,
extra: extra,
}

bc.mut.Unlock()
Expand Down Expand Up @@ -180,9 +182,10 @@ func (bc *Bulk) flush(ctx context.Context) error {
body, ok = simpleCache[pendingData]
if !ok {
fields := bulk.UpdateFields{
dl.FieldLastCheckin: pendingData.ts,
dl.FieldUpdatedAt: nowTimestamp,
dl.FieldLastCheckinStatus: pendingData.status,
dl.FieldLastCheckin: pendingData.ts,
dl.FieldUpdatedAt: nowTimestamp,
dl.FieldLastCheckinStatus: pendingData.status,
dl.FieldLastCheckinMessage: pendingData.message,
}
if body, err = fields.Marshal(); err != nil {
return err
Expand All @@ -192,9 +195,10 @@ func (bc *Bulk) flush(ctx context.Context) error {
} else {

fields := bulk.UpdateFields{
dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp
dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp
dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status
dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp
dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp
dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status
dl.FieldLastCheckinMessage: pendingData.message, // Set the status message
}

// If the agent version is not empty it needs to be updated
Expand Down
25 changes: 17 additions & 8 deletions internal/pkg/checkin/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ func matchOp(tb testing.TB, c bulkcase, ts time.Time) func(ops []bulk.MultiOp) b
}

type bulkcase struct {
desc string
id string
status string
meta []byte
seqno sqn.SeqNo
ver string
desc string
id string
status string
message string
meta []byte
seqno sqn.SeqNo
ver string
}

func TestBulkSimple(t *testing.T) {
Expand All @@ -96,6 +97,7 @@ func TestBulkSimple(t *testing.T) {
"Simple case",
"simpleId",
"online",
"message",
nil,
nil,
"",
Expand All @@ -104,6 +106,7 @@ func TestBulkSimple(t *testing.T) {
"Singled field case",
"singleFieldId",
"online",
"message",
[]byte(`{"hey":"now"}`),
nil,
"",
Expand All @@ -112,6 +115,7 @@ func TestBulkSimple(t *testing.T) {
"Multi field case",
"multiFieldId",
"online",
"message",
[]byte(`{"hey":"now","brown":"cow"}`),
nil,
ver,
Expand All @@ -120,6 +124,7 @@ func TestBulkSimple(t *testing.T) {
"Multi field nested case",
"multiFieldNestedId",
"online",
"message",
[]byte(`{"hey":"now","wee":{"little":"doggie"}}`),
nil,
"",
Expand All @@ -128,6 +133,7 @@ func TestBulkSimple(t *testing.T) {
"Simple case with seqNo",
"simpleseqno",
"online",
"message",
nil,
sqn.SeqNo{1, 2, 3, 4},
ver,
Expand All @@ -136,6 +142,7 @@ func TestBulkSimple(t *testing.T) {
"Field case with seqNo",
"simpleseqno",
"online",
"message",
[]byte(`{"uncle":"fester"}`),
sqn.SeqNo{5, 6, 7, 8},
ver,
Expand All @@ -144,6 +151,7 @@ func TestBulkSimple(t *testing.T) {
"Unusual status",
"singleFieldId",
"unusual",
"message",
nil,
nil,
"",
Expand All @@ -152,6 +160,7 @@ func TestBulkSimple(t *testing.T) {
"Empty status",
"singleFieldId",
"",
"message",
nil,
nil,
"",
Expand All @@ -165,7 +174,7 @@ func TestBulkSimple(t *testing.T) {
mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchOp(t, c, start)), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once()
bc := NewBulk(mockBulk)

if err := bc.CheckIn(c.id, c.status, c.meta, c.seqno, c.ver); err != nil {
if err := bc.CheckIn(c.id, c.status, c.message, c.meta, c.seqno, c.ver); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -203,7 +212,7 @@ func benchmarkBulk(n int, flush bool, b *testing.B) {
for i := 0; i < b.N; i++ {

for _, id := range ids {
err := bc.CheckIn(id, "", nil, nil, "")
err := bc.CheckIn(id, "", "", nil, nil, "")
if err != nil {
b.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/dl/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
FieldCoordinatorIdx = "coordinator_idx"
FieldLastCheckin = "last_checkin"
FieldLastCheckinStatus = "last_checkin_status"
FieldLastCheckinMessage = "last_checkin_message"
FieldLocalMetadata = "local_metadata"
FieldPolicyCoordinatorIdx = "policy_coordinator_idx"
FieldPolicyID = "policy_id"
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/model/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions model/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,10 @@
"description": "Last checkin status",
"type": "string"
},
"last_checkin_message": {
"description": "Last checkin message",
"type": "string"
},
"default_api_key_id": {
"description": "Deprecated. Use Outputs instead. ID of the API key the Elastic Agent uses to authenticate with elasticsearch",
"type": "string"
Expand Down

0 comments on commit 129ea1c

Please sign in to comment.