From a64d40e638387a81c5c84878a9583a8abb371865 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Thu, 7 Sep 2023 19:57:45 +0200 Subject: [PATCH 1/2] GH-4776 remove old dependencies and start using the latests version of the new java client --- compliance/elasticsearch/pom.xml | 26 ++------------------------ core/sail/elasticsearch-store/pom.xml | 27 ++------------------------- core/sail/elasticsearch/pom.xml | 15 ++------------- pom.xml | 2 +- 4 files changed, 7 insertions(+), 63 deletions(-) diff --git a/compliance/elasticsearch/pom.xml b/compliance/elasticsearch/pom.xml index e95dad466a9..d0b9294ae3a 100644 --- a/compliance/elasticsearch/pom.xml +++ b/compliance/elasticsearch/pom.xml @@ -49,22 +49,6 @@ - - org.elasticsearch.test - framework - ${elasticsearch.version} - test - - - commons-logging - commons-logging - - - org.apache.httpcomponents - httpcore - - - org.apache.httpcomponents httpcore @@ -100,16 +84,10 @@ test - org.elasticsearch.client - transport + co.elastic.clients + elasticsearch-java ${elasticsearch.version} test - - - com.vividsolutions - jts - - org.apache.logging.log4j diff --git a/core/sail/elasticsearch-store/pom.xml b/core/sail/elasticsearch-store/pom.xml index 57aeb65c505..1f467f35a1b 100644 --- a/core/sail/elasticsearch-store/pom.xml +++ b/core/sail/elasticsearch-store/pom.xml @@ -11,38 +11,15 @@ Store for utilizing Elasticsearch as a triplestore. - org.elasticsearch.client - elasticsearch-rest-high-level-client + co.elastic.clients + elasticsearch-java ${elasticsearch.version} - true - - - org.apache.httpcomponents - httpcore - - - commons-logging - commons-logging - - org.apache.httpcomponents httpcore ${httpcore.version} - - org.elasticsearch.client - transport - ${elasticsearch.version} - true - - - commons-logging - commons-logging - - - org.slf4j diff --git a/core/sail/elasticsearch/pom.xml b/core/sail/elasticsearch/pom.xml index b8c924bcf9f..14aa359e908 100644 --- a/core/sail/elasticsearch/pom.xml +++ b/core/sail/elasticsearch/pom.xml @@ -16,20 +16,9 @@ ${project.version} - org.elasticsearch.client - transport + co.elastic.clients + elasticsearch-java ${elasticsearch.version} - true - - - commons-logging - commons-logging - - - org.apache.httpcomponents - httpcore - - diff --git a/pom.xml b/pom.xml index c4f0b8ed7db..244261731b4 100644 --- a/pom.xml +++ b/pom.xml @@ -373,7 +373,7 @@ 3.3.1 8.9.0 8.9.0 - 7.15.2 + 8.9.1 5.3.23 30.1.1-jre 1.35 From 5d1b5ce9faeeb75a3403d03eb3e9e9e88ebaa13f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Thu, 7 Sep 2023 19:58:27 +0200 Subject: [PATCH 2/2] GH-4776 start migration of the code in the ElasticsearchStore. --- .../elasticsearchstore/ClientProvider.java | 4 +- .../ElasticsearchDataStructure.java | 115 ++++++++---------- .../ElasticsearchStore.java | 26 ++-- .../UserProvidedClientProvider.java | 8 +- 4 files changed, 67 insertions(+), 86 deletions(-) diff --git a/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ClientProvider.java b/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ClientProvider.java index a214e2e99c4..999c37ac498 100644 --- a/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ClientProvider.java +++ b/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ClientProvider.java @@ -10,14 +10,14 @@ *******************************************************************************/ package org.eclipse.rdf4j.sail.elasticsearchstore; -import org.elasticsearch.client.Client; +import co.elastic.clients.elasticsearch.ElasticsearchClient; /** * @author Håvard Mikkelsen Ottestad */ interface ClientProvider extends AutoCloseable { - Client getClient(); + ElasticsearchClient getClient(); boolean isClosed(); } diff --git a/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ElasticsearchDataStructure.java b/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ElasticsearchDataStructure.java index 583d4091f0b..68add170077 100644 --- a/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ElasticsearchDataStructure.java +++ b/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ElasticsearchDataStructure.java @@ -27,6 +27,8 @@ import java.util.stream.Stream; import org.apache.commons.io.IOUtils; +import org.apache.http.HttpStatus; +import org.apache.http.util.EntityUtils; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.LookAheadIteration; import org.eclipse.rdf4j.model.BNode; @@ -37,27 +39,19 @@ import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.extensiblestore.DataStructureInterface; import org.eclipse.rdf4j.sail.extensiblestore.valuefactory.ExtensibleStatement; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.IndicesAdminClient; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.reindex.BulkByScrollResponse; -import org.elasticsearch.index.reindex.DeleteByQueryAction; -import org.elasticsearch.index.reindex.DeleteByQueryRequestBuilder; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.client.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import co.elastic.clients.elasticsearch._types.Conflicts; +import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery; +import co.elastic.clients.elasticsearch._types.query_dsl.Query; +import co.elastic.clients.elasticsearch._types.query_dsl.TermQuery; +import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.elasticsearch.core.BulkResponse; +import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest; +import co.elastic.clients.elasticsearch.core.IndexRequest; + /** * @author Håvard Mikkelsen Ottestad */ @@ -146,14 +140,18 @@ public void addStatement(Collection statements) { @Override synchronized public void clear(boolean inferred, Resource[] contexts) { - BulkByScrollResponse response = new DeleteByQueryRequestBuilder(clientProvider.getClient(), - DeleteByQueryAction.INSTANCE) - .filter(getQueryBuilder(null, null, null, inferred, contexts)) - .abortOnVersionConflict(false) - .source(index) - .get(); + DeleteByQueryRequest.Builder builder = new DeleteByQueryRequest.Builder(); + DeleteByQueryRequest build = builder.index(index) + .query(getQuery(null, null, null, inferred, contexts)) + .conflicts(Conflicts.Proceed) + .build(); + + try { + Long deleted = clientProvider.getClient().deleteByQuery(build).deleted(); + } catch (IOException e) { + throw new SailException(e); + } - long deleted = response.getDeleted(); } @Override @@ -166,7 +164,7 @@ public CloseableIteration getStatements(Resource IRI predicate, Value object, boolean inferred, Resource... context) { - QueryBuilder queryBuilder = getQueryBuilder(subject, predicate, object, inferred, context); + Query queryBuilder = getQuery(subject, predicate, object, inferred, context); return new LookAheadIteration<>() { @@ -221,73 +219,64 @@ protected void handleClose() throws SailException { } - private QueryBuilder getQueryBuilder(Resource subject, IRI predicate, Value object, boolean inferred, - Resource[] contexts) { + private Query getQuery(Resource subject, IRI predicate, Value object, boolean inferred, Resource[] contexts) { - BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + BoolQuery.Builder mainQuery = new BoolQuery.Builder(); if (subject != null) { - boolQueryBuilder.must(QueryBuilders.termQuery("subject", subject.stringValue())); + mainQuery.must(b -> b.term(t -> t.field("subject").value(subject.stringValue()))); + if (subject instanceof IRI) { - boolQueryBuilder.must(QueryBuilders.termQuery("subject_IRI", true)); + mainQuery.must(b -> b.term(t -> t.field("subject_IRI").value(true))); } else { - boolQueryBuilder.must(QueryBuilders.termQuery("subject_BNode", true)); + mainQuery.must(b -> b.term(t -> t.field("subject_BNode").value(true))); } } if (predicate != null) { - boolQueryBuilder.must(QueryBuilders.termQuery("predicate", predicate.stringValue())); + mainQuery.must(b -> b.term(t -> t.field("predicate").value(predicate.stringValue()))); } if (object != null) { - boolQueryBuilder.must(QueryBuilders.termQuery("object_Hash", object.stringValue().hashCode())); + mainQuery.must(b -> b.term(t -> t.field("object_Hash").value(object.stringValue().hashCode()))); + if (object instanceof IRI) { - boolQueryBuilder.must(QueryBuilders.termQuery("object_IRI", true)); + mainQuery.must(b -> b.term(t -> t.field("object_IRI").value(true))); } else if (object instanceof BNode) { - boolQueryBuilder.must(QueryBuilders.termQuery("object_BNode", true)); + mainQuery.must(b -> b.term(t -> t.field("object_BNode").value(true))); } else { - boolQueryBuilder.must( - QueryBuilders.termQuery("object_Datatype", ((Literal) object).getDatatype().stringValue())); + mainQuery.must(b -> b + .term(t -> t.field("object_Datatype").value(((Literal) object).getDatatype().stringValue()))); + if (((Literal) object).getLanguage().isPresent()) { - boolQueryBuilder - .must(QueryBuilders.termQuery("object_Lang", ((Literal) object).getLanguage().get())); + mainQuery.must( + b -> b.term(t -> t.field("object_Lang").value(((Literal) object).getLanguage().get()))); } } } if (contexts != null && contexts.length > 0) { - BoolQueryBuilder contextQueryBuilder = new BoolQueryBuilder(); - for (Resource context : contexts) { - if (context == null) { - - contextQueryBuilder.should(new BoolQueryBuilder().mustNot(QueryBuilders.existsQuery("context"))); - + mainQuery.should(b -> b.bool(bb -> bb.mustNot(mb -> mb.exists(a -> a.field("context"))))); } else if (context instanceof IRI) { - - contextQueryBuilder.should( - new BoolQueryBuilder() - .must(QueryBuilders.termQuery("context", context.stringValue())) - .must(QueryBuilders.termQuery("context_IRI", true))); - + mainQuery.should(b -> b.bool(bb -> { + bb.must(mb -> mb.term(t -> t.field("context").value(context.stringValue()))); + bb.must(mb -> mb.term(t -> t.field("context_IRI").value(true))); + })); } else { // BNode - contextQueryBuilder.should( - new BoolQueryBuilder() - .must(QueryBuilders.termQuery("context", context.stringValue())) - .must(QueryBuilders.termQuery("context_BNode", true))); + mainQuery.should(b -> b.bool(bb -> { + bb.must(mb -> mb.term(t -> t.field("context").value(context.stringValue()))); + bb.must(mb -> mb.term(t -> t.field("context_BNode").value(true))); + })); } - } - - boolQueryBuilder.must(contextQueryBuilder); - } - boolQueryBuilder.must(QueryBuilders.termQuery("inferred", inferred)); + mainQuery.must(b -> b.term(t -> t.field("inferred").value(inferred))); - return QueryBuilders.constantScoreQuery(boolQueryBuilder); + return mainQuery.build()._toQuery(); } @Override @@ -604,7 +593,7 @@ public synchronized boolean removeStatementsByQuery(Resource subj, IRI pred, Val BulkByScrollResponse response = new DeleteByQueryRequestBuilder(clientProvider.getClient(), DeleteByQueryAction.INSTANCE) - .filter(getQueryBuilder(subj, pred, obj, inferred, contexts)) + .filter(getQuery(subj, pred, obj, inferred, contexts)) .source(index) .abortOnVersionConflict(false) .get(); diff --git a/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ElasticsearchStore.java b/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ElasticsearchStore.java index b21b5c29ff8..a53897a1211 100644 --- a/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ElasticsearchStore.java +++ b/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ElasticsearchStore.java @@ -25,13 +25,12 @@ import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.extensiblestore.ExtensibleStore; import org.eclipse.rdf4j.sail.extensiblestore.valuefactory.ExtensibleStatementHelper; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._types.HealthStatus; + /** *

* An RDF4J SailStore persisted to Elasticsearch. @@ -49,9 +48,8 @@ * There is no write-ahead logging, so a failure during a transaction may result in partially persisted changes. *

* - * @see Elastic License FAQ - * * @author Håvard Mikkelsen Ottestad + * @see Elastic License FAQ */ @Experimental public class ElasticsearchStore extends ExtensibleStore { @@ -101,11 +99,11 @@ public ElasticsearchStore(ClientProvider clientPool, String index, Cache cache) } - public ElasticsearchStore(Client client, String index) { + public ElasticsearchStore(ElasticsearchClient client, String index) { this(client, index, Cache.EAGER); } - public ElasticsearchStore(Client client, String index, Cache cache) { + public ElasticsearchStore(ElasticsearchClient client, String index, Cache cache) { this(new UnclosableClientProvider(new UserProvidedClientProvider(client)), index, cache); } @@ -152,16 +150,10 @@ public void waitForElasticsearch(int time, TemporalUnit timeUnit) { } try { - Client client = clientProvider.getClient(); - - ClusterHealthResponse clusterHealthResponse = client.admin() - .cluster() - .health(new ClusterHealthRequest()) - .actionGet(); - ClusterHealthStatus status = clusterHealthResponse.getStatus(); - logger.info("Cluster status: {}", status.name()); + ElasticsearchClient client = clientProvider.getClient(); - if (status.equals(ClusterHealthStatus.GREEN) || status.equals(ClusterHealthStatus.YELLOW)) { + HealthStatus status = client.cluster().health().status(); + if (status.equals(HealthStatus.Green) || status.equals(HealthStatus.Yellow)) { logger.info("Elasticsearch started!"); return; diff --git a/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/UserProvidedClientProvider.java b/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/UserProvidedClientProvider.java index a41e48e8ad8..44019c40acf 100644 --- a/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/UserProvidedClientProvider.java +++ b/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/UserProvidedClientProvider.java @@ -10,7 +10,7 @@ *******************************************************************************/ package org.eclipse.rdf4j.sail.elasticsearchstore; -import org.elasticsearch.client.Client; +import co.elastic.clients.elasticsearch.ElasticsearchClient; /** * Used by the user to provide an Elasticsearch Client to the ElasticsearchStore instead of providing host, port, @@ -20,16 +20,16 @@ */ public class UserProvidedClientProvider implements ClientProvider { - final private Client client; + final private ElasticsearchClient client; transient boolean closed; - public UserProvidedClientProvider(Client client) { + public UserProvidedClientProvider(ElasticsearchClient client) { this.client = client; } @Override - public Client getClient() { + public ElasticsearchClient getClient() { return client; }