Skip to content

Commit

Permalink
Remove PSBA and test files, modify exception handling
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Mar 19, 2024
1 parent 4c75c49 commit a893cb8
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 518 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard;
import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.GatewayStartedShard;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergePolicyProvider;
Expand Down Expand Up @@ -818,9 +818,9 @@ public void testShardFetchCorruptedShardsUsingBatchAction() throws Exception {
.get(discoveryNodes[0].getId())
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNotNull(gatewayStartedShard.storeException());
assertNotNull(gatewayStartedShard.allocationId());
assertTrue(gatewayStartedShard.primary());
assertNotNull(gatewayStartedShard.get().storeException());
assertNotNull(gatewayStartedShard.get().allocationId());
assertTrue(gatewayStartedShard.get().primary());
}

public void testSingleShardStoreFetchUsingBatchAction() throws ExecutionException, InterruptedException {
Expand Down Expand Up @@ -949,9 +949,9 @@ private void assertNodeStoreFilesMetadataSuccessCase(
}

private void assertNodeGatewayStartedShardsHappyCase(GatewayStartedShard gatewayStartedShard) {
assertNull(gatewayStartedShard.storeException());
assertNotNull(gatewayStartedShard.allocationId());
assertTrue(gatewayStartedShard.primary());
assertNull(gatewayStartedShard.get().storeException());
assertNotNull(gatewayStartedShard.get().allocationId());
assertTrue(gatewayStartedShard.get().primary());
}

private void prepareIndex(String indexName, int numberOfPrimaryShards) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ public Map<DiscoveryNode, T> getCacheData(DiscoveryNodes nodes, Set<String> fail
*/
private void fillReverseIdMap() {
arrayToShardId.clear();
for (ShardId shardId : shardIdToArray.keySet()) {
arrayToShardId.putIfAbsent(shardIdToArray.get(shardId), shardId);
for (Map.Entry<ShardId, Integer> indexMapping : shardIdToArray.entrySet()) {
arrayToShardId.putIfAbsent(indexMapping.getValue(), indexMapping.getKey());
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ public static Boolean isEmpty(GatewayStartedShard response) {
&& response.storeException() == null
&& response.replicationCheckpoint() == null;
}

public Boolean isEmpty() {
return allocationId == null && primary == false && storeException == null && replicationCheckpoint == null;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.store.ShardAttributes;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -141,15 +140,18 @@ protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) {
try {
shardsOnNode.put(
shardId,
getShardInfoOnLocalNode(
logger,
shardId,
namedXContentRegistry,
nodeEnv,
indicesService,
shardAttr.getValue().getCustomDataPath(),
settings,
clusterService
new GatewayStartedShard(
getShardInfoOnLocalNode(
logger,
shardId,
namedXContentRegistry,
nodeEnv,
indicesService,
shardAttr.getValue().getCustomDataPath(),
settings,
clusterService
),
null
)
);
} catch (Exception e) {
Expand All @@ -158,7 +160,10 @@ protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) {
shardsOnNode.put(shardId, null);
} else {
// return actual exception as it is for unknown exceptions
shardsOnNode.put(shardId, new GatewayStartedShard(null, false, null, e));
shardsOnNode.put(
shardId,
new GatewayStartedShard(new TransportNodesGatewayStartedShardHelper.GatewayStartedShard(null, false, null, null), e)
);
}
}
}
Expand Down Expand Up @@ -251,6 +256,50 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

public static class GatewayStartedShard {
private final TransportNodesGatewayStartedShardHelper.GatewayStartedShard gatewayStartedShard;
private final Exception transportError;

public GatewayStartedShard(
TransportNodesGatewayStartedShardHelper.GatewayStartedShard gatewayStartedShard,
Exception transportError
) {
this.gatewayStartedShard = gatewayStartedShard;
this.transportError = transportError;
}

public GatewayStartedShard(StreamInput in) throws IOException {
this.gatewayStartedShard = new TransportNodesGatewayStartedShardHelper.GatewayStartedShard(in);
if (in.readBoolean()) {
this.transportError = in.readException();
} else {
this.transportError = null;
}
}

public void writeTo(StreamOutput out) throws IOException {
gatewayStartedShard.writeTo(out);
if (transportError != null) {
out.writeBoolean(true);
out.writeException(transportError);
} else {
out.writeBoolean(false);
}
}

public static boolean isEmpty(GatewayStartedShard gatewayStartedShard) {
return gatewayStartedShard.get().isEmpty() && gatewayStartedShard.getTransportError() == null;
}

public Exception getTransportError() {
return transportError;
}

public TransportNodesGatewayStartedShardHelper.GatewayStartedShard get() {
return gatewayStartedShard;
}
}

/**
* This is the response from a single node, this is used in {@link NodesGatewayStartedShardsBatch} for creating
* node to its response mapping for this transport request.
Expand Down
Loading

0 comments on commit a893cb8

Please sign in to comment.