Skip to content

Commit

Permalink
Cherry-pick #24480 to 7.x: Improve ILM policy and alias setup log out…
Browse files Browse the repository at this point in the history
…put (#24794)
  • Loading branch information
Steffen Siering authored Mar 29, 2021
1 parent defdfe1 commit e4bc061
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 36 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix panic due to unhandled DeletedFinalStateUnknown in k8s OnDelete {pull}23419[23419]
- Fix error loop with runaway CPU use when the Kafka output encounters some connection errors {pull}23484[23484]
- Allow configuring credential_profile_name and shared_credential_file when using role_arn. {pull}24174[24174]
- Fix ILM setup log reporting that a policy or an alias was created, even though the creation of any resource was disabled. {issue}24046[24046] {pull}24480[24480]
- Fix ILM alias not being created if `setup.ilm.check_exists: false` and `setup.ilm.overwrite: true` has been configured. {pull}24480[24480]
- Allow cgroup self-monitoring to see alternate `hostfs` paths {pull}24334[24334]


Expand Down
25 changes: 24 additions & 1 deletion libbeat/idxmgmt/ilm/ilm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,27 @@ func TestDefaultSupport_Manager_EnsureAlias(t *testing.T) {
},
fail: ErrRequestFailed,
},
"overwrite non-existent": {
calls: []onCall{
onCreateAlias(alias).Return(nil),
},
fail: nil,
cfg: map[string]interface{}{"check_exists": false, "overwrite": true},
},
"try overwrite existing": {
calls: []onCall{
onCreateAlias(alias).Return(errOf(ErrAliasAlreadyExists)),
},
fail: nil, // we detect that that the alias exists, and call it a day.
cfg: map[string]interface{}{"check_exists": false, "overwrite": true},
},
"fail to overwrite": {
calls: []onCall{
onCreateAlias(alias).Return(errOf(ErrAliasCreateFailed)),
},
fail: ErrAliasCreateFailed,
cfg: map[string]interface{}{"check_exists": false, "overwrite": true},
},
}

for name, test := range cases {
Expand Down Expand Up @@ -283,12 +304,14 @@ func TestDefaultSupport_Manager_EnsurePolicy(t *testing.T) {
},
},
"policy already exists": {
create: false,
calls: []onCall{
onHasILMPolicy(testPolicy.Name).Return(true, nil),
},
},
"overwrite existing": {
"overwrite": {
overwrite: true,
create: true,
calls: []onCall{
onCreateILMPolicy(testPolicy).Return(nil),
},
Expand Down
73 changes: 54 additions & 19 deletions libbeat/idxmgmt/ilm/std.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,42 +103,77 @@ func (m *stdManager) CheckEnabled() (bool, error) {
}

func (m *stdManager) EnsureAlias() error {
if !m.checkExists {
return nil
}
log := m.log
overwrite := m.Overwrite()
name := m.alias.Name

b, err := m.client.HasAlias(m.alias.Name)
if err != nil {
return err
var exists bool
if m.checkExists && !overwrite {
var err error
exists, err = m.client.HasAlias(name)
if err != nil {
return err
}
}
if b {

switch {
case exists && !overwrite:
log.Infof("Index Alias %v exists already.", name)
return nil
}

// This always assume it's a date pattern by sourrounding it by <...>
return m.client.CreateAlias(m.alias)
case !exists || overwrite:
err := m.client.CreateAlias(m.alias)
if err != nil {
if ErrReason(err) != ErrAliasAlreadyExists {
log.Errorf("Index Alias %v setup failed: %v.", name, err)
return err
}
log.Infof("Index Alias %v exists already.", name)
return nil
}

log.Infof("Index Alias %v successfully created.", name)
return nil

default:
m.log.Infof("ILM index alias not created: exists=%v, overwrite=%v", exists, overwrite)
return nil
}
}

func (m *stdManager) EnsurePolicy(overwrite bool) (bool, error) {
log := m.log
overwrite = overwrite || m.Overwrite()
name := m.policy.Name

exists := true
var exists bool
if m.checkExists && !overwrite {
b, err := m.client.HasILMPolicy(m.policy.Name)
var err error
exists, err = m.client.HasILMPolicy(name)
if err != nil {
return false, err
}
exists = b
}

if !exists || overwrite {
return !exists, m.client.CreateILMPolicy(m.policy)
}
switch {
case exists && !overwrite:
log.Infof("ILM policy %v exists already.", name)
return false, nil

case !exists || overwrite:
err := m.client.CreateILMPolicy(m.policy)
if err != nil {
log.Errorf("ILM policy %v creation failed: %v", name, err)
return false, err
}

log.Infof("do not generate ilm policy: exists=%v, overwrite=%v",
exists, overwrite)
return false, nil
log.Infof("ILM policy %v successfully created.", name)
return true, err

default:
log.Infof("ILM policy not created: exists=%v, overwrite=%v.", exists, overwrite)
return false, nil
}
}

func (c *infoCache) Valid() bool {
Expand Down
12 changes: 3 additions & 9 deletions libbeat/idxmgmt/std.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ func (m *indexManager) Setup(loadTemplate, loadILM LoadMode) error {
if err != nil {
return err
}
log.Info("ILM policy successfully loaded.")

// The template should be updated if a new policy is created.
if policyCreated && templateComponent.enabled {
Expand Down Expand Up @@ -299,14 +298,9 @@ func (m *indexManager) Setup(loadTemplate, loadILM LoadMode) error {
}

if ilmComponent.load {
// ensure alias is created after the template is created
if err := m.ilm.EnsureAlias(); err != nil {
if ilm.ErrReason(err) != ilm.ErrAliasAlreadyExists {
return err
}
log.Info("Write alias exists already")
} else {
log.Info("Write alias successfully generated.")
err := m.ilm.EnsureAlias()
if err != nil {
return err
}
}

Expand Down
18 changes: 11 additions & 7 deletions libbeat/tests/system/test_ilm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import pytest
import re
import shutil
import unittest

Expand All @@ -12,6 +13,9 @@
INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False)


MSG_ILM_POLICY_LOADED = re.compile('ILM policy .* successfully created.')


class TestRunILM(BaseTest):

def setUp(self):
Expand Down Expand Up @@ -46,7 +50,7 @@ def test_ilm_default(self):
self.render_config()
proc = self.start_beat()
self.wait_until(lambda: self.log_contains("mockbeat start running."))
self.wait_until(lambda: self.log_contains("ILM policy successfully loaded"))
self.wait_until(lambda: self.log_contains(MSG_ILM_POLICY_LOADED))
self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published"))
proc.check_kill_and_wait()

Expand Down Expand Up @@ -84,7 +88,7 @@ def test_policy_name(self):

proc = self.start_beat()
self.wait_until(lambda: self.log_contains("mockbeat start running."))
self.wait_until(lambda: self.log_contains("ILM policy successfully loaded"))
self.wait_until(lambda: self.log_contains(MSG_ILM_POLICY_LOADED))
self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published"))
proc.check_kill_and_wait()

Expand All @@ -103,7 +107,7 @@ def test_rollover_alias(self):

proc = self.start_beat()
self.wait_until(lambda: self.log_contains("mockbeat start running."))
self.wait_until(lambda: self.log_contains("ILM policy successfully loaded"))
self.wait_until(lambda: self.log_contains(MSG_ILM_POLICY_LOADED))
self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published"))
proc.check_kill_and_wait()

Expand All @@ -123,7 +127,7 @@ def test_pattern(self):

proc = self.start_beat()
self.wait_until(lambda: self.log_contains("mockbeat start running."))
self.wait_until(lambda: self.log_contains("ILM policy successfully loaded"))
self.wait_until(lambda: self.log_contains(MSG_ILM_POLICY_LOADED))
self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published"))
proc.check_kill_and_wait()

Expand All @@ -143,7 +147,7 @@ def test_pattern_date(self):

proc = self.start_beat()
self.wait_until(lambda: self.log_contains("mockbeat start running."))
self.wait_until(lambda: self.log_contains("ILM policy successfully loaded"))
self.wait_until(lambda: self.log_contains(MSG_ILM_POLICY_LOADED))
self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published"))
proc.check_kill_and_wait()

Expand Down Expand Up @@ -286,12 +290,12 @@ def setUp(self):
self.cmd = "ilm-policy"

def assert_log_contains_policy(self):
assert self.log_contains('ILM policy successfully loaded.')
assert self.log_contains(MSG_ILM_POLICY_LOADED)
assert self.log_contains('"max_age": "30d"')
assert self.log_contains('"max_size": "50gb"')

def assert_log_contains_write_alias(self):
assert self.log_contains('Write alias successfully generated.')
assert self.log_contains(re.compile('Index Alias .* successfully created.'))

def test_default(self):
"""
Expand Down

0 comments on commit e4bc061

Please sign in to comment.