Skip to content

Commit

Permalink
Cross cluster search in PPL (opensearch-project#1512)
Browse files Browse the repository at this point in the history
* feat: PPL parser for ccs

Signed-off-by: Sean Kao <[email protected]>

* feat: disable describe remote cluster index in PPL

Allowing the syntax will lead to misunderstanding in the query result,
because we will do a local cluster query for index mapping, even for
remote indices. This is due to the restriction that OpenSearch doesn't
support remote cluster index mapping query at the moment.

Signed-off-by: Sean Kao <[email protected]>

* feat: Query system index without cluster name

We require system index query to happen at the local cluster.
Currently, OpenSearch does not support cross cluster system index
query. Thus, mapping of a remote index is unavailable.

Therefore, we require the local cluster to have the system index
of the remote cluster index.

The full "cluster:index" name is still
used to query OpenSearch for datarows, as CCS is natively supported.

Signed-off-by: Sean Kao <[email protected]>

* fix: index name parsing for datasources

To identify datasources in the index qualified names, they need to
be parsed into parts (separated only by dots). clusterQualifiedName
can't contain custom datasources, hence the distinction.

Signed-off-by: Sean Kao <[email protected]>

* multi clusters setup for integration test

Signed-off-by: Sean Kao <[email protected]>

* Add IT test case

Signed-off-by: Sean Kao <[email protected]>

* Document ccs for ppl

Signed-off-by: Sean Kao <[email protected]>

* documentation update

Signed-off-by: Sean Kao <[email protected]>

* feat: allow describe remote cluster index in PPL

Signed-off-by: Sean Kao <[email protected]>

* feat: allow "*:index" to match all remote clusters

Signed-off-by: Sean Kao <[email protected]>

* use local index names for field mappings request

Signed-off-by: Sean Kao <[email protected]>

* allow ':' in index identifier

Signed-off-by: Sean Kao <[email protected]>

* docs update

Signed-off-by: Sean Kao <[email protected]>

* limit cluster prefix to table names only

Signed-off-by: Sean Kao <[email protected]>

* move multicluster capability to sql rest test case

Signed-off-by: Sean Kao <[email protected]>

* add IT for failure case

Signed-off-by: Sean Kao <[email protected]>

* remove logger info for connection in IT test case

Signed-off-by: Sean Kao <[email protected]>

---------

Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: Mitchell Gale <[email protected]>
  • Loading branch information
seankao-az authored and MitchellGale committed Jun 12, 2023
1 parent 8732bc6 commit 0cc18c1
Show file tree
Hide file tree
Showing 18 changed files with 598 additions and 22 deletions.
87 changes: 87 additions & 0 deletions docs/user/ppl/admin/cross_cluster_search.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
.. highlight:: sh

====================
Cross-Cluster Search
====================

.. rubric:: Table of contents

.. contents::
:local:
:depth: 1

Introduction
============
Cross-cluster search lets any node in a cluster execute search requests against other clusters.
It makes searching easy across all connected clusters, allowing users to use multiple smaller clusters instead of a single large one.


Configuration
=============
On the local cluster, add the remote cluster name and the IP address with port 9300 for each seed node. ::

PUT _cluster/settings
{
"persistent": {
"cluster.remote": {
"<remote-cluster-name>": {
"seeds": ["<remote-cluster-IP-address>:9300"]
}
}
}
}


Using Cross-Cluster Search in PPL
=================================
Perform cross-cluster search by using "<cluster-name>:<index-name>" as the index identifier.

Example search command ::

>> search source = my_remote_cluster:my_index


Limitation
==========
Since OpenSearch does not support cross cluster index metadata retrieval, field mapping of a remote cluster index is not available to the local cluster.
(`[Feature] Cross cluster field mappings query #6573 <https://github.com/opensearch-project/OpenSearch/issues/6573>`_)
Therefore, the query engine requires that for any remote cluster index that the users need to search,
the local cluster keep a field mapping system index with the same index name.
This can be done by creating an index on the local cluster with the same name and schema as the remote cluster index.


Authentication and Permission
=============================

1. The security plugin authenticates the user on the local cluster.
2. The security plugin fetches the user’s backend roles on the local cluster.
3. The call, including the authenticated user, is forwarded to the remote cluster.
4. The user’s permissions are evaluated on the remote cluster.

Check `Cross-cluster search access control <https://opensearch.org/docs/latest/security/access-control/cross-cluster-search/>`_ for more details.

Example: Create the ppl_role for test_user on local cluster and the ccs_role for test_user on remote cluster. Then test_user could use PPL to query ``ppl-security-demo`` index on remote cluster.

1. On the local cluster, refer to `Security Settings <security.rst>`_ to create role and user for PPL plugin and index access permission.

2. On the remote cluster, create a new role and grant permission to access index. Create a user with the same name and credentials as the local cluster, and map the user to this role::

PUT _plugins/_security/api/roles/ccs_role
{
"index_permissions":[
{
"index_patterns":["ppl-security-demo"],
"allowed_actions":[
"indices:admin/shards/search_shards",
"indices:data/read/search"
]
}
]
}

PUT _plugins/_security/api/rolesmapping/ccs_role
{
"backend_roles" : [],
"hosts" : [],
"users" : ["test_user"]
}
5 changes: 3 additions & 2 deletions docs/user/ppl/admin/security.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Security Settings
Introduction
============

User needs ``cluster:admin/opensearch/ppl`` permission to use PPL plugin. User also needs indices level permission ``indices:admin/mappings/get`` to get field mappings and ``indices:data/read/search*`` to search index.
User needs ``cluster:admin/opensearch/ppl`` permission to use PPL plugin. User also needs indices level permission ``indices:admin/mappings/get`` to get field mappings, ``indices:monitor/settings/get`` to get cluster settings, and ``indices:data/read/search*`` to search index.

Using Rest API
==============
Expand All @@ -34,7 +34,8 @@ Example: Create the ppl_role for test_user. then test_user could use PPL to quer
],
"allowed_actions": [
"indices:data/read/search*",
"indices:admin/mappings/get"
"indices:admin/mappings/get",
"indices:monitor/settings/get"
]
}]
}
Expand Down
9 changes: 7 additions & 2 deletions docs/user/ppl/cmd/search.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ Description

Syntax
============
search source=<index> [boolean-expression]
search source=[<remote-cluster>:]<index> [boolean-expression]

* search: search keywords, which could be ignore.
* index: mandatory. search command must specify which index to query from.
* index: mandatory. search command must specify which index to query from. The index name can be prefixed by "<cluster name>:" for cross-cluster search.
* bool-expression: optional. any expression which could be evaluated to boolean value.


Cross-Cluster Search
====================
Cross-cluster search lets any node in a cluster execute search requests against other clusters. Refer to `Cross-Cluster Search <admin/cross_cluster_search.rst>`_ for configuration.


Example 1: Fetch all the data
=============================

Expand Down
19 changes: 19 additions & 0 deletions docs/user/ppl/general/identifiers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,25 @@ Here are examples for quoting an index name by back ticks::
+------------------+


Cross-Cluster Index Identifiers
===================

Description
-----------

A cross-cluster index identifier is an index identifier with a prefix ``<cluster identifier>:``. The cluster identifier could contain star ``*``. This is mostly an cluster pattern for wildcard match.

Use Cases
---------

It is used to identify an index on a remote cluster for cross-cluster search.

Examples
--------

For example, if you setup a connection between the local cluster and a remote cluster ``my_cluster``, then you can run ``source=my_cluster:accounts`` to query the ``accounts`` index at ``my_cluster``.


Case Sensitivity
================

Expand Down
2 changes: 2 additions & 0 deletions docs/user/ppl/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ The query start with search command and then flowing a set of command delimited

- `Prometheus Connector <admin/prometheus_connector.rst>`_

- `Cross-Cluster Search <admin/cross_cluster_search.rst>`_

* **Commands**

- `Syntax <cmd/syntax.rst>`_
Expand Down
25 changes: 25 additions & 0 deletions integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.opensearch.gradle.test.RestIntegTestTask
import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask

import java.util.concurrent.Callable
import java.util.stream.Collectors

plugins {
id "de.undercouch.download" version "5.3.0"
Expand Down Expand Up @@ -131,6 +132,12 @@ testClusters.integTest {
plugin ":opensearch-sql-plugin"
}

testClusters {
remoteCluster {
plugin ":opensearch-sql-plugin"
}
}

task startPrometheus(type: SpawnProcessTask) {
mustRunAfter ':doctest:doctest'

Expand Down Expand Up @@ -209,9 +216,27 @@ task integJdbcTest(type: RestIntegTestTask) {

// Run PPL ITs and new, legacy and comparison SQL ITs with new SQL engine enabled
integTest {
useCluster testClusters.remoteCluster

// Set properties for connection to clusters and between clusters
doFirst {
getClusters().forEach { cluster ->
String allTransportSocketURI = cluster.nodes.stream().flatMap { node ->
node.getAllTransportPortURI().stream()
}.collect(Collectors.joining(","))
String allHttpSocketURI = cluster.nodes.stream().flatMap { node ->
node.getAllHttpSocketURI().stream()
}.collect(Collectors.joining(","))

systemProperty "tests.rest.${cluster.name}.http_hosts", "${-> allHttpSocketURI}"
systemProperty "tests.rest.${cluster.name}.transport_hosts", "${-> allTransportSocketURI}"
}
}

testLogging {
events "passed", "skipped", "failed"
}

dependsOn ':opensearch-sql-plugin:bundlePlugin'
if(getOSFamilyType() != "windows") {
dependsOn startPrometheus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package org.opensearch.sql.legacy;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hc.client5.http.auth.AuthScope;
Expand All @@ -27,21 +29,35 @@
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.AfterClass;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.test.rest.OpenSearchRestTestCase;

import static java.util.Collections.unmodifiableList;

/**
* OpenSearch SQL integration test base class to support both security disabled and enabled OpenSearch cluster.
* Allows interaction with multiple external test clusters using OpenSearch's {@link RestClient}.
*/
public abstract class OpenSearchSQLRestTestCase extends OpenSearchRestTestCase {

private static final Logger LOG = LogManager.getLogger();
public static final String REMOTE_CLUSTER = "remoteCluster";
public static final String MATCH_ALL_REMOTE_CLUSTER = "*";

private static RestClient remoteClient;
/**
* A client for the running remote OpenSearch cluster configured to take test administrative actions
* like remove all indexes after the test completes
*/
private static RestClient remoteAdminClient;

protected boolean isHttps() {
boolean isHttps = Optional.ofNullable(System.getProperty("https"))
Expand All @@ -61,6 +77,21 @@ protected String getProtocol() {
return isHttps() ? "https" : "http";
}

/**
* Get the client to remote cluster used for ordinary api calls while writing a test.
*/
protected static RestClient remoteClient() {
return remoteClient;
}

/**
* Get the client to remote cluster used for test administrative actions.
* Do not use this while writing a test. Only use it for cleaning up after tests.
*/
protected static RestClient remoteAdminClient() {
return remoteAdminClient;
}

protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException {
RestClientBuilder builder = RestClient.builder(hosts);
if (isHttps()) {
Expand All @@ -73,19 +104,92 @@ protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOE
return builder.build();
}

// Modified from initClient in OpenSearchRestTestCase
public void initRemoteClient() throws IOException {
if (remoteClient == null) {
assert remoteAdminClient == null;
String cluster = getTestRestCluster(REMOTE_CLUSTER);
String[] stringUrls = cluster.split(",");
List<HttpHost> hosts = new ArrayList<>(stringUrls.length);
for (String stringUrl : stringUrls) {
int portSeparator = stringUrl.lastIndexOf(':');
if (portSeparator < 0) {
throw new IllegalArgumentException("Illegal cluster url [" + stringUrl + "]");
}
String host = stringUrl.substring(0, portSeparator);
int port = Integer.valueOf(stringUrl.substring(portSeparator + 1));
hosts.add(buildHttpHost(host, port));
}
final List<HttpHost> clusterHosts = unmodifiableList(hosts);
remoteClient = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
remoteAdminClient = buildClient(restAdminSettings(), clusterHosts.toArray(new HttpHost[0]));
}
assert remoteClient != null;
assert remoteAdminClient != null;
}

/**
* Get a comma delimited list of [host:port] to which to send REST requests.
*/
protected String getTestRestCluster(String clusterName) {
String cluster = System.getProperty("tests.rest." + clusterName + ".http_hosts");
if (cluster == null) {
throw new RuntimeException(
"Must specify [tests.rest."
+ clusterName
+ ".http_hosts] system property with a comma delimited list of [host:port] "
+ "to which to send REST requests"
);
}
return cluster;
}

/**
* Get a comma delimited list of [host:port] for connections between clusters.
*/
protected String getTestTransportCluster(String clusterName) {
String cluster = System.getProperty("tests.rest." + clusterName + ".transport_hosts");
if (cluster == null) {
throw new RuntimeException(
"Must specify [tests.rest."
+ clusterName
+ ".transport_hosts] system property with a comma delimited list of [host:port] "
+ "for connections between clusters"
);
}
return cluster;
}

@AfterClass
public static void closeRemoteClients() throws IOException {
try {
IOUtils.close(remoteClient, remoteAdminClient);
} finally {
remoteClient = null;
remoteAdminClient = null;
}
}

protected static void wipeAllOpenSearchIndices() throws IOException {
wipeAllOpenSearchIndices(client());
if (remoteClient() != null) {
wipeAllOpenSearchIndices(remoteClient());
}
}

protected static void wipeAllOpenSearchIndices(RestClient client) throws IOException {
// include all the indices, included hidden indices.
// https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html#cat-indices-api-query-params
try {
Response response = client().performRequest(new Request("GET", "/_cat/indices?format=json&expand_wildcards=all"));
Response response = client.performRequest(new Request("GET", "/_cat/indices?format=json&expand_wildcards=all"));
JSONArray jsonArray = new JSONArray(EntityUtils.toString(response.getEntity(), "UTF-8"));
for (Object object : jsonArray) {
JSONObject jsonObject = (JSONObject) object;
String indexName = jsonObject.getString("index");
try {
// System index, mostly named .opensearch-xxx or .opendistro-xxx, are not allowed to delete
if (!indexName.startsWith(".opensearch") && !indexName.startsWith(".opendistro")) {
client().performRequest(new Request("DELETE", "/" + indexName));
client.performRequest(new Request("DELETE", "/" + indexName));
}
} catch (Exception e) {
// TODO: Ignore index delete error for now. Remove this if strict check on system index added above.
Expand Down Expand Up @@ -143,4 +247,21 @@ protected static void configureHttpsClient(RestClientBuilder builder, Settings s
builder.setPathPrefix(settings.get(CLIENT_PATH_PREFIX));
}
}

/**
* Initialize rest client to remote cluster,
* and create a connection to it from the coordinating cluster.
*/
public void configureMultiClusters() throws IOException {
initRemoteClient();

Request connectionRequest = new Request("PUT", "_cluster/settings");
String connectionSetting = "{\"persistent\": {\"cluster\": {\"remote\": {\""
+ REMOTE_CLUSTER
+ "\": {\"seeds\": [\""
+ getTestTransportCluster(REMOTE_CLUSTER).split(",")[0]
+ "\"]}}}}}";
connectionRequest.setJsonEntity(connectionSetting);
adminClient().performRequest(connectionRequest);
}
}
Loading

0 comments on commit 0cc18c1

Please sign in to comment.