Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
adding test for remote-cluster-service
Browse files Browse the repository at this point in the history
rajiv-kv committed Aug 30, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent d5ec82d commit 6016fb9
Showing 16 changed files with 175 additions and 147 deletions.
Original file line number Diff line number Diff line change
@@ -51,7 +51,6 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.discovery.Discovery;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.threadpool.ThreadPool;
@@ -84,7 +83,8 @@ public TransportClusterStateAction(
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
@Nullable RemoteClusterStateService remoteClusterStateService) {
@Nullable RemoteClusterStateService remoteClusterStateService
) {
super(
ClusterStateAction.NAME,
false,
Original file line number Diff line number Diff line change
@@ -50,7 +50,6 @@
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
@@ -65,7 +64,6 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.discovery.Discovery;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteManifestManager;
@@ -101,10 +99,6 @@ public abstract class TransportClusterManagerNodeAction<Request extends ClusterM
protected final ClusterService clusterService;
protected final IndexNameExpressionResolver indexNameExpressionResolver;

public void setRemoteClusterStateService(RemoteClusterStateService remoteClusterStateService) {
this.remoteClusterStateService = remoteClusterStateService;
}

protected RemoteClusterStateService remoteClusterStateService;

private final String executor;
@@ -427,7 +421,7 @@ public ClusterState getStateFromLocalNode(ClusterStateTermVersion termVersion) {
return appliedState;
}

ClusterState preCommitState = clusterService.commitState();
ClusterState preCommitState = clusterService.preCommitState();
if (preCommitState != null && termVersion.equals(new ClusterStateTermVersion(preCommitState))) {
logger.trace("Using the published state from local, ClusterStateTermVersion {}", termVersion);
return preCommitState;
@@ -439,16 +433,18 @@ public ClusterState getStateFromLocalNode(ClusterStateTermVersion termVersion) {
termVersion.getTerm(),
termVersion.getVersion()
);
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService
.getClusterMetadataManifestByFileName(appliedState.stateUUID(), manifestFile);
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
appliedState.metadata().clusterUUID(),
manifestFile
);
ClusterState clusterStateFromRemote = remoteClusterStateService.getClusterStateForManifest(
appliedState.getClusterName().value(),
clusterMetadataManifest,
appliedState.nodes().getLocalNode().getId(),
true
);

if(clusterStateFromRemote!=null) {
if (clusterStateFromRemote != null) {
logger.trace("Using the remote cluster-state fetched from local node, ClusterStateTermVersion {}", termVersion);
return clusterStateFromRemote;
}
Original file line number Diff line number Diff line change
@@ -15,13 +15,11 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.discovery.Discovery;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

@@ -42,7 +40,8 @@ public TransportGetTermVersionAction(
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
GetTermVersionAction.NAME,
false,
@@ -77,7 +76,7 @@ protected void clusterManagerOperation(
ClusterState state,
ActionListener<GetTermVersionResponse> listener
) throws Exception {
ActionListener.completeWith(listener, () -> buildResponse(request, clusterService.commitState()));
ActionListener.completeWith(listener, () -> buildResponse(request, clusterService.preCommitState()));
}

private GetTermVersionResponse buildResponse(GetTermVersionRequest request, ClusterState state) {
Original file line number Diff line number Diff line change
@@ -386,7 +386,7 @@ private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionList
coordinationState.get().handleCommit(applyCommitRequest);
final ClusterState committedState = hideStateIfNotRecovered(coordinationState.get().getLastAcceptedState());
applierState = mode == Mode.CANDIDATE ? clusterStateWithNoClusterManagerBlock(committedState) : committedState;
setStateOnApplier(applierState);
clusterApplier.setPreCommitState(applierState);

if (applyCommitRequest.getSourceNode().equals(getLocalNode())) {
// cluster-manager node applies the committed state at the end of the publication process, not here.
@@ -479,10 +479,6 @@ && getCurrentTerm() == ZEN1_BWC_TERM
}
}

private void setStateOnApplier(ClusterState clusterState) {
clusterApplier.setCommitState(clusterState);
}

private static Optional<Join> joinWithDestination(Optional<Join> lastJoin, DiscoveryNode leader, long term) {
if (lastJoin.isPresent() && lastJoin.get().targetMatches(leader) && lastJoin.get().getTerm() == term) {
return lastJoin;
Original file line number Diff line number Diff line change
@@ -527,16 +527,6 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
reposToSkip.add(joiningNodeRepoName);
}
}
//if non-or1
//mix of remote-state (enabled and disabled)
//all of them settings

// publishes to all nodes

//commits

//30s still commit []


if (STRICT.equals(remoteStoreCompatibilityMode)) {
DiscoveryNode existingNode = remoteRoutingTableNode.orElseGet(() -> existingNodes.get(0));
Original file line number Diff line number Diff line change
@@ -50,10 +50,10 @@ public interface ClusterApplier {
void setInitialState(ClusterState initialState);

/**
* Sets the committed state for the applier.
* @param clusterState state that has been committed by cluster-manager
* Sets the pre-commit state for the applier.
* @param clusterState state that has been committed by coordinator to store
*/
void setCommitState(ClusterState clusterState);
void setPreCommitState(ClusterState clusterState);

/**
* Method to invoke when a new cluster state is available to be applied
Original file line number Diff line number Diff line change
@@ -119,7 +119,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
private final Map<TimeoutClusterStateListener, NotifyTimeout> timeoutClusterStateListeners = new ConcurrentHashMap<>();
private final AtomicReference<ClusterState> preApplyState = new AtomicReference<>(); // last committed state which is yet to be applied
private final AtomicReference<ClusterState> preCommitState = new AtomicReference<>(); // last state which is yet to be applied
private final AtomicReference<ClusterState> state; // last applied state

private final String nodeName;
@@ -169,11 +169,6 @@ public void setInitialState(ClusterState initialState) {
state.set(initialState);
}

@Override
public void setCommitState(ClusterState clusterState) {
preApplyState.set(clusterState);
}

@Override
protected synchronized void doStart() {
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
@@ -238,10 +233,6 @@ public ClusterState state() {
return clusterState;
}

public ClusterState commitState() {
return preApplyState.get();
}

/**
* Returns true if the appliedClusterState is not null
*/
@@ -759,4 +750,18 @@ protected long currentTimeInMillis() {
protected boolean applicationMayFail() {
return false;
}

/**
* Pre-commit State of the cluster-applier
* @return ClusterState
*/
public ClusterState preCommitState() {
return preCommitState.get();
}

@Override
public void setPreCommitState(ClusterState clusterState) {
preCommitState.set(clusterState);
}

}
Original file line number Diff line number Diff line change
@@ -183,8 +183,12 @@ public ClusterState state() {
return clusterApplierService.state();
}

public ClusterState commitState() {
return clusterApplierService.commitState();
/**
* The state that is persisted to store but may not be applied to cluster.
* @return ClusterState
*/
public ClusterState preCommitState() {
return clusterApplierService.preCommitState();
}

/**
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ public ClusterState getState(String clusterName, ClusterMetadataManifest manifes
if (cache != null) {
ClusterStateTermVersion cacheStateTermVersion = new ClusterStateTermVersion(
new ClusterName(clusterName),
cache.stateUUID(),
cache.metadata().clusterUUID(),
cache.term(),
cache.version()
);
Original file line number Diff line number Diff line change
@@ -1378,6 +1378,7 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
throws IOException {
assert manifest.getDiffManifest() != null : "Diff manifest null which is required for downloading cluster state";
ClusterStateDiffManifest diff = manifest.getDiffManifest();
boolean includeEphemeral = true;

List<UploadedIndexMetadata> updatedIndices = diff.getIndicesUpdated().stream().map(idx -> {
Optional<UploadedIndexMetadata> uploadedIndexMetadataOptional = manifest.getIndices()
@@ -1425,7 +1426,7 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
manifest.getDiffManifest() != null
&& manifest.getDiffManifest().getIndicesRoutingDiffPath() != null
&& !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(),
true
includeEphemeral
);
ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState);
Metadata.Builder metadataBuilder = Metadata.builder(updatedClusterState.metadata());
@@ -1459,7 +1460,8 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables))
.build();

// always includes all the fields of cluster-state (includeEphemeral=true)
assert includeEphemeral == true;
// newState includes all the fields of cluster-state (includeEphemeral=true always)
remoteClusterStateCache.putState(newState);
return newState;
}
@@ -1664,4 +1666,8 @@ public RemotePersistenceStats getStats() {
return remoteStateStats;
}

RemoteClusterStateCache getRemoteClusterStateCache() {
return remoteClusterStateCache;
}

}
Original file line number Diff line number Diff line change
@@ -156,10 +156,6 @@ public void testGetTransportWithoutMatchingTerm() {
clusterService.state().version() - 1
)
);

{

}
capturingTransport.handleResponse(capturedRequest.requestId, termResp);

assertThat(capturingTransport.capturedRequests().length, equalTo(2));
Loading

0 comments on commit 6016fb9

Please sign in to comment.