Skip to content

Commit

Permalink
Fix discovery of Nomad allocations (elastic#28700)
Browse files Browse the repository at this point in the history
In some cases there can be multiple events during the startup of Nomad
allocations. These updates were being ignored, so the allocation was not
discovered, missing events.
  • Loading branch information
jsoriano authored Nov 22, 2021
1 parent 668da78 commit 389da94
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 3 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Output errors when Kibana index pattern setup fails. {pull}20121[20121]
- Fix issue in autodiscover that kept inputs stopped after config updates. {pull}20305[20305]
- Add service resource in k8s cluster role. {pull}20546[20546]
- Periodic metrics in logs will now report `libbeat.output.events.active` and `beat.memstats.rss`
- Periodic metrics in logs will now report `libbeat.output.events.active` and `beat.memstats.rss` as gauges (rather than counters). {pull}22877[22877]
- Fix discovery of Nomad allocations with multiple events during startup. {pull}28700[28700]
- Allows disable pod events enrichment with deployment name {pull}28521[28521]
- Fix `fingerprint` processor to give it access to the `@timestamp` field. {issue}28683[28683]
- Fix the wrong beat name on monitoring and state endpoint {issue}27755[27755]
Expand Down
10 changes: 8 additions & 2 deletions x-pack/libbeat/common/nomad/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,13 @@ func (w *watcher) sync() error {
w.logger.Debugf("Found %d allocations", len(allocations))
for _, alloc := range allocations {
// the allocation has not changed since last seen, ignore
if w.waitIndex > alloc.AllocModifyIndex {
if w.waitIndex > alloc.ModifyIndex {
w.logger.Debugf(
"Skip allocation.id=%s ClientStatus=%s because w.waitIndex=%v > alloc.ModifyIndex=%v",
alloc.ID,
alloc.ClientStatus,
fmt.Sprint(w.waitIndex),
fmt.Sprint(alloc.ModifyIndex))
continue
}

Expand All @@ -156,7 +162,7 @@ func (w *watcher) sync() error {
case AllocClientStatusRunning:
// Handle in-place allocation updates (like adding tags to a service definition) that
// don't trigger a new allocation
updated := (w.waitIndex != 0) && (alloc.CreateIndex < w.waitIndex) && (alloc.AllocModifyIndex >= w.waitIndex)
updated := (w.waitIndex != 0) && (alloc.CreateIndex < w.waitIndex) && (alloc.ModifyIndex >= w.waitIndex)

w.logger.Debugf("allocation.id=%s waitIndex=%v CreateIndex=%v ModifyIndex=%v AllocModifyIndex=%v updated=%v",
alloc.ID, w.waitIndex, alloc.CreateIndex, alloc.ModifyIndex,
Expand Down
24 changes: 24 additions & 0 deletions x-pack/libbeat/common/nomad/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,30 @@ func TestAllocationWatcher(t *testing.T) {
deleted: nil,
},
},
{
name: "old allocation index new modify index should be detected",
node: api.Node{ID: uuid.Must(uuid.NewV4()).String(), Name: "nomad1"},
allocs: []api.Allocation{
{
ModifyIndex: 20, CreateIndex: 11,
AllocModifyIndex: 11, TaskGroup: "group1",
NodeName: "nomad1", ClientStatus: AllocClientStatusRunning,
},
},
waitIndex: 24,
initialWaitIndex: 17,
expected: watcherEvents{
added: nil,
updated: []api.Allocation{
{
ModifyIndex: 20, CreateIndex: 11,
AllocModifyIndex: 11, TaskGroup: "group1",
NodeName: "nomad1", ClientStatus: AllocClientStatusRunning,
},
},
deleted: nil,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit 389da94

Please sign in to comment.