Skip to content

Commit

Permalink
pit segments service layer changes
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Aug 3, 2022
1 parent 6f23300 commit 42f5f10
Show file tree
Hide file tree
Showing 12 changed files with 509 additions and 5 deletions.
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@
import org.opensearch.action.admin.indices.rollover.RolloverAction;
import org.opensearch.action.admin.indices.rollover.TransportRolloverAction;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.opensearch.action.admin.indices.segments.PitSegmentsAction;
import org.opensearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
import org.opensearch.action.admin.indices.segments.TransportPitSegmentsAction;
import org.opensearch.action.admin.indices.settings.get.GetSettingsAction;
import org.opensearch.action.admin.indices.settings.get.TransportGetSettingsAction;
import org.opensearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
Expand Down Expand Up @@ -667,6 +669,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class);
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);
actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class);

return unmodifiableMap(actions.getRegistry());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.segments;

import org.opensearch.action.ActionType;

/**
* Action for retrieving segment information for PITs
*/
public class PitSegmentsAction extends ActionType<IndicesSegmentResponse> {

public static final PitSegmentsAction INSTANCE = new PitSegmentsAction();
public static final String NAME = "indices:monitor/point_in_time/segments";

private PitSegmentsAction() {
super(NAME, IndicesSegmentResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.segments;

import org.opensearch.action.support.broadcast.BroadcastRequest;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

/**
* Transport request for retrieving PITs segment information
*/
public class PitSegmentsRequest extends BroadcastRequest<PitSegmentsRequest> {

protected boolean verbose = false;
private Collection<String> pitIds;

public PitSegmentsRequest() {
this(Strings.EMPTY_ARRAY);
}

public PitSegmentsRequest(StreamInput in) throws IOException {
super(in);
pitIds = Arrays.asList(in.readStringArray());
verbose = in.readBoolean();
}

public PitSegmentsRequest(String... indices) {
super(indices);
pitIds = Collections.emptyList();
}

/**
* <code>true</code> if detailed information about each segment should be returned,
* <code>false</code> otherwise.
*/
public boolean verbose() {
return verbose;
}

/**
* Sets the <code>verbose</code> option.
* @see #verbose()
*/
public void verbose(boolean v) {
verbose = v;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (pitIds == null) {
out.writeVInt(0);
} else {
out.writeStringArray(pitIds.toArray(new String[pitIds.size()]));
}
out.writeBoolean(verbose);

}

public Collection<String> getPitIds() {
return pitIds;
}

public void setPitIds(Collection<String> pitIds) {
this.pitIds = pitIds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.action.admin.indices.segments;

import org.opensearch.action.ActionListener;
import org.opensearch.action.search.ListPitInfo;
import org.opensearch.action.search.PitService;
import org.opensearch.action.search.SearchContextId;
import org.opensearch.action.search.SearchContextIdForNode;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.DefaultShardOperationFailedException;
import org.opensearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.AllocationId;
import org.opensearch.cluster.routing.PlainShardsIterator;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.ShardsIterator;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndicesService;
import org.opensearch.search.SearchService;
import org.opensearch.search.internal.PitReaderContext;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.action.search.SearchContextId.decode;

/**
* Transport action for retrieving segment information of PITs
*/
public class TransportPitSegmentsAction extends TransportBroadcastByNodeAction<PitSegmentsRequest, IndicesSegmentResponse, ShardSegments> {
private final ClusterService clusterService;
private final IndicesService indicesService;
private final SearchService searchService;
private final NamedWriteableRegistry namedWriteableRegistry;
private final TransportService transportService;
private final PitService pitService;

@Inject
public TransportPitSegmentsAction(
ClusterService clusterService,
TransportService transportService,
IndicesService indicesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
SearchService searchService,
NamedWriteableRegistry namedWriteableRegistry,
PitService pitService
) {
super(
PitSegmentsAction.NAME,
clusterService,
transportService,
actionFilters,
indexNameExpressionResolver,
PitSegmentsRequest::new,
ThreadPool.Names.MANAGEMENT
);
this.clusterService = clusterService;
this.indicesService = indicesService;
this.searchService = searchService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.transportService = transportService;
this.pitService = pitService;
}

/**
* Execute PIT segments flow for all PITs or request PIT IDs
*/
@Override
protected void doExecute(Task task, PitSegmentsRequest request, ActionListener<IndicesSegmentResponse> listener) {
if (request.getPitIds().isEmpty()) {
pitService.getAllPits(ActionListener.wrap(response -> {
request.setPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList()));
getDoExecute(task, request, listener);
}, listener::onFailure));
} else {
getDoExecute(task, request, listener);
}
}

private void getDoExecute(Task task, PitSegmentsRequest request, ActionListener<IndicesSegmentResponse> listener) {
super.doExecute(task, request, listener);
}

/**
* This adds list of shards on which we need to retrieve pit segments details
* @param clusterState the cluster state
* @param request the underlying request
* @param concreteIndices the concrete indices on which to execute the operation
*/
@Override
protected ShardsIterator shards(ClusterState clusterState, PitSegmentsRequest request, String[] concreteIndices) {
final ArrayList<ShardRouting> iterators = new ArrayList<>();
for (String pitId : request.getPitIds()) {
SearchContextId searchContext = decode(namedWriteableRegistry, pitId);
for (Map.Entry<ShardId, SearchContextIdForNode> entry : searchContext.shards().entrySet()) {
final SearchContextIdForNode perNode = entry.getValue();
if (Strings.isEmpty(perNode.getClusterAlias())) {
final ShardId shardId = entry.getKey();
iterators.add(
new PitAwareShardRouting(
pitId,
shardId,
perNode.getNode(),
null,
true,
ShardRoutingState.STARTED,
null,
null,
null,
-1L
)
);
}
}
}
return new PlainShardsIterator(iterators);
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, PitSegmentsRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, PitSegmentsRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
}

@Override
protected ShardSegments readShardResult(StreamInput in) throws IOException {
return new ShardSegments(in);
}

@Override
protected IndicesSegmentResponse newResponse(
PitSegmentsRequest request,
int totalShards,
int successfulShards,
int failedShards,
List<ShardSegments> results,
List<DefaultShardOperationFailedException> shardFailures,
ClusterState clusterState
) {
return new IndicesSegmentResponse(
results.toArray(new ShardSegments[results.size()]),
totalShards,
successfulShards,
failedShards,
shardFailures
);
}

@Override
protected PitSegmentsRequest readRequestFrom(StreamInput in) throws IOException {
return new PitSegmentsRequest(in);
}

@Override
public List<ShardRouting> getShardsFromInputStream(StreamInput in) throws IOException {
return in.readList(PitAwareShardRouting::new);
}

/**
* This retrieves segment details of PIT context
* @param request the node-level request
* @param shardRouting the shard on which to execute the operation
*/
@Override
protected ShardSegments shardOperation(PitSegmentsRequest request, ShardRouting shardRouting) {
PitAwareShardRouting pitAwareShardRouting = (PitAwareShardRouting) shardRouting;
SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry, pitAwareShardRouting.getPitId()).shards()
.get(shardRouting.shardId());
PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId());
if (pitReaderContext == null) return new ShardSegments(shardRouting, new ArrayList<>());
return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments());
}

/**
* This holds PIT id which is used to perform broadcast operation in PIT shards to retrieve segments information
*/
public class PitAwareShardRouting extends ShardRouting {

private final String pitId;

public PitAwareShardRouting(StreamInput in) throws IOException {
super(in);
this.pitId = in.readString();
}

public PitAwareShardRouting(
String pitId,
ShardId shardId,
String currentNodeId,
String relocatingNodeId,
boolean primary,
ShardRoutingState state,
RecoverySource recoverySource,
UnassignedInfo unassignedInfo,
AllocationId allocationId,
long expectedShardSize
) {
super(
shardId,
currentNodeId,
relocatingNodeId,
primary,
state,
recoverySource,
unassignedInfo,
allocationId,
expectedShardSize
);
this.pitId = pitId;
}

public String getPitId() {
return pitId;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(pitId);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
super.toXContent(builder, params);
builder.field("pitId", pitId);
return builder.endObject();
}
}
}
Loading

0 comments on commit 42f5f10

Please sign in to comment.