Skip to content

Commit

Permalink
Propagate checkin status to agent record
Browse files Browse the repository at this point in the history
  • Loading branch information
Sean Cunningham committed Jun 3, 2021
1 parent c3e73ee commit 73fcdb4
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 20 deletions.
13 changes: 11 additions & 2 deletions cmd/fleet/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ func (ct *CheckinT) _handleCheckin(w http.ResponseWriter, r *http.Request, id st
return err
}

log.Debug().
Str("agentId", id).
Str("reqId", reqId).
Str("status", req.Status).
Str("seqNo", seqno.String()).
RawJSON("meta", rawMeta).
Uint64("bodyCount", readCounter.Count()).
Msg("checkin start long poll")

// Subscribe to actions dispatcher
aSub := ct.ad.Subscribe(agent.Id, seqno)
defer ct.ad.Unsubscribe(aSub)
Expand All @@ -197,7 +206,7 @@ func (ct *CheckinT) _handleCheckin(w http.ResponseWriter, r *http.Request, id st
defer longPoll.Stop()

// Intial update on checkin, and any user fields that might have changed
ct.bc.CheckIn(agent.Id, rawMeta, seqno)
ct.bc.CheckIn(agent.Id, req.Status, rawMeta, seqno)

// Initial fetch for pending actions
var (
Expand Down Expand Up @@ -234,7 +243,7 @@ func (ct *CheckinT) _handleCheckin(w http.ResponseWriter, r *http.Request, id st
log.Trace().Str(EcsHttpRequestId, reqId).Str("agentId", agent.Id).Msg("fire long poll")
break LOOP
case <-tick.C:
ct.bc.CheckIn(agent.Id, nil, nil)
ct.bc.CheckIn(agent.Id, req.Status, nil, nil)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions cmd/fleet/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type EnrollResponse struct {
}

type CheckinRequest struct {
Status string `json:"status"`
AckToken string `json:"ack_token,omitempty"`
Events []Event `json:"events"`
LocalMeta json.RawMessage `json:"local_metadata"`
Expand Down
29 changes: 17 additions & 12 deletions internal/pkg/checkin/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ type extraT struct {
// There will be 10's of thousands of items
// in the map at any point.
type pendingT struct {
ts string
extra *extraT
ts string
status string
extra *extraT
}

type Bulk struct {
Expand Down Expand Up @@ -93,7 +94,7 @@ func (bc *Bulk) timestamp() string {

// WARNING: Bulk will take ownership of fields,
// so do not use after passing in.
func (bc *Bulk) CheckIn(id string, meta []byte, seqno sqn.SeqNo) error {
func (bc *Bulk) CheckIn(id string, status string, meta []byte, seqno sqn.SeqNo) error {

// Separate out the extra data to minimize
// the memory footprint of the 90% case of just
Expand All @@ -109,8 +110,9 @@ func (bc *Bulk) CheckIn(id string, meta []byte, seqno sqn.SeqNo) error {
bc.mut.Lock()

bc.pending[id] = pendingT{
ts: bc.timestamp(),
extra: extra,
ts: bc.timestamp(),
status: status,
extra: extra,
}

bc.mut.Unlock()
Expand Down Expand Up @@ -155,7 +157,7 @@ func (bc *Bulk) flush(ctx context.Context) error {

updates := make([]bulk.MultiOp, 0, len(pending))

simpleCache := make(map[string][]byte)
simpleCache := make(map[pendingT][]byte)

nowTimestamp := start.UTC().Format(time.RFC3339)

Expand All @@ -168,23 +170,26 @@ func (bc *Bulk) flush(ctx context.Context) error {
// JSON body containing just the timestamp updates.
var body []byte
if pendingData.extra == nil {

var ok bool
body, ok = simpleCache[pendingData.ts]
body, ok = simpleCache[pendingData]
if !ok {
fields := bulk.UpdateFields{
dl.FieldLastCheckin: pendingData.ts,
dl.FieldUpdatedAt: nowTimestamp,
dl.FieldLastCheckin: pendingData.ts,
dl.FieldUpdatedAt: nowTimestamp,
dl.FieldLastCheckinStatus: pendingData.status,
}
if body, err = fields.Marshal(); err != nil {
return err
}
simpleCache[pendingData.ts] = body
simpleCache[pendingData] = body
}
} else {

fields := bulk.UpdateFields{
dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp
dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp
dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp
dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp
dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status
}

// Update local metadata if provided
Expand Down
38 changes: 32 additions & 6 deletions internal/pkg/checkin/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,53 +44,74 @@ func TestBulkSimple(t *testing.T) {
bc := NewBulk(&mockBulk)

cases := []struct {
desc string
id string
meta []byte
seqno sqn.SeqNo
desc string
id string
status string
meta []byte
seqno sqn.SeqNo
}{
{
"Simple case",
"simpleId",
"online",
nil,
nil,
},
{
"Singled field case",
"singleFieldId",
"online",
[]byte(`{"hey":"now"}`),
nil,
},
{
"Multi field case",
"multiFieldId",
"online",
[]byte(`{"hey":"now","brown":"cow"}`),
nil,
},
{
"Multi field nested case",
"multiFieldNestedId",
"online",
[]byte(`{"hey":"now","wee":{"little":"doggie"}}`),
nil,
},
{
"Simple case with seqNo",
"simpleseqno",
"online",
nil,
sqn.SeqNo{1, 2, 3, 4},
},
{
"Field case with seqNo",
"simpleseqno",
"online",
[]byte(`{"uncle":"fester"}`),
sqn.SeqNo{5, 6, 7, 8},
},
{
"Unusual status",
"singleFieldId",
"unusual",
nil,
nil,
},
{
"Empty status",
"singleFieldId",
"",
nil,
nil,
},
}

for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {

if err := bc.CheckIn(c.id, c.meta, c.seqno); err != nil {
if err := bc.CheckIn(c.id, c.status, c.meta, c.seqno); err != nil {
t.Fatal(err)
}

Expand All @@ -117,6 +138,7 @@ func TestBulkSimple(t *testing.T) {

type updateT struct {
LastCheckin string `json:"last_checkin"`
Status string `json:"last_checkin_status"`
UpdatedAt string `json:"updated_at"`
Meta json.RawMessage `json:"local_metadata"`
SeqNo sqn.SeqNo `json:"action_seq_no"`
Expand Down Expand Up @@ -145,6 +167,10 @@ func TestBulkSimple(t *testing.T) {
t.Error("meta doesn't match up")
}

if c.status != sub.Status {
t.Error("status mismatch")
}

})
}
}
Expand Down Expand Up @@ -179,7 +205,7 @@ func benchmarkBulk(n int, flush bool, b *testing.B) {
for i := 0; i < b.N; i++ {

for _, id := range ids {
err := bc.CheckIn(id, nil, nil)
err := bc.CheckIn(id, "", nil, nil)
if err != nil {
b.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/dl/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
FieldRevisionIdx = "revision_idx"
FieldCoordinatorIdx = "coordinator_idx"
FieldLastCheckin = "last_checkin"
FieldLastCheckinStatus = "last_checkin_status"
FieldLocalMetadata = "local_metadata"
FieldPolicyRevisionIdx = "policy_revision_idx"
FieldPolicyCoordinatorIdx = "policy_coordinator_idx"
Expand Down

0 comments on commit 73fcdb4

Please sign in to comment.