Skip to content

Commit

Permalink
Upgraded ElasticSearch to get rid of CVEs (and switched client to Ope…
Browse files Browse the repository at this point in the history
…nSearch one) (apache#13867)

* Upgraded ElasticSearch to get rid of CVEs. (apache#13747)

* Upgraded ElasticSearch to get rid of CVEs.

CVE-2020-7020
CVE-2020-7021
CVE-2021-22132
CVE-2021-22134
CVE-2021-22144
CVE-2021-22147

* Elastic search client version >= 7.11 no longer works with OSS Elastic images (and elastic.co no longer releases OSS images)

* Fixed tests for Elasticsearch

* pom cleanup

* Switched to OpenSearch client for Elastic (Apache 2 licensed)

(cherry picked from commit bef3071)
(cherry picked from commit 6deb24c)
  • Loading branch information
dlg99 authored and nicoloboschi committed Feb 28, 2022
1 parent 5939f89 commit 79f99a0
Show file tree
Hide file tree
Showing 12 changed files with 290 additions and 143 deletions.
10 changes: 6 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ flexible messaging model and an intuitive client API.</description>
<mariadb-jdbc.version>2.6.0</mariadb-jdbc.version>
<hdfs-offload-version3>3.3.1</hdfs-offload-version3>
<json-smart.version>2.4.7</json-smart.version>
<elasticsearch.version>7.9.1</elasticsearch.version>
<opensearch.version>1.2.4</opensearch.version>
<presto.version>334</presto.version>
<scala.binary.version>2.13</scala.binary.version>
<scala-library.version>2.13.6</scala-library.version>
Expand Down Expand Up @@ -1119,9 +1119,9 @@ flexible messaging model and an intuitive client API.</description>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-rest-high-level-client</artifactId>
<version>${opensearch.version}</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -1328,6 +1328,7 @@ flexible messaging model and an intuitive client API.</description>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<encoding>UTF-8</encoding>
Expand Down Expand Up @@ -1833,6 +1834,7 @@ flexible messaging model and an intuitive client API.</description>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<!-- for some reason, setting maven.compiler.release property alone doesn't work -->
Expand Down
28 changes: 23 additions & 5 deletions pulsar-io/elastic-search/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
<artifactId>pulsar-io-elastic-search</artifactId>
<name>Pulsar IO :: ElasticSearch</name>

<properties>
<!--
Work-around for "Container exited with code 137" (OOM)
-->
<testReuseFork>false</testReuseFork>
<testForkCount>1</testForkCount>
</properties>

<dependencies>

<dependency>
Expand Down Expand Up @@ -68,15 +76,25 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-rest-high-level-client</artifactId>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>toxiproxy</artifactId>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,31 +46,38 @@
import org.apache.http.ssl.SSLContexts;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkProcessor;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Node;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.Requests;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
Expand All @@ -89,7 +96,7 @@
import java.util.concurrent.atomic.AtomicReference;

@Slf4j
public class ElasticSearchClient {
public class ElasticSearchClient implements AutoCloseable {

static final String[] malformedErrors = {
"mapper_parsing_exception",
Expand Down Expand Up @@ -344,6 +351,7 @@ public void flush() {
bulkProcessor.flush();
}

@Override
public void close() {
try {
if (bulkProcessor != null) {
Expand Down Expand Up @@ -458,7 +466,7 @@ protected long totalHits(String indexName) throws IOException {
}

@VisibleForTesting
protected org.elasticsearch.action.search.SearchResponse search(String indexName) throws IOException {
protected SearchResponse search(String indexName) throws IOException {
client.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
return client.search(
new SearchRequest()
Expand All @@ -467,6 +475,11 @@ protected org.elasticsearch.action.search.SearchResponse search(String indexName
RequestOptions.DEFAULT);
}

@VisibleForTesting
protected AcknowledgedResponse delete(String indexName) throws IOException {
return client.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
}

private <T> T retry(Callable<T> callable, String source) {
try {
return backoffRetry.retry(callable, config.getMaxRetries(), config.getRetryBackoffInMs(), source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
*/
package org.apache.pulsar.io.elasticsearch;

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.common.unit.TimeValue;

import java.util.Iterator;
import java.util.NoSuchElementException;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.common.unit.TimeValue;

public class RandomExponentialBackoffPolicy extends BackoffPolicy {
private final RandomExponentialRetry randomExponentialRetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public class ElasticSearchClientSslTests {

public static final String ELASTICSEARCH_IMAGE = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.10.2-amd64");
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");

final static String INDEX = "myindex";

Expand All @@ -45,9 +45,8 @@ public class ElasticSearchClientSslTests {
@Test
public void testSslBasic() throws IOException {
try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
.withCreateContainerCmdModifier(c -> c.withName("elasticsearch"))
.withFileSystemBind(sslResourceDir, configDir + "/ssl")
.withEnv("ELASTIC_PASSWORD","elastic") // boostrap password
.withPassword("elastic")
.withEnv("xpack.license.self_generated.type", "trial")
.withEnv("xpack.security.enabled", "true")
.withEnv("xpack.security.http.ssl.enabled", "true")
Expand Down Expand Up @@ -81,9 +80,8 @@ public void testSslBasic() throws IOException {
@Test
public void testSslWithHostnameVerification() throws IOException {
try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
.withCreateContainerCmdModifier(c -> c.withName("elasticsearch"))
.withFileSystemBind(sslResourceDir, configDir + "/ssl")
.withEnv("ELASTIC_PASSWORD","elastic") // boostrap password
.withPassword("elastic")
.withEnv("xpack.license.self_generated.type", "trial")
.withEnv("xpack.security.enabled", "true")
.withEnv("xpack.security.http.ssl.enabled", "true")
Expand Down Expand Up @@ -120,9 +118,8 @@ public void testSslWithHostnameVerification() throws IOException {
@Test
public void testSslWithClientAuth() throws IOException {
try(ElasticsearchContainer container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
.withCreateContainerCmdModifier(c -> c.withName("elasticsearch"))
.withFileSystemBind(sslResourceDir, configDir + "/ssl")
.withEnv("ELASTIC_PASSWORD","elastic") // boostrap password
.withPassword("elastic")
.withEnv("xpack.license.self_generated.type", "trial")
.withEnv("xpack.security.enabled", "true")
.withEnv("xpack.security.http.ssl.enabled", "true")
Expand Down
Loading

0 comments on commit 79f99a0

Please sign in to comment.