Skip to content

Commit

Permalink
HLRC: Adding Update datafeed API (#34882)
Browse files Browse the repository at this point in the history
* HLRC: Adding Update datafeed API

* Addressing unused import

* Adjusting docs and fixing minor comments

* fixing comment
  • Loading branch information
benwtrent authored Oct 26, 2018
1 parent 11fa8d3 commit 052dfa5
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.StartDatafeedRequest;
import org.elasticsearch.client.ml.StopDatafeedRequest;
import org.elasticsearch.client.ml.UpdateDatafeedRequest;
import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -209,6 +210,19 @@ static Request putDatafeed(PutDatafeedRequest putDatafeedRequest) throws IOExcep
return request;
}

static Request updateDatafeed(UpdateDatafeedRequest updateDatafeedRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("ml")
.addPathPartAsIs("datafeeds")
.addPathPart(updateDatafeedRequest.getDatafeedUpdate().getId())
.addPathPartAsIs("_update")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
request.setEntity(createEntity(updateDatafeedRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request getDatafeed(GetDatafeedRequest getDatafeedRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.client.ml.StartDatafeedResponse;
import org.elasticsearch.client.ml.StopDatafeedRequest;
import org.elasticsearch.client.ml.StopDatafeedResponse;
import org.elasticsearch.client.ml.UpdateDatafeedRequest;
import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.client.ml.job.stats.JobStats;

Expand Down Expand Up @@ -494,6 +495,46 @@ public void putDatafeedAsync(PutDatafeedRequest request, RequestOptions options,
Collections.emptySet());
}

/**
* Updates a Machine Learning Datafeed
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-update-datafeed.html">
* ML Update datafeed documentation</a>
*
* @param request The UpdateDatafeedRequest containing the {@link org.elasticsearch.client.ml.datafeed.DatafeedUpdate} settings
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return PutDatafeedResponse with enclosed, updated {@link org.elasticsearch.client.ml.datafeed.DatafeedConfig} object
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public PutDatafeedResponse updateDatafeed(UpdateDatafeedRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::updateDatafeed,
options,
PutDatafeedResponse::fromXContent,
Collections.emptySet());
}

/**
* Updates a Machine Learning Datafeed asynchronously and notifies listener on completion
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-update-datafeed.html">
* ML Update datafeed documentation</a>
*
* @param request The request containing the {@link org.elasticsearch.client.ml.datafeed.DatafeedUpdate} settings
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void updateDatafeedAsync(UpdateDatafeedRequest request, RequestOptions options, ActionListener<PutDatafeedResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::updateDatafeed,
options,
PutDatafeedResponse::fromXContent,
listener,
Collections.emptySet());
}

/**
* Gets one or more Machine Learning datafeed configuration info.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.client.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Requests an update to a {@link org.elasticsearch.client.ml.datafeed.DatafeedConfig} with the passed {@link DatafeedUpdate}
* settings
*/
public class UpdateDatafeedRequest extends ActionRequest implements ToXContentObject {

private final DatafeedUpdate update;

public UpdateDatafeedRequest(DatafeedUpdate update) {
this.update = update;
}

public DatafeedUpdate getDatafeedUpdate() {
return update;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return update.toXContent(builder, params);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

UpdateDatafeedRequest that = (UpdateDatafeedRequest) o;
return Objects.equals(update, that.update);
}

@Override
public int hashCode() {
return Objects.hash(update);
}

@Override
public final String toString() {
return Strings.toString(this);
}

@Override
public ActionRequestValidationException validate() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -292,6 +293,10 @@ public Builder setIndices(List<String> indices) {
return this;
}

public Builder setIndices(String... indices) {
return setIndices(Arrays.asList(indices));
}

public Builder setTypes(List<String> types) {
this.types = types;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@
import org.elasticsearch.client.ml.StartDatafeedResponse;
import org.elasticsearch.client.ml.StopDatafeedRequest;
import org.elasticsearch.client.ml.StopDatafeedResponse;
import org.elasticsearch.client.ml.UpdateDatafeedRequest;
import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.client.ml.calendars.Calendar;
import org.elasticsearch.client.ml.calendars.CalendarTests;
import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
import org.elasticsearch.client.ml.datafeed.DatafeedState;
import org.elasticsearch.client.ml.datafeed.DatafeedStats;
import org.elasticsearch.client.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.client.ml.job.config.AnalysisConfig;
import org.elasticsearch.client.ml.job.config.DataDescription;
import org.elasticsearch.client.ml.job.config.Detector;
Expand Down Expand Up @@ -357,6 +359,33 @@ public void testPutDatafeed() throws Exception {
assertThat(createdDatafeed.getIndices(), equalTo(datafeedConfig.getIndices()));
}

public void testUpdateDatafeed() throws Exception {
String jobId = randomValidJobId();
Job job = buildJob(jobId);
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
execute(new PutJobRequest(job), machineLearningClient::putJob, machineLearningClient::putJobAsync);

String datafeedId = "datafeed-" + jobId;
DatafeedConfig datafeedConfig = DatafeedConfig.builder(datafeedId, jobId).setIndices("some_data_index").build();

PutDatafeedResponse response = machineLearningClient.putDatafeed(new PutDatafeedRequest(datafeedConfig), RequestOptions.DEFAULT);

DatafeedConfig createdDatafeed = response.getResponse();
assertThat(createdDatafeed.getId(), equalTo(datafeedId));
assertThat(createdDatafeed.getIndices(), equalTo(datafeedConfig.getIndices()));

DatafeedUpdate datafeedUpdate = DatafeedUpdate.builder(datafeedId).setIndices("some_other_data_index").setScrollSize(10).build();

response = execute(new UpdateDatafeedRequest(datafeedUpdate),
machineLearningClient::updateDatafeed,
machineLearningClient::updateDatafeedAsync);

DatafeedConfig updatedDatafeed = response.getResponse();
assertThat(datafeedUpdate.getId(), equalTo(updatedDatafeed.getId()));
assertThat(datafeedUpdate.getIndices(), equalTo(updatedDatafeed.getIndices()));
assertThat(datafeedUpdate.getScrollSize(), equalTo(updatedDatafeed.getScrollSize()));
}

public void testGetDatafeed() throws Exception {
String jobId1 = "test-get-datafeed-job-1";
String jobId2 = "test-get-datafeed-job-2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,13 @@
import org.elasticsearch.client.ml.StartDatafeedResponse;
import org.elasticsearch.client.ml.StopDatafeedRequest;
import org.elasticsearch.client.ml.StopDatafeedResponse;
import org.elasticsearch.client.ml.UpdateDatafeedRequest;
import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.client.ml.calendars.Calendar;
import org.elasticsearch.client.ml.datafeed.ChunkingConfig;
import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
import org.elasticsearch.client.ml.datafeed.DatafeedStats;
import org.elasticsearch.client.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.client.ml.job.config.AnalysisConfig;
import org.elasticsearch.client.ml.job.config.AnalysisLimits;
import org.elasticsearch.client.ml.job.config.DataDescription;
Expand Down Expand Up @@ -630,6 +632,77 @@ public void onFailure(Exception e) {
}
}

public void testUpdateDatafeed() throws Exception {
RestHighLevelClient client = highLevelClient();

Job job = MachineLearningIT.buildJob("update-datafeed-job");
client.machineLearning().putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
String datafeedId = job.getId() + "-feed";
DatafeedConfig datafeed = DatafeedConfig.builder(datafeedId, job.getId()).setIndices("foo").build();
client.machineLearning().putDatafeed(new PutDatafeedRequest(datafeed), RequestOptions.DEFAULT);

{
AggregatorFactories.Builder aggs = AggregatorFactories.builder();
List<SearchSourceBuilder.ScriptField> scriptFields = Collections.emptyList();
// tag::update-datafeed-config
DatafeedUpdate.Builder datafeedUpdateBuilder = new DatafeedUpdate.Builder(datafeedId) // <1>
.setAggregations(aggs) // <2>
.setIndices("index_1", "index_2") // <3>
.setChunkingConfig(ChunkingConfig.newAuto()) // <4>
.setFrequency(TimeValue.timeValueSeconds(30)) // <5>
.setQuery(QueryBuilders.matchAllQuery()) // <6>
.setQueryDelay(TimeValue.timeValueMinutes(1)) // <7>
.setScriptFields(scriptFields) // <8>
.setScrollSize(1000) // <9>
.setJobId("update-datafeed-job"); // <10>
// end::update-datafeed-config

// Clearing aggregation to avoid complex validation rules
datafeedUpdateBuilder.setAggregations((String) null);

// tag::update-datafeed-request
UpdateDatafeedRequest request = new UpdateDatafeedRequest(datafeedUpdateBuilder.build()); // <1>
// end::update-datafeed-request

// tag::update-datafeed-execute
PutDatafeedResponse response = client.machineLearning().updateDatafeed(request, RequestOptions.DEFAULT);
// end::update-datafeed-execute

// tag::update-datafeed-response
DatafeedConfig updatedDatafeed = response.getResponse(); // <1>
// end::update-datafeed-response
assertThat(updatedDatafeed.getId(), equalTo(datafeedId));
}
{
DatafeedUpdate datafeedUpdate = new DatafeedUpdate.Builder(datafeedId).setIndices("index_1", "index_2").build();

UpdateDatafeedRequest request = new UpdateDatafeedRequest(datafeedUpdate);
// tag::update-datafeed-execute-listener
ActionListener<PutDatafeedResponse> listener = new ActionListener<PutDatafeedResponse>() {
@Override
public void onResponse(PutDatafeedResponse response) {
// <1>
}

@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::update-datafeed-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);

// tag::update-datafeed-execute-async
client.machineLearning().updateDatafeedAsync(request, RequestOptions.DEFAULT, listener); // <1>
// end::update-datafeed-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

public void testGetDatafeed() throws Exception {
RestHighLevelClient client = highLevelClient();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;

import org.elasticsearch.client.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.client.ml.datafeed.DatafeedUpdateTests;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;


public class UpdateDatafeedRequestTests extends AbstractXContentTestCase<UpdateDatafeedRequest> {

@Override
protected UpdateDatafeedRequest createTestInstance() {
return new UpdateDatafeedRequest(DatafeedUpdateTests.createRandom());
}

@Override
protected UpdateDatafeedRequest doParseInstance(XContentParser parser) {
return new UpdateDatafeedRequest(DatafeedUpdate.PARSER.apply(parser, null).build());
}

@Override
protected boolean supportsUnknownFields() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@

public class DatafeedUpdateTests extends AbstractXContentTestCase<DatafeedUpdate> {

@Override
protected DatafeedUpdate createTestInstance() {
public static DatafeedUpdate createRandom() {
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(DatafeedConfigTests.randomValidDatafeedId());
if (randomBoolean()) {
builder.setJobId(randomAlphaOfLength(10));
Expand Down Expand Up @@ -87,6 +86,11 @@ protected DatafeedUpdate createTestInstance() {
return builder.build();
}

@Override
protected DatafeedUpdate createTestInstance() {
return createRandom();
}

@Override
protected DatafeedUpdate doParseInstance(XContentParser parser) {
return DatafeedUpdate.PARSER.apply(parser, null).build();
Expand Down
Loading

0 comments on commit 052dfa5

Please sign in to comment.