Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix possible audit logging disappearance after rolling upgrade #49731

Merged
merged 2 commits into from
Dec 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public final class TransformInternalIndexConstants {
public static final String INDEX_NAME_PATTERN_DEPRECATED = ".data-frame-internal-*";

// audit index
public static final String AUDIT_TEMPLATE_VERSION = "000001";
// gh #49730: upped version of audit index to 000002
public static final String AUDIT_TEMPLATE_VERSION = "000002";
public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-";
public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*";
public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,22 +315,39 @@ private static XContentBuilder addMetaInformation(XContentBuilder builder) throw
.endObject();
}

public static boolean haveLatestVersionedIndexTemplate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME);
}

/**
* This method should be called before any document is indexed that relies on the
* existence of the latest index template to create the internal index. The
* reason is that the standard template upgrader only runs when the master node
* existence of the latest index templates to create the internal and audit index.
* The reason is that the standard template upgrader only runs when the master node
* is upgraded to the newer version. If data nodes are upgraded before master
* nodes and transforms get assigned to those data nodes then without this check
* the data nodes will index documents into the internal index before the necessary
* index template is present and this will result in an index with completely
* dynamic mappings being created (which is very bad).
*/
public static void installLatestVersionedIndexTemplateIfRequired(ClusterService clusterService, Client client,
ActionListener<Void> listener) {
public static void installLatestIndexTemplatesIfRequired(ClusterService clusterService, Client client, ActionListener<Void> listener) {

installLatestVersionedIndexTemplateIfRequired(
clusterService,
client,
ActionListener.wrap(r -> { installLatestAuditIndexTemplateIfRequired(clusterService, client, listener); }, listener::onFailure)
);

}

protected static boolean haveLatestVersionedIndexTemplate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME);
}

protected static boolean haveLatestAuditIndexTemplate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.AUDIT_INDEX);
}

protected static void installLatestVersionedIndexTemplateIfRequired(
ClusterService clusterService,
Client client,
ActionListener<Void> listener
) {

// The check for existence of the template is against local cluster state, so very cheap
if (haveLatestVersionedIndexTemplate(clusterService.state())) {
Expand All @@ -348,13 +365,52 @@ public static void installLatestVersionedIndexTemplateIfRequired(ClusterService
.settings(indexTemplateMetaData.settings())
.mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2());
ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), TRANSFORM_ORIGIN, request,
innerListener, client.admin().indices()::putTemplate);
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
TRANSFORM_ORIGIN,
request,
innerListener,
client.admin().indices()::putTemplate
);
} catch (IOException e) {
listener.onFailure(e);
}
}

private TransformInternalIndex() {
protected static void installLatestAuditIndexTemplateIfRequired(
ClusterService clusterService,
Client client,
ActionListener<Void> listener
) {

// The check for existence of the template is against local cluster state, so very cheap
if (haveLatestAuditIndexTemplate(clusterService.state())) {
listener.onResponse(null);
return;
}

// Installing the template involves communication with the master node, so it's more expensive but much rarer
try {
IndexTemplateMetaData indexTemplateMetaData = getAuditIndexTemplateMetaData();
BytesReference jsonMappings = new BytesArray(indexTemplateMetaData.mappings().get(SINGLE_MAPPING_NAME).uncompressed());
PutIndexTemplateRequest request = new PutIndexTemplateRequest(TransformInternalIndexConstants.AUDIT_INDEX).patterns(
indexTemplateMetaData.patterns()
)
.version(indexTemplateMetaData.version())
.settings(indexTemplateMetaData.settings())
.mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2());
ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure);
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
TRANSFORM_ORIGIN,
request,
innerListener,
client.admin().indices()::putTemplate
);
} catch (IOException e) {
listener.onFailure(e);
}
}

private TransformInternalIndex() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa
}
);

// <1> Check the internal index template is installed
TransformInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, templateCheckListener);
// <1> Check the index templates are installed
TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, templateCheckListener);
}

private static IndexerState currentIndexerState(TransformState previousState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
public class TransformInternalIndexTests extends ESTestCase {

public static ClusterState STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE;
public static ClusterState STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE;

static {
ImmutableOpenMap.Builder<String, IndexTemplateMetaData> mapBuilder = ImmutableOpenMap.builder();
Expand All @@ -51,6 +52,18 @@ public class TransformInternalIndexTests extends ESTestCase {
ClusterState.Builder csBuilder = ClusterState.builder(ClusterName.DEFAULT);
csBuilder.metaData(metaBuilder.build());
STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE = csBuilder.build();

mapBuilder = ImmutableOpenMap.builder();
try {
mapBuilder.put(TransformInternalIndexConstants.AUDIT_INDEX, TransformInternalIndex.getAuditIndexTemplateMetaData());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
metaBuilder = MetaData.builder();
metaBuilder.templates(mapBuilder.build());
csBuilder = ClusterState.builder(ClusterName.DEFAULT);
csBuilder.metaData(metaBuilder.build());
STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE = csBuilder.build();
}

public void testHaveLatestVersionedIndexTemplate() {
Expand Down Expand Up @@ -81,8 +94,7 @@ public void testInstallLatestVersionedIndexTemplateIfRequired_GivenRequired() {
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);

IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
doAnswer(
invocationOnMock -> {
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new AcknowledgedResponse(true));
Expand Down Expand Up @@ -112,4 +124,100 @@ public void testInstallLatestVersionedIndexTemplateIfRequired_GivenRequired() {
verify(indicesClient, times(1)).putTemplate(any(), any());
verifyNoMoreInteractions(indicesClient);
}

public void testHaveLatestAuditIndexTemplate() {

assertTrue(TransformInternalIndex.haveLatestAuditIndexTemplate(STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE));
assertFalse(TransformInternalIndex.haveLatestAuditIndexTemplate(ClusterState.EMPTY_STATE));
}

public void testInstallLatestAuditIndexTemplateIfRequired_GivenNotRequired() {

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE);

Client client = mock(Client.class);

AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));

TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener);

assertTrue(gotResponse.get());
verifyNoMoreInteractions(client);
}

public void testInstallLatestAuditIndexTemplateIfRequired_GivenRequired() {

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);

IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new AcknowledgedResponse(true));
return null;
}).when(indicesClient).putTemplate(any(), any());

AdminClient adminClient = mock(AdminClient.class);
when(adminClient.indices()).thenReturn(indicesClient);
Client client = mock(Client.class);
when(client.admin()).thenReturn(adminClient);

ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(client.threadPool()).thenReturn(threadPool);

AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));

TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener);

assertTrue(gotResponse.get());
verify(client, times(1)).threadPool();
verify(client, times(1)).admin();
verifyNoMoreInteractions(client);
verify(adminClient, times(1)).indices();
verifyNoMoreInteractions(adminClient);
verify(indicesClient, times(1)).putTemplate(any(), any());
verifyNoMoreInteractions(indicesClient);
}

public void testInstallLatestIndexTemplateIfRequired_GivenRequired() {

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);

IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new AcknowledgedResponse(true));
return null;
}).when(indicesClient).putTemplate(any(), any());

AdminClient adminClient = mock(AdminClient.class);
when(adminClient.indices()).thenReturn(indicesClient);
Client client = mock(Client.class);
when(client.admin()).thenReturn(adminClient);

ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(client.threadPool()).thenReturn(threadPool);

AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));

TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, testListener);

assertTrue(gotResponse.get());
verify(client, times(2)).threadPool();
verify(client, times(2)).admin();
verifyNoMoreInteractions(client);
verify(adminClient, times(2)).indices();
verifyNoMoreInteractions(adminClient);
verify(indicesClient, times(2)).putTemplate(any(), any());
verifyNoMoreInteractions(indicesClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ setup:
transform_id: "mixed-simple-continuous-transform"

---
"Test index mappings for latest internal index":
"Test index mappings for latest internal index and audit index":
- do:
transform.put_transform:
transform_id: "upgraded-simple-transform"
Expand All @@ -282,3 +282,9 @@ setup:
index: .transform-internal-004
- match: { \.transform-internal-004.mappings.dynamic: "false" }
- match: { \.transform-internal-004.mappings.properties.id.type: "keyword" }
- do:
indices.get_mapping:
index: .transform-notifications-000002
- match: { \.transform-notifications-000002.mappings.dynamic: "false" }
- match: { \.transform-notifications-000002.mappings.properties.transform_id.type: "keyword" }
- match: { \.transform-notifications-000002.mappings.properties.timestamp.type: "date" }