diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 861f386a90966..1b4af63bc789a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -20,7 +20,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -293,7 +292,7 @@ public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) { return this; } - public Builder putDatafeed(DatafeedConfig datafeedConfig, ThreadContext threadContext) { + public Builder putDatafeed(DatafeedConfig datafeedConfig, Map headers) { if (datafeeds.containsKey(datafeedConfig.getId())) { throw new ResourceAlreadyExistsException("A datafeed with id [" + datafeedConfig.getId() + "] already exists"); } @@ -302,13 +301,13 @@ public Builder putDatafeed(DatafeedConfig datafeedConfig, ThreadContext threadCo Job job = jobs.get(jobId); DatafeedJobValidator.validate(datafeedConfig, job); - if (threadContext != null) { + if (headers.isEmpty() == false) { // Adjust the request, adding security headers from the current thread context DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedConfig); - Map headers = threadContext.getHeaders().entrySet().stream() + Map securityHeaders = headers.entrySet().stream() .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - builder.setHeaders(headers); + builder.setHeaders(securityHeaders); datafeedConfig = builder.build(); } @@ -328,7 +327,7 @@ private void checkJobIsAvailableForDatafeed(String jobId) { } } - public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksCustomMetaData persistentTasks, ThreadContext threadContext) { + public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksCustomMetaData persistentTasks, Map headers) { String datafeedId = update.getId(); DatafeedConfig oldDatafeedConfig = datafeeds.get(datafeedId); if (oldDatafeedConfig == null) { @@ -336,7 +335,7 @@ public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksCustomMetaDa } checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE, datafeedId, DatafeedState.STARTED), datafeedId, persistentTasks); - DatafeedConfig newDatafeedConfig = update.apply(oldDatafeedConfig, threadContext); + DatafeedConfig newDatafeedConfig = update.apply(oldDatafeedConfig, headers); if (newDatafeedConfig.getJobId().equals(oldDatafeedConfig.getJobId()) == false) { checkJobIsAvailableForDatafeed(newDatafeedConfig.getJobId()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index 444532a7e3f15..f04994792e92a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -264,7 +264,7 @@ ChunkingConfig getChunkingConfig() { * Applies the update to the given {@link DatafeedConfig} * @return a new {@link DatafeedConfig} that contains the update */ - public DatafeedConfig apply(DatafeedConfig datafeedConfig, ThreadContext threadContext) { + public DatafeedConfig apply(DatafeedConfig datafeedConfig, Map headers) { if (id.equals(datafeedConfig.getId()) == false) { throw new IllegalArgumentException("Cannot apply update to datafeedConfig with different id"); } @@ -301,12 +301,12 @@ public DatafeedConfig apply(DatafeedConfig datafeedConfig, ThreadContext threadC builder.setChunkingConfig(chunkingConfig); } - if (threadContext != null) { + if (headers.isEmpty() == false) { // Adjust the request, adding security headers from the current thread context - Map headers = threadContext.getHeaders().entrySet().stream() + Map securityHeaders = headers.entrySet().stream() .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - builder.setHeaders(headers); + builder.setHeaders(securityHeaders); } return builder.build(); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java index d059e567d1588..358f9d1c97bd7 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java @@ -114,7 +114,7 @@ public void testApply_failBecauseTargetDatafeedHasDifferentId() { public void testApply_givenEmptyUpdate() { DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo"); - DatafeedConfig updatedDatafeed = new DatafeedUpdate.Builder(datafeed.getId()).build().apply(datafeed, null); + DatafeedConfig updatedDatafeed = new DatafeedUpdate.Builder(datafeed.getId()).build().apply(datafeed, Collections.emptyMap()); assertThat(datafeed, equalTo(updatedDatafeed)); } @@ -125,7 +125,7 @@ public void testApply_givenPartialUpdate() { DatafeedUpdate.Builder updated = new DatafeedUpdate.Builder(datafeed.getId()); updated.setScrollSize(datafeed.getScrollSize() + 1); - DatafeedConfig updatedDatafeed = update.build().apply(datafeed, null); + DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap()); DatafeedConfig.Builder expectedDatafeed = new DatafeedConfig.Builder(datafeed); expectedDatafeed.setScrollSize(datafeed.getScrollSize() + 1); @@ -149,7 +149,7 @@ public void testApply_givenFullUpdateNoAggregations() { update.setScrollSize(8000); update.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1))); - DatafeedConfig updatedDatafeed = update.build().apply(datafeed, null); + DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap()); assertThat(updatedDatafeed.getJobId(), equalTo("bar")); assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_2"))); @@ -175,7 +175,7 @@ public void testApply_givenAggregations() { update.setAggregations(new AggregatorFactories.Builder().addAggregator( AggregationBuilders.histogram("a").interval(300000).field("time").subAggregation(maxTime))); - DatafeedConfig updatedDatafeed = update.build().apply(datafeed, null); + DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap()); assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_1"))); assertThat(updatedDatafeed.getTypes(), equalTo(Collections.singletonList("t_1"))); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java index 08a9dfb09c1d9..88c72578023f9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java @@ -41,6 +41,7 @@ import org.elasticsearch.xpack.core.security.support.Exceptions; import java.io.IOException; +import java.util.Map; public class TransportPutDatafeedAction extends TransportMasterNodeAction { @@ -95,7 +96,7 @@ protected void masterOperation(PutDatafeedAction.Request request, ClusterState s client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); } else { - putDatafeed(request, listener); + putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener); } } @@ -103,7 +104,7 @@ private void handlePrivsResponse(String username, PutDatafeedAction.Request requ HasPrivilegesResponse response, ActionListener listener) throws IOException { if (response.isCompleteMatch()) { - putDatafeed(request, listener); + putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener); } else { XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -120,7 +121,8 @@ private void handlePrivsResponse(String username, PutDatafeedAction.Request requ } } - private void putDatafeed(PutDatafeedAction.Request request, ActionListener listener) { + private void putDatafeed(PutDatafeedAction.Request request, Map headers, + ActionListener listener) { clusterService.submitStateUpdateTask( "put-datafeed-" + request.getDatafeed().getId(), @@ -136,16 +138,16 @@ protected PutDatafeedAction.Response newResponse(boolean acknowledged) { @Override public ClusterState execute(ClusterState currentState) { - return putDatafeed(request, currentState); + return putDatafeed(request, headers, currentState); } }); } - private ClusterState putDatafeed(PutDatafeedAction.Request request, ClusterState clusterState) { + private ClusterState putDatafeed(PutDatafeedAction.Request request, Map headers, ClusterState clusterState) { XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); MlMetadata currentMetadata = MlMetadata.getMlMetadata(clusterState); MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata) - .putDatafeed(request.getDatafeed(), threadPool.getThreadContext()).build(); + .putDatafeed(request.getDatafeed(), headers).build(); return ClusterState.builder(clusterState).metaData( MetaData.builder(clusterState.getMetaData()).putCustom(MLMetadataField.TYPE, newMetadata).build()) .build(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java index 4d752fe294081..4e43cbb185330 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java @@ -27,6 +27,8 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import java.util.Map; + public class TransportUpdateDatafeedAction extends TransportMasterNodeAction { @Inject @@ -50,6 +52,8 @@ protected PutDatafeedAction.Response newResponse() { @Override protected void masterOperation(UpdateDatafeedAction.Request request, ClusterState state, ActionListener listener) { + final Map headers = threadPool.getThreadContext().getHeaders(); + clusterService.submitStateUpdateTask("update-datafeed-" + request.getUpdate().getId(), new AckedClusterStateUpdateTask(request, listener) { private volatile DatafeedConfig updatedDatafeed; @@ -69,7 +73,7 @@ public ClusterState execute(ClusterState currentState) { PersistentTasksCustomMetaData persistentTasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata) - .updateDatafeed(update, persistentTasks, threadPool.getThreadContext()).build(); + .updateDatafeed(update, persistentTasks, headers).build(); updatedDatafeed = newMetadata.getDatafeed(update.getId()); return ClusterState.builder(currentState).metaData( MetaData.builder(currentState.getMetaData()).putCustom(MLMetadataField.TYPE, newMetadata).build()).build(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 8049b5655d63b..e03ca45db7a59 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -30,9 +30,11 @@ import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.Map; import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; @@ -42,6 +44,7 @@ import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; @@ -63,7 +66,7 @@ protected MlMetadata createTestInstance() { } job = new Job.Builder(job).setAnalysisConfig(analysisConfig).build(); builder.putJob(job, false); - builder.putDatafeed(datafeedConfig, null); + builder.putDatafeed(datafeedConfig, Collections.emptyMap()); } else { builder.putJob(job, false); } @@ -164,7 +167,7 @@ public void testRemoveJob_failDatafeedRefersToJob() { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder.deleteJob(job1.getId(), new PersistentTasksCustomMetaData(0L, Collections.emptyMap()))); @@ -184,7 +187,7 @@ public void testCrudDatafeed() { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); MlMetadata result = builder.build(); assertThat(result.getJobs().get("job_id"), sameInstance(job1)); @@ -201,7 +204,7 @@ public void testPutDatafeed_failBecauseJobDoesNotExist() { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", "missing-job").build(); MlMetadata.Builder builder = new MlMetadata.Builder(); - expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1, null)); + expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap())); } public void testPutDatafeed_failBecauseJobIsBeingDeleted() { @@ -210,7 +213,7 @@ public void testPutDatafeed_failBecauseJobIsBeingDeleted() { MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1, null)); + expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap())); } public void testPutDatafeed_failBecauseDatafeedIdIsAlreadyTaken() { @@ -218,9 +221,9 @@ public void testPutDatafeed_failBecauseDatafeedIdIsAlreadyTaken() { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); - expectThrows(ResourceAlreadyExistsException.class, () -> builder.putDatafeed(datafeedConfig1, null)); + expectThrows(ResourceAlreadyExistsException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap())); } public void testPutDatafeed_failBecauseJobAlreadyHasDatafeed() { @@ -229,10 +232,10 @@ public void testPutDatafeed_failBecauseJobAlreadyHasDatafeed() { DatafeedConfig datafeedConfig2 = createDatafeedConfig("datafeed2", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> builder.putDatafeed(datafeedConfig2, null)); + () -> builder.putDatafeed(datafeedConfig2, Collections.emptyMap())); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); } @@ -246,7 +249,23 @@ public void testPutDatafeed_failBecauseJobIsNotCompatibleForDatafeed() { MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1.build(now), false); - expectThrows(ElasticsearchStatusException.class, () -> builder.putDatafeed(datafeedConfig1, null)); + expectThrows(ElasticsearchStatusException.class, () -> builder.putDatafeed(datafeedConfig1, Collections.emptyMap())); + } + + public void testPutDatafeed_setsSecurityHeaders() { + Job datafeedJob = createDatafeedJob().build(new Date()); + DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", datafeedJob.getId()).build(); + MlMetadata.Builder builder = new MlMetadata.Builder(); + builder.putJob(datafeedJob, false); + + Map headers = new HashMap<>(); + headers.put("unrelated_header", "unrelated_header_value"); + headers.put(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user"); + builder.putDatafeed(datafeedConfig, headers); + MlMetadata metadata = builder.build(); + assertThat(metadata.getDatafeed("datafeed1").getHeaders().size(), equalTo(1)); + assertThat(metadata.getDatafeed("datafeed1").getHeaders(), + hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user")); } public void testUpdateDatafeed() { @@ -254,12 +273,13 @@ public void testUpdateDatafeed() { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); MlMetadata beforeMetadata = builder.build(); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); update.setScrollSize(5000); - MlMetadata updatedMetadata = new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, null).build(); + MlMetadata updatedMetadata = + new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, Collections.emptyMap()).build(); DatafeedConfig updatedDatafeed = updatedMetadata.getDatafeed(datafeedConfig1.getId()); assertThat(updatedDatafeed.getJobId(), equalTo(datafeedConfig1.getJobId())); @@ -271,7 +291,8 @@ public void testUpdateDatafeed() { public void testUpdateDatafeed_failBecauseDatafeedDoesNotExist() { DatafeedUpdate.Builder update = new DatafeedUpdate.Builder("job_id"); update.setScrollSize(5000); - expectThrows(ResourceNotFoundException.class, () -> new MlMetadata.Builder().updateDatafeed(update.build(), null, null).build()); + expectThrows(ResourceNotFoundException.class, + () -> new MlMetadata.Builder().updateDatafeed(update.build(), null, Collections.emptyMap()).build()); } public void testUpdateDatafeed_failBecauseDatafeedIsNotStopped() { @@ -279,7 +300,7 @@ public void testUpdateDatafeed_failBecauseDatafeedIsNotStopped() { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); MlMetadata beforeMetadata = builder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -300,14 +321,14 @@ public void testUpdateDatafeed_failBecauseNewJobIdDoesNotExist() { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); MlMetadata beforeMetadata = builder.build(); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); update.setJobId(job1.getId() + "_2"); expectThrows(ResourceNotFoundException.class, - () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, null)); + () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, Collections.emptyMap())); } public void testUpdateDatafeed_failBecauseNewJobHasAnotherDatafeedAttached() { @@ -319,25 +340,46 @@ public void testUpdateDatafeed_failBecauseNewJobHasAnotherDatafeedAttached() { MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); builder.putJob(job2.build(), false); - builder.putDatafeed(datafeedConfig1, null); - builder.putDatafeed(datafeedConfig2, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); + builder.putDatafeed(datafeedConfig2, Collections.emptyMap()); MlMetadata beforeMetadata = builder.build(); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); update.setJobId(job2.getId()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, null)); + () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, Collections.emptyMap())); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); assertThat(e.getMessage(), equalTo("A datafeed [datafeed2] already exists for job [job_id_2]")); } + public void testUpdateDatafeed_setsSecurityHeaders() { + Job datafeedJob = createDatafeedJob().build(new Date()); + DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", datafeedJob.getId()).build(); + MlMetadata.Builder builder = new MlMetadata.Builder(); + builder.putJob(datafeedJob, false); + builder.putDatafeed(datafeedConfig, Collections.emptyMap()); + MlMetadata beforeMetadata = builder.build(); + assertTrue(beforeMetadata.getDatafeed("datafeed1").getHeaders().isEmpty()); + + DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig.getId()); + update.setQueryDelay(TimeValue.timeValueMinutes(5)); + + Map headers = new HashMap<>(); + headers.put("unrelated_header", "unrelated_header_value"); + headers.put(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user"); + MlMetadata afterMetadata = new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, headers).build(); + Map updatedHeaders = afterMetadata.getDatafeed("datafeed1").getHeaders(); + assertThat(updatedHeaders.size(), equalTo(1)); + assertThat(updatedHeaders, hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "permitted_run_as_user")); + } + public void testRemoveDatafeed_failBecauseDatafeedStarted() { Job job1 = createDatafeedJob().build(new Date()); DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig1, Collections.emptyMap()); MlMetadata result = builder.build(); assertThat(result.getJobs().get("job_id"), sameInstance(job1)); @@ -378,9 +420,9 @@ public void testExpandJobIds() { public void testExpandDatafeedIds() { MlMetadata.Builder mlMetadataBuilder = newMlMetadataWithJobs("bar-1", "foo-1", "foo-2"); - mlMetadataBuilder.putDatafeed(createDatafeedConfig("bar-1-feed", "bar-1").build(), null); - mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-1-feed", "foo-1").build(), null); - mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-2-feed", "foo-2").build(), null); + mlMetadataBuilder.putDatafeed(createDatafeedConfig("bar-1-feed", "bar-1").build(), Collections.emptyMap()); + mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-1-feed", "foo-1").build(), Collections.emptyMap()); + mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-2-feed", "foo-2").build(), Collections.emptyMap()); MlMetadata mlMetadata = mlMetadataBuilder.build(); @@ -409,7 +451,7 @@ protected MlMetadata mutateInstance(MlMetadata instance) { metadataBuilder.putJob(entry.getValue(), true); } for (Map.Entry entry : datafeeds.entrySet()) { - metadataBuilder.putDatafeed(entry.getValue(), null); + metadataBuilder.putDatafeed(entry.getValue(), Collections.emptyMap()); } switch (between(0, 1)) { @@ -430,7 +472,7 @@ protected MlMetadata mutateInstance(MlMetadata instance) { } randomJob = new Job.Builder(randomJob).setAnalysisConfig(analysisConfig).build(); metadataBuilder.putJob(randomJob, false); - metadataBuilder.putDatafeed(datafeedConfig, null); + metadataBuilder.putDatafeed(datafeedConfig, Collections.emptyMap()); break; default: throw new AssertionError("Illegal randomisation branch"); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java index f1679b8b0b9d1..6885cbcca73fe 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java @@ -51,7 +51,7 @@ public void testValidate_datafeedIsStarted() { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(new Date()), false); mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id", - Collections.singletonList("*")), null); + Collections.singletonList("*")), Collections.emptyMap()); final PersistentTasksCustomMetaData.Builder startDataFeedTaskBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", null, JobState.OPENED, startDataFeedTaskBuilder); addTask("datafeed_id", 0L, null, DatafeedState.STARTED, startDataFeedTaskBuilder); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java index af9446ed972cb..72c8d361dd882 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java @@ -45,7 +45,7 @@ public void testValidate_jobClosed() { PersistentTasksCustomMetaData tasks = PersistentTasksCustomMetaData.builder().build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1, null) + .putDatafeed(datafeedConfig1, Collections.emptyMap()) .build(); Exception e = expectThrows(ElasticsearchStatusException.class, () -> TransportStartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks)); @@ -62,7 +62,7 @@ public void testValidate_jobOpening() { PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1, null) + .putDatafeed(datafeedConfig1, Collections.emptyMap()) .build(); TransportStartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks); @@ -78,7 +78,7 @@ public void testValidate_jobOpened() { PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1, null) + .putDatafeed(datafeedConfig1, Collections.emptyMap()) .build(); TransportStartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java index a61709be424e8..1d93ae3d16f2a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java @@ -42,7 +42,7 @@ public void testValidate() { DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder().putJob(job, false) - .putDatafeed(datafeedConfig, null) + .putDatafeed(datafeedConfig, Collections.emptyMap()) .build(); TransportStopDatafeedAction.validateDatafeedTask("foo", mlMetadata2); } @@ -54,12 +54,12 @@ public void testResolveDataFeedIds_GivenDatafeedId() { addTask("datafeed_1", 0L, "node-1", DatafeedState.STARTED, tasksBuilder); Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()); DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, null); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, Collections.emptyMap()); addTask("datafeed_2", 0L, "node-1", DatafeedState.STOPPED, tasksBuilder); job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()); datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, null); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, Collections.emptyMap()); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); MlMetadata mlMetadata = mlMetadataBuilder.build(); @@ -86,17 +86,17 @@ public void testResolveDataFeedIds_GivenAll() { addTask("datafeed_1", 0L, "node-1", DatafeedState.STARTED, tasksBuilder); Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()); DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, null); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, Collections.emptyMap()); addTask("datafeed_2", 0L, "node-1", DatafeedState.STOPPED, tasksBuilder); job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()); datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, null); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, Collections.emptyMap()); addTask("datafeed_3", 0L, "node-1", DatafeedState.STOPPING, tasksBuilder); job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()); datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, null); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, Collections.emptyMap()); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); MlMetadata mlMetadata = mlMetadataBuilder.build(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index bd722ebf8ef9a..60496464ded8a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -84,7 +84,7 @@ public void setUpTests() { Job job = createDatafeedJob().build(new Date()); mlMetadata.putJob(job, false); DatafeedConfig datafeed = createDatafeedConfig("datafeed_id", job.getId()).build(); - mlMetadata.putDatafeed(datafeed, null); + mlMetadata.putDatafeed(datafeed, Collections.emptyMap()); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index 0fee78611a7bf..5e27ff79438fb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -68,7 +68,7 @@ public void testSelectNode_GivenJobIsOpened() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -86,7 +86,7 @@ public void testSelectNode_GivenJobIsOpening() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -106,7 +106,7 @@ public void testNoJobTask() { mlMetadataBuilder.putJob(job, false); // Using wildcard index name to test for index resolving as well - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); tasks = PersistentTasksCustomMetaData.builder().build(); @@ -128,7 +128,7 @@ public void testSelectNode_GivenJobFailedOrClosed() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -156,7 +156,7 @@ public void testShardUnassigned() { mlMetadataBuilder.putJob(job, false); // Using wildcard index name to test for index resolving as well - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -182,7 +182,7 @@ public void testShardNotAllActive() { mlMetadataBuilder.putJob(job, false); // Using wildcard index name to test for index resolving as well - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -207,7 +207,8 @@ public void testIndexDoesntExist() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")), + Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -231,7 +232,8 @@ public void testRemoteIndex() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("remote:foo")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("remote:foo")), + Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -248,7 +250,7 @@ public void testSelectNode_jobTaskStale() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); String nodeId = randomBoolean() ? "node_id2" : null; @@ -286,7 +288,8 @@ public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")), null); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")), + Collections.emptyMap()); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();