From a05bc15db783b5bc6a7da0fc51b8b674d0eed14d Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 2 Dec 2019 15:17:18 +0100 Subject: [PATCH] [Transform] Fix possible audit logging disappearance after rolling upgrade (#49731) ensure audit index template is available during a rolling upgrade before a transform task can write to it. fixes #49730 --- .../TransformInternalIndexConstants.java | 3 +- .../persistence/TransformInternalIndex.java | 78 ++++++++++-- .../TransformPersistentTasksExecutor.java | 4 +- .../TransformInternalIndexTests.java | 112 +++++++++++++++++- .../80_transform_jobs_crud.yml | 8 +- 5 files changed, 188 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java index 574499397e9fd..474943f0e8c26 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java @@ -32,7 +32,8 @@ public final class TransformInternalIndexConstants { public static final String INDEX_NAME_PATTERN_DEPRECATED = ".data-frame-internal-*"; // audit index - public static final String AUDIT_TEMPLATE_VERSION = "000001"; + // gh #49730: upped version of audit index to 000002 + public static final String AUDIT_TEMPLATE_VERSION = "000002"; public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-"; public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*"; public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1"; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java index bd436bde9aa2d..d0b94bf6e3e52 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java @@ -315,22 +315,39 @@ private static XContentBuilder addMetaInformation(XContentBuilder builder) throw .endObject(); } - public static boolean haveLatestVersionedIndexTemplate(ClusterState state) { - return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME); - } - /** * This method should be called before any document is indexed that relies on the - * existence of the latest index template to create the internal index. The - * reason is that the standard template upgrader only runs when the master node + * existence of the latest index templates to create the internal and audit index. + * The reason is that the standard template upgrader only runs when the master node * is upgraded to the newer version. If data nodes are upgraded before master * nodes and transforms get assigned to those data nodes then without this check * the data nodes will index documents into the internal index before the necessary * index template is present and this will result in an index with completely * dynamic mappings being created (which is very bad). */ - public static void installLatestVersionedIndexTemplateIfRequired(ClusterService clusterService, Client client, - ActionListener listener) { + public static void installLatestIndexTemplatesIfRequired(ClusterService clusterService, Client client, ActionListener listener) { + + installLatestVersionedIndexTemplateIfRequired( + clusterService, + client, + ActionListener.wrap(r -> { installLatestAuditIndexTemplateIfRequired(clusterService, client, listener); }, listener::onFailure) + ); + + } + + protected static boolean haveLatestVersionedIndexTemplate(ClusterState state) { + return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME); + } + + protected static boolean haveLatestAuditIndexTemplate(ClusterState state) { + return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.AUDIT_INDEX); + } + + protected static void installLatestVersionedIndexTemplateIfRequired( + ClusterService clusterService, + Client client, + ActionListener listener + ) { // The check for existence of the template is against local cluster state, so very cheap if (haveLatestVersionedIndexTemplate(clusterService.state())) { @@ -348,13 +365,52 @@ public static void installLatestVersionedIndexTemplateIfRequired(ClusterService .settings(indexTemplateMetaData.settings()) .mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2()); ActionListener innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), TRANSFORM_ORIGIN, request, - innerListener, client.admin().indices()::putTemplate); + executeAsyncWithOrigin( + client.threadPool().getThreadContext(), + TRANSFORM_ORIGIN, + request, + innerListener, + client.admin().indices()::putTemplate + ); } catch (IOException e) { listener.onFailure(e); } } - private TransformInternalIndex() { + protected static void installLatestAuditIndexTemplateIfRequired( + ClusterService clusterService, + Client client, + ActionListener listener + ) { + + // The check for existence of the template is against local cluster state, so very cheap + if (haveLatestAuditIndexTemplate(clusterService.state())) { + listener.onResponse(null); + return; + } + + // Installing the template involves communication with the master node, so it's more expensive but much rarer + try { + IndexTemplateMetaData indexTemplateMetaData = getAuditIndexTemplateMetaData(); + BytesReference jsonMappings = new BytesArray(indexTemplateMetaData.mappings().get(SINGLE_MAPPING_NAME).uncompressed()); + PutIndexTemplateRequest request = new PutIndexTemplateRequest(TransformInternalIndexConstants.AUDIT_INDEX).patterns( + indexTemplateMetaData.patterns() + ) + .version(indexTemplateMetaData.version()) + .settings(indexTemplateMetaData.settings()) + .mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2()); + ActionListener innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure); + executeAsyncWithOrigin( + client.threadPool().getThreadContext(), + TRANSFORM_ORIGIN, + request, + innerListener, + client.admin().indices()::putTemplate + ); + } catch (IOException e) { + listener.onFailure(e); + } } + + private TransformInternalIndex() {} } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index 3c3c7c7e1a3f0..2ed273128c6bd 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -271,8 +271,8 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa } ); - // <1> Check the internal index template is installed - TransformInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, templateCheckListener); + // <1> Check the index templates are installed + TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, templateCheckListener); } private static IndexerState currentIndexerState(TransformState previousState) { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndexTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndexTests.java index 83f9b36c496ab..b49546d8f11cc 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndexTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndexTests.java @@ -38,6 +38,7 @@ public class TransformInternalIndexTests extends ESTestCase { public static ClusterState STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE; + public static ClusterState STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE; static { ImmutableOpenMap.Builder mapBuilder = ImmutableOpenMap.builder(); @@ -51,6 +52,18 @@ public class TransformInternalIndexTests extends ESTestCase { ClusterState.Builder csBuilder = ClusterState.builder(ClusterName.DEFAULT); csBuilder.metaData(metaBuilder.build()); STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE = csBuilder.build(); + + mapBuilder = ImmutableOpenMap.builder(); + try { + mapBuilder.put(TransformInternalIndexConstants.AUDIT_INDEX, TransformInternalIndex.getAuditIndexTemplateMetaData()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + metaBuilder = MetaData.builder(); + metaBuilder.templates(mapBuilder.build()); + csBuilder = ClusterState.builder(ClusterName.DEFAULT); + csBuilder.metaData(metaBuilder.build()); + STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE = csBuilder.build(); } public void testHaveLatestVersionedIndexTemplate() { @@ -81,8 +94,7 @@ public void testInstallLatestVersionedIndexTemplateIfRequired_GivenRequired() { when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); IndicesAdminClient indicesClient = mock(IndicesAdminClient.class); - doAnswer( - invocationOnMock -> { + doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; listener.onResponse(new AcknowledgedResponse(true)); @@ -112,4 +124,100 @@ public void testInstallLatestVersionedIndexTemplateIfRequired_GivenRequired() { verify(indicesClient, times(1)).putTemplate(any(), any()); verifyNoMoreInteractions(indicesClient); } + + public void testHaveLatestAuditIndexTemplate() { + + assertTrue(TransformInternalIndex.haveLatestAuditIndexTemplate(STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE)); + assertFalse(TransformInternalIndex.haveLatestAuditIndexTemplate(ClusterState.EMPTY_STATE)); + } + + public void testInstallLatestAuditIndexTemplateIfRequired_GivenNotRequired() { + + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE); + + Client client = mock(Client.class); + + AtomicBoolean gotResponse = new AtomicBoolean(false); + ActionListener testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage())); + + TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener); + + assertTrue(gotResponse.get()); + verifyNoMoreInteractions(client); + } + + public void testInstallLatestAuditIndexTemplateIfRequired_GivenRequired() { + + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); + + IndicesAdminClient indicesClient = mock(IndicesAdminClient.class); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(new AcknowledgedResponse(true)); + return null; + }).when(indicesClient).putTemplate(any(), any()); + + AdminClient adminClient = mock(AdminClient.class); + when(adminClient.indices()).thenReturn(indicesClient); + Client client = mock(Client.class); + when(client.admin()).thenReturn(adminClient); + + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(client.threadPool()).thenReturn(threadPool); + + AtomicBoolean gotResponse = new AtomicBoolean(false); + ActionListener testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage())); + + TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener); + + assertTrue(gotResponse.get()); + verify(client, times(1)).threadPool(); + verify(client, times(1)).admin(); + verifyNoMoreInteractions(client); + verify(adminClient, times(1)).indices(); + verifyNoMoreInteractions(adminClient); + verify(indicesClient, times(1)).putTemplate(any(), any()); + verifyNoMoreInteractions(indicesClient); + } + + public void testInstallLatestIndexTemplateIfRequired_GivenRequired() { + + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); + + IndicesAdminClient indicesClient = mock(IndicesAdminClient.class); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(new AcknowledgedResponse(true)); + return null; + }).when(indicesClient).putTemplate(any(), any()); + + AdminClient adminClient = mock(AdminClient.class); + when(adminClient.indices()).thenReturn(indicesClient); + Client client = mock(Client.class); + when(client.admin()).thenReturn(adminClient); + + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(client.threadPool()).thenReturn(threadPool); + + AtomicBoolean gotResponse = new AtomicBoolean(false); + ActionListener testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage())); + + TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, testListener); + + assertTrue(gotResponse.get()); + verify(client, times(2)).threadPool(); + verify(client, times(2)).admin(); + verifyNoMoreInteractions(client); + verify(adminClient, times(2)).indices(); + verifyNoMoreInteractions(adminClient); + verify(indicesClient, times(2)).putTemplate(any(), any()); + verifyNoMoreInteractions(indicesClient); + } } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_transform_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_transform_jobs_crud.yml index 9aeaf6cdf82ca..d220c0b1a51ab 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_transform_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_transform_jobs_crud.yml @@ -261,7 +261,7 @@ setup: transform_id: "mixed-simple-continuous-transform" --- -"Test index mappings for latest internal index": +"Test index mappings for latest internal index and audit index": - do: transform.put_transform: transform_id: "upgraded-simple-transform" @@ -282,3 +282,9 @@ setup: index: .transform-internal-004 - match: { \.transform-internal-004.mappings.dynamic: "false" } - match: { \.transform-internal-004.mappings.properties.id.type: "keyword" } + - do: + indices.get_mapping: + index: .transform-notifications-000002 + - match: { \.transform-notifications-000002.mappings.dynamic: "false" } + - match: { \.transform-notifications-000002.mappings.properties.transform_id.type: "keyword" } + - match: { \.transform-notifications-000002.mappings.properties.timestamp.type: "date" }