Skip to content

Commit

Permalink
[7.5][Transform] Fix possible audit logging disappearance after rolli…
Browse files Browse the repository at this point in the history
…ng upgrade (#49770)

ensure audit index template is available during a rolling upgrade before a
transform task can write to it.

fixes #49730
  • Loading branch information
Hendrik Muhs authored Dec 3, 2019
1 parent f331c00 commit ff47d75
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public final class TransformInternalIndexConstants {
public static final String INDEX_NAME_PATTERN_DEPRECATED = ".data-frame-internal-*";

// audit index
public static final String AUDIT_TEMPLATE_VERSION = "000001";
// gh #49730: upped version of audit index to 000002
public static final String AUDIT_TEMPLATE_VERSION = "000002";
public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-";
public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*";
public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.transform.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -35,6 +37,8 @@

public class TransportDeleteTransformAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {

private static final Logger logger = LogManager.getLogger(TransportDeleteTransformAction.class);

private final TransformConfigManager transformConfigManager;
private final TransformAuditor auditor;
private final Client client;
Expand Down Expand Up @@ -80,6 +84,7 @@ protected void masterOperation(Request request, ClusterState state,
stopResponse -> transformConfigManager.deleteTransform(request.getId(),
ActionListener.wrap(
r -> {
logger.debug("[{}] deleted transform", request.getId());
auditor.info(request.getId(), "Deleted transform.");
listener.onResponse(new AcknowledgedResponse(r));
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

package org.elasticsearch.xpack.transform.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -59,6 +61,8 @@

public class TransportPutTransformAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {

private static final Logger logger = LogManager.getLogger(TransportPutTransformAction.class);

private final XPackLicenseState licenseState;
private final Client client;
private final TransformConfigManager transformConfigManager;
Expand Down Expand Up @@ -222,6 +226,7 @@ private void putTransform(Request request, ActionListener<AcknowledgedResponse>
// <3> Return to the listener
ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap(
putTransformConfigurationResult -> {
logger.debug("[{}] created transform", config.getId());
auditor.info(config.getId(), "Created transform.");
listener.onResponse(new AcknowledgedResponse(true));
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,23 +313,39 @@ private static XContentBuilder addMetaInformation(XContentBuilder builder) throw
.endObject();
}

public static boolean haveLatestVersionedIndexTemplate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME);
}

/**
* This method should be called before any document is indexed that relies on the
* existence of the latest index template to create the internal index. The
* reason is that the standard template upgrader only runs when the master node
* existence of the latest index templates to create the internal and audit index.
* The reason is that the standard template upgrader only runs when the master node
* is upgraded to the newer version. If data nodes are upgraded before master
* nodes and transforms get assigned to those data nodes then without this check
* the data nodes will index documents into the internal index before the necessary
* index template is present and this will result in an index with completely
* dynamic mappings being created (which is very bad).
*/
public static void installLatestVersionedIndexTemplateIfRequired(ClusterService clusterService, Client client,
ActionListener<Void> listener) {
public static void installLatestIndexTemplatesIfRequired(ClusterService clusterService, Client client, ActionListener<Void> listener) {

installLatestVersionedIndexTemplateIfRequired(
clusterService,
client,
ActionListener.wrap(r -> { installLatestAuditIndexTemplateIfRequired(clusterService, client, listener); }, listener::onFailure)
);

}

protected static boolean haveLatestVersionedIndexTemplate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME);
}

protected static boolean haveLatestAuditIndexTemplate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.AUDIT_INDEX);
}

protected static void installLatestVersionedIndexTemplateIfRequired(
ClusterService clusterService,
Client client,
ActionListener<Void> listener
) {
// The check for existence of the template is against local cluster state, so very cheap
if (haveLatestVersionedIndexTemplate(clusterService.state())) {
listener.onResponse(null);
Expand All @@ -348,6 +364,41 @@ public static void installLatestVersionedIndexTemplateIfRequired(ClusterService
.alias(new Alias(".data-frame-internal-3"))
.mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2());
ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure);
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
TRANSFORM_ORIGIN,
request,
innerListener,
client.admin().indices()::putTemplate
);
} catch (IOException e) {
listener.onFailure(e);
}
}

protected static void installLatestAuditIndexTemplateIfRequired(
ClusterService clusterService,
Client client,
ActionListener<Void> listener
) {

// The check for existence of the template is against local cluster state, so very cheap
if (haveLatestAuditIndexTemplate(clusterService.state())) {
listener.onResponse(null);
return;
}

// Installing the template involves communication with the master node, so it's more expensive but much rarer
try {
IndexTemplateMetaData indexTemplateMetaData = getAuditIndexTemplateMetaData();
BytesReference jsonMappings = new BytesArray(indexTemplateMetaData.mappings().get(SINGLE_MAPPING_NAME).uncompressed());
PutIndexTemplateRequest request = new PutIndexTemplateRequest(TransformInternalIndexConstants.AUDIT_INDEX).patterns(
indexTemplateMetaData.patterns()
)
.version(indexTemplateMetaData.version())
.settings(indexTemplateMetaData.settings())
.mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2());
ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), TRANSFORM_ORIGIN, request,
innerListener, client.admin().indices()::putTemplate);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa
}
);

// <1> Check the internal index template is installed
TransformInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, templateCheckListener);
// <1> Check the index templates are installed
TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, templateCheckListener);
}

private static IndexerState currentIndexerState(TransformState previousState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
public class TransformInternalIndexTests extends ESTestCase {

public static ClusterState STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE;
public static ClusterState STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE;

static {
ImmutableOpenMap.Builder<String, IndexTemplateMetaData> mapBuilder = ImmutableOpenMap.builder();
Expand All @@ -51,6 +52,18 @@ public class TransformInternalIndexTests extends ESTestCase {
ClusterState.Builder csBuilder = ClusterState.builder(ClusterName.DEFAULT);
csBuilder.metaData(metaBuilder.build());
STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE = csBuilder.build();

mapBuilder = ImmutableOpenMap.builder();
try {
mapBuilder.put(TransformInternalIndexConstants.AUDIT_INDEX, TransformInternalIndex.getAuditIndexTemplateMetaData());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
metaBuilder = MetaData.builder();
metaBuilder.templates(mapBuilder.build());
csBuilder = ClusterState.builder(ClusterName.DEFAULT);
csBuilder.metaData(metaBuilder.build());
STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE = csBuilder.build();
}

public void testHaveLatestVersionedIndexTemplate() {
Expand Down Expand Up @@ -81,8 +94,7 @@ public void testInstallLatestVersionedIndexTemplateIfRequired_GivenRequired() {
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);

IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
doAnswer(
invocationOnMock -> {
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new AcknowledgedResponse(true));
Expand Down Expand Up @@ -112,4 +124,100 @@ public void testInstallLatestVersionedIndexTemplateIfRequired_GivenRequired() {
verify(indicesClient, times(1)).putTemplate(any(), any());
verifyNoMoreInteractions(indicesClient);
}

public void testHaveLatestAuditIndexTemplate() {

assertTrue(TransformInternalIndex.haveLatestAuditIndexTemplate(STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE));
assertFalse(TransformInternalIndex.haveLatestAuditIndexTemplate(ClusterState.EMPTY_STATE));
}

public void testInstallLatestAuditIndexTemplateIfRequired_GivenNotRequired() {

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE);

Client client = mock(Client.class);

AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));

TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener);

assertTrue(gotResponse.get());
verifyNoMoreInteractions(client);
}

public void testInstallLatestAuditIndexTemplateIfRequired_GivenRequired() {

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);

IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new AcknowledgedResponse(true));
return null;
}).when(indicesClient).putTemplate(any(), any());

AdminClient adminClient = mock(AdminClient.class);
when(adminClient.indices()).thenReturn(indicesClient);
Client client = mock(Client.class);
when(client.admin()).thenReturn(adminClient);

ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(client.threadPool()).thenReturn(threadPool);

AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));

TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener);

assertTrue(gotResponse.get());
verify(client, times(1)).threadPool();
verify(client, times(1)).admin();
verifyNoMoreInteractions(client);
verify(adminClient, times(1)).indices();
verifyNoMoreInteractions(adminClient);
verify(indicesClient, times(1)).putTemplate(any(), any());
verifyNoMoreInteractions(indicesClient);
}

public void testInstallLatestIndexTemplateIfRequired_GivenRequired() {

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);

IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new AcknowledgedResponse(true));
return null;
}).when(indicesClient).putTemplate(any(), any());

AdminClient adminClient = mock(AdminClient.class);
when(adminClient.indices()).thenReturn(indicesClient);
Client client = mock(Client.class);
when(client.admin()).thenReturn(adminClient);

ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(client.threadPool()).thenReturn(threadPool);

AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));

TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, testListener);

assertTrue(gotResponse.get());
verify(client, times(2)).threadPool();
verify(client, times(2)).admin();
verifyNoMoreInteractions(client);
verify(adminClient, times(2)).indices();
verifyNoMoreInteractions(adminClient);
verify(indicesClient, times(2)).putTemplate(any(), any());
verifyNoMoreInteractions(indicesClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ setup:
transform.delete_transform:
transform_id: "mixed-simple-continuous-transform"
---
"Test index mappings for latest internal index":
"Test index mappings for latest internal index and audit index":
- do:
transform.put_transform:
transform_id: "upgraded-simple-transform"
Expand All @@ -273,9 +273,18 @@ setup:
}
}
- match: { acknowledged: true }

- do:
transform.delete_transform:
transform_id: "upgraded-simple-transform"
- match: { acknowledged: true }
- do:
indices.get_mapping:
index: .transform-internal-003
- match: { \.transform-internal-003.mappings.dynamic: "false" }
- match: { \.transform-internal-003.mappings.properties.id.type: "keyword" }
- do:
indices.get_mapping:
index: .transform-notifications-000002
- match: { \.transform-notifications-000002.mappings.dynamic: "false" }
- match: { \.transform-notifications-000002.mappings.properties.transform_id.type: "keyword" }
- match: { \.transform-notifications-000002.mappings.properties.timestamp.type: "date" }

0 comments on commit ff47d75

Please sign in to comment.