From ad3e72529cbcc35f15110709f26412ca930c20f5 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Tue, 2 May 2023 10:50:56 -0700 Subject: [PATCH] Cross cluster search in PPL (#1512) * feat: PPL parser for ccs Signed-off-by: Sean Kao * feat: disable describe remote cluster index in PPL Allowing the syntax will lead to misunderstanding in the query result, because we will do a local cluster query for index mapping, even for remote indices. This is due to the restriction that OpenSearch doesn't support remote cluster index mapping query at the moment. Signed-off-by: Sean Kao * feat: Query system index without cluster name We require system index query to happen at the local cluster. Currently, OpenSearch does not support cross cluster system index query. Thus, mapping of a remote index is unavailable. Therefore, we require the local cluster to have the system index of the remote cluster index. The full "cluster:index" name is still used to query OpenSearch for datarows, as CCS is natively supported. Signed-off-by: Sean Kao * fix: index name parsing for datasources To identify datasources in the index qualified names, they need to be parsed into parts (separated only by dots). clusterQualifiedName can't contain custom datasources, hence the distinction. Signed-off-by: Sean Kao * multi clusters setup for integration test Signed-off-by: Sean Kao * Add IT test case Signed-off-by: Sean Kao * Document ccs for ppl Signed-off-by: Sean Kao * documentation update Signed-off-by: Sean Kao * feat: allow describe remote cluster index in PPL Signed-off-by: Sean Kao * feat: allow "*:index" to match all remote clusters Signed-off-by: Sean Kao * use local index names for field mappings request Signed-off-by: Sean Kao * allow ':' in index identifier Signed-off-by: Sean Kao * docs update Signed-off-by: Sean Kao * limit cluster prefix to table names only Signed-off-by: Sean Kao * move multicluster capability to sql rest test case Signed-off-by: Sean Kao * add IT for failure case Signed-off-by: Sean Kao * remove logger info for connection in IT test case Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao (cherry picked from commit 1bda5fe73312017b3bc6029e2eaf378c2e72306c) --- docs/user/ppl/admin/cross_cluster_search.rst | 87 +++++++++++ docs/user/ppl/admin/security.rst | 5 +- docs/user/ppl/cmd/search.rst | 9 +- docs/user/ppl/general/identifiers.rst | 19 +++ docs/user/ppl/index.rst | 2 + integ-test/build.gradle | 25 +++ .../sql/legacy/OpenSearchSQLRestTestCase.java | 125 ++++++++++++++- .../sql/legacy/SQLIntegTestCase.java | 33 +++- .../sql/ppl/CrossClusterSearchIT.java | 144 ++++++++++++++++++ .../request/OpenSearchScrollRequest.java | 1 - .../OpenSearchDescribeIndexRequest.java | 19 ++- .../request/OpenSearchQueryRequestTest.java | 18 +++ .../OpenSearchDescribeIndexRequestTest.java | 16 ++ ppl/src/main/antlr/OpenSearchPPLLexer.g4 | 6 +- ppl/src/main/antlr/OpenSearchPPLParser.g4 | 10 +- .../sql/ppl/parser/AstExpressionBuilder.java | 14 +- .../sql/ppl/antlr/PPLSyntaxParserTest.java | 67 ++++++++ .../sql/ppl/parser/AstBuilderTest.java | 20 +++ 18 files changed, 598 insertions(+), 22 deletions(-) create mode 100644 docs/user/ppl/admin/cross_cluster_search.rst create mode 100644 integ-test/src/test/java/org/opensearch/sql/ppl/CrossClusterSearchIT.java diff --git a/docs/user/ppl/admin/cross_cluster_search.rst b/docs/user/ppl/admin/cross_cluster_search.rst new file mode 100644 index 0000000000..5a0370ebe0 --- /dev/null +++ b/docs/user/ppl/admin/cross_cluster_search.rst @@ -0,0 +1,87 @@ +.. highlight:: sh + +==================== +Cross-Cluster Search +==================== + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 1 + +Introduction +============ +Cross-cluster search lets any node in a cluster execute search requests against other clusters. +It makes searching easy across all connected clusters, allowing users to use multiple smaller clusters instead of a single large one. + + +Configuration +============= +On the local cluster, add the remote cluster name and the IP address with port 9300 for each seed node. :: + + PUT _cluster/settings + { + "persistent": { + "cluster.remote": { + "": { + "seeds": [":9300"] + } + } + } + } + + +Using Cross-Cluster Search in PPL +================================= +Perform cross-cluster search by using ":" as the index identifier. + +Example search command :: + + >> search source = my_remote_cluster:my_index + + +Limitation +========== +Since OpenSearch does not support cross cluster index metadata retrieval, field mapping of a remote cluster index is not available to the local cluster. +(`[Feature] Cross cluster field mappings query #6573 `_) +Therefore, the query engine requires that for any remote cluster index that the users need to search, +the local cluster keep a field mapping system index with the same index name. +This can be done by creating an index on the local cluster with the same name and schema as the remote cluster index. + + +Authentication and Permission +============================= + +1. The security plugin authenticates the user on the local cluster. +2. The security plugin fetches the user’s backend roles on the local cluster. +3. The call, including the authenticated user, is forwarded to the remote cluster. +4. The user’s permissions are evaluated on the remote cluster. + +Check `Cross-cluster search access control `_ for more details. + +Example: Create the ppl_role for test_user on local cluster and the ccs_role for test_user on remote cluster. Then test_user could use PPL to query ``ppl-security-demo`` index on remote cluster. + +1. On the local cluster, refer to `Security Settings `_ to create role and user for PPL plugin and index access permission. + +2. On the remote cluster, create a new role and grant permission to access index. Create a user with the same name and credentials as the local cluster, and map the user to this role:: + + PUT _plugins/_security/api/roles/ccs_role + { + "index_permissions":[ + { + "index_patterns":["ppl-security-demo"], + "allowed_actions":[ + "indices:admin/shards/search_shards", + "indices:data/read/search" + ] + } + ] + } + + PUT _plugins/_security/api/rolesmapping/ccs_role + { + "backend_roles" : [], + "hosts" : [], + "users" : ["test_user"] + } diff --git a/docs/user/ppl/admin/security.rst b/docs/user/ppl/admin/security.rst index 529704574b..e512cc259c 100644 --- a/docs/user/ppl/admin/security.rst +++ b/docs/user/ppl/admin/security.rst @@ -13,7 +13,7 @@ Security Settings Introduction ============ -User needs ``cluster:admin/opensearch/ppl`` permission to use PPL plugin. User also needs indices level permission ``indices:admin/mappings/get`` to get field mappings and ``indices:data/read/search*`` to search index. +User needs ``cluster:admin/opensearch/ppl`` permission to use PPL plugin. User also needs indices level permission ``indices:admin/mappings/get`` to get field mappings, ``indices:monitor/settings/get`` to get cluster settings, and ``indices:data/read/search*`` to search index. Using Rest API ============== @@ -34,7 +34,8 @@ Example: Create the ppl_role for test_user. then test_user could use PPL to quer ], "allowed_actions": [ "indices:data/read/search*", - "indices:admin/mappings/get" + "indices:admin/mappings/get", + "indices:monitor/settings/get" ] }] } diff --git a/docs/user/ppl/cmd/search.rst b/docs/user/ppl/cmd/search.rst index 120ce34d40..5299f9f78a 100644 --- a/docs/user/ppl/cmd/search.rst +++ b/docs/user/ppl/cmd/search.rst @@ -16,13 +16,18 @@ Description Syntax ============ -search source= [boolean-expression] +search source=[:] [boolean-expression] * search: search keywords, which could be ignore. -* index: mandatory. search command must specify which index to query from. +* index: mandatory. search command must specify which index to query from. The index name can be prefixed by ":" for cross-cluster search. * bool-expression: optional. any expression which could be evaluated to boolean value. +Cross-Cluster Search +==================== +Cross-cluster search lets any node in a cluster execute search requests against other clusters. Refer to `Cross-Cluster Search `_ for configuration. + + Example 1: Fetch all the data ============================= diff --git a/docs/user/ppl/general/identifiers.rst b/docs/user/ppl/general/identifiers.rst index b15f621af8..51fc36c40f 100644 --- a/docs/user/ppl/general/identifiers.rst +++ b/docs/user/ppl/general/identifiers.rst @@ -83,6 +83,25 @@ Here are examples for quoting an index name by back ticks:: +------------------+ +Cross-Cluster Index Identifiers +=================== + +Description +----------- + +A cross-cluster index identifier is an index identifier with a prefix ``:``. The cluster identifier could contain star ``*``. This is mostly an cluster pattern for wildcard match. + +Use Cases +--------- + +It is used to identify an index on a remote cluster for cross-cluster search. + +Examples +-------- + +For example, if you setup a connection between the local cluster and a remote cluster ``my_cluster``, then you can run ``source=my_cluster:accounts`` to query the ``accounts`` index at ``my_cluster``. + + Case Sensitivity ================ diff --git a/docs/user/ppl/index.rst b/docs/user/ppl/index.rst index a69136bb19..3fc094bddf 100644 --- a/docs/user/ppl/index.rst +++ b/docs/user/ppl/index.rst @@ -38,6 +38,8 @@ The query start with search command and then flowing a set of command delimited - `Prometheus Connector `_ + - `Cross-Cluster Search `_ + * **Commands** - `Syntax `_ diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 80197071db..a42d39a365 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -26,6 +26,7 @@ import org.opensearch.gradle.test.RestIntegTestTask import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask import java.util.concurrent.Callable +import java.util.stream.Collectors plugins { id "de.undercouch.download" version "5.3.0" @@ -121,6 +122,12 @@ testClusters.integTest { plugin ":opensearch-sql-plugin" } +testClusters { + remoteCluster { + plugin ":opensearch-sql-plugin" + } +} + task startPrometheus(type: SpawnProcessTask) { mustRunAfter ':doctest:doctest' @@ -199,9 +206,27 @@ task integJdbcTest(type: RestIntegTestTask) { // Run PPL ITs and new, legacy and comparison SQL ITs with new SQL engine enabled integTest { + useCluster testClusters.remoteCluster + + // Set properties for connection to clusters and between clusters + doFirst { + getClusters().forEach { cluster -> + String allTransportSocketURI = cluster.nodes.stream().flatMap { node -> + node.getAllTransportPortURI().stream() + }.collect(Collectors.joining(",")) + String allHttpSocketURI = cluster.nodes.stream().flatMap { node -> + node.getAllHttpSocketURI().stream() + }.collect(Collectors.joining(",")) + + systemProperty "tests.rest.${cluster.name}.http_hosts", "${-> allHttpSocketURI}" + systemProperty "tests.rest.${cluster.name}.transport_hosts", "${-> allTransportSocketURI}" + } + } + testLogging { events "passed", "skipped", "failed" } + dependsOn ':opensearch-sql-plugin:bundlePlugin' if(getOSFamilyType() != "windows") { dependsOn startPrometheus diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/OpenSearchSQLRestTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/OpenSearchSQLRestTestCase.java index 483f027506..8916347dcb 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/OpenSearchSQLRestTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/OpenSearchSQLRestTestCase.java @@ -7,6 +7,8 @@ package org.opensearch.sql.legacy; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.http.Header; @@ -23,6 +25,7 @@ import org.apache.logging.log4j.Logger; import org.json.JSONArray; import org.json.JSONObject; +import org.junit.AfterClass; import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.RestClient; @@ -30,14 +33,27 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.internal.io.IOUtils; import org.opensearch.test.rest.OpenSearchRestTestCase; +import static java.util.Collections.unmodifiableList; + /** * OpenSearch SQL integration test base class to support both security disabled and enabled OpenSearch cluster. + * Allows interaction with multiple external test clusters using OpenSearch's {@link RestClient}. */ public abstract class OpenSearchSQLRestTestCase extends OpenSearchRestTestCase { private static final Logger LOG = LogManager.getLogger(); + public static final String REMOTE_CLUSTER = "remoteCluster"; + public static final String MATCH_ALL_REMOTE_CLUSTER = "*"; + + private static RestClient remoteClient; + /** + * A client for the running remote OpenSearch cluster configured to take test administrative actions + * like remove all indexes after the test completes + */ + private static RestClient remoteAdminClient; protected boolean isHttps() { boolean isHttps = Optional.ofNullable(System.getProperty("https")) @@ -57,6 +73,21 @@ protected String getProtocol() { return isHttps() ? "https" : "http"; } + /** + * Get the client to remote cluster used for ordinary api calls while writing a test. + */ + protected static RestClient remoteClient() { + return remoteClient; + } + + /** + * Get the client to remote cluster used for test administrative actions. + * Do not use this while writing a test. Only use it for cleaning up after tests. + */ + protected static RestClient remoteAdminClient() { + return remoteAdminClient; + } + protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException { RestClientBuilder builder = RestClient.builder(hosts); if (isHttps()) { @@ -69,10 +100,83 @@ protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOE return builder.build(); } + // Modified from initClient in OpenSearchRestTestCase + public void initRemoteClient() throws IOException { + if (remoteClient == null) { + assert remoteAdminClient == null; + String cluster = getTestRestCluster(REMOTE_CLUSTER); + String[] stringUrls = cluster.split(","); + List hosts = new ArrayList<>(stringUrls.length); + for (String stringUrl : stringUrls) { + int portSeparator = stringUrl.lastIndexOf(':'); + if (portSeparator < 0) { + throw new IllegalArgumentException("Illegal cluster url [" + stringUrl + "]"); + } + String host = stringUrl.substring(0, portSeparator); + int port = Integer.valueOf(stringUrl.substring(portSeparator + 1)); + hosts.add(buildHttpHost(host, port)); + } + final List clusterHosts = unmodifiableList(hosts); + remoteClient = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0])); + remoteAdminClient = buildClient(restAdminSettings(), clusterHosts.toArray(new HttpHost[0])); + } + assert remoteClient != null; + assert remoteAdminClient != null; + } + + /** + * Get a comma delimited list of [host:port] to which to send REST requests. + */ + protected String getTestRestCluster(String clusterName) { + String cluster = System.getProperty("tests.rest." + clusterName + ".http_hosts"); + if (cluster == null) { + throw new RuntimeException( + "Must specify [tests.rest." + + clusterName + + ".http_hosts] system property with a comma delimited list of [host:port] " + + "to which to send REST requests" + ); + } + return cluster; + } + + /** + * Get a comma delimited list of [host:port] for connections between clusters. + */ + protected String getTestTransportCluster(String clusterName) { + String cluster = System.getProperty("tests.rest." + clusterName + ".transport_hosts"); + if (cluster == null) { + throw new RuntimeException( + "Must specify [tests.rest." + + clusterName + + ".transport_hosts] system property with a comma delimited list of [host:port] " + + "for connections between clusters" + ); + } + return cluster; + } + + @AfterClass + public static void closeRemoteClients() throws IOException { + try { + IOUtils.close(remoteClient, remoteAdminClient); + } finally { + remoteClient = null; + remoteAdminClient = null; + } + } + protected static void wipeAllOpenSearchIndices() throws IOException { + wipeAllOpenSearchIndices(client()); + if (remoteClient() != null) { + wipeAllOpenSearchIndices(remoteClient()); + } + } + + protected static void wipeAllOpenSearchIndices(RestClient client) throws IOException { // include all the indices, included hidden indices. // https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html#cat-indices-api-query-params - Response response = client().performRequest(new Request("GET", "/_cat/indices?format=json&expand_wildcards=all")); + Response response = client.performRequest(new Request("GET", "/_cat/indices?format=json&expand_wildcards=all")); JSONArray jsonArray = new JSONArray(EntityUtils.toString(response.getEntity(), "UTF-8")); for (Object object : jsonArray) { JSONObject jsonObject = (JSONObject) object; @@ -80,7 +184,7 @@ protected static void wipeAllOpenSearchIndices() throws IOException { try { // System index, mostly named .opensearch-xxx or .opendistro-xxx, are not allowed to delete if (!indexName.startsWith(".opensearch") && !indexName.startsWith(".opendistro")) { - client().performRequest(new Request("DELETE", "/" + indexName)); + client.performRequest(new Request("DELETE", "/" + indexName)); } } catch (Exception e) { // TODO: Ignore index delete error for now. Remove this if strict check on system index added above. @@ -128,4 +232,21 @@ protected static void configureHttpsClient(RestClientBuilder builder, Settings s builder.setPathPrefix(settings.get(CLIENT_PATH_PREFIX)); } } + + /** + * Initialize rest client to remote cluster, + * and create a connection to it from the coordinating cluster. + */ + public void configureMultiClusters() throws IOException { + initRemoteClient(); + + Request connectionRequest = new Request("PUT", "_cluster/settings"); + String connectionSetting = "{\"persistent\": {\"cluster\": {\"remote\": {\"" + + REMOTE_CLUSTER + + "\": {\"seeds\": [\"" + + getTestTransportCluster(REMOTE_CLUSTER).split(",")[0] + + "\"]}}}}}"; + connectionRequest.setJsonEntity(connectionSetting); + adminClient().performRequest(connectionRequest); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 35ae5d3675..f6e4b23708 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -17,6 +17,7 @@ import org.opensearch.client.Request; import org.opensearch.client.RequestOptions; import org.opensearch.client.Response; +import org.opensearch.client.RestClient; import org.opensearch.sql.common.setting.Settings; import javax.management.MBeanServerInvocationHandler; @@ -167,6 +168,10 @@ protected void resetQuerySizeLimit() throws IOException { protected static void wipeAllClusterSettings() throws IOException { updateClusterSettings(new ClusterSetting("persistent", "*", null)); updateClusterSettings(new ClusterSetting("transient", "*", null)); + if (remoteClient() != null) { + updateClusterSettings(new ClusterSetting("persistent", "*", null), remoteClient()); + updateClusterSettings(new ClusterSetting("transient", "*", null), remoteClient()); + } } protected void setMaxResultWindow(String indexName, Integer window) throws IOException { @@ -188,17 +193,21 @@ protected void init() throws Exception { * Make it thread-safe in case tests are running in parallel but does not guarantee * if test like DeleteIT that mutates cluster running in parallel. */ - protected synchronized void loadIndex(Index index) throws IOException { + protected synchronized void loadIndex(Index index, RestClient client) throws IOException { String indexName = index.getName(); String mapping = index.getMapping(); String dataSet = index.getDataSet(); - if (!isIndexExist(client(), indexName)) { - createIndexByRestClient(client(), indexName, mapping); - loadDataByRestClient(client(), indexName, dataSet); + if (!isIndexExist(client, indexName)) { + createIndexByRestClient(client, indexName, mapping); + loadDataByRestClient(client, indexName, dataSet); } } + protected synchronized void loadIndex(Index index) throws IOException { + loadIndex(index, client()); + } + protected Request getSqlRequest(String request, boolean explain) { return getSqlRequest(request, explain, "json"); } @@ -325,12 +334,16 @@ private String executeRequest(final String requestBody, final boolean isExplainQ return executeRequest(sqlRequest); } - protected static String executeRequest(final Request request) throws IOException { - Response response = client().performRequest(request); + protected static String executeRequest(final Request request, RestClient client) throws IOException { + Response response = client.performRequest(request); Assert.assertEquals(200, response.getStatusLine().getStatusCode()); return getResponseBody(response); } + protected static String executeRequest(final Request request) throws IOException { + return executeRequest(request, client()); + } + protected JSONObject executeQueryWithGetRequest(final String sqlQuery) throws IOException { final Request request = buildGetEndpointRequest(sqlQuery); @@ -350,7 +363,7 @@ protected JSONObject executeCursorCloseQuery(final String cursor) throws IOExcep return new JSONObject(executeRequest(sqlRequest)); } - protected static JSONObject updateClusterSettings(ClusterSetting setting) throws IOException { + protected static JSONObject updateClusterSettings(ClusterSetting setting, RestClient client) throws IOException { Request request = new Request("PUT", "/_cluster/settings"); String persistentSetting = String.format(Locale.ROOT, "{\"%s\": {\"%s\": %s}}", setting.type, setting.name, setting.value); @@ -358,7 +371,11 @@ protected static JSONObject updateClusterSettings(ClusterSetting setting) throws RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); restOptionsBuilder.addHeader("Content-Type", "application/json"); request.setOptions(restOptionsBuilder); - return new JSONObject(executeRequest(request)); + return new JSONObject(executeRequest(request, client)); + } + + protected static JSONObject updateClusterSettings(ClusterSetting setting) throws IOException { + return updateClusterSettings(setting, client()); } protected static JSONObject getAllClusterSettings() throws IOException { diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/CrossClusterSearchIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/CrossClusterSearchIT.java new file mode 100644 index 0000000000..a8e686a893 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/CrossClusterSearchIT.java @@ -0,0 +1,144 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.sql.ppl; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DOG; +import static org.opensearch.sql.util.MatcherUtils.columnName; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.verifyColumn; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.Rule; +import org.junit.jupiter.api.Test; +import org.junit.rules.ExpectedException; +import org.opensearch.client.ResponseException; + +public class CrossClusterSearchIT extends PPLIntegTestCase { + + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + private final static String TEST_INDEX_BANK_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_BANK; + private final static String TEST_INDEX_DOG_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_DOG; + private final static String TEST_INDEX_DOG_MATCH_ALL_REMOTE = MATCH_ALL_REMOTE_CLUSTER + ":" + TEST_INDEX_DOG; + private final static String TEST_INDEX_ACCOUNT_REMOTE = REMOTE_CLUSTER + ":" + TEST_INDEX_ACCOUNT; + + @Override + public void init() throws IOException { + configureMultiClusters(); + loadIndex(Index.BANK); + loadIndex(Index.BANK, remoteClient()); + loadIndex(Index.DOG); + loadIndex(Index.DOG, remoteClient()); + loadIndex(Index.ACCOUNT, remoteClient()); + } + + @Test + public void testCrossClusterSearchAllFields() throws IOException { + JSONObject result = executeQuery(String.format("search source=%s", TEST_INDEX_DOG_REMOTE)); + verifyColumn(result, columnName("dog_name"), columnName("holdersName"), columnName("age")); + } + + @Test + public void testMatchAllCrossClusterSearchAllFields() throws IOException { + JSONObject result = executeQuery(String.format("search source=%s", TEST_INDEX_DOG_MATCH_ALL_REMOTE)); + verifyColumn(result, columnName("dog_name"), columnName("holdersName"), columnName("age")); + } + + @Test + public void testCrossClusterSearchWithoutLocalFieldMappingShouldFail() throws IOException { + exceptionRule.expect(ResponseException.class); + exceptionRule.expectMessage("400 Bad Request"); + exceptionRule.expectMessage("IndexNotFoundException"); + + executeQuery(String.format("search source=%s", TEST_INDEX_ACCOUNT_REMOTE)); + } + + @Test + public void testCrossClusterSearchCommandWithLogicalExpression() throws IOException { + JSONObject result = executeQuery(String.format( + "search source=%s firstname='Hattie' | fields firstname", TEST_INDEX_BANK_REMOTE)); + verifyDataRows(result, rows("Hattie")); + } + + @Test + public void testCrossClusterSearchMultiClusters() throws IOException { + JSONObject result = executeQuery(String.format( + "search source=%s,%s firstname='Hattie' | fields firstname", TEST_INDEX_BANK_REMOTE, TEST_INDEX_BANK)); + verifyDataRows(result, + rows("Hattie"), + rows("Hattie")); + } + + @Test + public void testCrossClusterDescribeAllFields() throws IOException { + JSONObject result = executeQuery(String.format("describe %s", TEST_INDEX_DOG_REMOTE)); + verifyColumn( + result, + columnName("TABLE_CAT"), + columnName("TABLE_SCHEM"), + columnName("TABLE_NAME"), + columnName("COLUMN_NAME"), + columnName("DATA_TYPE"), + columnName("TYPE_NAME"), + columnName("COLUMN_SIZE"), + columnName("BUFFER_LENGTH"), + columnName("DECIMAL_DIGITS"), + columnName("NUM_PREC_RADIX"), + columnName("NULLABLE"), + columnName("REMARKS"), + columnName("COLUMN_DEF"), + columnName("SQL_DATA_TYPE"), + columnName("SQL_DATETIME_SUB"), + columnName("CHAR_OCTET_LENGTH"), + columnName("ORDINAL_POSITION"), + columnName("IS_NULLABLE"), + columnName("SCOPE_CATALOG"), + columnName("SCOPE_SCHEMA"), + columnName("SCOPE_TABLE"), + columnName("SOURCE_DATA_TYPE"), + columnName("IS_AUTOINCREMENT"), + columnName("IS_GENERATEDCOLUMN") + ); + } + + @Test + public void testMatchAllCrossClusterDescribeAllFields() throws IOException { + JSONObject result = executeQuery(String.format("describe %s", TEST_INDEX_DOG_MATCH_ALL_REMOTE)); + verifyColumn( + result, + columnName("TABLE_CAT"), + columnName("TABLE_SCHEM"), + columnName("TABLE_NAME"), + columnName("COLUMN_NAME"), + columnName("DATA_TYPE"), + columnName("TYPE_NAME"), + columnName("COLUMN_SIZE"), + columnName("BUFFER_LENGTH"), + columnName("DECIMAL_DIGITS"), + columnName("NUM_PREC_RADIX"), + columnName("NULLABLE"), + columnName("REMARKS"), + columnName("COLUMN_DEF"), + columnName("SQL_DATA_TYPE"), + columnName("SQL_DATETIME_SUB"), + columnName("CHAR_OCTET_LENGTH"), + columnName("ORDINAL_POSITION"), + columnName("IS_NULLABLE"), + columnName("SCOPE_CATALOG"), + columnName("SCOPE_SCHEMA"), + columnName("SCOPE_TABLE"), + columnName("SOURCE_DATA_TYPE"), + columnName("IS_AUTOINCREMENT"), + columnName("IS_GENERATEDCOLUMN") + ); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java index dacbecc7b9..9b0d6ca074 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java @@ -13,7 +13,6 @@ import java.util.function.Function; import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.ToString; import org.opensearch.action.search.SearchRequest; diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java index 22ed8c2ffe..f4fd7b98d3 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java @@ -11,6 +11,7 @@ import static org.opensearch.sql.opensearch.client.OpenSearchClient.META_CLUSTER_NAME; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -82,7 +83,8 @@ public List search() { // TODO possible collision if two indices have fields with the same name and different mappings public Map getFieldTypes() { Map fieldTypes = new HashMap<>(); - Map indexMappings = client.getIndexMappings(indexName.getIndexNames()); + Map indexMappings = + client.getIndexMappings(getLocalIndexNames(indexName.getIndexNames())); for (IndexMapping indexMapping : indexMappings.values()) { fieldTypes.putAll(indexMapping.getFieldMappings()); } @@ -95,7 +97,7 @@ public Map getFieldTypes() { * @return max result window */ public Integer getMaxResultWindow() { - return client.getIndexMaxResultWindows(indexName.getIndexNames()) + return client.getIndexMaxResultWindows(getLocalIndexNames(indexName.getIndexNames())) .values().stream().min(Integer::compare).get(); } @@ -119,6 +121,19 @@ private ExprTupleValue row(String fieldName, String fieldType, int position, Str return new ExprTupleValue(valueMap); } + /** + * Return index names without "{cluster}:" prefix. + * Without the prefix, they refer to the indices at the local cluster. + * + * @param indexNames a string array of index names + * @return local cluster index names + */ + private String[] getLocalIndexNames(String[] indexNames) { + return Arrays.stream(indexNames) + .map(name -> name.substring(name.indexOf(":") + 1)) + .toArray(String[]::new); + } + private String clusterName(Map meta) { return meta.getOrDefault(META_CLUSTER_NAME, DEFAULT_TABLE_CAT); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java index be83622578..2e1ded6322 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequestTest.java @@ -67,6 +67,9 @@ public class OpenSearchQueryRequestTest { private final OpenSearchQueryRequest request = new OpenSearchQueryRequest("test", 200, factory); + private final OpenSearchQueryRequest remoteRequest = + new OpenSearchQueryRequest("ccs:test", 200, factory); + @Test void search() { OpenSearchQueryRequest request = new OpenSearchQueryRequest( @@ -152,4 +155,19 @@ void searchRequest() { .query(QueryBuilders.termQuery("name", "John"))), request.searchRequest()); } + + @Test + void searchCrossClusterRequest() { + remoteRequest.getSourceBuilder().query(QueryBuilders.termQuery("name", "John")); + + assertEquals( + new SearchRequest() + .indices("ccs:test") + .source(new SearchSourceBuilder() + .timeout(OpenSearchQueryRequest.DEFAULT_QUERY_TIMEOUT) + .from(0) + .size(200) + .query(QueryBuilders.termQuery("name", "John"))), + remoteRequest.searchRequest()); + } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequestTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequestTest.java index 111316a7ed..c19b3a3ccd 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequestTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequestTest.java @@ -49,6 +49,22 @@ void testSearch() { )); } + @Test + void testCrossClusterShouldSearchLocal() { + when(mapping.getFieldMappings()).thenReturn( + Map.of("name", OpenSearchDataType.of(OpenSearchDataType.MappingType.Keyword))); + when(client.getIndexMappings("index")).thenReturn(ImmutableMap.of("test", mapping)); + + final List results = + new OpenSearchDescribeIndexRequest(client, "ccs:index").search(); + assertEquals(1, results.size()); + assertThat(results.get(0).tupleValue(), anyOf( + hasEntry("TABLE_NAME", stringValue("index")), + hasEntry("COLUMN_NAME", stringValue("name")), + hasEntry("TYPE_NAME", stringValue("STRING")) + )); + } + @Test void testToString() { assertEquals("OpenSearchDescribeIndexRequest{indexName='index'}", diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 12c24bd531..f412f29280 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -363,12 +363,14 @@ Y: 'Y'; // LITERALS AND VALUES //STRING_LITERAL: DQUOTA_STRING | SQUOTA_STRING | BQUOTA_STRING; ID: ID_LITERAL; +CLUSTER: CLUSTER_PREFIX_LITERAL; INTEGER_LITERAL: DEC_DIGIT+; DECIMAL_LITERAL: (DEC_DIGIT+)? '.' DEC_DIGIT+; -fragment DATE_SUFFIX: ([\-.][*0-9]+)*; +fragment DATE_SUFFIX: ([\-.][*0-9]+)+; fragment ID_LITERAL: [@*A-Z]+?[*A-Z_\-0-9]*; -ID_DATE_SUFFIX: ID_LITERAL DATE_SUFFIX; +fragment CLUSTER_PREFIX_LITERAL: [*A-Z]+?[*A-Z_\-0-9]* COLON; +ID_DATE_SUFFIX: CLUSTER_PREFIX_LITERAL? ID_LITERAL DATE_SUFFIX; DQUOTA_STRING: '"' ( '\\'. | '""' | ~('"'| '\\') )* '"'; SQUOTA_STRING: '\'' ('\\'. | '\'\'' | ~('\'' | '\\'))* '\''; BQUOTA_STRING: '`' ( '\\'. | '``' | ~('`'|'\\'))* '`'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index cca99407bb..3a28210271 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -322,7 +322,7 @@ multiFieldRelevanceFunction /** tables */ tableSource - : qualifiedName + : tableQualifiedName | ID_DATE_SUFFIX ; @@ -723,6 +723,10 @@ qualifiedName : ident (DOT ident)* #identsAsQualifiedName ; +tableQualifiedName + : tableIdent (DOT ident)* #identsAsTableQualifiedName + ; + wcQualifiedName : wildcard (DOT wildcard)* #identsAsWildcardQualifiedName ; @@ -734,6 +738,10 @@ ident | keywordsCanBeId ; +tableIdent + : (CLUSTER)? ident + ; + wildcard : ident (MODULE ident)* (MODULE)? | SINGLE_QUOTE wildcard SINGLE_QUOTE diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index c9823b67f9..e56eae83a6 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -24,6 +24,7 @@ import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.EvalFunctionCallContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.FieldExpressionContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.IdentsAsQualifiedNameContext; +import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.IdentsAsTableQualifiedNameContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.IdentsAsWildcardQualifiedNameContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.InExprContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.IntegerLiteralContext; @@ -50,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.antlr.v4.runtime.ParserRuleContext; import org.antlr.v4.runtime.RuleContext; import org.opensearch.sql.ast.dsl.AstDSL; @@ -280,8 +282,8 @@ public UnresolvedExpression visitMultiFieldRelevanceFunction( @Override public UnresolvedExpression visitTableSource(TableSourceContext ctx) { - if (ctx.getChild(0) instanceof IdentsAsQualifiedNameContext) { - return visitIdentifiers(((IdentsAsQualifiedNameContext) ctx.getChild(0)).ident()); + if (ctx.getChild(0) instanceof IdentsAsTableQualifiedNameContext) { + return visitIdentsAsTableQualifiedName((IdentsAsTableQualifiedNameContext) ctx.getChild(0)); } else { return visitIdentifiers(Arrays.asList(ctx)); } @@ -304,6 +306,14 @@ public UnresolvedExpression visitIdentsAsQualifiedName(IdentsAsQualifiedNameCont return visitIdentifiers(ctx.ident()); } + @Override + public UnresolvedExpression visitIdentsAsTableQualifiedName( + IdentsAsTableQualifiedNameContext ctx) { + return visitIdentifiers( + Stream.concat(Stream.of(ctx.tableIdent()), ctx.ident().stream()) + .collect(Collectors.toList())); + } + @Override public UnresolvedExpression visitIdentsAsWildcardQualifiedName( IdentsAsWildcardQualifiedNameContext ctx) { diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/antlr/PPLSyntaxParserTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/antlr/PPLSyntaxParserTest.java index dcf961dc24..bbc566e2ba 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/antlr/PPLSyntaxParserTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/antlr/PPLSyntaxParserTest.java @@ -30,12 +30,66 @@ public void testSearchCommandIgnoreSearchKeywordShouldPass() { assertNotEquals(null, tree); } + @Test + public void testSearchCommandWithMultipleIndicesShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=t,u a=1 b=2"); + assertNotEquals(null, tree); + } + + @Test + public void testSearchCommandCrossClusterShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=c:t a=1 b=2"); + assertNotEquals(null, tree); + } + + @Test + public void testSearchCommandCrossClusterHiddenShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=c:.t a=1 b=2"); + assertNotEquals(null, tree); + } + + @Test + public void testSearchCommandCrossClusterQualifiedShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=c:t.u a=1 b=2"); + assertNotEquals(null, tree); + } + + @Test + public void testSearchCommandCrossClusterHiddenQualifiedShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=c:.t.u a=1 b=2"); + assertNotEquals(null, tree); + } + + @Test + public void testSearchCommandMatchAllCrossClusterShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=*:t a=1 b=2"); + assertNotEquals(null, tree); + } + + @Test + public void testSearchCommandCrossClusterWithMultipleIndicesShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=c:t,d:u,v a=1 b=2"); + assertNotEquals(null, tree); + } + + @Test + public void testSearchCommandCrossClusterIgnoreSearchKeywordShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("source=c:t a=1 b=2"); + assertNotEquals(null, tree); + } + @Test public void testSearchFieldsCommandShouldPass() { ParseTree tree = new PPLSyntaxParser().parse("search source=t a=1 b=2 | fields a,b"); assertNotEquals(null, tree); } + @Test + public void testSearchFieldsCommandCrossClusterShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("search source=c:t a=1 b=2 | fields a,b"); + assertNotEquals(null, tree); + } + @Test public void testSearchCommandWithoutSourceShouldFail() { exceptionRule.expect(RuntimeException.class); @@ -177,6 +231,7 @@ public void can_parse_query_string_relevance_function() { + "analyzer=keyword, quote_field_suffix=\".exact\", fuzzy_prefix_length = 4)")); } + @Test public void testDescribeCommandShouldPass() { ParseTree tree = new PPLSyntaxParser().parse("describe t"); assertNotEquals(null, tree); @@ -188,6 +243,18 @@ public void testDescribeCommandWithMultipleIndicesShouldPass() { assertNotEquals(null, tree); } + @Test + public void testDescribeCommandCrossClusterShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("describe c:t"); + assertNotEquals(null, tree); + } + + @Test + public void testDescribeCommandMatchAllCrossClusterShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("describe *:t"); + assertNotEquals(null, tree); + } + @Test public void testDescribeFieldsCommandShouldPass() { ParseTree tree = new PPLSyntaxParser().parse("describe t | fields a,b"); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index 533254a599..8249619b37 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -77,6 +77,20 @@ public void testSearchCommand() { ); } + @Test + public void testSearchCrossClusterCommand() { + assertEqual("search source=c:t", + relation(qualifiedName("c:t")) + ); + } + + @Test + public void testSearchMatchAllCrossClusterCommand() { + assertEqual("search source=*:t", + relation(qualifiedName("*:t")) + ); + } + @Test public void testPrometheusSearchCommand() { assertEqual("search source = prometheus.http_requests_total", @@ -736,6 +750,12 @@ public void testDescribeCommand() { relation(mappingTable("t"))); } + @Test + public void testDescribeMatchAllCrossClusterSearchCommand() { + assertEqual("describe *:t", + relation(mappingTable("*:t"))); + } + @Test public void testDescribeCommandWithMultipleIndices() { assertEqual("describe t,u",