Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storing checkin message in last_checkin_message #1932

Merged
merged 4 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
- Fleet Server now allows setting global labels on APM instrumentation. {pull}1649[1649]
- Fleet Server now allows setting transaction sample rate on APM instrumentation {pull}1681[1681]
- Log redacted config when config updates. {issue}1626[1626] {pull}1668[1668]
- Storing checkin message in last_checkin_message {pull}1932[1932]
4 changes: 2 additions & 2 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r
defer longPoll.Stop()

// Initial update on checkin, and any user fields that might have changed
err = ct.bc.CheckIn(agent.Id, req.Status, rawMeta, seqno, ver)
err = ct.bc.CheckIn(agent.Id, req.Status, req.Message, rawMeta, seqno, ver)
if err != nil {
zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed")
}
Expand Down Expand Up @@ -277,7 +277,7 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r
zlog.Trace().Msg("fire long poll")
break LOOP
case <-tick.C:
err := ct.bc.CheckIn(agent.Id, req.Status, nil, nil, ver)
err := ct.bc.CheckIn(agent.Id, req.Status, req.Message, nil, nil, ver)
if err != nil {
zlog.Error().Err(err).Str("agent_id", agent.Id).Msg("checkin failed")
}
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/api/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type EnrollResponse struct {

type CheckinRequest struct {
Status string `json:"status"`
Message string `json:"message"`
AckToken string `json:"ack_token,omitempty"`
LocalMeta json.RawMessage `json:"local_metadata"`
}
Expand Down
30 changes: 17 additions & 13 deletions internal/pkg/checkin/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ type extraT struct {
// There will be 10's of thousands of items
// in the map at any point.
type pendingT struct {
ts string
status string
extra *extraT
ts string
status string
message string
extra *extraT
}

// Bulk will batch pending checkins and update elasticsearch at a set interval.
Expand Down Expand Up @@ -98,7 +99,7 @@ func (bc *Bulk) timestamp() string {
// CheckIn will add the agent (identified by id) to the pending set.
// The pending agents are sent to elasticsearch as a bulk update at each flush interval.
// WARNING: Bulk will take ownership of fields, so do not use after passing in.
func (bc *Bulk) CheckIn(id string, status string, meta []byte, seqno sqn.SeqNo, newVer string) error {
func (bc *Bulk) CheckIn(id string, status string, message string, meta []byte, seqno sqn.SeqNo, newVer string) error {
// Separate out the extra data to minimize
// the memory footprint of the 90% case of just
// updating the timestamp.
Expand All @@ -114,9 +115,10 @@ func (bc *Bulk) CheckIn(id string, status string, meta []byte, seqno sqn.SeqNo,
bc.mut.Lock()

bc.pending[id] = pendingT{
ts: bc.timestamp(),
status: status,
extra: extra,
ts: bc.timestamp(),
status: status,
message: message,
extra: extra,
}

bc.mut.Unlock()
Expand Down Expand Up @@ -180,9 +182,10 @@ func (bc *Bulk) flush(ctx context.Context) error {
body, ok = simpleCache[pendingData]
if !ok {
fields := bulk.UpdateFields{
dl.FieldLastCheckin: pendingData.ts,
dl.FieldUpdatedAt: nowTimestamp,
dl.FieldLastCheckinStatus: pendingData.status,
dl.FieldLastCheckin: pendingData.ts,
dl.FieldUpdatedAt: nowTimestamp,
dl.FieldLastCheckinStatus: pendingData.status,
dl.FieldLastCheckinMessage: pendingData.message,
}
if body, err = fields.Marshal(); err != nil {
return err
Expand All @@ -192,9 +195,10 @@ func (bc *Bulk) flush(ctx context.Context) error {
} else {

fields := bulk.UpdateFields{
dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp
dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp
dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status
dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp
dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp
dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status
dl.FieldLastCheckinMessage: pendingData.message, // Set the status message
}

// If the agent version is not empty it needs to be updated
Expand Down
25 changes: 17 additions & 8 deletions internal/pkg/checkin/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ func matchOp(tb testing.TB, c bulkcase, ts time.Time) func(ops []bulk.MultiOp) b
}

type bulkcase struct {
desc string
id string
status string
meta []byte
seqno sqn.SeqNo
ver string
desc string
id string
status string
message string
meta []byte
seqno sqn.SeqNo
ver string
}

func TestBulkSimple(t *testing.T) {
Expand All @@ -96,6 +97,7 @@ func TestBulkSimple(t *testing.T) {
"Simple case",
"simpleId",
"online",
"message",
nil,
nil,
"",
Expand All @@ -104,6 +106,7 @@ func TestBulkSimple(t *testing.T) {
"Singled field case",
"singleFieldId",
"online",
"message",
[]byte(`{"hey":"now"}`),
nil,
"",
Expand All @@ -112,6 +115,7 @@ func TestBulkSimple(t *testing.T) {
"Multi field case",
"multiFieldId",
"online",
"message",
[]byte(`{"hey":"now","brown":"cow"}`),
nil,
ver,
Expand All @@ -120,6 +124,7 @@ func TestBulkSimple(t *testing.T) {
"Multi field nested case",
"multiFieldNestedId",
"online",
"message",
[]byte(`{"hey":"now","wee":{"little":"doggie"}}`),
nil,
"",
Expand All @@ -128,6 +133,7 @@ func TestBulkSimple(t *testing.T) {
"Simple case with seqNo",
"simpleseqno",
"online",
"message",
nil,
sqn.SeqNo{1, 2, 3, 4},
ver,
Expand All @@ -136,6 +142,7 @@ func TestBulkSimple(t *testing.T) {
"Field case with seqNo",
"simpleseqno",
"online",
"message",
[]byte(`{"uncle":"fester"}`),
sqn.SeqNo{5, 6, 7, 8},
ver,
Expand All @@ -144,6 +151,7 @@ func TestBulkSimple(t *testing.T) {
"Unusual status",
"singleFieldId",
"unusual",
"message",
nil,
nil,
"",
Expand All @@ -152,6 +160,7 @@ func TestBulkSimple(t *testing.T) {
"Empty status",
"singleFieldId",
"",
"message",
nil,
nil,
"",
Expand All @@ -165,7 +174,7 @@ func TestBulkSimple(t *testing.T) {
mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchOp(t, c, start)), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once()
bc := NewBulk(mockBulk)

if err := bc.CheckIn(c.id, c.status, c.meta, c.seqno, c.ver); err != nil {
if err := bc.CheckIn(c.id, c.status, c.message, c.meta, c.seqno, c.ver); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -203,7 +212,7 @@ func benchmarkBulk(n int, flush bool, b *testing.B) {
for i := 0; i < b.N; i++ {

for _, id := range ids {
err := bc.CheckIn(id, "", nil, nil, "")
err := bc.CheckIn(id, "", "", nil, nil, "")
if err != nil {
b.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/dl/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
FieldCoordinatorIdx = "coordinator_idx"
FieldLastCheckin = "last_checkin"
FieldLastCheckinStatus = "last_checkin_status"
FieldLastCheckinMessage = "last_checkin_message"
FieldLocalMetadata = "local_metadata"
FieldPolicyCoordinatorIdx = "policy_coordinator_idx"
FieldPolicyID = "policy_id"
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/model/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions model/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,10 @@
"description": "Last checkin status",
"type": "string"
},
"last_checkin_message": {
"description": "Last checkin message",
"type": "string"
},
"default_api_key_id": {
"description": "Deprecated. Use Outputs instead. ID of the API key the Elastic Agent uses to authenticate with elasticsearch",
"type": "string"
Expand Down