Skip to content

Commit

Permalink
[Backport 2.x] Add wait_for_completion parameter to resize, open, and…
Browse files Browse the repository at this point in the history
… forcemerge APIs (#6855)

* Add wait_for_completion parameter to resize, open, and forcemerge APIs (#6434)

* Add wait_for_completion parameter to resize&open&forcemerge APIs (#6228)

Signed-off-by: Gao Binlong <[email protected]>

* Modify changelog

Signed-off-by: Gao Binlong <[email protected]>

* fix test failure

Signed-off-by: Gao Binlong <[email protected]>

* Fix test failure

Signed-off-by: Gao Binlong <[email protected]>

* change header of new file

Signed-off-by: Gao Binlong <[email protected]>

* modify changelog

Signed-off-by: Gao Binlong <[email protected]>

---------

Signed-off-by: Gao Binlong <[email protected]>
(cherry picked from commit 3fec567)

Modify the yaml test file

Signed-off-by: Gao Binlong <[email protected]>

* Modify package name

Signed-off-by: Gao Binlong <[email protected]>

---------

Signed-off-by: Gao Binlong <[email protected]>
Co-authored-by: Andrew Ross <[email protected]>
  • Loading branch information
gaobinlong and andrross authored Mar 29, 2023
1 parent ac79a63 commit 47a95e4
Show file tree
Hide file tree
Showing 31 changed files with 778 additions and 76 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add 'base_path' setting to File System Repository ([#6558](https://github.com/opensearch-project/OpenSearch/pull/6558))
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517))
- Add wait_for_completion parameter to resize, open, and forcemerge APIs ([#6434](https://github.com/opensearch-project/OpenSearch/pull/6434))
- [Segment Replication] Apply backpressure when replicas fall behind ([#6563](https://github.com/opensearch-project/OpenSearch/pull/6563))
- [Remote Store] Integrate remote segment store in peer recovery flow ([#6664](https://github.com/opensearch-project/OpenSearch/pull/6664))
- [Segment Replication] Add new cluster setting to set replication strategy by default for all indices in cluster. ([#6791](https://github.com/opensearch-project/OpenSearch/pull/6791))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,9 @@
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestStatus;
import org.opensearch.tasks.LoggingTaskListener;
import org.opensearch.tasks.Task;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -124,17 +120,6 @@ protected Request setCommonOptions(RestRequest restRequest, Request request) {
return request;
}

private RestChannelConsumer sendTask(String localNodeId, Task task) {
return channel -> {
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
builder.field("task", localNodeId + ":" + task.getId());
builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}
};
}

private static Integer parseSlices(RestRequest request) {
String slicesString = request.param("slices");
if (slicesString == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@
"wait_for_active_shards": {
"type" : "string",
"description" : "Set the number of active shards to wait for on the cloned index before the operation returns."
},
"wait_for_completion": {
"type" : "boolean",
"description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true."
},
"task_execution_timeout": {
"type" : "time",
"description" : "Explicit task execution timeout, only useful when wait_for_completion is false, defaults to 1h."
}
},
"body": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
"only_expunge_deletes":{
"type":"boolean",
"description":"Specify whether the operation should only expunge deleted documents"
},
"wait_for_completion": {
"type" : "boolean",
"description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@
},
"master_timeout":{
"type":"time",
"description":"Specify timeout for connection to master"
"description":"Specify timeout for connection to master",
"deprecated":{
"version":"2.0.0",
"description":"To promote inclusive language, use 'cluster_manager_timeout' instead."
}
},
"cluster_manager_timeout":{
"type":"time",
"description":"Specify timeout for connection to cluster-manager node"
},
"ignore_unavailable":{
"type":"boolean",
Expand All @@ -53,6 +61,14 @@
"wait_for_active_shards":{
"type":"string",
"description":"Sets the number of active shards to wait for before the operation returns."
},
"wait_for_completion": {
"type" : "boolean",
"description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true."
},
"task_execution_timeout": {
"type" : "time",
"description" : "Explicit task execution timeout, only useful when wait_for_completion is false, defaults to 1h."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@
"wait_for_active_shards": {
"type" : "string",
"description" : "Set the number of active shards to wait for on the shrunken index before the operation returns."
},
"wait_for_completion": {
"type" : "boolean",
"description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true."
},
"task_execution_timeout": {
"type" : "time",
"description" : "Explicit task execution timeout, only useful when wait_for_completion is false, defaults to 1h."
}
},
"body":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@
"wait_for_active_shards": {
"type" : "string",
"description" : "Set the number of active shards to wait for on the shrunken index before the operation returns."
},
"wait_for_completion": {
"type" : "boolean",
"description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true."
},
"task_execution_timeout": {
"type" : "time",
"description" : "Explicit task execution timeout, only useful when wait_for_completion is false, defaults to 1h."
}
},
"body":{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
---
"Clone index with wait_for_completion":
# clone index with wait_for_completion parameter, when the parameter is set to false, the API
# will return a task immediately and the clone operation will run in background.

- skip:
version: " - 2.6.99"
reason: "only available in 2.7+"
features: allowed_warnings

- do:
nodes.info:
node_id: data:true
- set:
nodes._arbitrary_key_: node_id

- do:
indices.create:
index: source
wait_for_active_shards: 1
body:
settings:
# ensure everything is allocated on the same data node
index.routing.allocation.include._id: $node_id
index.number_of_shards: 1
index.number_of_replicas: 0
- do:
index:
index: source
id: "1"
body: { "foo": "hello world" }

- do:
get:
index: source
id: "1"

- match: { _index: source }
- match: { _id: "1" }
- match: { _source: { foo: "hello world" } }

# make it read-only
- do:
indices.put_settings:
index: source
body:
index.blocks.write: true
index.number_of_replicas: 0

- do:
cluster.health:
wait_for_status: green
index: source

# clone with wait_for_completion
- do:
indices.clone:
index: "source"
target: "new_cloned_index"
wait_for_active_shards: 1
cluster_manager_timeout: 10s
wait_for_completion: false
task_execution_timeout: 30s
body:
settings:
index.number_of_shards: 1
"index.number_of_replicas": 0

- match: { task: /^.+$/ }
- set: { task: taskId }

- do:
tasks.get:
wait_for_completion: true
task_id: $taskId
- match: { task.action: "indices:admin/resize" }
- match: { task.description: "clone from [source] to [new_cloned_index]" }

# .tasks index is created when the clone index operation completes, so we should delete .tasks index finally,
# if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail.
# Delete the .tasks index directly will also introduce warning, but currently we don't have such APIs which can delete one
# specified task or clear all completed tasks, so we have to do so. Expect we can introduce more tasks related APIs in future
- do:
allowed_warnings:
- "this request accesses system indices: [.tasks], but in a future major version, direct access to system indices will be prevented by default"
indices.delete:
index: .tasks
ignore_unavailable: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
---
"Force merge index with wait_for_completion":
# force merge index with wait_for_completion parameter, when the parameter is set to false, the API
# will return a task immediately and the merge process will run in background.

- skip:
version: " - 2.6.99"
reason: "only available in 2.7+"
features: allowed_warnings

- do:
indices.create:
index: test_index

- do:
indices.forcemerge:
index: test_index
wait_for_completion: false
max_num_segments: 1
- match: { task: /^.+$/ }
- set: { task: taskId }

- do:
tasks.get:
wait_for_completion: true
task_id: $taskId
- match: { task.action: "indices:admin/forcemerge" }
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true]" }

# .tasks index is created when the force-merge operation completes, so we should delete .tasks index finally,
# if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail.
# Delete the .tasks index directly will also introduce warning, but currently we don't have such APIs which can delete one
# specified task or clear all completed tasks, so we have to do so. Expect we can introduce more tasks related APIs in future
- do:
allowed_warnings:
- "this request accesses system indices: [.tasks], but in a future major version, direct access to system indices will be prevented by default"
indices.delete:
index: .tasks
ignore_unavailable: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
"Open index with wait_for_completion":
# open index with wait_for_completion parameter, when the parameter is set to false, the API
# will return a task immediately and the open operation will run in background.

- skip:
version: " - 2.6.99"
reason: "only available in 2.7+"
features: allowed_warnings

- do:
indices.create:
index: test_index
body:
settings:
number_of_replicas: 0
number_of_shards: 1

- do:
indices.close:
index: test_index
- is_true: acknowledged

- do:
indices.open:
index: test_index
wait_for_active_shards: all
cluster_manager_timeout: 10s
wait_for_completion: false
task_execution_timeout: 30s
- match: { task: /^.+$/ }
- set: { task: taskId }

- do:
tasks.get:
wait_for_completion: true
task_id: $taskId
- match: { task.action: "indices:admin/open" }
- match: { task.description: "open indices [test_index]" }

# .tasks index is created when the open index operation completes, so we should delete .tasks index finally,
# if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail.
# Delete the .tasks index directly will also introduce warning, but currently we don't have such APIs which can delete one
# specified task or clear all completed tasks, so we have to do so. Expect we can introduce more tasks related APIs in future
- do:
allowed_warnings:
- "this request accesses system indices: [.tasks], but in a future major version, direct access to system indices will be prevented by default"
indices.delete:
index: .tasks
ignore_unavailable: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
---
"Shrink index with wait_for_completion":
# shrink index with wait_for_completion parameter, when the parameter is set to false, the API
# will return a task immediately and the shrink operation will run in background.

- skip:
version: " - 2.6.99"
reason: "only available in 2.7+"
features: allowed_warnings

- do:
nodes.info:
node_id: data:true
- set:
nodes._arbitrary_key_: node_id

- do:
indices.create:
index: source
wait_for_active_shards: 1
body:
settings:
# ensure everything is allocated on the same data node
index.routing.allocation.include._id: $node_id
index.number_of_shards: 3
index.number_of_replicas: 0
- do:
index:
index: source
id: "1"
body: { "foo": "hello world" }

- do:
get:
index: source
id: "1"

- match: { _index: source }
- match: { _id: "1" }
- match: { _source: { foo: "hello world" } }

# make it read-only
- do:
indices.put_settings:
index: source
body:
index.blocks.write: true
index.number_of_replicas: 0

- do:
cluster.health:
wait_for_status: green
index: source

# shrink with wait_for_completion
- do:
indices.shrink:
index: "source"
target: "new_shrunken_index"
wait_for_active_shards: 1
cluster_manager_timeout: 10s
wait_for_completion: false
task_execution_timeout: 30s
body:
settings:
index.number_of_shards: 1
"index.number_of_replicas": 0

- match: { task: /^.+$/ }
- set: { task: taskId }

- do:
tasks.get:
wait_for_completion: true
task_id: $taskId
- match: { task.action: "indices:admin/resize" }
- match: { task.description: "shrink from [source] to [new_shrunken_index]" }

# .tasks index is created when the shrink index operation completes, so we should delete .tasks index finally,
# if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail.
# Delete the .tasks index directly will also introduce warning, but currently we don't have such APIs which can delete one
# specified task or clear all completed tasks, so we have to do so. Expect we can introduce more tasks related APIs in future
- do:
allowed_warnings:
- "this request accesses system indices: [.tasks], but in a future major version, direct access to system indices will be prevented by default"
indices.delete:
index: .tasks
ignore_unavailable: true
Loading

0 comments on commit 47a95e4

Please sign in to comment.