From 273663cd126606954f9bb5c828d8d157699f493d Mon Sep 17 00:00:00 2001 From: zane-neo Date: Fri, 5 Jan 2024 10:17:50 +0800 Subject: [PATCH 1/2] Add publishToMavenLocal (#2461) Signed-off-by: zane-neo --- scripts/build.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/build.sh b/scripts/build.sh index 4b2893f304..4c4aaf128e 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -77,6 +77,7 @@ echo "COPY ${distributions}/*.zip" mkdir -p $OUTPUT/plugins cp ${distributions}/*.zip ./$OUTPUT/plugins +./gradlew publishToMavenLocal -Dopensearch.version=$VERSION -Dbuild.snapshot=$SNAPSHOT -Dbuild.version_qualifier=$QUALIFIER ./gradlew publishPluginZipPublicationToZipStagingRepository -Dopensearch.version=$VERSION -Dbuild.snapshot=$SNAPSHOT -Dbuild.version_qualifier=$QUALIFIER mkdir -p $OUTPUT/maven/org/opensearch cp -r ./build/local-staging-repo/org/opensearch/. $OUTPUT/maven/org/opensearch From efb159a8cb0560fbde996cdf7c72ce82deb15681 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Tue, 9 Jan 2024 02:17:13 +0800 Subject: [PATCH 2/2] Add cluster name in spark submit params (#2467) * Add cluster name in spark submit params Signed-off-by: Louis Chu * Include cluster name to spark env Signed-off-by: Louis Chu --------- Signed-off-by: Louis Chu --- .../sql/spark/asyncquery/model/SparkSubmitParameters.java | 8 ++++++++ .../sql/spark/data/constants/SparkConstants.java | 5 +++++ .../sql/spark/dispatcher/BatchQueryHandler.java | 4 +++- .../sql/spark/dispatcher/InteractiveQueryHandler.java | 4 +++- .../sql/spark/dispatcher/StreamingQueryHandler.java | 4 +++- .../sql/spark/dispatcher/SparkQueryDispatcherTest.java | 3 ++- 6 files changed, 24 insertions(+), 4 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java index 9a73b0f364..7ddb92900d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -59,6 +59,8 @@ private Builder() { config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY); config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); + config.put(SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY, FLINT_DEFAULT_CLUSTER_NAME); + config.put(SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY, FLINT_DEFAULT_CLUSTER_NAME); config.put(FLINT_INDEX_STORE_HOST_KEY, FLINT_DEFAULT_HOST); config.put(FLINT_INDEX_STORE_PORT_KEY, FLINT_DEFAULT_PORT); config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME); @@ -77,6 +79,12 @@ public Builder className(String className) { return this; } + public Builder clusterName(String clusterName) { + config.put(SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY, clusterName); + config.put(SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY, clusterName); + return this; + } + public Builder dataSource(DataSourceMetadata metadata) { if (DataSourceType.S3GLUE.equals(metadata.getConnector())) { String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN); diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 3a243cb5b3..95b3c25b99 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -25,6 +25,7 @@ public class SparkConstants { public static final String FLINT_INTEGRATION_JAR = "s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar"; // TODO should be replaced with mvn jar. + public static final String FLINT_DEFAULT_CLUSTER_NAME = "opensearch-cluster"; public static final String FLINT_DEFAULT_HOST = "localhost"; public static final String FLINT_DEFAULT_PORT = "9200"; public static final String FLINT_DEFAULT_SCHEME = "http"; @@ -45,6 +46,10 @@ public class SparkConstants { public static final String SPARK_DRIVER_ENV_JAVA_HOME_KEY = "spark.emr-serverless.driverEnv.JAVA_HOME"; public static final String SPARK_EXECUTOR_ENV_JAVA_HOME_KEY = "spark.executorEnv.JAVA_HOME"; + public static final String SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY = + "spark.emr-serverless.driverEnv.FLINT_CLUSTER_NAME"; + public static final String SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY = + "spark.executorEnv.FLINT_CLUSTER_NAME"; public static final String FLINT_INDEX_STORE_HOST_KEY = "spark.datasource.flint.host"; public static final String FLINT_INDEX_STORE_PORT_KEY = "spark.datasource.flint.port"; public static final String FLINT_INDEX_STORE_SCHEME_KEY = "spark.datasource.flint.scheme"; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index de25f1188c..46dec38038 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -67,7 +67,8 @@ public DispatchQueryResponse submit( DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource())); - String jobName = dispatchQueryRequest.getClusterName() + ":" + "non-index-query"; + String clusterName = dispatchQueryRequest.getClusterName(); + String jobName = clusterName + ":" + "non-index-query"; Map tags = context.getTags(); DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata(); @@ -79,6 +80,7 @@ public DispatchQueryResponse submit( dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), SparkSubmitParameters.Builder.builder() + .clusterName(clusterName) .dataSource(context.getDataSourceMetadata()) .extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()) .build() diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java index 1da38f03a7..1afba22db7 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java @@ -70,7 +70,8 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { public DispatchQueryResponse submit( DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { Session session = null; - String jobName = dispatchQueryRequest.getClusterName() + ":" + "non-index-query"; + String clusterName = dispatchQueryRequest.getClusterName(); + String jobName = clusterName + ":" + "non-index-query"; Map tags = context.getTags(); DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata(); @@ -98,6 +99,7 @@ public DispatchQueryResponse submit( dispatchQueryRequest.getExecutionRoleARN(), SparkSubmitParameters.Builder.builder() .className(FLINT_SESSION_CLASS_NAME) + .clusterName(clusterName) .dataSource(dataSourceMetadata) .extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()), tags, diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java index 6a4045b85a..75337a3dad 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java @@ -43,7 +43,8 @@ public DispatchQueryResponse submit( leaseManager.borrow(new LeaseRequest(JobType.STREAMING, dispatchQueryRequest.getDatasource())); - String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query"; + String clusterName = dispatchQueryRequest.getClusterName(); + String jobName = clusterName + ":" + "index-query"; IndexQueryDetails indexQueryDetails = context.getIndexQueryDetails(); Map tags = context.getTags(); tags.put(INDEX_TAG_KEY, indexQueryDetails.openSearchIndexName()); @@ -56,6 +57,7 @@ public DispatchQueryResponse submit( dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), SparkSubmitParameters.Builder.builder() + .clusterName(clusterName) .dataSource(dataSourceMetadata) .structuredStreaming(true) .extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()) diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index dbc087cbae..4205102cb1 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -1052,7 +1052,8 @@ private String constructExpectedSparkSubmitParameterString( + " --conf" + " spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/" + " --conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/" - + " --conf" + + " --conf spark.emr-serverless.driverEnv.FLINT_CLUSTER_NAME=TEST_CLUSTER --conf" + + " spark.executorEnv.FLINT_CLUSTER_NAME=TEST_CLUSTER --conf" + " spark.datasource.flint.host=search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com" + " --conf spark.datasource.flint.port=-1 --conf" + " spark.datasource.flint.scheme=https --conf spark.datasource.flint.auth="