Skip to content

Commit

Permalink
Fix IndexAuditTrail rolling upgrade on rollover edge 2 (#38286) (#38381)
Browse files Browse the repository at this point in the history
Fixes a race during the rolling upgrade with the index audit output enabled.

The race is that after the upgraded node is restarted, it installs the audit template
and updates the mapping of the "current" (from his perspective) audit index. But
the template might be installed after a new daily rolled-over index has been
created by the other old nodes, using the old templates.

However, the new node, even if it installs the template after the rollover edge,
can accumulate audit events before the edge, and will correctly try to update the
mapping of the audit index before the edge. But this way, the mapping of the index
after the edge remains un-updated, because only the master node does the
mapping updates.

The fix keeps the design of only allowing the master to update the mapping, but
the master will try, on a best effort policy, to also possibly update the mapping of
the next rollover audit index.
  • Loading branch information
albertzaharovits authored Feb 5, 2019
1 parent 2937151 commit f1aac27
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import static org.elasticsearch.xpack.security.audit.AuditUtil.indices;
import static org.elasticsearch.xpack.security.audit.AuditUtil.restRequestContent;
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.resolve;
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.resolveNext;
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.SECURITY_VERSION_STRING;

/**
Expand Down Expand Up @@ -308,6 +309,17 @@ private String getIndexName() {
return index;
}

private String getNextIndexName() {
final Message first = peek();
final String index;
if (first == null) {
index = resolveNext(IndexAuditTrailField.INDEX_NAME_PREFIX, DateTime.now(DateTimeZone.UTC), rollover);
} else {
index = resolveNext(IndexAuditTrailField.INDEX_NAME_PREFIX, first.timestamp, rollover);
}
return index;
}

private boolean hasStaleMessage() {
final Message first = peek();
if (first == null) {
Expand Down Expand Up @@ -337,7 +349,7 @@ public void onResponse(ClusterStateResponse clusterStateResponse) {
updateCurrentIndexMappingsIfNecessary(clusterStateResponse.getState());
} else if (TemplateUtils.checkTemplateExistsAndVersionMatches(INDEX_TEMPLATE_NAME,
SECURITY_VERSION_STRING, clusterStateResponse.getState(), logger,
Version.CURRENT::onOrAfter) == false) {
Version.CURRENT::onOrBefore) == false) {
putTemplate(customAuditIndexSettings(settings, logger),
e -> {
logger.error("failed to put audit trail template", e);
Expand Down Expand Up @@ -377,6 +389,7 @@ public void onFailure(Exception e) {

// pkg private for tests
void updateCurrentIndexMappingsIfNecessary(ClusterState state) {
final String nextIndex = getNextIndexName();
final String index = getIndexName();

AliasOrIndex aliasOrIndex = state.getMetaData().getAliasAndIndexLookup().get(index);
Expand All @@ -391,48 +404,60 @@ void updateCurrentIndexMappingsIfNecessary(ClusterState state) {
MappingMetaData docMapping = indexMetaData.mapping("doc");
if (docMapping == null) {
if (indexToRemoteCluster || state.nodes().isLocalNodeElectedMaster() || hasStaleMessage()) {
putAuditIndexMappingsAndStart(index);
putAuditIndexMappingsAndStart(index, nextIndex);
} else {
logger.trace("audit index [{}] is missing mapping for type [{}]", index, DOC_TYPE);
logger.debug("audit index [{}] is missing mapping for type [{}]", index, DOC_TYPE);
transitionStartingToInitialized();
}
} else {
@SuppressWarnings("unchecked")
Map<String, Object> meta = (Map<String, Object>) docMapping.sourceAsMap().get("_meta");
if (meta == null) {
logger.info("Missing _meta field in mapping [{}] of index [{}]", docMapping.type(), index);
throw new IllegalStateException("Cannot read security-version string in index " + index);
}

final String versionString = (String) meta.get(SECURITY_VERSION_STRING);
if (versionString != null && Version.fromString(versionString).onOrAfter(Version.CURRENT)) {
innerStart();
} else {
logger.warn("Missing _meta field in mapping [{}] of index [{}]", docMapping.type(), index);
if (indexToRemoteCluster || state.nodes().isLocalNodeElectedMaster() || hasStaleMessage()) {
putAuditIndexMappingsAndStart(index);
} else if (versionString == null) {
logger.trace("audit index [{}] mapping is missing meta field [{}]", index, SECURITY_VERSION_STRING);
transitionStartingToInitialized();
putAuditIndexMappingsAndStart(index, nextIndex);
} else {
logger.trace("audit index [{}] has the incorrect version [{}]", index, versionString);
logger.debug("audit index [{}] is missing _meta for type [{}]", index, DOC_TYPE);
transitionStartingToInitialized();
}
} else {
final String versionString = (String) meta.get(SECURITY_VERSION_STRING);
if (versionString != null && Version.fromString(versionString).onOrAfter(Version.CURRENT)) {
innerStart();
} else {
if (indexToRemoteCluster || state.nodes().isLocalNodeElectedMaster() || hasStaleMessage()) {
putAuditIndexMappingsAndStart(index, nextIndex);
} else if (versionString == null) {
logger.debug("audit index [{}] mapping is missing meta field [{}]", index, SECURITY_VERSION_STRING);
transitionStartingToInitialized();
} else {
logger.debug("audit index [{}] has the incorrect version [{}]", index, versionString);
transitionStartingToInitialized();
}
}
}
}
} else {
innerStart();
}
}

private void putAuditIndexMappingsAndStart(String index) {
putAuditIndexMappings(index, getPutIndexTemplateRequest(Settings.EMPTY).mappings().get(DOC_TYPE),
ActionListener.wrap(ignore -> {
logger.trace("updated mappings on audit index [{}]", index);
private void putAuditIndexMappingsAndStart(String index, String nextIndex) {
final String docMapping = getPutIndexTemplateRequest(Settings.EMPTY).mappings().get(DOC_TYPE);
putAuditIndexMappings(index, docMapping, ActionListener.wrap(ignore -> {
logger.debug("updated mappings on audit index [{}]", index);
putAuditIndexMappings(nextIndex, docMapping, ActionListener.wrap(ignoreToo -> {
logger.debug("updated mappings on next audit index [{}]", nextIndex);
innerStart();
}, e2 -> {
// best effort only
logger.debug("Failed to update mappings on next audit index [{}]", nextIndex);
innerStart();
}, e -> {
logger.error(new ParameterizedMessage("failed to update mappings on audit index [{}]", index), e);
transitionStartingToInitialized(); // reset to initialized so we can retry
}));
}, e -> {
logger.error(new ParameterizedMessage("failed to update mappings on audit index [{}]", index), e);
transitionStartingToInitialized(); // reset to initialized so we can retry
}));
}

private void transitionStartingToInitialized() {
Expand All @@ -451,7 +476,7 @@ void innerStart() {
assert false : message;
logger.error(message);
} else {
logger.trace("successful state transition from starting to started, current value: [{}]", state.get());
logger.debug("successful state transition from starting to started, current value: [{}]", state.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,31 @@
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import java.util.function.Function;

public class IndexNameResolver {

public enum Rollover {
HOURLY ("-yyyy.MM.dd.HH"),
DAILY ("-yyyy.MM.dd"),
WEEKLY ("-yyyy.w"),
MONTHLY ("-yyyy.MM");
HOURLY ("-yyyy.MM.dd.HH", d -> d.plusHours(1)),
DAILY ("-yyyy.MM.dd", d -> d.plusDays(1)),
WEEKLY ("-yyyy.w", d -> d.plusWeeks(1)),
MONTHLY ("-yyyy.MM", d -> d.plusMonths(1));

private final DateTimeFormatter formatter;
private final Function<DateTime, DateTime> next;

Rollover(String format) {
Rollover(String format, Function<DateTime, DateTime> next) {
this.formatter = DateTimeFormat.forPattern(format);
this.next = next;
}

DateTimeFormatter formatter() {
return formatter;
}

Function<DateTime, DateTime> getNext() {
return next;
}
}

private IndexNameResolver() {}
Expand All @@ -34,6 +42,10 @@ public static String resolve(DateTime timestamp, Rollover rollover) {
return rollover.formatter().print(timestamp);
}

public static String resolveNext(String indexNamePrefix, DateTime timestamp, Rollover rollover) {
return resolve(indexNamePrefix, rollover.getNext().apply(timestamp), rollover);
}

public static String resolve(String indexNamePrefix, DateTime timestamp, Rollover rollover) {
return indexNamePrefix + resolve(timestamp, rollover);
}
Expand Down
2 changes: 2 additions & 0 deletions x-pack/qa/rolling-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ subprojects {
setting 'xpack.security.audit.outputs', 'index'
setting 'xpack.ssl.keystore.path', 'testnode.jks'
setting 'xpack.ssl.keystore.password', 'testnode'
setting 'logger.org.elasticsearch.xpack.security.audit.index', 'DEBUG'
if (version.onOrAfter('6.0.0') == false) {
// this is needed since in 5.6 we don't bootstrap the token service if there is no explicit initial password
keystoreSetting 'xpack.security.authc.token.passphrase', 'xpack_token_passphrase'
Expand Down Expand Up @@ -211,6 +212,7 @@ subprojects {
setting 'xpack.security.transport.ssl.enabled', 'true'
setting 'xpack.ssl.keystore.path', 'testnode.jks'
keystoreSetting 'xpack.ssl.keystore.secure_password', 'testnode'
setting 'logger.org.elasticsearch.xpack.security.audit.index', 'DEBUG'
if (version.onOrAfter('6.0.0') == false) {
// this is needed since in 5.6 we don't bootstrap the token service if there is no explicit initial password
keystoreSetting 'xpack.security.authc.token.passphrase', 'xpack_token_passphrase'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.hasSize;

Expand Down Expand Up @@ -62,12 +63,11 @@ public void findMinVersionInCluster() throws IOException {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33867")
public void testAuditLogs() throws Exception {
assertBusy(() -> {
assertAuditDocsExist();
assertNumUniqueNodeNameBuckets(expectedNumUniqueNodeNameBuckets());
});
}, 30, TimeUnit.SECONDS);
}

private int expectedNumUniqueNodeNameBuckets() throws IOException {
Expand Down

0 comments on commit f1aac27

Please sign in to comment.