Skip to content

Commit

Permalink
Store headers for put/update datafeed
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Jun 14, 2018
1 parent d70d73a commit 8b8e3e3
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -293,7 +292,7 @@ public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) {
return this;
}

public Builder putDatafeed(DatafeedConfig datafeedConfig, ThreadContext threadContext) {
public Builder putDatafeed(DatafeedConfig datafeedConfig, Map<String, String> headers) {
if (datafeeds.containsKey(datafeedConfig.getId())) {
throw new ResourceAlreadyExistsException("A datafeed with id [" + datafeedConfig.getId() + "] already exists");
}
Expand All @@ -302,13 +301,13 @@ public Builder putDatafeed(DatafeedConfig datafeedConfig, ThreadContext threadCo
Job job = jobs.get(jobId);
DatafeedJobValidator.validate(datafeedConfig, job);

if (threadContext != null) {
if (headers.isEmpty() == false) {
// Adjust the request, adding security headers from the current thread context
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedConfig);
Map<String, String> headers = threadContext.getHeaders().entrySet().stream()
Map<String, String> securityHeaders = headers.entrySet().stream()
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
builder.setHeaders(headers);
builder.setHeaders(securityHeaders);
datafeedConfig = builder.build();
}

Expand All @@ -328,15 +327,15 @@ private void checkJobIsAvailableForDatafeed(String jobId) {
}
}

public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksCustomMetaData persistentTasks, ThreadContext threadContext) {
public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksCustomMetaData persistentTasks, Map<String, String> headers) {
String datafeedId = update.getId();
DatafeedConfig oldDatafeedConfig = datafeeds.get(datafeedId);
if (oldDatafeedConfig == null) {
throw ExceptionsHelper.missingDatafeedException(datafeedId);
}
checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE, datafeedId,
DatafeedState.STARTED), datafeedId, persistentTasks);
DatafeedConfig newDatafeedConfig = update.apply(oldDatafeedConfig, threadContext);
DatafeedConfig newDatafeedConfig = update.apply(oldDatafeedConfig, headers);
if (newDatafeedConfig.getJobId().equals(oldDatafeedConfig.getJobId()) == false) {
checkJobIsAvailableForDatafeed(newDatafeedConfig.getJobId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ ChunkingConfig getChunkingConfig() {
* Applies the update to the given {@link DatafeedConfig}
* @return a new {@link DatafeedConfig} that contains the update
*/
public DatafeedConfig apply(DatafeedConfig datafeedConfig, ThreadContext threadContext) {
public DatafeedConfig apply(DatafeedConfig datafeedConfig, Map<String, String> headers) {
if (id.equals(datafeedConfig.getId()) == false) {
throw new IllegalArgumentException("Cannot apply update to datafeedConfig with different id");
}
Expand Down Expand Up @@ -301,12 +301,12 @@ public DatafeedConfig apply(DatafeedConfig datafeedConfig, ThreadContext threadC
builder.setChunkingConfig(chunkingConfig);
}

if (threadContext != null) {
if (headers.isEmpty() == false) {
// Adjust the request, adding security headers from the current thread context
Map<String, String> headers = threadContext.getHeaders().entrySet().stream()
Map<String, String> securityHeaders = headers.entrySet().stream()
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
builder.setHeaders(headers);
builder.setHeaders(securityHeaders);
}

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testApply_failBecauseTargetDatafeedHasDifferentId() {

public void testApply_givenEmptyUpdate() {
DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo");
DatafeedConfig updatedDatafeed = new DatafeedUpdate.Builder(datafeed.getId()).build().apply(datafeed, null);
DatafeedConfig updatedDatafeed = new DatafeedUpdate.Builder(datafeed.getId()).build().apply(datafeed, Collections.emptyMap());
assertThat(datafeed, equalTo(updatedDatafeed));
}

Expand All @@ -125,7 +125,7 @@ public void testApply_givenPartialUpdate() {

DatafeedUpdate.Builder updated = new DatafeedUpdate.Builder(datafeed.getId());
updated.setScrollSize(datafeed.getScrollSize() + 1);
DatafeedConfig updatedDatafeed = update.build().apply(datafeed, null);
DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap());

DatafeedConfig.Builder expectedDatafeed = new DatafeedConfig.Builder(datafeed);
expectedDatafeed.setScrollSize(datafeed.getScrollSize() + 1);
Expand All @@ -149,7 +149,7 @@ public void testApply_givenFullUpdateNoAggregations() {
update.setScrollSize(8000);
update.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1)));

DatafeedConfig updatedDatafeed = update.build().apply(datafeed, null);
DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap());

assertThat(updatedDatafeed.getJobId(), equalTo("bar"));
assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_2")));
Expand All @@ -175,7 +175,7 @@ public void testApply_givenAggregations() {
update.setAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("a").interval(300000).field("time").subAggregation(maxTime)));

DatafeedConfig updatedDatafeed = update.build().apply(datafeed, null);
DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap());

assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_1")));
assertThat(updatedDatafeed.getTypes(), equalTo(Collections.singletonList("t_1")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.xpack.core.security.support.Exceptions;

import java.io.IOException;
import java.util.Map;

public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDatafeedAction.Request, PutDatafeedAction.Response> {

Expand Down Expand Up @@ -95,15 +96,15 @@ protected void masterOperation(PutDatafeedAction.Request request, ClusterState s

client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
} else {
putDatafeed(request, listener);
putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener);
}
}

private void handlePrivsResponse(String username, PutDatafeedAction.Request request,
HasPrivilegesResponse response,
ActionListener<PutDatafeedAction.Response> listener) throws IOException {
if (response.isCompleteMatch()) {
putDatafeed(request, listener);
putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener);
} else {
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject();
Expand All @@ -120,7 +121,8 @@ private void handlePrivsResponse(String username, PutDatafeedAction.Request requ
}
}

private void putDatafeed(PutDatafeedAction.Request request, ActionListener<PutDatafeedAction.Response> listener) {
private void putDatafeed(PutDatafeedAction.Request request, Map<String, String> headers,
ActionListener<PutDatafeedAction.Response> listener) {

clusterService.submitStateUpdateTask(
"put-datafeed-" + request.getDatafeed().getId(),
Expand All @@ -136,16 +138,16 @@ protected PutDatafeedAction.Response newResponse(boolean acknowledged) {

@Override
public ClusterState execute(ClusterState currentState) {
return putDatafeed(request, currentState);
return putDatafeed(request, headers, currentState);
}
});
}

private ClusterState putDatafeed(PutDatafeedAction.Request request, ClusterState clusterState) {
private ClusterState putDatafeed(PutDatafeedAction.Request request, Map<String, String> headers, ClusterState clusterState) {
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
MlMetadata currentMetadata = MlMetadata.getMlMetadata(clusterState);
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)
.putDatafeed(request.getDatafeed(), threadPool.getThreadContext()).build();
.putDatafeed(request.getDatafeed(), headers).build();
return ClusterState.builder(clusterState).metaData(
MetaData.builder(clusterState.getMetaData()).putCustom(MLMetadataField.TYPE, newMetadata).build())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;

import java.util.Map;

public class TransportUpdateDatafeedAction extends TransportMasterNodeAction<UpdateDatafeedAction.Request, PutDatafeedAction.Response> {

@Inject
Expand All @@ -50,6 +52,8 @@ protected PutDatafeedAction.Response newResponse() {
@Override
protected void masterOperation(UpdateDatafeedAction.Request request, ClusterState state,
ActionListener<PutDatafeedAction.Response> listener) {
final Map<String, String> headers = threadPool.getThreadContext().getHeaders();

clusterService.submitStateUpdateTask("update-datafeed-" + request.getUpdate().getId(),
new AckedClusterStateUpdateTask<PutDatafeedAction.Response>(request, listener) {
private volatile DatafeedConfig updatedDatafeed;
Expand All @@ -69,7 +73,7 @@ public ClusterState execute(ClusterState currentState) {
PersistentTasksCustomMetaData persistentTasks =
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)
.updateDatafeed(update, persistentTasks, threadPool.getThreadContext()).build();
.updateDatafeed(update, persistentTasks, headers).build();
updatedDatafeed = newMetadata.getDatafeed(update.getId());
return ClusterState.builder(currentState).metaData(
MetaData.builder(currentState.getMetaData()).putCustom(MLMetadataField.TYPE, newMetadata).build()).build();
Expand Down
Loading

0 comments on commit 8b8e3e3

Please sign in to comment.