Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor fix in dropping covering index #2226

Merged
merged 1 commit into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
import org.opensearch.sql.spark.asyncquery.AsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.asyncquery.OpensearchAsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.EmrServerlessClientImplEMR;
import org.opensearch.sql.spark.client.EmrServerlessClientImpl;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl;
Expand Down Expand Up @@ -325,7 +325,7 @@ private EMRServerlessClient createEMRServerlessClient() {
.withRegion(sparkExecutionEngineConfig.getRegion())
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
return new EmrServerlessClientImplEMR(awsemrServerless);
return new EmrServerlessClientImpl(awsemrServerless);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
/**
* Client Interface for spark Job Submissions. Can have multiple implementations based on the
* underlying spark infrastructure. Currently, we have one for EMRServerless {@link
* EmrServerlessClientImplEMR}
* EmrServerlessClientImpl}
*/
public interface EMRServerlessClient {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class EmrServerlessClientImplEMR implements EMRServerlessClient {
public class EmrServerlessClientImpl implements EMRServerlessClient {

private final AWSEMRServerless emrServerless;
private static final Logger logger = LogManager.getLogger(EmrServerlessClientImplEMR.class);
private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class);

public EmrServerlessClientImplEMR(AWSEMRServerless emrServerless) {
public EmrServerlessClientImpl(AWSEMRServerless emrServerless) {
this.emrServerless = emrServerless;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader {

@Override
public String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) {
String indexName = getIndexName(indexDetails).toLowerCase();
String indexName = getIndexName(indexDetails);
GetMappingsResponse mappingsResponse =
client.admin().indices().prepareGetMappings(indexName).get();
try {
Expand All @@ -34,27 +34,31 @@ public String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) {
private String getIndexName(IndexDetails indexDetails) {
FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName();
if (FlintIndexType.SKIPPING.equals(indexDetails.getIndexType())) {
return "flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexType().getName();
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexType().getSuffix();
return indexName.toLowerCase();
} else if (FlintIndexType.COVERING.equals(indexDetails.getIndexType())) {
return "flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexName()
+ "_"
+ indexDetails.getIndexType().getName();
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexName()
+ "_"
+ indexDetails.getIndexType().getSuffix();
return indexName.toLowerCase();
} else {
throw new UnsupportedOperationException(
String.format("Unsupported Index Type : %s", indexDetails.getIndexType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,19 @@

package org.opensearch.sql.spark.flint;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/** Enum for FlintIndex Type. */
public enum FlintIndexType {
SKIPPING("skipping_index"),
COVERING("covering_index"),
COVERING("index"),
MATERIALIZED("materialized_view");

private final String name;
private static final Map<String, FlintIndexType> ENUM_MAP;

FlintIndexType(String name) {
this.name = name;
}

public String getName() {
return this.name;
}
private final String suffix;

static {
Map<String, FlintIndexType> map = new HashMap<>();
for (FlintIndexType instance : FlintIndexType.values()) {
map.put(instance.getName().toLowerCase(), instance);
}
ENUM_MAP = Collections.unmodifiableMap(map);
FlintIndexType(String suffix) {
this.suffix = suffix;
}

public static FlintIndexType get(String name) {
return ENUM_MAP.get(name.toLowerCase());
public String getSuffix() {
return this.suffix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void testStartJobRun() {
StartJobRunResult response = new StartJobRunResult();
when(emrServerless.startJobRun(any())).thenReturn(response);

EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless);
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
emrServerlessClient.startJobRun(
new StartJobRequest(
QUERY,
Expand All @@ -54,7 +54,7 @@ void testStartJobRunResultIndex() {
StartJobRunResult response = new StartJobRunResult();
when(emrServerless.startJobRun(any())).thenReturn(response);

EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless);
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
emrServerlessClient.startJobRun(
new StartJobRequest(
QUERY,
Expand All @@ -74,15 +74,15 @@ void testGetJobRunState() {
GetJobRunResult response = new GetJobRunResult();
response.setJobRun(jobRun);
when(emrServerless.getJobRun(any())).thenReturn(response);
EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless);
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, "123");
}

@Test
void testCancelJobRun() {
when(emrServerless.cancelJobRun(any()))
.thenReturn(new CancelJobRunResult().withJobRunId(EMR_JOB_ID));
EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless);
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
CancelJobRunResult cancelJobRunResult =
emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID);
Assertions.assertEquals(EMR_JOB_ID, cancelJobRunResult.getJobRunId());
Expand All @@ -91,7 +91,7 @@ void testCancelJobRun() {
@Test
void testCancelJobRunWithValidationException() {
doThrow(new ValidationException("Error")).when(emrServerless).cancelJobRun(any());
EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless);
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class,
Expand Down