Skip to content

Commit

Permalink
Update Shallow Snapshot flows to support remote path type & hash algo (
Browse files Browse the repository at this point in the history
…opensearch-project#12988)

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 authored Apr 5, 2024
1 parent 19d6c5c commit e713175
Show file tree
Hide file tree
Showing 12 changed files with 508 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
Expand Down Expand Up @@ -47,6 +48,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -284,7 +286,7 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {

indexDocuments(client, indexName1, randomIntBetween(5, 10));
ensureGreen(indexName1);
validatePathType(indexName1, PathType.FIXED, PathHashAlgorithm.FNV_1A);
validatePathType(indexName1, PathType.FIXED);

logger.info("--> snapshot");
SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(Arrays.asList(indexName1)));
Expand All @@ -301,7 +303,7 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
.get();
assertEquals(RestStatus.ACCEPTED, restoreSnapshotResponse.status());
ensureGreen(restoredIndexName1version1);
validatePathType(restoredIndexName1version1, PathType.FIXED, PathHashAlgorithm.FNV_1A);
validatePathType(restoredIndexName1version1, PathType.FIXED);

client(clusterManagerNode).admin()
.cluster()
Expand All @@ -327,16 +329,50 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
validatePathType(indexName2, PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A);

// Validating that custom data has not changed for indexes which were created before the cluster setting got updated
validatePathType(indexName1, PathType.FIXED, PathHashAlgorithm.FNV_1A);
validatePathType(indexName1, PathType.FIXED);

// Create Snapshot of index 2
String snapshotName2 = "test-restore-snapshot2";
snapshotInfo = createSnapshot(snapshotRepoName, snapshotName2, new ArrayList<>(List.of(indexName2)));
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
assertTrue(snapshotInfo.successfulShards() > 0);
assertEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards());

// Update cluster settings to FIXED
client(clusterManagerNode).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING.getKey(), PathType.FIXED))
.get();

// Close index 2
assertAcked(client().admin().indices().prepareClose(indexName2));
restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName2)
.setWaitForCompletion(false)
.setIndices(indexName2)
.get();
assertEquals(RestStatus.ACCEPTED, restoreSnapshotResponse.status());
ensureGreen(indexName2);

// Validating that custom data has not changed for testindex2 which was created before the cluster setting got updated
validatePathType(indexName2, PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A);
}

private void validatePathType(String index, PathType pathType, PathHashAlgorithm pathHashAlgorithm) {
private void validatePathType(String index, PathType pathType) {
validatePathType(index, pathType, null);
}

private void validatePathType(String index, PathType pathType, @Nullable PathHashAlgorithm pathHashAlgorithm) {
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
// Validate that the remote_store custom data is present in index metadata for the created index.
Map<String, String> remoteCustomData = state.metadata().index(index).getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assertNotNull(remoteCustomData);
assertEquals(pathType.name(), remoteCustomData.get(PathType.NAME));
assertEquals(pathHashAlgorithm.name(), remoteCustomData.get(PathHashAlgorithm.NAME));
if (Objects.nonNull(pathHashAlgorithm)) {
assertEquals(pathHashAlgorithm.name(), remoteCustomData.get(PathHashAlgorithm.NAME));
}
}

public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ public MetadataCreateIndexService(

// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true);
Supplier<Version> minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion();
remoteStorePathStrategyResolver = isRemoteDataAttributePresent(settings)
? new RemoteStorePathStrategyResolver(clusterService.getClusterSettings())
? new RemoteStorePathStrategyResolver(clusterService.getClusterSettings(), minNodeVersionSupplier)
: null;
}

Expand Down Expand Up @@ -572,28 +573,23 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata(
* @param assertNullOldType flag to verify that the old remote store path type is null
*/
public void addRemoteStorePathStrategyInCustomData(IndexMetadata.Builder tmpImdBuilder, boolean assertNullOldType) {
if (remoteStorePathStrategyResolver != null) {
// It is possible that remote custom data exists already. In such cases, we need to only update the path type
// in the remote store custom data map.
Map<String, String> existingRemoteCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
Map<String, String> remoteCustomData = existingRemoteCustomData == null
? new HashMap<>()
: new HashMap<>(existingRemoteCustomData);
// Determine the path type for use using the remoteStorePathResolver.
RemoteStorePathStrategy newPathStrategy = remoteStorePathStrategyResolver.get();
String oldPathType = remoteCustomData.put(PathType.NAME, newPathStrategy.getType().name());
String oldHashAlgorithm = remoteCustomData.put(PathHashAlgorithm.NAME, newPathStrategy.getHashAlgorithm().name());
assert !assertNullOldType || (Objects.isNull(oldPathType) && Objects.isNull(oldHashAlgorithm));
logger.trace(
() -> new ParameterizedMessage(
"Added newPathStrategy={}, replaced oldPathType={} oldHashAlgorithm={}",
newPathStrategy,
oldPathType,
oldHashAlgorithm
)
);
tmpImdBuilder.putCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY, remoteCustomData);
if (remoteStorePathStrategyResolver == null) {
return;
}
// It is possible that remote custom data exists already. In such cases, we need to only update the path type
// in the remote store custom data map.
Map<String, String> existingCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assert assertNullOldType == false || Objects.isNull(existingCustomData);

// Determine the path type for use using the remoteStorePathResolver.
RemoteStorePathStrategy newPathStrategy = remoteStorePathStrategyResolver.get();
Map<String, String> remoteCustomData = new HashMap<>();
remoteCustomData.put(PathType.NAME, newPathStrategy.getType().name());
if (Objects.nonNull(newPathStrategy.getHashAlgorithm())) {
remoteCustomData.put(PathHashAlgorithm.NAME, newPathStrategy.getHashAlgorithm().name());
}
logger.trace(() -> new ParameterizedMessage("Added newStrategy={}, replaced oldStrategy={}", remoteCustomData, existingCustomData));
tmpImdBuilder.putCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY, remoteCustomData);
}

private ClusterState applyCreateIndexRequestWithV1Templates(
Expand Down
19 changes: 13 additions & 6 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -764,6 +765,7 @@ public static IndexMergePolicy fromString(String text) {
private volatile String defaultSearchPipeline;
private final boolean widenIndexSortType;
private final boolean assignedOnRemoteNode;
private final RemoteStorePathStrategy remoteStorePathStrategy;

/**
* The maximum age of a retention lease before it is considered expired.
Expand Down Expand Up @@ -988,6 +990,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
*/
widenIndexSortType = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).before(V_2_7_0);
assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings());
remoteStorePathStrategy = determineRemoteStorePathStrategy();

setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING));
setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING));
Expand Down Expand Up @@ -1908,15 +1911,19 @@ public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePo
this.docIdFuzzySetFalsePositiveProbability = docIdFuzzySetFalsePositiveProbability;
}

public RemoteStorePathStrategy getRemoteStorePathStrategy() {
private RemoteStorePathStrategy determineRemoteStorePathStrategy() {
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
if (remoteCustomData != null
&& remoteCustomData.containsKey(PathType.NAME)
&& remoteCustomData.containsKey(PathHashAlgorithm.NAME)) {
assert remoteCustomData == null || remoteCustomData.containsKey(PathType.NAME);
if (remoteCustomData != null && remoteCustomData.containsKey(PathType.NAME)) {
PathType pathType = PathType.parseString(remoteCustomData.get(PathType.NAME));
PathHashAlgorithm pathHashAlgorithm = PathHashAlgorithm.parseString(remoteCustomData.get(PathHashAlgorithm.NAME));
return new RemoteStorePathStrategy(pathType, pathHashAlgorithm);
String hashAlgoStr = remoteCustomData.get(PathHashAlgorithm.NAME);
PathHashAlgorithm hashAlgorithm = Objects.nonNull(hashAlgoStr) ? PathHashAlgorithm.parseString(hashAlgoStr) : null;
return new RemoteStorePathStrategy(pathType, hashAlgorithm);
}
return new RemoteStorePathStrategy(PathType.FIXED);
}

public RemoteStorePathStrategy getRemoteStorePathStrategy() {
return remoteStorePathStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@

package org.opensearch.index.remote;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.hash.FNV1a;
import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static java.util.Collections.unmodifiableMap;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;

Expand Down Expand Up @@ -78,9 +83,10 @@ public String getName() {
*/
@PublicApi(since = "2.14.0")
public enum PathType {
FIXED {
FIXED(0) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.isNull(hashAlgorithm) : "hashAlgorithm is expected to be null with fixed remote store path type";
// Hash algorithm is not used in FIXED path type
return pathInput.basePath()
.add(pathInput.indexUUID())
Expand All @@ -94,7 +100,7 @@ boolean requiresHashAlgorithm() {
return false;
}
},
HASHED_PREFIX {
HASHED_PREFIX(1) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
// TODO - We need to implement this, keeping the same path as Fixed for sake of multiple tests that can fail otherwise.
Expand All @@ -112,6 +118,40 @@ boolean requiresHashAlgorithm() {
}
};

private final int code;

PathType(int code) {
this.code = code;
}

public int getCode() {
return code;
}

private static final Map<Integer, PathType> CODE_TO_ENUM;

static {
PathType[] values = values();
Map<Integer, PathType> codeToStatus = new HashMap<>(values.length);
for (PathType value : values) {
int code = value.code;
if (codeToStatus.containsKey(code)) {
throw new IllegalStateException(
new ParameterizedMessage("{} has same code as {}", codeToStatus.get(code), value).getFormattedMessage()
);
}
codeToStatus.put(code, value);
}
CODE_TO_ENUM = unmodifiableMap(codeToStatus);
}

/**
* Turn a status code into a {@link PathType}.
*/
public static PathType fromCode(int code) {
return CODE_TO_ENUM.get(code);
}

/**
* This method generates the path for the given path input which constitutes multiple fields and characteristics
* of the data.
Expand All @@ -131,7 +171,7 @@ public BlobPath path(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
return generatePath(pathInput, hashAlgorithm);
}

abstract BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm);
protected abstract BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm);

abstract boolean requiresHashAlgorithm();

Expand All @@ -158,7 +198,7 @@ public static PathType parseString(String pathType) {
@PublicApi(since = "2.14.0")
public enum PathHashAlgorithm {

FNV_1A {
FNV_1A(0) {
@Override
long hash(PathInput pathInput) {
String input = pathInput.indexUUID() + pathInput.shardId() + pathInput.dataCategory().getName() + pathInput.dataType()
Expand All @@ -167,6 +207,39 @@ long hash(PathInput pathInput) {
}
};

private final int code;

PathHashAlgorithm(int code) {
this.code = code;
}

public int getCode() {
return code;
}

private static final Map<Integer, PathHashAlgorithm> CODE_TO_ENUM;
static {
PathHashAlgorithm[] values = values();
Map<Integer, PathHashAlgorithm> codeToStatus = new HashMap<>(values.length);
for (PathHashAlgorithm value : values) {
int code = value.code;
if (codeToStatus.containsKey(code)) {
throw new IllegalStateException(
new ParameterizedMessage("{} has same code as {}", codeToStatus.get(code), value).getFormattedMessage()
);
}
codeToStatus.put(code, value);
}
CODE_TO_ENUM = unmodifiableMap(codeToStatus);
}

/**
* Turn a status code into a {@link PathHashAlgorithm}.
*/
public static PathHashAlgorithm fromCode(int code) {
return CODE_TO_ENUM.get(code);
}

abstract long hash(PathInput pathInput);

public static PathHashAlgorithm parseString(String pathHashAlgorithm) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.remote;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -36,11 +37,21 @@ public RemoteStorePathStrategy(PathType type) {
}

public RemoteStorePathStrategy(PathType type, PathHashAlgorithm hashAlgorithm) {
assert type.requiresHashAlgorithm() == false || Objects.nonNull(hashAlgorithm);
this.type = Objects.requireNonNull(type);
Objects.requireNonNull(type, "pathType can not be null");
if (isCompatible(type, hashAlgorithm) == false) {
throw new IllegalArgumentException(
new ParameterizedMessage("pathType={} pathHashAlgorithm={} are incompatible", type, hashAlgorithm).getFormattedMessage()
);
}
this.type = type;
this.hashAlgorithm = hashAlgorithm;
}

public static boolean isCompatible(PathType type, PathHashAlgorithm hashAlgorithm) {
return (type.requiresHashAlgorithm() == false && Objects.isNull(hashAlgorithm))
|| (type.requiresHashAlgorithm() && Objects.nonNull(hashAlgorithm));
}

public PathType getType() {
return type;
}
Expand All @@ -55,7 +66,7 @@ public String toString() {
}

public BlobPath generatePath(PathInput pathInput) {
return type.generatePath(pathInput, hashAlgorithm);
return type.path(pathInput, hashAlgorithm);
}

/**
Expand Down
Loading

0 comments on commit e713175

Please sign in to comment.