Skip to content

Commit

Permalink
Add factory class to create new response objects
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Apr 4, 2024
1 parent ba6cbb4 commit 4246256
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
Expand All @@ -45,10 +44,9 @@ public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V> extend
AsyncShardFetch.Lister<? extends BaseNodesResponse<T>, T> action,
String batchId,
Class<V> clazz,
BiFunction<DiscoveryNode, Map<ShardId, V>, T> responseGetter,
Function<T, Map<ShardId, V>> shardsBatchDataGetter,
V emptyResponse,
Function<V, Boolean> isEmptyResponse
Function<V, Boolean> isEmptyResponse,
ShardBatchResponseFactory<T, V> responseFactory
) {
super(
logger,
Expand All @@ -62,10 +60,9 @@ public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V> extend
shardAttributesMap,
"BatchID=[" + batchId + "]",
clazz,
responseGetter,
shardsBatchDataGetter,
emptyResponse,
isEmptyResponse
isEmptyResponse,
responseFactory
)
);
}
Expand Down Expand Up @@ -95,8 +92,7 @@ static class ShardBatchCache<T extends BaseNodeResponse, V> extends AsyncShardFe
private final Map<ShardId, Integer> shardIdToArray;
private final int batchSize;
private final Class<V> shardResponseClass;
private final BiFunction<DiscoveryNode, Map<ShardId, V>, T> responseConstructor;
private final Function<T, Map<ShardId, V>> shardsBatchDataGetter;
private final ShardBatchResponseFactory<T, V> responseFactory;
private final V emptyResponse;
private final Function<V, Boolean> isEmpty;
private final Logger logger;
Expand All @@ -107,10 +103,9 @@ public ShardBatchCache(
Map<ShardId, ShardAttributes> shardAttributesMap,
String logKey,
Class<V> clazz,
BiFunction<DiscoveryNode, Map<ShardId, V>, T> responseGetter,
Function<T, Map<ShardId, V>> shardsBatchDataGetter,
V emptyResponse,
Function<V, Boolean> isEmptyResponse
Function<V, Boolean> isEmptyResponse,
ShardBatchResponseFactory<T, V> responseFactory
) {
super(Loggers.getLogger(logger, "_" + logKey), type);
this.batchSize = shardAttributesMap.size();
Expand All @@ -119,10 +114,9 @@ public ShardBatchCache(
shardIdToArray = new HashMap<>();
fillShardIdKeys(shardAttributesMap.keySet());
this.shardResponseClass = clazz;
this.responseConstructor = responseGetter;
this.shardsBatchDataGetter = shardsBatchDataGetter;
this.emptyResponse = emptyResponse;
this.logger = logger;
this.responseFactory = responseFactory;
}

@Override
Expand Down Expand Up @@ -156,13 +150,13 @@ public void initData(DiscoveryNode node) {
@Override
public void putData(DiscoveryNode node, T response) {
NodeEntry<V> nodeEntry = cache.get(node.getId());
Map<ShardId, V> batchResponse = shardsBatchDataGetter.apply(response);
Map<ShardId, V> batchResponse = responseFactory.getShardBatchData(response);
nodeEntry.doneFetching(batchResponse, shardIdToArray);
}

@Override
public T getData(DiscoveryNode node) {
return this.responseConstructor.apply(node, getBatchData(cache.get(node.getId())));
return this.responseFactory.getNewResponse(node, getBatchData(cache.get(node.getId())));
}

private HashMap<ShardId, V> getBatchData(NodeEntry<V> nodeEntry) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;

import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard;
import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch;

import java.util.Map;

/**
* A factory class to create new responses of batch transport actions like
* {@link TransportNodesListGatewayStartedShardsBatch} or {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch}
*
* @param <T> Node level response returned by batch transport actions.
* @param <V> Shard level metadata returned by batch transport actions.
*/
public class ShardBatchResponseFactory<T extends BaseNodeResponse, V> {
private final boolean primary;

public ShardBatchResponseFactory(boolean primary) {
this.primary = primary;
}

public T getNewResponse(DiscoveryNode node, Map<ShardId, V> shardData) {
if (primary) {
return (T) new NodeGatewayStartedShardsBatch(node, (Map<ShardId, GatewayStartedShard>) shardData);
} else {
return (T) new NodeStoreFilesMetadataBatch(node, (Map<ShardId, NodeStoreFilesMetadata>) shardData);
}
}

public Map<ShardId, V> getShardBatchData(T response) {
if (primary) {
return (Map<ShardId, V>) ((NodeGatewayStartedShardsBatch) response).getNodeGatewayStartedShardsBatch();
} else {
return (Map<ShardId, V>) ((NodeStoreFilesMetadataBatch) response).getNodeStoreFilesMetadataBatch();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ public void setupShardBatchCache(String batchId, int numberOfShards) {
shardAttributesMap,
"BatchID=[" + batchId + "]",
GatewayStartedShard.class,
NodeGatewayStartedShardsBatch::new,
NodeGatewayStartedShardsBatch::getNodeGatewayStartedShardsBatch,
new GatewayStartedShard(null, false, null, null),
GatewayStartedShard::isEmpty
GatewayStartedShard::isEmpty,
new ShardBatchResponseFactory<>(true)
);
}

Expand Down

0 comments on commit 4246256

Please sign in to comment.