From 228d1aebd2bde745c2de2064a799ec8aa36c64ec Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Sat, 4 Feb 2017 01:13:52 +0530 Subject: [PATCH 1/3] cache_key support in rest client --- .../index/query/BoolFilterBuilder.java | 37 ++++++++++++++++++- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/query/BoolFilterBuilder.java b/src/main/java/org/elasticsearch/index/query/BoolFilterBuilder.java index f949ad00ed261..d32eca99f3ccd 100644 --- a/src/main/java/org/elasticsearch/index/query/BoolFilterBuilder.java +++ b/src/main/java/org/elasticsearch/index/query/BoolFilterBuilder.java @@ -19,8 +19,14 @@ package org.elasticsearch.index.query; +import org.apache.commons.codec.digest.DigestUtils; +import org.elasticsearch.Version; +import org.elasticsearch.common.xcontent.ToXContentUtils; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentType; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -130,6 +136,12 @@ public BoolFilterBuilder cacheKey(String cacheKey) { @Override protected void doXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("bool"); + doXContentInternal(builder, params); + addCacheToQuery(cacheKey, cache, builder, params); + builder.endObject(); + } + + private void doXContentInternal(XContentBuilder builder, Params params) throws IOException { doXArrayContent("must", mustClauses, builder, params); doXArrayContent("must_not", mustNotClauses, builder, params); doXArrayContent("should", shouldClauses, builder, params); @@ -137,8 +149,6 @@ protected void doXContent(XContentBuilder builder, Params params) throws IOExcep if (filterName != null) { builder.field("_name", filterName); } - addCacheToQuery(cacheKey, cache, builder, params); - builder.endObject(); } private void doXArrayContent(String field, List clauses, XContentBuilder builder, Params params) throws IOException { @@ -156,4 +166,27 @@ private void doXArrayContent(String field, List clauses, XContent builder.endArray(); } } + + private String generateCacheKey() throws IOException { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.SMILE); + builder.startObject("bool"); + doXContentInternal(builder, EMPTY_PARAMS); + builder.endObject(); + return DigestUtils.sha512Hex(builder.bytes().streamInput()); + } + + @Override + protected void addCacheToQuery(String cacheKey, Boolean cache, XContentBuilder builder, Params params) throws IOException { + if (ToXContentUtils.getVersionFromParams(params).onOrAfter(Version.V_5_0_0)) { + builder.field("cache_key", generateCacheKey()); + return; + } + + if (cache != null) { + builder.field("_cache", cache); + } + if (cacheKey != null) { + builder.field("_cache_key", cacheKey); + } + } } \ No newline at end of file From 8659e6964f705fe0f05d1725fe968f71e45474a7 Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Mon, 6 Feb 2017 15:10:26 +0530 Subject: [PATCH 2/3] Cache key from application side --- pom.xml | 2 +- .../elasticsearch/index/query/BoolFilterBuilder.java | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 3a473d0c0ae6c..777a209785c78 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 org.elasticsearch elasticsearch - 1.4.1-rest-1.0.37 + 1.4.1-rest-1.0.40 jar Elasticsearch - Open Source, Distributed, RESTful Search Engine 2009 diff --git a/src/main/java/org/elasticsearch/index/query/BoolFilterBuilder.java b/src/main/java/org/elasticsearch/index/query/BoolFilterBuilder.java index d32eca99f3ccd..35e4f1a5e5c39 100644 --- a/src/main/java/org/elasticsearch/index/query/BoolFilterBuilder.java +++ b/src/main/java/org/elasticsearch/index/query/BoolFilterBuilder.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.query; import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang3.BooleanUtils; import org.elasticsearch.Version; import org.elasticsearch.common.xcontent.ToXContentUtils; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -178,7 +179,16 @@ private String generateCacheKey() throws IOException { @Override protected void addCacheToQuery(String cacheKey, Boolean cache, XContentBuilder builder, Params params) throws IOException { if (ToXContentUtils.getVersionFromParams(params).onOrAfter(Version.V_5_0_0)) { - builder.field("cache_key", generateCacheKey()); + if (BooleanUtils.isTrue(cache)) { + if (cacheKey != null) { + builder.field("_cache_key", cacheKey); + builder.field("_cache_any", cacheKey); + } else { + builder.field("_cache_any", generateCacheKey()); + } + } + + builder.field("_cache_sha", generateCacheKey()); return; } From cff611040f0f7f3266fc6ab94972133a0fff7862 Mon Sep 17 00:00:00 2001 From: Utkarsh Date: Tue, 28 Feb 2017 19:02:05 +0530 Subject: [PATCH 3/3] Update bulk request for ES5 --- pom.xml | 2 +- .../action/bulk/BulkRequest.java | 46 +++++++++ .../action/update/UpdateRequest.java | 97 +++++++++++++++++-- 3 files changed, 136 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index 777a209785c78..7707f91696e08 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 org.elasticsearch elasticsearch - 1.4.1-rest-1.0.40 + 1.4.1-rest-1.0.41 jar Elasticsearch - Open Source, Distributed, RESTful Search Engine 2009 diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index cb53178b41afa..adbe364bdf5c3 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -24,6 +24,7 @@ import org.apache.http.HttpEntity; import org.apache.http.nio.entity.NStringEntity; import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.Version; import org.elasticsearch.action.*; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; @@ -538,6 +539,16 @@ public RestRequest.Method getMethod() { return RestRequest.Method.POST; } + @Override + public ActionRestRequest getActionRestRequest(Version version) { + ActionRestRequest actionRestRequest = super.getActionRestRequest(version); + if (version.id >= Version.V_5_0_0_ID) { + return new BulkRequestV5(); + } else { + return actionRestRequest; + } + } + @Override public HttpEntity getEntity() throws IOException { //todo add support for streaming version of getRestEntity() @@ -549,4 +560,39 @@ public HttpEntity getEntity() throws IOException { return new NStringEntity(builder.toString(), "UTF-8"); } + private class BulkRequestV5 implements ActionRestRequest { + + @Override + public RestRequest.Method getMethod() { + return BulkRequest.this.getMethod(); + } + + @Override + public String getEndPoint() { + return BulkRequest.this.getEndPoint(); + } + + @Override + public HttpEntity getEntity() throws IOException { + //todo add support for streaming version of getRestEntity() + StringBuilder builder = new StringBuilder(); + for (ActionRequest request : requests) { + ActionRestRequest restRequest = request.getActionRestRequest(Version.V_5_0_0); + String payload = HttpUtils.readUtf8(restRequest.getBulkEntity()); + builder.append(payload); + } + return new NStringEntity(builder.toString(), "UTF-8"); + } + + @Override + public Map getParams() { + return BulkRequest.this.getParams(); + } + + @Override + public HttpEntity getBulkEntity() throws IOException { + return BulkRequest.this.getBulkEntity(); + } + } + } diff --git a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 379aad7f4725f..e98a1bfeb007b 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -25,9 +25,11 @@ import org.apache.http.nio.entity.NStringEntity; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionRestRequest; import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest; import org.elasticsearch.common.Nullable; @@ -41,12 +43,14 @@ import org.elasticsearch.common.util.UriBuilder; import org.elasticsearch.common.xcontent.*; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.query.ScriptFilterParser; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.script.ScriptService; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Locale; import java.util.Map; @@ -206,7 +210,9 @@ public String script() { return this.script; } - public ScriptService.ScriptType scriptType() { return this.scriptType; } + public ScriptService.ScriptType scriptType() { + return this.scriptType; + } public Map scriptParams() { return this.scriptParams; @@ -573,6 +579,7 @@ public UpdateRequest source(byte[] source, int offset, int length) throws Except /** * Should this update attempt to detect if it is a noop? + * * @return this for chaining */ public UpdateRequest detectNoop(boolean detectNoop) { @@ -632,15 +639,15 @@ public boolean docAsUpsert() { public void docAsUpsert(boolean shouldUpsertDoc) { this.docAsUpsert = shouldUpsertDoc; } - - public boolean scriptedUpsert(){ + + public boolean scriptedUpsert() { return this.scriptedUpsert; } - + public void scriptedUpsert(boolean scriptedUpsert) { this.scriptedUpsert = scriptedUpsert; } - + @Override public void readFrom(StreamInput in) throws IOException { @@ -651,7 +658,7 @@ public void readFrom(StreamInput in) throws IOException { id = in.readString(); routing = in.readOptionalString(); script = in.readOptionalString(); - if(Strings.hasLength(script)) { + if (Strings.hasLength(script)) { if (in.getVersion().onOrAfter(Version.V_1_3_0)) { scriptType = ScriptService.ScriptType.readFrom(in); } else { @@ -745,6 +752,16 @@ public String getEndPoint() { return UriBuilder.newBuilder().slash(index(), type(), id(), "_update").build(); } + @Override + public ActionRestRequest getActionRestRequest(Version version) { + ActionRestRequest actionRestRequest = super.getActionRestRequest(version); + if (version.id >= Version.V_5_0_0_ID) { + return new UpdateRequestV5(); + } else { + return actionRestRequest; + } + } + @Override public RestRequest.Method getMethod() { return RestRequest.Method.POST; @@ -783,8 +800,7 @@ private Map getPayload() { if (this.detectNoop) { payload.put("detect_noop", Boolean.TRUE); } - } - else if (Strings.hasLength(script)) { + } else if (Strings.hasLength(script)) { payload.putIfNotNull("lang", this.scriptLang); payload.putIf("scripted_upsert", Boolean.TRUE, this.scriptedUpsert); payload.put("script", this.script); @@ -799,4 +815,69 @@ else if (Strings.hasLength(script)) { } return payload.map(); } + + private class UpdateRequestV5 implements ActionRestRequest { + + public RestRequest.Method getMethod() { + return UpdateRequest.this.getMethod(); + } + + public String getEndPoint() { + return UpdateRequest.this.getEndPoint(); + } + + @Override + public Map getParams() { + return UpdateRequest.this.getParams(); + } + + @Override + public HttpEntity getEntity() throws IOException { + Map payload = getPayload(); + String json = XContentHelper.convertToJson(payload, false); + return new NStringEntity(json, StandardCharsets.UTF_8); + + } + + private Map getPayload() { + MapBuilder payload = MapBuilder.newMapBuilder(); + if (UpdateRequest.this.doc != null) { + payload.put("doc", UpdateRequest.this.doc.sourceAsMap()); + if (UpdateRequest.this.docAsUpsert) { + payload.put("doc_as_upsert", Boolean.TRUE); + } + if (UpdateRequest.this.detectNoop) { + payload.put("detect_noop", Boolean.TRUE); + } + } else if (Strings.hasLength(script)) { + Map scriptObj = new LinkedHashMap<>(); + scriptObj.put(UpdateRequest.this.scriptType.name().toLowerCase(Locale.ROOT), script); + scriptObj.put("lang", scriptLang); + scriptObj.put("params", scriptParams); + payload.put("script", scriptObj); + } + if (UpdateRequest.this.upsertRequest != null) { + payload.put("upsert", UpdateRequest.this.upsertRequest.sourceAsMap()); + } + + if (payload.isEmpty()) { + throw new IllegalStateException("Nothing to update. No doc, script or upsert provided"); + } + return payload.map(); + } + + public HttpEntity getBulkEntity() throws IOException { + Map payload = Maps.newLinkedHashMap(); + Map actionMetadata = Maps.newLinkedHashMap(); + actionMetadata.put("_index", index); + actionMetadata.put("_type", type); + actionMetadata.put("_id", id); + payload.put(BULK_TYPE, actionMetadata); + String json = XContentHelper.convertToJson(payload, false); + + String payloadJson = XContentHelper.convertToJson(getPayload(), false); + String fullPayload = Strings.join(json, "\n", payloadJson, "\n"); + return new NStringEntity(fullPayload, StandardCharsets.UTF_8); + } + } }