-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add changes to Point in time segments API service layer #4105
Merged
Merged
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
42f5f10
pit segments service layer changes
bharath-techie 1c702de
Addressing comment
bharath-techie babab92
Addressing comment
bharath-techie adf80d6
Addressing comment
bharath-techie 176153d
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
bharath-techie 1eb68d7
addressing review comments
bharath-techie b9e9134
Merge branch 'main' of github.com:bharath-techie/OpenSearch into pits…
bharath-techie 14071be
addressing comment
bharath-techie 24d90c2
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
bharath-techie 6ad2126
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
bharath-techie 5e06479
Addressing comments
bharath-techie c8c6593
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
bharath-techie 51ef240
addressing comments
bharath-techie 903588e
Addressing comments
bharath-techie aa04c05
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
bharath-techie 611ed25
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
bharath-techie 78c645b
Adding '_all' as option to get all segments
bharath-techie 5d814ac
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
bharath-techie File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
24 changes: 24 additions & 0 deletions
24
server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsAction.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.admin.indices.segments; | ||
|
||
import org.opensearch.action.ActionType; | ||
|
||
/** | ||
* Action for retrieving segment information for PITs | ||
*/ | ||
public class PitSegmentsAction extends ActionType<IndicesSegmentResponse> { | ||
|
||
public static final PitSegmentsAction INSTANCE = new PitSegmentsAction(); | ||
public static final String NAME = "indices:monitor/point_in_time/segments"; | ||
|
||
private PitSegmentsAction() { | ||
super(NAME, IndicesSegmentResponse::new); | ||
} | ||
} |
87 changes: 87 additions & 0 deletions
87
server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.admin.indices.segments; | ||
|
||
import org.opensearch.action.ActionRequestValidationException; | ||
import org.opensearch.action.support.broadcast.BroadcastRequest; | ||
import org.opensearch.common.Strings; | ||
import org.opensearch.common.io.stream.StreamInput; | ||
import org.opensearch.common.io.stream.StreamOutput; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.List; | ||
|
||
import static org.opensearch.action.ValidateActions.addValidationError; | ||
|
||
/** | ||
* Transport request for retrieving PITs segment information | ||
*/ | ||
public class PitSegmentsRequest extends BroadcastRequest<PitSegmentsRequest> { | ||
private boolean verbose = false; | ||
private final List<String> pitIds = new ArrayList<>(); | ||
|
||
public PitSegmentsRequest() { | ||
this(Strings.EMPTY_ARRAY); | ||
} | ||
|
||
public PitSegmentsRequest(StreamInput in) throws IOException { | ||
super(in); | ||
pitIds.addAll(Arrays.asList(in.readStringArray())); | ||
verbose = in.readBoolean(); | ||
} | ||
|
||
public PitSegmentsRequest(String... pitIds) { | ||
super(pitIds); | ||
this.pitIds.addAll(Arrays.asList(pitIds)); | ||
} | ||
|
||
/** | ||
* <code>true</code> if detailed information about each segment should be returned, | ||
* <code>false</code> otherwise. | ||
*/ | ||
public boolean isVerbose() { | ||
return verbose; | ||
} | ||
|
||
/** | ||
* Sets the <code>verbose</code> option. | ||
* @see #isVerbose() | ||
*/ | ||
public void setVerbose(boolean v) { | ||
verbose = v; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
out.writeStringArrayNullable((pitIds == null) ? null : pitIds.toArray(new String[pitIds.size()])); | ||
out.writeBoolean(verbose); | ||
} | ||
|
||
public List<String> getPitIds() { | ||
return Collections.unmodifiableList(pitIds); | ||
} | ||
|
||
public void clearAndSetPitIds(List<String> pitIds) { | ||
this.pitIds.clear(); | ||
this.pitIds.addAll(pitIds); | ||
} | ||
|
||
@Override | ||
public ActionRequestValidationException validate() { | ||
ActionRequestValidationException validationException = null; | ||
if (pitIds == null || pitIds.isEmpty()) { | ||
validationException = addValidationError("no pit ids specified", validationException); | ||
} | ||
return validationException; | ||
} | ||
} |
261 changes: 261 additions & 0 deletions
261
...rc/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,261 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
package org.opensearch.action.admin.indices.segments; | ||
|
||
import org.opensearch.action.ActionListener; | ||
import org.opensearch.action.search.ListPitInfo; | ||
import org.opensearch.action.search.PitService; | ||
import org.opensearch.action.search.SearchContextId; | ||
import org.opensearch.action.search.SearchContextIdForNode; | ||
import org.opensearch.action.support.ActionFilters; | ||
import org.opensearch.action.support.DefaultShardOperationFailedException; | ||
import org.opensearch.action.support.broadcast.node.TransportBroadcastByNodeAction; | ||
import org.opensearch.cluster.ClusterState; | ||
import org.opensearch.cluster.block.ClusterBlockException; | ||
import org.opensearch.cluster.block.ClusterBlockLevel; | ||
import org.opensearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.opensearch.cluster.routing.AllocationId; | ||
import org.opensearch.cluster.routing.PlainShardsIterator; | ||
import org.opensearch.cluster.routing.RecoverySource; | ||
import org.opensearch.cluster.routing.ShardRouting; | ||
import org.opensearch.cluster.routing.ShardRoutingState; | ||
import org.opensearch.cluster.routing.ShardsIterator; | ||
import org.opensearch.cluster.routing.UnassignedInfo; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.Strings; | ||
import org.opensearch.common.inject.Inject; | ||
import org.opensearch.common.io.stream.NamedWriteableRegistry; | ||
import org.opensearch.common.io.stream.StreamInput; | ||
import org.opensearch.common.io.stream.StreamOutput; | ||
import org.opensearch.common.xcontent.XContentBuilder; | ||
import org.opensearch.index.shard.ShardId; | ||
import org.opensearch.indices.IndicesService; | ||
import org.opensearch.search.SearchService; | ||
import org.opensearch.search.internal.PitReaderContext; | ||
import org.opensearch.tasks.Task; | ||
import org.opensearch.threadpool.ThreadPool; | ||
import org.opensearch.transport.TransportService; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.opensearch.action.search.SearchContextId.decode; | ||
|
||
/** | ||
* Transport action for retrieving segment information of PITs | ||
*/ | ||
public class TransportPitSegmentsAction extends TransportBroadcastByNodeAction<PitSegmentsRequest, IndicesSegmentResponse, ShardSegments> { | ||
private final ClusterService clusterService; | ||
private final IndicesService indicesService; | ||
private final SearchService searchService; | ||
private final NamedWriteableRegistry namedWriteableRegistry; | ||
private final TransportService transportService; | ||
private final PitService pitService; | ||
|
||
@Inject | ||
public TransportPitSegmentsAction( | ||
ClusterService clusterService, | ||
TransportService transportService, | ||
IndicesService indicesService, | ||
ActionFilters actionFilters, | ||
IndexNameExpressionResolver indexNameExpressionResolver, | ||
SearchService searchService, | ||
NamedWriteableRegistry namedWriteableRegistry, | ||
PitService pitService | ||
) { | ||
super( | ||
PitSegmentsAction.NAME, | ||
clusterService, | ||
transportService, | ||
actionFilters, | ||
indexNameExpressionResolver, | ||
PitSegmentsRequest::new, | ||
ThreadPool.Names.MANAGEMENT | ||
); | ||
this.clusterService = clusterService; | ||
this.indicesService = indicesService; | ||
this.searchService = searchService; | ||
this.namedWriteableRegistry = namedWriteableRegistry; | ||
this.transportService = transportService; | ||
this.pitService = pitService; | ||
} | ||
|
||
/** | ||
* Execute PIT segments flow for all PITs or request PIT IDs | ||
*/ | ||
@Override | ||
protected void doExecute(Task task, PitSegmentsRequest request, ActionListener<IndicesSegmentResponse> listener) { | ||
List<String> pitIds = request.getPitIds(); | ||
if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { | ||
pitService.getAllPits(ActionListener.wrap(response -> { | ||
request.clearAndSetPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList())); | ||
super.doExecute(task, request, listener); | ||
}, listener::onFailure)); | ||
} else { | ||
super.doExecute(task, request, listener); | ||
} | ||
} | ||
|
||
/** | ||
* This adds list of shards on which we need to retrieve pit segments details | ||
* @param clusterState the cluster state | ||
* @param request the underlying request | ||
* @param concreteIndices the concrete indices on which to execute the operation | ||
*/ | ||
@Override | ||
protected ShardsIterator shards(ClusterState clusterState, PitSegmentsRequest request, String[] concreteIndices) { | ||
final ArrayList<ShardRouting> iterators = new ArrayList<>(); | ||
for (String pitId : request.getPitIds()) { | ||
SearchContextId searchContext = decode(namedWriteableRegistry, pitId); | ||
for (Map.Entry<ShardId, SearchContextIdForNode> entry : searchContext.shards().entrySet()) { | ||
final SearchContextIdForNode perNode = entry.getValue(); | ||
// check if node is part of local cluster | ||
if (Strings.isEmpty(perNode.getClusterAlias())) { | ||
bharath-techie marked this conversation as resolved.
Show resolved
Hide resolved
|
||
final ShardId shardId = entry.getKey(); | ||
iterators.add( | ||
new PitAwareShardRouting( | ||
pitId, | ||
shardId, | ||
perNode.getNode(), | ||
null, | ||
true, | ||
ShardRoutingState.STARTED, | ||
null, | ||
null, | ||
null, | ||
-1L | ||
) | ||
); | ||
} | ||
} | ||
} | ||
return new PlainShardsIterator(iterators); | ||
} | ||
|
||
@Override | ||
protected ClusterBlockException checkGlobalBlock(ClusterState state, PitSegmentsRequest request) { | ||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); | ||
} | ||
Comment on lines
+144
to
+147
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
@Override | ||
protected ClusterBlockException checkRequestBlock(ClusterState state, PitSegmentsRequest countRequest, String[] concreteIndices) { | ||
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices); | ||
} | ||
|
||
@Override | ||
protected ShardSegments readShardResult(StreamInput in) throws IOException { | ||
return new ShardSegments(in); | ||
} | ||
|
||
@Override | ||
protected IndicesSegmentResponse newResponse( | ||
PitSegmentsRequest request, | ||
int totalShards, | ||
int successfulShards, | ||
int failedShards, | ||
List<ShardSegments> results, | ||
List<DefaultShardOperationFailedException> shardFailures, | ||
ClusterState clusterState | ||
) { | ||
return new IndicesSegmentResponse( | ||
results.toArray(new ShardSegments[results.size()]), | ||
totalShards, | ||
successfulShards, | ||
failedShards, | ||
shardFailures | ||
); | ||
} | ||
|
||
@Override | ||
protected PitSegmentsRequest readRequestFrom(StreamInput in) throws IOException { | ||
return new PitSegmentsRequest(in); | ||
} | ||
|
||
@Override | ||
public List<ShardRouting> getShardRoutingsFromInputStream(StreamInput in) throws IOException { | ||
return in.readList(PitAwareShardRouting::new); | ||
} | ||
|
||
/** | ||
* This retrieves segment details of PIT context | ||
* @param request the node-level request | ||
* @param shardRouting the shard on which to execute the operation | ||
*/ | ||
@Override | ||
protected ShardSegments shardOperation(PitSegmentsRequest request, ShardRouting shardRouting) { | ||
assert shardRouting instanceof PitAwareShardRouting; | ||
PitAwareShardRouting pitAwareShardRouting = (PitAwareShardRouting) shardRouting; | ||
bharath-techie marked this conversation as resolved.
Show resolved
Hide resolved
|
||
SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry, pitAwareShardRouting.getPitId()).shards() | ||
.get(shardRouting.shardId()); | ||
PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId()); | ||
if (pitReaderContext == null) { | ||
return new ShardSegments(shardRouting, Collections.emptyList()); | ||
} | ||
return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments()); | ||
} | ||
|
||
/** | ||
* This holds PIT id which is used to perform broadcast operation in PIT shards to retrieve segments information | ||
*/ | ||
public class PitAwareShardRouting extends ShardRouting { | ||
|
||
private final String pitId; | ||
|
||
public PitAwareShardRouting(StreamInput in) throws IOException { | ||
super(in); | ||
this.pitId = in.readString(); | ||
} | ||
|
||
public PitAwareShardRouting( | ||
String pitId, | ||
ShardId shardId, | ||
String currentNodeId, | ||
String relocatingNodeId, | ||
boolean primary, | ||
ShardRoutingState state, | ||
RecoverySource recoverySource, | ||
UnassignedInfo unassignedInfo, | ||
AllocationId allocationId, | ||
long expectedShardSize | ||
) { | ||
super( | ||
shardId, | ||
currentNodeId, | ||
relocatingNodeId, | ||
primary, | ||
state, | ||
recoverySource, | ||
unassignedInfo, | ||
allocationId, | ||
expectedShardSize | ||
); | ||
this.pitId = pitId; | ||
} | ||
|
||
public String getPitId() { | ||
return pitId; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
out.writeString(pitId); | ||
} | ||
|
||
@Override | ||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { | ||
super.toXContent(builder, params); | ||
builder.field("pit_id", pitId); | ||
return builder.endObject(); | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Action is not clear from the name. Why not
GetPitSegmentsAction
? It will make it consistent with previous action names.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming is consistent with 'IndicesSegmentsAction' - which is similar to this API. Should we still consider renaming ?