Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add validation in Decommission Request for minimum awareness attributes #4767

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fix weighted routing metadata deserialization error on process restart ([#4691](https://github.com/opensearch-project/OpenSearch/pull/4691))
- Refactor Base Action class javadocs to OpenSearch.API ([#4732](https://github.com/opensearch-project/OpenSearch/pull/4732))
- Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459))
- Add validation in Decommission Request for minimum awareness attributes ([#4767](https://github.com/opensearch-project/OpenSearch/pull/4767))
- Refactored BalancedAllocator.Balancer to LocalShardsBalancer ([#4761](https://github.com/opensearch-project/OpenSearch/pull/4761))
- Fail weight update when decommission ongoing and fail decommission when attribute not weighed away ([#4839](https://github.com/opensearch-project/OpenSearch/pull/4839))
### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ public DecommissionRequest(StreamInput in) throws IOException {
super(in);
decommissionAttribute = new DecommissionAttribute(in);
this.delayTimeout = in.readTimeValue();
this.noDelay = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
decommissionAttribute.writeTo(out);
out.writeTimeValue(delayTimeout);
out.writeBoolean(noDelay);
}

/**
Expand All @@ -80,7 +82,9 @@ public TimeValue getDelayTimeout() {
}

public void setNoDelay(boolean noDelay) {
this.delayTimeout = TimeValue.ZERO;
if (noDelay) {
this.delayTimeout = TimeValue.ZERO;
}
this.noDelay = noDelay;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,22 +458,42 @@ private static void validateAwarenessAttribute(
List<String> awarenessAttributes,
Map<String, List<String>> forcedAwarenessAttributes
) {
String msg = null;
if (awarenessAttributes == null) {
msg = "awareness attribute not set to the cluster.";
} else if (forcedAwarenessAttributes == null) {
msg = "forced awareness attribute not set to the cluster.";
} else if (awarenessAttributes.contains(decommissionAttribute.attributeName()) == false) {
msg = "invalid awareness attribute requested for decommissioning";
} else if (forcedAwarenessAttributes.containsKey(decommissionAttribute.attributeName()) == false) {
msg = "forced awareness attribute [" + forcedAwarenessAttributes.toString() + "] doesn't have the decommissioning attribute";
} else if (forcedAwarenessAttributes.get(decommissionAttribute.attributeName())
if (awarenessAttributes == null
|| forcedAwarenessAttributes == null
|| awarenessAttributes.isEmpty()
|| forcedAwarenessAttributes.isEmpty()) {
throw new DecommissioningFailedException(
decommissionAttribute,
"awareness attribute ["
+ awarenessAttributes
+ "] and forced awareness attribute ["
+ forcedAwarenessAttributes
+ "] must be set to execute decommissioning"
);
}
if (awarenessAttributes.contains(decommissionAttribute.attributeName()) == false
|| forcedAwarenessAttributes.containsKey(decommissionAttribute.attributeName()) == false) {
throw new DecommissioningFailedException(
decommissionAttribute,
"invalid awareness attribute requested for decommissioning, eligible attributes are [" + forcedAwarenessAttributes + "]"
);
}
if (forcedAwarenessAttributes.get(decommissionAttribute.attributeName())
.contains(decommissionAttribute.attributeValue()) == false) {
msg = "invalid awareness attribute value requested for decommissioning. Set forced awareness values before to decommission";
}

if (msg != null) {
throw new DecommissioningFailedException(decommissionAttribute, msg);
throw new DecommissioningFailedException(
decommissionAttribute,
"invalid awareness attribute value requested for decommissioning. Eligible forced awareness attributes ["
+ forcedAwarenessAttributes
+ "]"
);
}
if (forcedAwarenessAttributes.get(decommissionAttribute.attributeName()).size() < 3) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a configuration?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't get you. Configuration as in?

throw new DecommissioningFailedException(
decommissionAttribute,
"total awareness attribute value set to cluster is ["
+ forcedAwarenessAttributes.get(decommissionAttribute.attributeName()).size()
+ "] which is less than minimum attribute value count required [3]"
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ public DecommissioningFailedException(DecommissionAttribute decommissionAttribut
}

public DecommissioningFailedException(DecommissionAttribute decommissionAttribute, String msg, Throwable cause) {
super("[" + (decommissionAttribute == null ? "_na" : decommissionAttribute.toString()) + "] " + msg, cause);
super(
"Decommission request for ["
+ (decommissionAttribute == null ? "_na" : decommissionAttribute.toString())
+ "] failed because "
+ msg,
cause
);
this.decommissionAttribute = decommissionAttribute;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void onResponse(DecommissionResponse decommissionResponse) {
@Override
public void onFailure(Exception e) {
assertTrue(e instanceof DecommissioningFailedException);
assertThat(e.getMessage(), Matchers.endsWith("invalid awareness attribute requested for decommissioning"));
assertThat(e.getMessage(), Matchers.containsString("invalid awareness attribute requested for decommissioning"));
countDownLatch.countDown();
}
};
Expand All @@ -162,9 +162,8 @@ public void onFailure(Exception e) {
assertTrue(e instanceof DecommissioningFailedException);
assertThat(
e.getMessage(),
Matchers.endsWith(
"invalid awareness attribute value requested for decommissioning. "
+ "Set forced awareness values before to decommission"
Matchers.containsString(
"failed because invalid awareness attribute value requested for decommissioning. Eligible forced awareness attributes"
)
);
countDownLatch.countDown();
Expand Down Expand Up @@ -262,6 +261,43 @@ public void onFailure(Exception e) {
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

@SuppressWarnings("unchecked")
public void testDecommissioningFailedForInsufficientAttributeValues() throws InterruptedException {
final Settings.Builder tempSettingsBuilder = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "zone_1,zone_2");
DecommissionService decommissionService = new DecommissionService(
tempSettingsBuilder.build(),
clusterSettings,
clusterService,
transportService,
threadPool,
allocationService
);

final CountDownLatch countDownLatch = new CountDownLatch(1);
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1");

ActionListener<DecommissionResponse> listener = new ActionListener<DecommissionResponse>() {
@Override
public void onResponse(DecommissionResponse decommissionResponse) {
fail("on response shouldn't have been called");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof DecommissioningFailedException);
assertEquals(
e.getMessage(),
"Decommission request for [DecommissionAttribute{attributeName='zone', attributeValue='zone_1'}] failed because total awareness attribute value set to cluster is [2] which is less than minimum attribute value count required [3]"
);
countDownLatch.countDown();
}
};
decommissionService.startDecommissionAction(new DecommissionRequest(decommissionAttribute), listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

public void testScheduleNodesDecommissionOnTimeout() {
TransportService mockTransportService = Mockito.mock(TransportService.class);
ThreadPool mockThreadPool = Mockito.mock(ThreadPool.class);
Expand Down Expand Up @@ -310,7 +346,6 @@ public void testDrainNodesWithDecommissionedAttributeWithNoDelay() {

setState(clusterService, state);
decommissionService.drainNodesWithDecommissionedAttribute(request);

}

public void testClearClusterDecommissionState() throws InterruptedException {
Expand Down