Skip to content

Commit

Permalink
multi clusters setup for integration test
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Apr 18, 2023
1 parent 4f4d47d commit 93d441c
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 6 deletions.
24 changes: 24 additions & 0 deletions integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.opensearch.gradle.test.RestIntegTestTask
import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask

import java.util.concurrent.Callable
import java.util.stream.Collectors

plugins {
id "de.undercouch.download" version "5.3.0"
Expand Down Expand Up @@ -126,6 +127,12 @@ testClusters.integTest {
plugin ":opensearch-sql-plugin"
}

testClusters {
remoteCluster {
plugin ":opensearch-sql-plugin"
}
}

task startPrometheus(type: SpawnProcessTask) {
mustRunAfter ':doctest:doctest'

Expand Down Expand Up @@ -160,6 +167,23 @@ stopPrometheus.mustRunAfter startPrometheus

// Run PPL ITs and new, legacy and comparison SQL ITs with new SQL engine enabled
integTest {
useCluster testClusters.remoteCluster

// Set properties for connection to clusters and between clusters
doFirst {
getClusters().forEach { cluster ->
String allTransportSocketURI = cluster.nodes.stream().flatMap { node ->
node.getAllTransportPortURI().stream()
}.collect(Collectors.joining(","))
String allHttpSocketURI = cluster.nodes.stream().flatMap { node ->
node.getAllHttpSocketURI().stream()
}.collect(Collectors.joining(","))

systemProperty "tests.rest.${cluster.name}.http_hosts", "${-> allHttpSocketURI}"
systemProperty "tests.rest.${cluster.name}.transport_hosts", "${-> allTransportSocketURI}"
}
}

dependsOn ':opensearch-sql-plugin:bundlePlugin'
if(getOSFamilyType() != "windows") {
dependsOn startPrometheus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.test.rest.OpenSearchRestTestCase;
import org.opensearch.sql.multicluster.OpenSearchMultiClustersRestTestCase;

/**
* OpenSearch SQL integration test base class to support both security disabled and enabled OpenSearch cluster.
*/
public abstract class OpenSearchSQLRestTestCase extends OpenSearchRestTestCase {
public abstract class OpenSearchSQLRestTestCase extends OpenSearchMultiClustersRestTestCase {

private static final Logger LOG = LogManager.getLogger();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.client.Request;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.Response;
import org.opensearch.client.RestClient;
import org.opensearch.sql.common.setting.Settings;

import javax.management.MBeanServerInvocationHandler;
Expand Down Expand Up @@ -188,17 +189,21 @@ protected void init() throws Exception {
* Make it thread-safe in case tests are running in parallel but does not guarantee
* if test like DeleteIT that mutates cluster running in parallel.
*/
protected synchronized void loadIndex(Index index) throws IOException {
protected synchronized void loadIndex(Index index, RestClient client) throws IOException {
String indexName = index.getName();
String mapping = index.getMapping();
String dataSet = index.getDataSet();

if (!isIndexExist(client(), indexName)) {
createIndexByRestClient(client(), indexName, mapping);
loadDataByRestClient(client(), indexName, dataSet);
if (!isIndexExist(client, indexName)) {
createIndexByRestClient(client, indexName, mapping);
loadDataByRestClient(client, indexName, dataSet);
}
}

protected synchronized void loadIndex(Index index) throws IOException {
loadIndex(index, client());
}

protected Request getSqlRequest(String request, boolean explain) {
return getSqlRequest(request, explain, "json");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/


package org.opensearch.sql.multicluster;

import org.apache.hc.core5.http.HttpHost;
import org.opensearch.client.Request;
import org.opensearch.client.RestClient;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.test.rest.OpenSearchRestTestCase;
import org.junit.AfterClass;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static java.util.Collections.unmodifiableList;

/**
* Superclass for tests that interact with multiple external test clusters using OpenSearch's {@link RestClient}.
*/
public abstract class OpenSearchMultiClustersRestTestCase extends OpenSearchRestTestCase {

public static final String REMOTE_CLUSTER = "remoteCluster";

private static RestClient remoteClient;
/**
* A client for the running remote OpenSearch cluster configured to take test administrative actions
* like remove all indexes after the test completes
*/
private static RestClient remoteAdminClient;

// modified from initClient in OpenSearchRestTestCase
public void initRemoteClient() throws IOException {
if (remoteClient == null) {
assert remoteAdminClient == null;
String cluster = getTestRestCluster(REMOTE_CLUSTER);
String[] stringUrls = cluster.split(",");
List<HttpHost> hosts = new ArrayList<>(stringUrls.length);
for (String stringUrl : stringUrls) {
int portSeparator = stringUrl.lastIndexOf(':');
if (portSeparator < 0) {
throw new IllegalArgumentException("Illegal cluster url [" + stringUrl + "]");
}
String host = stringUrl.substring(0, portSeparator);
int port = Integer.valueOf(stringUrl.substring(portSeparator + 1));
hosts.add(buildHttpHost(host, port));
}
final List<HttpHost> clusterHosts = unmodifiableList(hosts);
logger.info("initializing REST clients against {}", clusterHosts);
remoteClient = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
remoteAdminClient = buildClient(restAdminSettings(), clusterHosts.toArray(new HttpHost[0]));
}
assert remoteClient != null;
assert remoteAdminClient != null;
}

protected String getTestRestCluster(String clusterName) {
String cluster = System.getProperty("tests.rest." + clusterName + ".http_hosts");
if (cluster == null) {
throw new RuntimeException(
"Must specify [tests.rest."
+ clusterName
+ ".http_hosts] system property with a comma delimited list of [host:port] "
+ "to which to send REST requests"
);
}
return cluster;
}

protected String getTestTransportCluster(String clusterName) {
String cluster = System.getProperty("tests.rest." + clusterName + ".transport_hosts");
if (cluster == null) {
throw new RuntimeException(
"Must specify [tests.rest."
+ clusterName
+ ".transport_hosts] system property with a comma delimited list of [host:port] "
+ "for connections between clusters"
);
}
return cluster;
}

public void configureMultiClusters() throws IOException {
initRemoteClient();

Request connectionRequest = new Request("PUT", "_cluster/settings");
String connectionSetting = "{\"persistent\": {\"cluster\": {\"remote\": {\""
+ REMOTE_CLUSTER
+ "\": {\"seeds\": [\""
+ getTestTransportCluster(REMOTE_CLUSTER).split(",")[0]
+ "\"]}}}}}";
connectionRequest.setJsonEntity(connectionSetting);
logger.info("Creating connection from coordinating cluster to {}", REMOTE_CLUSTER);
adminClient().performRequest(connectionRequest);
}

@AfterClass
public static void closeRemoteClients() throws IOException {
try {
IOUtils.close(remoteClient, remoteAdminClient);
} finally {
remoteClient = null;
remoteAdminClient = null;
}
}

/**
* Get the client to remote cluster used for ordinary api calls while writing a test
*/
protected static RestClient remoteClient() {
return remoteClient;
}

/**
* Get the client to remote cluster used for test administrative actions.
* Do not use this while writing a test. Only use it for cleaning up after tests.
*/
protected static RestClient remoteAdminClient() {
return remoteAdminClient;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/


package org.opensearch.sql.ppl;

import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DOG;
import static org.opensearch.sql.util.MatcherUtils.columnName;
import static org.opensearch.sql.util.MatcherUtils.rows;
import static org.opensearch.sql.util.MatcherUtils.verifyColumn;
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;

import java.io.IOException;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;
import org.opensearch.client.Request;
import org.opensearch.client.ResponseException;

public class CrossClusterSearchIT extends PPLIntegTestCase {

private final static String TEST_INDEX_BANK_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_BANK;
private final static String TEST_INDEX_DOG_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_DOG;

@Override
public void init() throws IOException {
configureMultiClusters();
loadIndex(Index.BANK);
loadIndex(Index.BANK, remoteClient());
loadIndex(Index.DOG);
loadIndex(Index.DOG, remoteClient());
}

@Test
public void testCrossClusterSearchAllFields() throws IOException {
JSONObject result = executeQuery(String.format("search source=%s", TEST_INDEX_DOG_REMOTE));
verifyColumn(result, columnName("dog_name"), columnName("holdersName"), columnName("age"));
}

@Test
public void testCrossClusterSearchCommandWithLogicalExpression() throws IOException {
JSONObject result =
executeQuery(
String.format(
"search source=%s firstname='Hattie' | fields firstname", TEST_INDEX_BANK_REMOTE));
verifyDataRows(result, rows("Hattie"));
}
}

0 comments on commit 93d441c

Please sign in to comment.