Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Oct 26, 2023
1 parent 289400e commit af9217a
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,25 @@ public DataSourceMetadata(
List<String> allowedRoles,
Map<String, String> properties,
String resultIndex) {
String errorMessage = validateCustomResultIndex(resultIndex);

this.name = name;
this.connector = connector;
this.description = description;
this.properties = properties;
this.allowedRoles = allowedRoles;

String errorMessage = validateCustomResultIndex(resultIndex);
if (errorMessage != null) {
throw new IllegalArgumentException(errorMessage);
}
if (resultIndex == null) {
this.resultIndex = DATASOURCE_TO_RESULT_INDEX.apply(name);
// since we are using datasource name to create result index, we need to make sure that the
// final
// name is valid
this.resultIndex =
convertToValidResultIndex(DATASOURCE_TO_RESULT_INDEX.apply(name.toLowerCase()));
} else {
this.resultIndex = resultIndex;
}

this.connector = connector;
this.description = description;
this.properties = properties;
this.allowedRoles = allowedRoles;
}

public DataSourceMetadata() {
Expand Down Expand Up @@ -120,4 +123,20 @@ public String validateCustomResultIndex(String resultIndex) {
}
return null;
}

public String convertToValidResultIndex(String resultIndex) {
// Limit Length
if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) {
resultIndex = resultIndex.substring(0, MAX_RESULT_INDEX_NAME_SIZE);
}

// Pattern Matching: Remove characters that don't match the pattern
StringBuilder validChars = new StringBuilder();
for (char c : resultIndex.toCharArray()) {
if (String.valueOf(c).matches(RESULT_INDEX_NAME_PATTERN)) {
validChars.append(c);
}
}
return validChars.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,8 @@

package org.opensearch.sql.spark.cluster;

import java.util.Arrays;
import java.util.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -21,7 +15,6 @@
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.index.store.StoreStats;

/** Clean up the old docs for indices. */
public class IndexCleanup {
Expand All @@ -35,60 +28,6 @@ public IndexCleanup(Client client, ClusterService clusterService) {
this.clusterService = clusterService;
}

/**
* delete docs when shard size is bigger than max limitation.
*
* @param indexName index name
* @param maxShardSize max shard size
* @param queryForDeleteByQueryRequest query request
* @param listener action listener
*/
public void deleteDocsBasedOnShardSize(
String indexName,
long maxShardSize,
QueryBuilder queryForDeleteByQueryRequest,
ActionListener<Boolean> listener) {

if (!clusterService.state().getRoutingTable().hasIndex(indexName)) {
LOG.debug("skip as the index:{} doesn't exist", indexName);
return;
}

ActionListener<IndicesStatsResponse> indicesStatsResponseListener =
ActionListener.wrap(
indicesStatsResponse -> {
// Check if any shard size is bigger than maxShardSize
boolean cleanupNeeded =
Arrays.stream(indicesStatsResponse.getShards())
.map(ShardStats::getStats)
.filter(Objects::nonNull)
.map(CommonStats::getStore)
.filter(Objects::nonNull)
.map(StoreStats::getSizeInBytes)
.anyMatch(size -> size > maxShardSize);

if (cleanupNeeded) {
deleteDocsByQuery(
indexName,
queryForDeleteByQueryRequest,
ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure));
} else {
listener.onResponse(false);
}
},
listener::onFailure);

getCheckpointShardStoreStats(indexName, indicesStatsResponseListener);
}

private void getCheckpointShardStoreStats(
String indexName, ActionListener<IndicesStatsResponse> listener) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.store();
indicesStatsRequest.indices(indexName);
client.admin().indices().stats(indicesStatsRequest, listener);
}

/**
* Delete docs based on query request
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import joptsimple.internal.Strings;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -112,7 +112,7 @@ public void setup() {
dataSourceService.createDataSource(
new DataSourceMetadata(
DATASOURCE,
Strings.EMPTY,
StringUtils.EMPTY,
DataSourceType.S3GLUE,
ImmutableList.of(),
ImmutableMap.of(
Expand Down Expand Up @@ -331,7 +331,7 @@ public void datasourceWithBasicAuth() {
dataSourceService.createDataSource(
new DataSourceMetadata(
"mybasicauth",
Strings.EMPTY,
StringUtils.EMPTY,
DataSourceType.S3GLUE,
ImmutableList.of(),
properties,
Expand Down

0 comments on commit af9217a

Please sign in to comment.