Skip to content

Commit

Permalink
ensure audit index template is available during a rolling upgrade bef…
Browse files Browse the repository at this point in the history
…ore a

transform task can write to it.

fixes #49730
  • Loading branch information
Hendrik Muhs committed Nov 30, 2019
1 parent 4f1ebfa commit 10025e8
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public final class TransformInternalIndexConstants {
public static final String INDEX_NAME_PATTERN_DEPRECATED = ".data-frame-internal-*";

// audit index
public static final String AUDIT_TEMPLATE_VERSION = "000001";
// gh #49730: upped version of audit index to 000002
public static final String AUDIT_TEMPLATE_VERSION = "000002";
public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-";
public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*";
public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,22 +315,39 @@ private static XContentBuilder addMetaInformation(XContentBuilder builder) throw
.endObject();
}

public static boolean haveLatestVersionedIndexTemplate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME);
}

/**
* This method should be called before any document is indexed that relies on the
* existence of the latest index template to create the internal index. The
* reason is that the standard template upgrader only runs when the master node
* existence of the latest index templates to create the internal and audit index.
* The reason is that the standard template upgrader only runs when the master node
* is upgraded to the newer version. If data nodes are upgraded before master
* nodes and transforms get assigned to those data nodes then without this check
* the data nodes will index documents into the internal index before the necessary
* index template is present and this will result in an index with completely
* dynamic mappings being created (which is very bad).
*/
public static void installLatestVersionedIndexTemplateIfRequired(ClusterService clusterService, Client client,
ActionListener<Void> listener) {
public static void installLatestIndexTemplatesIfRequired(ClusterService clusterService, Client client, ActionListener<Void> listener) {

installLatestVersionedIndexTemplateIfRequired(
clusterService,
client,
ActionListener.wrap(r -> { installLatestAuditIndexTemplateIfRequired(clusterService, client, listener); }, listener::onFailure)
);

}

protected static boolean haveLatestVersionedIndexTemplate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME);
}

protected static boolean haveLatestAuditIndexTemplate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.AUDIT_INDEX);
}

protected static void installLatestVersionedIndexTemplateIfRequired(
ClusterService clusterService,
Client client,
ActionListener<Void> listener
) {

// The check for existence of the template is against local cluster state, so very cheap
if (haveLatestVersionedIndexTemplate(clusterService.state())) {
Expand All @@ -348,13 +365,52 @@ public static void installLatestVersionedIndexTemplateIfRequired(ClusterService
.settings(indexTemplateMetaData.settings())
.mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2());
ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), TRANSFORM_ORIGIN, request,
innerListener, client.admin().indices()::putTemplate);
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
TRANSFORM_ORIGIN,
request,
innerListener,
client.admin().indices()::putTemplate
);
} catch (IOException e) {
listener.onFailure(e);
}
}

private TransformInternalIndex() {
protected static void installLatestAuditIndexTemplateIfRequired(
ClusterService clusterService,
Client client,
ActionListener<Void> listener
) {

// The check for existence of the template is against local cluster state, so very cheap
if (haveLatestAuditIndexTemplate(clusterService.state())) {
listener.onResponse(null);
return;
}

// Installing the template involves communication with the master node, so it's more expensive but much rarer
try {
IndexTemplateMetaData indexTemplateMetaData = getAuditIndexTemplateMetaData();
BytesReference jsonMappings = new BytesArray(indexTemplateMetaData.mappings().get(SINGLE_MAPPING_NAME).uncompressed());
PutIndexTemplateRequest request = new PutIndexTemplateRequest(TransformInternalIndexConstants.AUDIT_INDEX).patterns(
indexTemplateMetaData.patterns()
)
.version(indexTemplateMetaData.version())
.settings(indexTemplateMetaData.settings())
.mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2());
ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure);
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
TRANSFORM_ORIGIN,
request,
innerListener,
client.admin().indices()::putTemplate
);
} catch (IOException e) {
listener.onFailure(e);
}
}

private TransformInternalIndex() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa
}
);

// <1> Check the internal index template is installed
TransformInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, templateCheckListener);
// <1> Check the index templates are installed
TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, templateCheckListener);
}

private static IndexerState currentIndexerState(TransformState previousState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
public class TransformInternalIndexTests extends ESTestCase {

public static ClusterState STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE;
public static ClusterState STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE;

static {
ImmutableOpenMap.Builder<String, IndexTemplateMetaData> mapBuilder = ImmutableOpenMap.builder();
Expand All @@ -51,6 +52,18 @@ public class TransformInternalIndexTests extends ESTestCase {
ClusterState.Builder csBuilder = ClusterState.builder(ClusterName.DEFAULT);
csBuilder.metaData(metaBuilder.build());
STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE = csBuilder.build();

mapBuilder = ImmutableOpenMap.builder();
try {
mapBuilder.put(TransformInternalIndexConstants.AUDIT_INDEX, TransformInternalIndex.getAuditIndexTemplateMetaData());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
metaBuilder = MetaData.builder();
metaBuilder.templates(mapBuilder.build());
csBuilder = ClusterState.builder(ClusterName.DEFAULT);
csBuilder.metaData(metaBuilder.build());
STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE = csBuilder.build();
}

public void testHaveLatestVersionedIndexTemplate() {
Expand Down Expand Up @@ -81,8 +94,7 @@ public void testInstallLatestVersionedIndexTemplateIfRequired_GivenRequired() {
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);

IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
doAnswer(
invocationOnMock -> {
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new AcknowledgedResponse(true));
Expand Down Expand Up @@ -112,4 +124,100 @@ public void testInstallLatestVersionedIndexTemplateIfRequired_GivenRequired() {
verify(indicesClient, times(1)).putTemplate(any(), any());
verifyNoMoreInteractions(indicesClient);
}

public void testHaveLatestAuditIndexTemplate() {

assertTrue(TransformInternalIndex.haveLatestAuditIndexTemplate(STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE));
assertFalse(TransformInternalIndex.haveLatestAuditIndexTemplate(ClusterState.EMPTY_STATE));
}

public void testInstallLatestAuditIndexTemplateIfRequired_GivenNotRequired() {

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE);

Client client = mock(Client.class);

AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));

TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener);

assertTrue(gotResponse.get());
verifyNoMoreInteractions(client);
}

public void testInstallLatestAuditIndexTemplateIfRequired_GivenRequired() {

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);

IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new AcknowledgedResponse(true));
return null;
}).when(indicesClient).putTemplate(any(), any());

AdminClient adminClient = mock(AdminClient.class);
when(adminClient.indices()).thenReturn(indicesClient);
Client client = mock(Client.class);
when(client.admin()).thenReturn(adminClient);

ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(client.threadPool()).thenReturn(threadPool);

AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));

TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener);

assertTrue(gotResponse.get());
verify(client, times(1)).threadPool();
verify(client, times(1)).admin();
verifyNoMoreInteractions(client);
verify(adminClient, times(1)).indices();
verifyNoMoreInteractions(adminClient);
verify(indicesClient, times(1)).putTemplate(any(), any());
verifyNoMoreInteractions(indicesClient);
}

public void testInstallLatestIndexTemplateIfRequired_GivenRequired() {

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);

IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new AcknowledgedResponse(true));
return null;
}).when(indicesClient).putTemplate(any(), any());

AdminClient adminClient = mock(AdminClient.class);
when(adminClient.indices()).thenReturn(indicesClient);
Client client = mock(Client.class);
when(client.admin()).thenReturn(adminClient);

ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(client.threadPool()).thenReturn(threadPool);

AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));

TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, testListener);

assertTrue(gotResponse.get());
verify(client, times(2)).threadPool();
verify(client, times(2)).admin();
verifyNoMoreInteractions(client);
verify(adminClient, times(2)).indices();
verifyNoMoreInteractions(adminClient);
verify(indicesClient, times(2)).putTemplate(any(), any());
verifyNoMoreInteractions(indicesClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ setup:
transform_id: "mixed-simple-continuous-transform"

---
"Test index mappings for latest internal index":
"Test index mappings for latest internal index and audit index":
- do:
transform.put_transform:
transform_id: "upgraded-simple-transform"
Expand All @@ -282,3 +282,9 @@ setup:
index: .transform-internal-004
- match: { \.transform-internal-004.mappings.dynamic: "false" }
- match: { \.transform-internal-004.mappings.properties.id.type: "keyword" }
- do:
indices.get_mapping:
index: .transform-notifications-000002
- match: { \.transform-notifications-000002.mappings.dynamic: "false" }
- match: { \.transform-notifications-000002.mappings.properties.transform_id.type: "keyword" }
- match: { \.transform-notifications-000002.mappings.properties.timestamp.type: "date" }

0 comments on commit 10025e8

Please sign in to comment.