diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java index cb1d41d5ea21c..6790de854818c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java @@ -32,7 +32,8 @@ public final class TransformInternalIndexConstants { public static final String INDEX_NAME_PATTERN_DEPRECATED = ".data-frame-internal-*"; // audit index - public static final String AUDIT_TEMPLATE_VERSION = "000001"; + // gh #49730: upped version of audit index to 000002 + public static final String AUDIT_TEMPLATE_VERSION = "000002"; public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-"; public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*"; public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1"; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteTransformAction.java index 5e6f65e8b823b..0df3be9f52aab 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteTransformAction.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.transform.action; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; @@ -35,6 +37,8 @@ public class TransportDeleteTransformAction extends TransportMasterNodeAction { + private static final Logger logger = LogManager.getLogger(TransportDeleteTransformAction.class); + private final TransformConfigManager transformConfigManager; private final TransformAuditor auditor; private final Client client; @@ -80,6 +84,7 @@ protected void masterOperation(Request request, ClusterState state, stopResponse -> transformConfigManager.deleteTransform(request.getId(), ActionListener.wrap( r -> { + logger.debug("[{}] deleted transform", request.getId()); auditor.info(request.getId(), "Deleted transform."); listener.onResponse(new AcknowledgedResponse(r)); }, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java index 378ae0add5150..7041bc16a83d4 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.transform.action; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; @@ -59,6 +61,8 @@ public class TransportPutTransformAction extends TransportMasterNodeAction { + private static final Logger logger = LogManager.getLogger(TransportPutTransformAction.class); + private final XPackLicenseState licenseState; private final Client client; private final TransformConfigManager transformConfigManager; @@ -222,6 +226,7 @@ private void putTransform(Request request, ActionListener // <3> Return to the listener ActionListener putTransformConfigurationListener = ActionListener.wrap( putTransformConfigurationResult -> { + logger.debug("[{}] created transform", config.getId()); auditor.info(config.getId(), "Created transform."); listener.onResponse(new AcknowledgedResponse(true)); }, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java index 96870ab41ce74..97ddfbee0a2e6 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java @@ -313,23 +313,39 @@ private static XContentBuilder addMetaInformation(XContentBuilder builder) throw .endObject(); } - public static boolean haveLatestVersionedIndexTemplate(ClusterState state) { - return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME); - } - /** * This method should be called before any document is indexed that relies on the - * existence of the latest index template to create the internal index. The - * reason is that the standard template upgrader only runs when the master node + * existence of the latest index templates to create the internal and audit index. + * The reason is that the standard template upgrader only runs when the master node * is upgraded to the newer version. If data nodes are upgraded before master * nodes and transforms get assigned to those data nodes then without this check * the data nodes will index documents into the internal index before the necessary * index template is present and this will result in an index with completely * dynamic mappings being created (which is very bad). */ - public static void installLatestVersionedIndexTemplateIfRequired(ClusterService clusterService, Client client, - ActionListener listener) { + public static void installLatestIndexTemplatesIfRequired(ClusterService clusterService, Client client, ActionListener listener) { + + installLatestVersionedIndexTemplateIfRequired( + clusterService, + client, + ActionListener.wrap(r -> { installLatestAuditIndexTemplateIfRequired(clusterService, client, listener); }, listener::onFailure) + ); + } + + protected static boolean haveLatestVersionedIndexTemplate(ClusterState state) { + return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME); + } + + protected static boolean haveLatestAuditIndexTemplate(ClusterState state) { + return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.AUDIT_INDEX); + } + + protected static void installLatestVersionedIndexTemplateIfRequired( + ClusterService clusterService, + Client client, + ActionListener listener + ) { // The check for existence of the template is against local cluster state, so very cheap if (haveLatestVersionedIndexTemplate(clusterService.state())) { listener.onResponse(null); @@ -348,6 +364,41 @@ public static void installLatestVersionedIndexTemplateIfRequired(ClusterService .alias(new Alias(".data-frame-internal-3")) .mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2()); ActionListener innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure); + executeAsyncWithOrigin( + client.threadPool().getThreadContext(), + TRANSFORM_ORIGIN, + request, + innerListener, + client.admin().indices()::putTemplate + ); + } catch (IOException e) { + listener.onFailure(e); + } + } + + protected static void installLatestAuditIndexTemplateIfRequired( + ClusterService clusterService, + Client client, + ActionListener listener + ) { + + // The check for existence of the template is against local cluster state, so very cheap + if (haveLatestAuditIndexTemplate(clusterService.state())) { + listener.onResponse(null); + return; + } + + // Installing the template involves communication with the master node, so it's more expensive but much rarer + try { + IndexTemplateMetaData indexTemplateMetaData = getAuditIndexTemplateMetaData(); + BytesReference jsonMappings = new BytesArray(indexTemplateMetaData.mappings().get(SINGLE_MAPPING_NAME).uncompressed()); + PutIndexTemplateRequest request = new PutIndexTemplateRequest(TransformInternalIndexConstants.AUDIT_INDEX).patterns( + indexTemplateMetaData.patterns() + ) + .version(indexTemplateMetaData.version()) + .settings(indexTemplateMetaData.settings()) + .mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2()); + ActionListener innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure); executeAsyncWithOrigin(client.threadPool().getThreadContext(), TRANSFORM_ORIGIN, request, innerListener, client.admin().indices()::putTemplate); } catch (IOException e) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index da8235b6c219b..f6cd240c87a86 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -291,8 +291,8 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa } ); - // <1> Check the internal index template is installed - TransformInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, templateCheckListener); + // <1> Check the index templates are installed + TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, templateCheckListener); } private static IndexerState currentIndexerState(TransformState previousState) { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndexTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndexTests.java index 83f9b36c496ab..b49546d8f11cc 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndexTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndexTests.java @@ -38,6 +38,7 @@ public class TransformInternalIndexTests extends ESTestCase { public static ClusterState STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE; + public static ClusterState STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE; static { ImmutableOpenMap.Builder mapBuilder = ImmutableOpenMap.builder(); @@ -51,6 +52,18 @@ public class TransformInternalIndexTests extends ESTestCase { ClusterState.Builder csBuilder = ClusterState.builder(ClusterName.DEFAULT); csBuilder.metaData(metaBuilder.build()); STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE = csBuilder.build(); + + mapBuilder = ImmutableOpenMap.builder(); + try { + mapBuilder.put(TransformInternalIndexConstants.AUDIT_INDEX, TransformInternalIndex.getAuditIndexTemplateMetaData()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + metaBuilder = MetaData.builder(); + metaBuilder.templates(mapBuilder.build()); + csBuilder = ClusterState.builder(ClusterName.DEFAULT); + csBuilder.metaData(metaBuilder.build()); + STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE = csBuilder.build(); } public void testHaveLatestVersionedIndexTemplate() { @@ -81,8 +94,7 @@ public void testInstallLatestVersionedIndexTemplateIfRequired_GivenRequired() { when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); IndicesAdminClient indicesClient = mock(IndicesAdminClient.class); - doAnswer( - invocationOnMock -> { + doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; listener.onResponse(new AcknowledgedResponse(true)); @@ -112,4 +124,100 @@ public void testInstallLatestVersionedIndexTemplateIfRequired_GivenRequired() { verify(indicesClient, times(1)).putTemplate(any(), any()); verifyNoMoreInteractions(indicesClient); } + + public void testHaveLatestAuditIndexTemplate() { + + assertTrue(TransformInternalIndex.haveLatestAuditIndexTemplate(STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE)); + assertFalse(TransformInternalIndex.haveLatestAuditIndexTemplate(ClusterState.EMPTY_STATE)); + } + + public void testInstallLatestAuditIndexTemplateIfRequired_GivenNotRequired() { + + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE); + + Client client = mock(Client.class); + + AtomicBoolean gotResponse = new AtomicBoolean(false); + ActionListener testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage())); + + TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener); + + assertTrue(gotResponse.get()); + verifyNoMoreInteractions(client); + } + + public void testInstallLatestAuditIndexTemplateIfRequired_GivenRequired() { + + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); + + IndicesAdminClient indicesClient = mock(IndicesAdminClient.class); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(new AcknowledgedResponse(true)); + return null; + }).when(indicesClient).putTemplate(any(), any()); + + AdminClient adminClient = mock(AdminClient.class); + when(adminClient.indices()).thenReturn(indicesClient); + Client client = mock(Client.class); + when(client.admin()).thenReturn(adminClient); + + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(client.threadPool()).thenReturn(threadPool); + + AtomicBoolean gotResponse = new AtomicBoolean(false); + ActionListener testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage())); + + TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener); + + assertTrue(gotResponse.get()); + verify(client, times(1)).threadPool(); + verify(client, times(1)).admin(); + verifyNoMoreInteractions(client); + verify(adminClient, times(1)).indices(); + verifyNoMoreInteractions(adminClient); + verify(indicesClient, times(1)).putTemplate(any(), any()); + verifyNoMoreInteractions(indicesClient); + } + + public void testInstallLatestIndexTemplateIfRequired_GivenRequired() { + + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); + + IndicesAdminClient indicesClient = mock(IndicesAdminClient.class); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(new AcknowledgedResponse(true)); + return null; + }).when(indicesClient).putTemplate(any(), any()); + + AdminClient adminClient = mock(AdminClient.class); + when(adminClient.indices()).thenReturn(indicesClient); + Client client = mock(Client.class); + when(client.admin()).thenReturn(adminClient); + + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(client.threadPool()).thenReturn(threadPool); + + AtomicBoolean gotResponse = new AtomicBoolean(false); + ActionListener testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage())); + + TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, testListener); + + assertTrue(gotResponse.get()); + verify(client, times(2)).threadPool(); + verify(client, times(2)).admin(); + verifyNoMoreInteractions(client); + verify(adminClient, times(2)).indices(); + verifyNoMoreInteractions(adminClient); + verify(indicesClient, times(2)).putTemplate(any(), any()); + verifyNoMoreInteractions(indicesClient); + } } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml index b567f8bdab700..6f8c0457486b9 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml @@ -258,7 +258,7 @@ setup: transform.delete_transform: transform_id: "mixed-simple-continuous-transform" --- -"Test index mappings for latest internal index": +"Test index mappings for latest internal index and audit index": - do: transform.put_transform: transform_id: "upgraded-simple-transform" @@ -273,9 +273,18 @@ setup: } } - match: { acknowledged: true } - + - do: + transform.delete_transform: + transform_id: "upgraded-simple-transform" + - match: { acknowledged: true } - do: indices.get_mapping: index: .transform-internal-003 - match: { \.transform-internal-003.mappings.dynamic: "false" } - match: { \.transform-internal-003.mappings.properties.id.type: "keyword" } + - do: + indices.get_mapping: + index: .transform-notifications-000002 + - match: { \.transform-notifications-000002.mappings.dynamic: "false" } + - match: { \.transform-notifications-000002.mappings.properties.transform_id.type: "keyword" } + - match: { \.transform-notifications-000002.mappings.properties.timestamp.type: "date" }