diff --git a/docs/user/ppl/admin/cross_cluster_search.rst b/docs/user/ppl/admin/cross_cluster_search.rst new file mode 100644 index 0000000000..5a0370ebe0 --- /dev/null +++ b/docs/user/ppl/admin/cross_cluster_search.rst @@ -0,0 +1,87 @@ +.. highlight:: sh + +==================== +Cross-Cluster Search +==================== + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 1 + +Introduction +============ +Cross-cluster search lets any node in a cluster execute search requests against other clusters. +It makes searching easy across all connected clusters, allowing users to use multiple smaller clusters instead of a single large one. + + +Configuration +============= +On the local cluster, add the remote cluster name and the IP address with port 9300 for each seed node. :: + + PUT _cluster/settings + { + "persistent": { + "cluster.remote": { + "": { + "seeds": [":9300"] + } + } + } + } + + +Using Cross-Cluster Search in PPL +================================= +Perform cross-cluster search by using ":" as the index identifier. + +Example search command :: + + >> search source = my_remote_cluster:my_index + + +Limitation +========== +Since OpenSearch does not support cross cluster index metadata retrieval, field mapping of a remote cluster index is not available to the local cluster. +(`[Feature] Cross cluster field mappings query #6573 `_) +Therefore, the query engine requires that for any remote cluster index that the users need to search, +the local cluster keep a field mapping system index with the same index name. +This can be done by creating an index on the local cluster with the same name and schema as the remote cluster index. + + +Authentication and Permission +============================= + +1. The security plugin authenticates the user on the local cluster. +2. The security plugin fetches the user’s backend roles on the local cluster. +3. The call, including the authenticated user, is forwarded to the remote cluster. +4. The user’s permissions are evaluated on the remote cluster. + +Check `Cross-cluster search access control `_ for more details. + +Example: Create the ppl_role for test_user on local cluster and the ccs_role for test_user on remote cluster. Then test_user could use PPL to query ``ppl-security-demo`` index on remote cluster. + +1. On the local cluster, refer to `Security Settings `_ to create role and user for PPL plugin and index access permission. + +2. On the remote cluster, create a new role and grant permission to access index. Create a user with the same name and credentials as the local cluster, and map the user to this role:: + + PUT _plugins/_security/api/roles/ccs_role + { + "index_permissions":[ + { + "index_patterns":["ppl-security-demo"], + "allowed_actions":[ + "indices:admin/shards/search_shards", + "indices:data/read/search" + ] + } + ] + } + + PUT _plugins/_security/api/rolesmapping/ccs_role + { + "backend_roles" : [], + "hosts" : [], + "users" : ["test_user"] + } diff --git a/docs/user/ppl/admin/security.rst b/docs/user/ppl/admin/security.rst index 529704574b..e512cc259c 100644 --- a/docs/user/ppl/admin/security.rst +++ b/docs/user/ppl/admin/security.rst @@ -13,7 +13,7 @@ Security Settings Introduction ============ -User needs ``cluster:admin/opensearch/ppl`` permission to use PPL plugin. User also needs indices level permission ``indices:admin/mappings/get`` to get field mappings and ``indices:data/read/search*`` to search index. +User needs ``cluster:admin/opensearch/ppl`` permission to use PPL plugin. User also needs indices level permission ``indices:admin/mappings/get`` to get field mappings, ``indices:monitor/settings/get`` to get cluster settings, and ``indices:data/read/search*`` to search index. Using Rest API ============== @@ -34,7 +34,8 @@ Example: Create the ppl_role for test_user. then test_user could use PPL to quer ], "allowed_actions": [ "indices:data/read/search*", - "indices:admin/mappings/get" + "indices:admin/mappings/get", + "indices:monitor/settings/get" ] }] } diff --git a/docs/user/ppl/cmd/search.rst b/docs/user/ppl/cmd/search.rst index 120ce34d40..5299f9f78a 100644 --- a/docs/user/ppl/cmd/search.rst +++ b/docs/user/ppl/cmd/search.rst @@ -16,13 +16,18 @@ Description Syntax ============ -search source= [boolean-expression] +search source=[:] [boolean-expression] * search: search keywords, which could be ignore. -* index: mandatory. search command must specify which index to query from. +* index: mandatory. search command must specify which index to query from. The index name can be prefixed by ":" for cross-cluster search. * bool-expression: optional. any expression which could be evaluated to boolean value. +Cross-Cluster Search +==================== +Cross-cluster search lets any node in a cluster execute search requests against other clusters. Refer to `Cross-Cluster Search `_ for configuration. + + Example 1: Fetch all the data ============================= diff --git a/docs/user/ppl/general/identifiers.rst b/docs/user/ppl/general/identifiers.rst index b15f621af8..51fc36c40f 100644 --- a/docs/user/ppl/general/identifiers.rst +++ b/docs/user/ppl/general/identifiers.rst @@ -83,6 +83,25 @@ Here are examples for quoting an index name by back ticks:: +------------------+ +Cross-Cluster Index Identifiers +=================== + +Description +----------- + +A cross-cluster index identifier is an index identifier with a prefix ``:``. The cluster identifier could contain star ``*``. This is mostly an cluster pattern for wildcard match. + +Use Cases +--------- + +It is used to identify an index on a remote cluster for cross-cluster search. + +Examples +-------- + +For example, if you setup a connection between the local cluster and a remote cluster ``my_cluster``, then you can run ``source=my_cluster:accounts`` to query the ``accounts`` index at ``my_cluster``. + + Case Sensitivity ================ diff --git a/docs/user/ppl/index.rst b/docs/user/ppl/index.rst index a69136bb19..3fc094bddf 100644 --- a/docs/user/ppl/index.rst +++ b/docs/user/ppl/index.rst @@ -38,6 +38,8 @@ The query start with search command and then flowing a set of command delimited - `Prometheus Connector `_ + - `Cross-Cluster Search `_ + * **Commands** - `Syntax `_ diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 0a30e057ad..bbda0ac255 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -26,6 +26,7 @@ import org.opensearch.gradle.test.RestIntegTestTask import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask import java.util.concurrent.Callable +import java.util.stream.Collectors plugins { id "de.undercouch.download" version "5.3.0" @@ -131,6 +132,12 @@ testClusters.integTest { plugin ":opensearch-sql-plugin" } +testClusters { + remoteCluster { + plugin ":opensearch-sql-plugin" + } +} + task startPrometheus(type: SpawnProcessTask) { mustRunAfter ':doctest:doctest' @@ -209,9 +216,27 @@ task integJdbcTest(type: RestIntegTestTask) { // Run PPL ITs and new, legacy and comparison SQL ITs with new SQL engine enabled integTest { + useCluster testClusters.remoteCluster + + // Set properties for connection to clusters and between clusters + doFirst { + getClusters().forEach { cluster -> + String allTransportSocketURI = cluster.nodes.stream().flatMap { node -> + node.getAllTransportPortURI().stream() + }.collect(Collectors.joining(",")) + String allHttpSocketURI = cluster.nodes.stream().flatMap { node -> + node.getAllHttpSocketURI().stream() + }.collect(Collectors.joining(",")) + + systemProperty "tests.rest.${cluster.name}.http_hosts", "${-> allHttpSocketURI}" + systemProperty "tests.rest.${cluster.name}.transport_hosts", "${-> allTransportSocketURI}" + } + } + testLogging { events "passed", "skipped", "failed" } + dependsOn ':opensearch-sql-plugin:bundlePlugin' if(getOSFamilyType() != "windows") { dependsOn startPrometheus diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/OpenSearchSQLRestTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/OpenSearchSQLRestTestCase.java index dc18e8510d..e057c58969 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/OpenSearchSQLRestTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/OpenSearchSQLRestTestCase.java @@ -7,6 +7,8 @@ package org.opensearch.sql.legacy; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.hc.client5.http.auth.AuthScope; @@ -27,6 +29,7 @@ import org.apache.logging.log4j.Logger; import org.json.JSONArray; import org.json.JSONObject; +import org.junit.AfterClass; import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.RestClient; @@ -34,14 +37,27 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.util.io.IOUtils; import org.opensearch.test.rest.OpenSearchRestTestCase; +import static java.util.Collections.unmodifiableList; + /** * OpenSearch SQL integration test base class to support both security disabled and enabled OpenSearch cluster. + * Allows interaction with multiple external test clusters using OpenSearch's {@link RestClient}. */ public abstract class OpenSearchSQLRestTestCase extends OpenSearchRestTestCase { private static final Logger LOG = LogManager.getLogger(); + public static final String REMOTE_CLUSTER = "remoteCluster"; + public static final String MATCH_ALL_REMOTE_CLUSTER = "*"; + + private static RestClient remoteClient; + /** + * A client for the running remote OpenSearch cluster configured to take test administrative actions + * like remove all indexes after the test completes + */ + private static RestClient remoteAdminClient; protected boolean isHttps() { boolean isHttps = Optional.ofNullable(System.getProperty("https")) @@ -61,6 +77,21 @@ protected String getProtocol() { return isHttps() ? "https" : "http"; } + /** + * Get the client to remote cluster used for ordinary api calls while writing a test. + */ + protected static RestClient remoteClient() { + return remoteClient; + } + + /** + * Get the client to remote cluster used for test administrative actions. + * Do not use this while writing a test. Only use it for cleaning up after tests. + */ + protected static RestClient remoteAdminClient() { + return remoteAdminClient; + } + protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException { RestClientBuilder builder = RestClient.builder(hosts); if (isHttps()) { @@ -73,11 +104,84 @@ protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOE return builder.build(); } + // Modified from initClient in OpenSearchRestTestCase + public void initRemoteClient() throws IOException { + if (remoteClient == null) { + assert remoteAdminClient == null; + String cluster = getTestRestCluster(REMOTE_CLUSTER); + String[] stringUrls = cluster.split(","); + List hosts = new ArrayList<>(stringUrls.length); + for (String stringUrl : stringUrls) { + int portSeparator = stringUrl.lastIndexOf(':'); + if (portSeparator < 0) { + throw new IllegalArgumentException("Illegal cluster url [" + stringUrl + "]"); + } + String host = stringUrl.substring(0, portSeparator); + int port = Integer.valueOf(stringUrl.substring(portSeparator + 1)); + hosts.add(buildHttpHost(host, port)); + } + final List clusterHosts = unmodifiableList(hosts); + remoteClient = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0])); + remoteAdminClient = buildClient(restAdminSettings(), clusterHosts.toArray(new HttpHost[0])); + } + assert remoteClient != null; + assert remoteAdminClient != null; + } + + /** + * Get a comma delimited list of [host:port] to which to send REST requests. + */ + protected String getTestRestCluster(String clusterName) { + String cluster = System.getProperty("tests.rest." + clusterName + ".http_hosts"); + if (cluster == null) { + throw new RuntimeException( + "Must specify [tests.rest." + + clusterName + + ".http_hosts] system property with a comma delimited list of [host:port] " + + "to which to send REST requests" + ); + } + return cluster; + } + + /** + * Get a comma delimited list of [host:port] for connections between clusters. + */ + protected String getTestTransportCluster(String clusterName) { + String cluster = System.getProperty("tests.rest." + clusterName + ".transport_hosts"); + if (cluster == null) { + throw new RuntimeException( + "Must specify [tests.rest." + + clusterName + + ".transport_hosts] system property with a comma delimited list of [host:port] " + + "for connections between clusters" + ); + } + return cluster; + } + + @AfterClass + public static void closeRemoteClients() throws IOException { + try { + IOUtils.close(remoteClient, remoteAdminClient); + } finally { + remoteClient = null; + remoteAdminClient = null; + } + } + protected static void wipeAllOpenSearchIndices() throws IOException { + wipeAllOpenSearchIndices(client()); + if (remoteClient() != null) { + wipeAllOpenSearchIndices(remoteClient()); + } + } + + protected static void wipeAllOpenSearchIndices(RestClient client) throws IOException { // include all the indices, included hidden indices. // https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html#cat-indices-api-query-params try { - Response response = client().performRequest(new Request("GET", "/_cat/indices?format=json&expand_wildcards=all")); + Response response = client.performRequest(new Request("GET", "/_cat/indices?format=json&expand_wildcards=all")); JSONArray jsonArray = new JSONArray(EntityUtils.toString(response.getEntity(), "UTF-8")); for (Object object : jsonArray) { JSONObject jsonObject = (JSONObject) object; @@ -85,7 +189,7 @@ protected static void wipeAllOpenSearchIndices() throws IOException { try { // System index, mostly named .opensearch-xxx or .opendistro-xxx, are not allowed to delete if (!indexName.startsWith(".opensearch") && !indexName.startsWith(".opendistro")) { - client().performRequest(new Request("DELETE", "/" + indexName)); + client.performRequest(new Request("DELETE", "/" + indexName)); } } catch (Exception e) { // TODO: Ignore index delete error for now. Remove this if strict check on system index added above. @@ -143,4 +247,21 @@ protected static void configureHttpsClient(RestClientBuilder builder, Settings s builder.setPathPrefix(settings.get(CLIENT_PATH_PREFIX)); } } + + /** + * Initialize rest client to remote cluster, + * and create a connection to it from the coordinating cluster. + */ + public void configureMultiClusters() throws IOException { + initRemoteClient(); + + Request connectionRequest = new Request("PUT", "_cluster/settings"); + String connectionSetting = "{\"persistent\": {\"cluster\": {\"remote\": {\"" + + REMOTE_CLUSTER + + "\": {\"seeds\": [\"" + + getTestTransportCluster(REMOTE_CLUSTER).split(",")[0] + + "\"]}}}}}"; + connectionRequest.setJsonEntity(connectionSetting); + adminClient().performRequest(connectionRequest); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 35ae5d3675..f6e4b23708 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -17,6 +17,7 @@ import org.opensearch.client.Request; import org.opensearch.client.RequestOptions; import org.opensearch.client.Response; +import org.opensearch.client.RestClient; import org.opensearch.sql.common.setting.Settings; import javax.management.MBeanServerInvocationHandler; @@ -167,6 +168,10 @@ protected void resetQuerySizeLimit() throws IOException { protected static void wipeAllClusterSettings() throws IOException { updateClusterSettings(new ClusterSetting("persistent", "*", null)); updateClusterSettings(new ClusterSetting("transient", "*", null)); + if (remoteClient() != null) { + updateClusterSettings(new ClusterSetting("persistent", "*", null), remoteClient()); + updateClusterSettings(new ClusterSetting("transient", "*", null), remoteClient()); + } } protected void setMaxResultWindow(String indexName, Integer window) throws IOException { @@ -188,17 +193,21 @@ protected void init() throws Exception { * Make it thread-safe in case tests are running in parallel but does not guarantee * if test like DeleteIT that mutates cluster running in parallel. */ - protected synchronized void loadIndex(Index index) throws IOException { + protected synchronized void loadIndex(Index index, RestClient client) throws IOException { String indexName = index.getName(); String mapping = index.getMapping(); String dataSet = index.getDataSet(); - if (!isIndexExist(client(), indexName)) { - createIndexByRestClient(client(), indexName, mapping); - loadDataByRestClient(client(), indexName, dataSet); + if (!isIndexExist(client, indexName)) { + createIndexByRestClient(client, indexName, mapping); + loadDataByRestClient(client, indexName, dataSet); } } + protected synchronized void loadIndex(Index index) throws IOException { + loadIndex(index, client()); + } + protected Request getSqlRequest(String request, boolean explain) { return getSqlRequest(request, explain, "json"); } @@ -325,12 +334,16 @@ private String executeRequest(final String requestBody, final boolean isExplainQ return executeRequest(sqlRequest); } - protected static String executeRequest(final Request request) throws IOException { - Response response = client().performRequest(request); + protected static String executeRequest(final Request request, RestClient client) throws IOException { + Response response = client.performRequest(request); Assert.assertEquals(200, response.getStatusLine().getStatusCode()); return getResponseBody(response); } + protected static String executeRequest(final Request request) throws IOException { + return executeRequest(request, client()); + } + protected JSONObject executeQueryWithGetRequest(final String sqlQuery) throws IOException { final Request request = buildGetEndpointRequest(sqlQuery); @@ -350,7 +363,7 @@ protected JSONObject executeCursorCloseQuery(final String cursor) throws IOExcep return new JSONObject(executeRequest(sqlRequest)); } - protected static JSONObject updateClusterSettings(ClusterSetting setting) throws IOException { + protected static JSONObject updateClusterSettings(ClusterSetting setting, RestClient client) throws IOException { Request request = new Request("PUT", "/_cluster/settings"); String persistentSetting = String.format(Locale.ROOT, "{\"%s\": {\"%s\": %s}}", setting.type, setting.name, setting.value); @@ -358,7 +371,11 @@ protected static JSONObject updateClusterSettings(ClusterSetting setting) throws RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); restOptionsBuilder.addHeader("Content-Type", "application/json"); request.setOptions(restOptionsBuilder); - return new JSONObject(executeRequest(request)); + return new JSONObject(executeRequest(request, client)); + } + + protected static JSONObject updateClusterSettings(ClusterSetting setting) throws IOException { + return updateClusterSettings(setting, client()); } protected static JSONObject getAllClusterSettings() throws IOException { diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/CrossClusterSearchIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/CrossClusterSearchIT.java new file mode 100644 index 0000000000..a8e686a893 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/CrossClusterSearchIT.java @@ -0,0 +1,144 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.sql.ppl; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DOG; +import static org.opensearch.sql.util.MatcherUtils.columnName; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.verifyColumn; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.Rule; +import org.junit.jupiter.api.Test; +import org.junit.rules.ExpectedException; +import org.opensearch.client.ResponseException; + +public class CrossClusterSearchIT extends PPLIntegTestCase { + + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + private final static String TEST_INDEX_BANK_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_BANK; + private final static String TEST_INDEX_DOG_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_DOG; + private final static String TEST_INDEX_DOG_MATCH_ALL_REMOTE = MATCH_ALL_REMOTE_CLUSTER + ":" + TEST_INDEX_DOG; + private final static String TEST_INDEX_ACCOUNT_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_ACCOUNT; + + @Override + public void init() throws IOException { + configureMultiClusters(); + loadIndex(Index.BANK); + loadIndex(Index.BANK, remoteClient()); + loadIndex(Index.DOG); + loadIndex(Index.DOG, remoteClient()); + loadIndex(Index.ACCOUNT, remoteClient()); + } + + @Test + public void testCrossClusterSearchAllFields() throws IOException { + JSONObject result = executeQuery(String.format("search source=%s", TEST_INDEX_DOG_REMOTE)); + verifyColumn(result, columnName("dog_name"), columnName("holdersName"), columnName("age")); + } + + @Test + public void testMatchAllCrossClusterSearchAllFields() throws IOException { + JSONObject result = executeQuery(String.format("search source=%s", TEST_INDEX_DOG_MATCH_ALL_REMOTE)); + verifyColumn(result, columnName("dog_name"), columnName("holdersName"), columnName("age")); + } + + @Test + public void testCrossClusterSearchWithoutLocalFieldMappingShouldFail() throws IOException { + exceptionRule.expect(ResponseException.class); + exceptionRule.expectMessage("400 Bad Request"); + exceptionRule.expectMessage("IndexNotFoundException"); + + executeQuery(String.format("search source=%s", TEST_INDEX_ACCOUNT_REMOTE)); + } + + @Test + public void testCrossClusterSearchCommandWithLogicalExpression() throws IOException { + JSONObject result = executeQuery(String.format( + "search source=%s firstname='Hattie' | fields firstname", TEST_INDEX_BANK_REMOTE)); + verifyDataRows(result, rows("Hattie")); + } + + @Test + public void testCrossClusterSearchMultiClusters() throws IOException { + JSONObject result = executeQuery(String.format( + "search source=%s,%s firstname='Hattie' | fields firstname", TEST_INDEX_BANK_REMOTE, TEST_INDEX_BANK)); + verifyDataRows(result, + rows("Hattie"), + rows("Hattie")); + } + + @Test + public void testCrossClusterDescribeAllFields() throws IOException { + JSONObject result = executeQuery(String.format("describe %s", TEST_INDEX_DOG_REMOTE)); + verifyColumn( + result, + columnName("TABLE_CAT"), + columnName("TABLE_SCHEM"), + columnName("TABLE_NAME"), + columnName("COLUMN_NAME"), + columnName("DATA_TYPE"), + columnName("TYPE_NAME"), + columnName("COLUMN_SIZE"), + columnName("BUFFER_LENGTH"), + columnName("DECIMAL_DIGITS"), + columnName("NUM_PREC_RADIX"), + columnName("NULLABLE"), + columnName("REMARKS"), + columnName("COLUMN_DEF"), + columnName("SQL_DATA_TYPE"), + columnName("SQL_DATETIME_SUB"), + columnName("CHAR_OCTET_LENGTH"), + columnName("ORDINAL_POSITION"), + columnName("IS_NULLABLE"), + columnName("SCOPE_CATALOG"), + columnName("SCOPE_SCHEMA"), + columnName("SCOPE_TABLE"), + columnName("SOURCE_DATA_TYPE"), + columnName("IS_AUTOINCREMENT"), + columnName("IS_GENERATEDCOLUMN") + ); + } + + @Test + public void testMatchAllCrossClusterDescribeAllFields() throws IOException { + JSONObject result = executeQuery(String.format("describe %s", TEST_INDEX_DOG_MATCH_ALL_REMOTE)); + verifyColumn( + result, + columnName("TABLE_CAT"), + columnName("TABLE_SCHEM"), + columnName("TABLE_NAME"), + columnName("COLUMN_NAME"), + columnName("DATA_TYPE"), + columnName("TYPE_NAME"), + columnName("COLUMN_SIZE"), + columnName("BUFFER_LENGTH"), + columnName("DECIMAL_DIGITS"), + columnName("NUM_PREC_RADIX"), + columnName("NULLABLE"), + columnName("REMARKS"), + columnName("COLUMN_DEF"), + columnName("SQL_DATA_TYPE"), + columnName("SQL_DATETIME_SUB"), + columnName("CHAR_OCTET_LENGTH"), + columnName("ORDINAL_POSITION"), + columnName("IS_NULLABLE"), + columnName("SCOPE_CATALOG"), + columnName("SCOPE_SCHEMA"), + columnName("SCOPE_TABLE"), + columnName("SOURCE_DATA_TYPE"), + columnName("IS_AUTOINCREMENT"), + columnName("IS_GENERATEDCOLUMN") + ); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java index dacbecc7b9..9b0d6ca074 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java @@ -13,7 +13,6 @@ import java.util.function.Function; import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.ToString; import org.opensearch.action.search.SearchRequest; diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java index 22ed8c2ffe..f4fd7b98d3 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java @@ -11,6 +11,7 @@ import static org.opensearch.sql.opensearch.client.OpenSearchClient.META_CLUSTER_NAME; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -82,7 +83,8 @@ public List search() { // TODO possible collision if two indices have fields with the same name and different mappings public Map getFieldTypes() { Map fieldTypes = new HashMap<>(); - Map indexMappings = client.getIndexMappings(indexName.getIndexNames()); + Map indexMappings = + client.getIndexMappings(getLocalIndexNames(indexName.getIndexNames())); for (IndexMapping indexMapping : indexMappings.values()) { fieldTypes.putAll(indexMapping.getFieldMappings()); } @@ -95,7 +97,7 @@ public Map getFieldTypes() { * @return max result window */ public Integer getMaxResultWindow() { - return client.getIndexMaxResultWindows(indexName.getIndexNames()) + return client.getIndexMaxResultWindows(getLocalIndexNames(indexName.getIndexNames())) .values().stream().min(Integer::compare).get(); } @@ -119,6 +121,19 @@ private ExprTupleValue row(String fieldName, String fieldType, int position, Str return new ExprTupleValue(valueMap); } + /** + * Return index names without "{cluster}:" prefix. + * Without the prefix, they refer to the indices at the local cluster. + * + * @param indexNames a string array of index names + * @return local cluster index names + */ + private String[] getLocalIndexNames(String[] indexNames) { + return Arrays.stream(indexNames) + .map(name -> name.substring(name.indexOf(":") + 1)) + .toArray(String[]::new); + } + private String clusterName(Map meta) { return meta.getOrDefault(META_CLUSTER_NAME, DEFAULT_TABLE_CAT); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java index be83622578..2e1ded6322 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java @@ -67,6 +67,9 @@ public class OpenSearchQueryRequestTest { private final OpenSearchQueryRequest request = new OpenSearchQueryRequest("test", 200, factory); + private final OpenSearchQueryRequest remoteRequest = + new OpenSearchQueryRequest("ccs:test", 200, factory); + @Test void search() { OpenSearchQueryRequest request = new OpenSearchQueryRequest( @@ -152,4 +155,19 @@ void searchRequest() { .query(QueryBuilders.termQuery("name", "John"))), request.searchRequest()); } + + @Test + void searchCrossClusterRequest() { + remoteRequest.getSourceBuilder().query(QueryBuilders.termQuery("name", "John")); + + assertEquals( + new SearchRequest() + .indices("ccs:test") + .source(new SearchSourceBuilder() + .timeout(OpenSearchQueryRequest.DEFAULT_QUERY_TIMEOUT) + .from(0) + .size(200) + .query(QueryBuilders.termQuery("name", "John"))), + remoteRequest.searchRequest()); + } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequestTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequestTest.java index 111316a7ed..c19b3a3ccd 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequestTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequestTest.java @@ -49,6 +49,22 @@ void testSearch() { )); } + @Test + void testCrossClusterShouldSearchLocal() { + when(mapping.getFieldMappings()).thenReturn( + Map.of("name", OpenSearchDataType.of(OpenSearchDataType.MappingType.Keyword))); + when(client.getIndexMappings("index")).thenReturn(ImmutableMap.of("test", mapping)); + + final List results = + new OpenSearchDescribeIndexRequest(client, "ccs:index").search(); + assertEquals(1, results.size()); + assertThat(results.get(0).tupleValue(), anyOf( + hasEntry("TABLE_NAME", stringValue("index")), + hasEntry("COLUMN_NAME", stringValue("name")), + hasEntry("TYPE_NAME", stringValue("STRING")) + )); + } + @Test void testToString() { assertEquals("OpenSearchDescribeIndexRequest{indexName='index'}", diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 12c24bd531..f412f29280 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -363,12 +363,14 @@ Y: 'Y'; // LITERALS AND VALUES //STRING_LITERAL: DQUOTA_STRING | SQUOTA_STRING | BQUOTA_STRING; ID: ID_LITERAL; +CLUSTER: CLUSTER_PREFIX_LITERAL; INTEGER_LITERAL: DEC_DIGIT+; DECIMAL_LITERAL: (DEC_DIGIT+)? '.' DEC_DIGIT+; -fragment DATE_SUFFIX: ([\-.][*0-9]+)*; +fragment DATE_SUFFIX: ([\-.][*0-9]+)+; fragment ID_LITERAL: [@*A-Z]+?[*A-Z_\-0-9]*; -ID_DATE_SUFFIX: ID_LITERAL DATE_SUFFIX; +fragment CLUSTER_PREFIX_LITERAL: [*A-Z]+?[*A-Z_\-0-9]* COLON; +ID_DATE_SUFFIX: CLUSTER_PREFIX_LITERAL? ID_LITERAL DATE_SUFFIX; DQUOTA_STRING: '"' ( '\\'. | '""' | ~('"'| '\\') )* '"'; SQUOTA_STRING: '\'' ('\\'. | '\'\'' | ~('\'' | '\\'))* '\''; BQUOTA_STRING: '`' ( '\\'. | '``' | ~('`'|'\\'))* '`'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index cca99407bb..3a28210271 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -322,7 +322,7 @@ multiFieldRelevanceFunction /** tables */ tableSource - : qualifiedName + : tableQualifiedName | ID_DATE_SUFFIX ; @@ -723,6 +723,10 @@ qualifiedName : ident (DOT ident)* #identsAsQualifiedName ; +tableQualifiedName + : tableIdent (DOT ident)* #identsAsTableQualifiedName + ; + wcQualifiedName : wildcard (DOT wildcard)* #identsAsWildcardQualifiedName ; @@ -734,6 +738,10 @@ ident | keywordsCanBeId ; +tableIdent + : (CLUSTER)? ident + ; + wildcard : ident (MODULE ident)* (MODULE)? | SINGLE_QUOTE wildcard SINGLE_QUOTE diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index c9823b67f9..e56eae83a6 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -24,6 +24,7 @@ import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.EvalFunctionCallContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.FieldExpressionContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.IdentsAsQualifiedNameContext; +import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.IdentsAsTableQualifiedNameContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.IdentsAsWildcardQualifiedNameContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.InExprContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.IntegerLiteralContext; @@ -50,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.antlr.v4.runtime.ParserRuleContext; import org.antlr.v4.runtime.RuleContext; import org.opensearch.sql.ast.dsl.AstDSL; @@ -280,8 +282,8 @@ public UnresolvedExpression visitMultiFieldRelevanceFunction( @Override public UnresolvedExpression visitTableSource(TableSourceContext ctx) { - if (ctx.getChild(0) instanceof IdentsAsQualifiedNameContext) { - return visitIdentifiers(((IdentsAsQualifiedNameContext) ctx.getChild(0)).ident()); + if (ctx.getChild(0) instanceof IdentsAsTableQualifiedNameContext) { + return visitIdentsAsTableQualifiedName((IdentsAsTableQualifiedNameContext) ctx.getChild(0)); } else { return visitIdentifiers(Arrays.asList(ctx)); } @@ -304,6 +306,14 @@ public UnresolvedExpression visitIdentsAsQualifiedName(IdentsAsQualifiedNameCont return visitIdentifiers(ctx.ident()); } + @Override + public UnresolvedExpression visitIdentsAsTableQualifiedName( + IdentsAsTableQualifiedNameContext ctx) { + return visitIdentifiers( + Stream.concat(Stream.of(ctx.tableIdent()), ctx.ident().stream()) + .collect(Collectors.toList())); + } + @Override public UnresolvedExpression visitIdentsAsWildcardQualifiedName( IdentsAsWildcardQualifiedNameContext ctx) { diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/antlr/PPLSyntaxParserTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/antlr/PPLSyntaxParserTest.java index dcf961dc24..bbc566e2ba 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/antlr/PPLSyntaxParserTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/antlr/PPLSyntaxParserTest.java @@ -30,12 +30,66 @@ public void testSearchCommandIgnoreSearchKeywordShouldPass() { assertNotEquals(null, tree); } + @Test + public void testSearchCommandWithMultipleIndicesShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=t,u a=1 b=2"); + assertNotEquals(null, tree); + } + + @Test + public void testSearchCommandCrossClusterShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=c:t a=1 b=2"); + assertNotEquals(null, tree); + } + + @Test + public void testSearchCommandCrossClusterHiddenShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=c:.t a=1 b=2"); + assertNotEquals(null, tree); + } + + @Test + public void testSearchCommandCrossClusterQualifiedShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=c:t.u a=1 b=2"); + assertNotEquals(null, tree); + } + + @Test + public void testSearchCommandCrossClusterHiddenQualifiedShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=c:.t.u a=1 b=2"); + assertNotEquals(null, tree); + } + + @Test + public void testSearchCommandMatchAllCrossClusterShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=*:t a=1 b=2"); + assertNotEquals(null, tree); + } + + @Test + public void testSearchCommandCrossClusterWithMultipleIndicesShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=c:t,d:u,v a=1 b=2"); + assertNotEquals(null, tree); + } + + @Test + public void testSearchCommandCrossClusterIgnoreSearchKeywordShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("source=c:t a=1 b=2"); + assertNotEquals(null, tree); + } + @Test public void testSearchFieldsCommandShouldPass() { ParseTree tree = new PPLSyntaxParser().parse("search source=t a=1 b=2 | fields a,b"); assertNotEquals(null, tree); } + @Test + public void testSearchFieldsCommandCrossClusterShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=c:t a=1 b=2 | fields a,b"); + assertNotEquals(null, tree); + } + @Test public void testSearchCommandWithoutSourceShouldFail() { exceptionRule.expect(RuntimeException.class); @@ -177,6 +231,7 @@ public void can_parse_query_string_relevance_function() { + "analyzer=keyword, quote_field_suffix=\".exact\", fuzzy_prefix_length = 4)")); } + @Test public void testDescribeCommandShouldPass() { ParseTree tree = new PPLSyntaxParser().parse("describe t"); assertNotEquals(null, tree); @@ -188,6 +243,18 @@ public void testDescribeCommandWithMultipleIndicesShouldPass() { assertNotEquals(null, tree); } + @Test + public void testDescribeCommandCrossClusterShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("describe c:t"); + assertNotEquals(null, tree); + } + + @Test + public void testDescribeCommandMatchAllCrossClusterShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("describe *:t"); + assertNotEquals(null, tree); + } + @Test public void testDescribeFieldsCommandShouldPass() { ParseTree tree = new PPLSyntaxParser().parse("describe t | fields a,b"); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index 533254a599..8249619b37 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -77,6 +77,20 @@ public void testSearchCommand() { ); } + @Test + public void testSearchCrossClusterCommand() { + assertEqual("search source=c:t", + relation(qualifiedName("c:t")) + ); + } + + @Test + public void testSearchMatchAllCrossClusterCommand() { + assertEqual("search source=*:t", + relation(qualifiedName("*:t")) + ); + } + @Test public void testPrometheusSearchCommand() { assertEqual("search source = prometheus.http_requests_total", @@ -736,6 +750,12 @@ public void testDescribeCommand() { relation(mappingTable("t"))); } + @Test + public void testDescribeMatchAllCrossClusterSearchCommand() { + assertEqual("describe *:t", + relation(mappingTable("*:t"))); + } + @Test public void testDescribeCommandWithMultipleIndices() { assertEqual("describe t,u",