Skip to content

Commit

Permalink
Add integration JDBC tests for cursor/fetch_size feature (#1315) (#1550)
Browse files Browse the repository at this point in the history
* Add integration JDBC tests for cursor/fetch_size feature.

---------

Signed-off-by: Yury-Fridlyand <[email protected]>
(cherry picked from commit 57ccb6c)

Co-authored-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
1 parent 796e48a commit 1b79069
Show file tree
Hide file tree
Showing 2 changed files with 295 additions and 1 deletion.
70 changes: 69 additions & 1 deletion integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ apply plugin: 'com.wiredforcode.spawn'
repositories {
mavenCentral()
maven { url 'https://jitpack.io' }

// Add extra repository for the JDBC driver if given by user
if (System.getProperty("jdbcRepo") != null && new File(System.getProperty("jdbcRepo")).isDirectory()) {
maven { url new File(System.getProperty("jdbcRepo")) }
}
}

ext {
Expand Down Expand Up @@ -81,7 +86,7 @@ dependencies {
testImplementation group: 'org.opensearch.test', name: 'framework', version: "${opensearch_version}"
testImplementation group: 'org.opensearch.client', name: 'opensearch-rest-high-level-client', version: "${opensearch_version}"
testImplementation group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}"
testImplementation group: 'org.opensearch.driver', name: 'opensearch-sql-jdbc', version: '1.2.0.0'
testImplementation group: 'org.opensearch.driver', name: 'opensearch-sql-jdbc', version: System.getProperty("jdbcDriverVersion", '1.2.0.0')
testImplementation group: 'org.hamcrest', name: 'hamcrest', version: '2.1'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.17.1'
testImplementation project(':opensearch-sql-plugin')
Expand Down Expand Up @@ -148,8 +153,55 @@ task stopPrometheus(type: KillProcessTask) {

stopPrometheus.mustRunAfter startPrometheus

task integJdbcTest(type: RestIntegTestTask) {
testClusters.findAll {c -> c.clusterName == "integJdbcTest"}.first().
plugin ":opensearch-sql-plugin"

useJUnitPlatform()
dependsOn ':opensearch-sql-plugin:bundlePlugin'
testLogging {
events "passed", "skipped", "failed"
}
afterTest { desc, result ->
logger.quiet "${desc.className}.${desc.name}: ${result.resultType} ${(result.getEndTime() - result.getStartTime())/1000}s"
}

if (System.getProperty("jdbcDriverVersion") != null) {
systemProperty "jdbcDriverVersion", System.getProperty("jdbcDriverVersion")
}

if (System.getProperty("jdbcFile") != null) {
systemProperty "jdbcFile", System.getProperty("jdbcFile")
}

systemProperty 'tests.security.manager', 'false'
systemProperty('project.root', project.projectDir.absolutePath)

systemProperty "https", System.getProperty("https")
systemProperty "user", System.getProperty("user")
systemProperty "password", System.getProperty("password")

// Set default query size limit
systemProperty 'defaultQuerySizeLimit', '10000'

// Tell the test JVM if the cluster JVM is running under a debugger so that tests can use longer timeouts for
// requests. The 'doFirst' delays reading the debug setting on the cluster till execution time.
doFirst { systemProperty 'cluster.debug', getDebug() }

if (System.getProperty("test.debug") != null) {
jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005'
}

filter {
includeTestsMatching 'org.opensearch.sql.jdbc.*'
}
}

// Run PPL ITs and new, legacy and comparison SQL ITs with new SQL engine enabled
integTest {
testLogging {
events "passed", "skipped", "failed"
}
dependsOn ':opensearch-sql-plugin:bundlePlugin'
if(getOSFamilyType() != "windows") {
dependsOn startPrometheus
Expand Down Expand Up @@ -199,10 +251,16 @@ integTest {

// Skip this IT because all assertions are against explain output
exclude 'org/opensearch/sql/legacy/OrderIT.class'

// Exclude JDBC related tests
exclude 'org/opensearch/sql/jdbc/**'
}


task comparisonTest(type: RestIntegTestTask) {
testLogging {
events "passed", "skipped", "failed"
}
dependsOn ':opensearch-sql-plugin:bundlePlugin'

systemProperty 'tests.security.manager', 'false'
Expand All @@ -221,6 +279,9 @@ task comparisonTest(type: RestIntegTestTask) {
exclude 'org/opensearch/sql/ppl/**/*IT.class'
exclude 'org/opensearch/sql/legacy/**/*IT.class'

// Exclude JDBC related tests
exclude 'org/opensearch/sql/jdbc/**'

// Enable logging output to console
testLogging.showStandardStreams true

Expand Down Expand Up @@ -431,6 +492,9 @@ task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) {

// A bwc test suite which runs all the bwc tasks combined
task bwcTestSuite(type: StandaloneRestIntegTestTask) {
testLogging {
events "passed", "skipped", "failed"
}
exclude '**/*Test*'
exclude '**/*IT*'
dependsOn tasks.named("${baseName}#mixedClusterTask")
Expand All @@ -442,6 +506,9 @@ def opensearch_tmp_dir = rootProject.file('build/private/es_tmp').absoluteFile
opensearch_tmp_dir.mkdirs()

task integTestRemote(type: RestIntegTestTask) {
testLogging {
events "passed", "skipped", "failed"
}
testClassesDirs = sourceSets.test.output.classesDirs
classpath = sourceSets.test.runtimeClasspath
systemProperty 'tests.security.manager', 'false'
Expand Down Expand Up @@ -469,4 +536,5 @@ task integTestRemote(type: RestIntegTestTask) {
exclude 'org/opensearch/sql/legacy/TermQueryExplainIT.class'
exclude 'org/opensearch/sql/legacy/QueryAnalysisIT.class'
exclude 'org/opensearch/sql/legacy/OrderIT.class'
exclude 'org/opensearch/sql/jdbc/**'
}
226 changes: 226 additions & 0 deletions integ-test/src/test/java/org/opensearch/sql/jdbc/CursorIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.jdbc;

import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_CALCS;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ONLINE;
import static org.opensearch.sql.legacy.plugin.RestSqlAction.QUERY_API_ENDPOINT;
import static org.opensearch.sql.util.TestUtils.getResponseBody;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.List;
import javax.annotation.Nullable;
import lombok.SneakyThrows;
import org.json.JSONObject;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayNameGeneration;
import org.junit.jupiter.api.DisplayNameGenerator;
import org.opensearch.client.Request;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.Response;
import org.opensearch.sql.legacy.SQLIntegTestCase;

@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
public class CursorIT extends SQLIntegTestCase {

private static Connection connection;
private boolean initialized = false;

@BeforeEach
@SneakyThrows
public void init() {
if (!initialized) {
initClient();
resetQuerySizeLimit();
loadIndex(Index.BANK);
loadIndex(Index.CALCS);
loadIndex(Index.ONLINE);
loadIndex(Index.ACCOUNT);
initialized = true;
}
}

@BeforeAll
@BeforeClass
@SneakyThrows
public static void initConnection() {
var driverFile = System.getProperty("jdbcFile");
if (driverFile != null) {
URLClassLoader loader = new URLClassLoader(
new URL[]{new File(driverFile).toURI().toURL()},
ClassLoader.getSystemClassLoader()
);
Driver driver = (Driver) Class.forName("org.opensearch.jdbc.Driver", true, loader)
.getDeclaredConstructor().newInstance();
connection = driver.connect(getConnectionString(), null);
} else {
connection = DriverManager.getConnection(getConnectionString());
}
}

@AfterAll
@AfterClass
@SneakyThrows
public static void closeConnection() {
// TODO should we close Statement and ResultSet?
if (connection != null) {
connection.close();
connection = null;
}
}

@Test
@SneakyThrows
public void check_driver_version() {
var version = System.getProperty("jdbcDriverVersion");
Assume.assumeTrue("Parameter `jdbcDriverVersion` is not given, test platform uses default driver version",
version != null);
assertEquals(version, connection.getMetaData().getDriverVersion());
}

@Test
@SneakyThrows
public void select_all_no_cursor() {
Statement stmt = connection.createStatement();

for (var table : List.of(TEST_INDEX_CALCS, TEST_INDEX_ONLINE, TEST_INDEX_BANK, TEST_INDEX_ACCOUNT)) {
var query = String.format("SELECT * FROM %s", table);
ResultSet rs = stmt.executeQuery(query);
int rows = 0;
for (; rs.next(); rows++) ;

var restResponse = executeRestQuery(query, null);
assertEquals(rows, restResponse.getInt("total"));
}
}

@Test
@SneakyThrows
public void select_count_all_no_cursor() {
Statement stmt = connection.createStatement();

for (var table : List.of(TEST_INDEX_CALCS, TEST_INDEX_ONLINE, TEST_INDEX_BANK, TEST_INDEX_ACCOUNT)) {
var query = String.format("SELECT COUNT(*) FROM %s", table);
ResultSet rs = stmt.executeQuery(query);
int rows = 0;
for (; rs.next(); rows++) ;

var restResponse = executeRestQuery(query, null);
assertEquals(rows, restResponse.getInt("total"));
}
}

@Test
@SneakyThrows
public void select_all_small_table_big_cursor() {
Statement stmt = connection.createStatement();

for (var table : List.of(TEST_INDEX_CALCS, TEST_INDEX_BANK)) {
var query = String.format("SELECT COUNT(*) FROM %s", table);
stmt.setFetchSize(200);
ResultSet rs = stmt.executeQuery(query);
int rows = 0;
for (; rs.next(); rows++) ;

var restResponse = executeRestQuery(query, null);
assertEquals(rows, restResponse.getInt("total"));
}
}

@Test
@SneakyThrows
public void select_all_small_table_small_cursor() {
Statement stmt = connection.createStatement();

for (var table : List.of(TEST_INDEX_CALCS, TEST_INDEX_BANK)) {
var query = String.format("SELECT * FROM %s", table);
stmt.setFetchSize(3);
ResultSet rs = stmt.executeQuery(query);
int rows = 0;
for (; rs.next(); rows++) ;

var restResponse = executeRestQuery(query, null);
assertEquals(rows, restResponse.getInt("total"));
}
}

@Test
@SneakyThrows
public void select_all_big_table_small_cursor() {
Statement stmt = connection.createStatement();

for (var table : List.of(TEST_INDEX_ONLINE, TEST_INDEX_ACCOUNT)) {
var query = String.format("SELECT * FROM %s", table);
stmt.setFetchSize(10);
ResultSet rs = stmt.executeQuery(query);
int rows = 0;
for (; rs.next(); rows++) ;

var restResponse = executeRestQuery(query, null);
assertEquals(rows, restResponse.getInt("total"));
}
}

@Test
@SneakyThrows
public void select_all_big_table_big_cursor() {
Statement stmt = connection.createStatement();

for (var table : List.of(TEST_INDEX_ONLINE, TEST_INDEX_ACCOUNT)) {
var query = String.format("SELECT * FROM %s", table);
stmt.setFetchSize(500);
ResultSet rs = stmt.executeQuery(query);
int rows = 0;
for (; rs.next(); rows++) ;

var restResponse = executeRestQuery(query, null);
assertEquals(rows, restResponse.getInt("total"));
}
}

/**
* Use OpenSearch cluster initialized by OpenSearch Gradle task.
*/
private static String getConnectionString() {
// string like "[::1]:46751,127.0.0.1:34403"
var clusterUrls = System.getProperty("tests.rest.cluster").split(",");
return String.format("jdbc:opensearch://%s", clusterUrls[clusterUrls.length - 1]);
}

@SneakyThrows
protected JSONObject executeRestQuery(String query, @Nullable Integer fetch_size) {
Request request = new Request("POST", QUERY_API_ENDPOINT);
if (fetch_size != null) {
request.setJsonEntity(String.format("{ \"query\": \"%s\", \"fetch_size\": %d }", query, fetch_size));
} else {
request.setJsonEntity(String.format("{ \"query\": \"%s\" }", query));
}

RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder();
restOptionsBuilder.addHeader("Content-Type", "application/json");
request.setOptions(restOptionsBuilder);

Response response = client().performRequest(request);
return new JSONObject(getResponseBody(response));
}
}

0 comments on commit 1b79069

Please sign in to comment.