Skip to content

Commit

Permalink
[8.0](backport #28700) Fix discovery of Nomad allocations (#29079)
Browse files Browse the repository at this point in the history
In some cases there can be multiple events during the startup of Nomad
allocations. These updates were being ignored, so the allocation was not
discovered, missing events.

(cherry picked from commit 389da94)

Co-authored-by: Jaime Soriano Pastor <[email protected]>
  • Loading branch information
mergify[bot] and jsoriano authored Nov 23, 2021
1 parent 5b7f202 commit 6caf4a7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Output errors when Kibana index pattern setup fails. {pull}20121[20121]
- Fix issue in autodiscover that kept inputs stopped after config updates. {pull}20305[20305]
- Add service resource in k8s cluster role. {pull}20546[20546]
- Fix discovery of Nomad allocations with multiple events during startup. {pull}28700[28700]
- Fix `fingerprint` processor to give it access to the `@timestamp` field. {issue}28683[28683]
- Fix the wrong beat name on monitoring and state endpoint {issue}27755[27755]

Expand Down
10 changes: 8 additions & 2 deletions x-pack/libbeat/common/nomad/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,13 @@ func (w *watcher) sync() error {
w.logger.Debugf("Found %d allocations", len(allocations))
for _, alloc := range allocations {
// the allocation has not changed since last seen, ignore
if w.waitIndex > alloc.AllocModifyIndex {
if w.waitIndex > alloc.ModifyIndex {
w.logger.Debugf(
"Skip allocation.id=%s ClientStatus=%s because w.waitIndex=%v > alloc.ModifyIndex=%v",
alloc.ID,
alloc.ClientStatus,
fmt.Sprint(w.waitIndex),
fmt.Sprint(alloc.ModifyIndex))
continue
}

Expand All @@ -156,7 +162,7 @@ func (w *watcher) sync() error {
case AllocClientStatusRunning:
// Handle in-place allocation updates (like adding tags to a service definition) that
// don't trigger a new allocation
updated := (w.waitIndex != 0) && (alloc.CreateIndex < w.waitIndex) && (alloc.AllocModifyIndex >= w.waitIndex)
updated := (w.waitIndex != 0) && (alloc.CreateIndex < w.waitIndex) && (alloc.ModifyIndex >= w.waitIndex)

w.logger.Debugf("allocation.id=%s waitIndex=%v CreateIndex=%v ModifyIndex=%v AllocModifyIndex=%v updated=%v",
alloc.ID, w.waitIndex, alloc.CreateIndex, alloc.ModifyIndex,
Expand Down
24 changes: 24 additions & 0 deletions x-pack/libbeat/common/nomad/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,30 @@ func TestAllocationWatcher(t *testing.T) {
deleted: nil,
},
},
{
name: "old allocation index new modify index should be detected",
node: api.Node{ID: uuid.Must(uuid.NewV4()).String(), Name: "nomad1"},
allocs: []api.Allocation{
{
ModifyIndex: 20, CreateIndex: 11,
AllocModifyIndex: 11, TaskGroup: "group1",
NodeName: "nomad1", ClientStatus: AllocClientStatusRunning,
},
},
waitIndex: 24,
initialWaitIndex: 17,
expected: watcherEvents{
added: nil,
updated: []api.Allocation{
{
ModifyIndex: 20, CreateIndex: 11,
AllocModifyIndex: 11, TaskGroup: "group1",
NodeName: "nomad1", ClientStatus: AllocClientStatusRunning,
},
},
deleted: nil,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit 6caf4a7

Please sign in to comment.