Skip to content

Commit

Permalink
Add lower limit for primary and replica batch allocators
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN committed Jul 25, 2024
1 parent 157d277 commit c2b75cb
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
private final long maxBatchSize;
private static final short DEFAULT_SHARD_BATCH_SIZE = 2000;

private static final String PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY =
public static final String PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY =
"cluster.routing.allocation.shards_batch_gateway_allocator.primary_allocator_timeout";
private static final String REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY =
public static final String REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY =
"cluster.routing.allocation.shards_batch_gateway_allocator.replica_allocator_timeout";

private TimeValue primaryShardsBatchGatewayAllocatorTimeout;
Expand All @@ -92,16 +92,50 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
Setting.Property.NodeScope
);

/**
* Timeout for existing primary shards batch allocator.
* Values supported is > 20 seconds or -1 to effectively disable timeout
*/
public static final Setting<TimeValue> PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY,
TimeValue.MINUS_ONE,
TimeValue.MINUS_ONE,
new Setting.Validator<>() {
@Override
public void validate(TimeValue timeValue) {
if (timeValue.compareTo(TimeValue.timeValueSeconds(20)) < 0 && timeValue.compareTo(TimeValue.MINUS_ONE) != 0) {
throw new IllegalArgumentException(
"Setting ["
+ PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey()
+ "] should be more than 20s or -1ms to disable timeout"
);
}
}
},
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Timeout for existing replica shards batch allocator.
* Values supported is > 20 seconds or -1 to effectively disable timeout
*/
public static final Setting<TimeValue> REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY,
TimeValue.MINUS_ONE,
TimeValue.MINUS_ONE,
new Setting.Validator<>() {
@Override
public void validate(TimeValue timeValue) {
if (timeValue.compareTo(TimeValue.timeValueSeconds(20)) < 0 && timeValue.compareTo(TimeValue.MINUS_ONE) != 0) {
throw new IllegalArgumentException(
"Setting ["
+ REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey()
+ "] should be more than 20s or -1ms to disable timeout"
);
}
}
},
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING;
import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY;
import static org.opensearch.gateway.ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING;
import static org.opensearch.gateway.ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY;

public class GatewayAllocatorTests extends OpenSearchAllocationTestCase {

private final Logger logger = LogManager.getLogger(GatewayAllocatorTests.class);
Expand Down Expand Up @@ -368,6 +373,56 @@ public void testCreatePrimaryAndReplicaExecutorOfSizeTwo() {
assertEquals(executor.getTimeoutAwareRunnables().size(), 2);
}

public void testPrimaryAllocatorTimeout() {
// Valid setting with timeout = 20s
Settings build = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "20s").build();
assertEquals(20, PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds());

// Valid setting with timeout > 20s
build = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "30000ms").build();
assertEquals(30, PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds());

// Invalid setting with timeout < 20s
Settings lessThan20sSetting = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "10s").build();
IllegalArgumentException iae = expectThrows(
IllegalArgumentException.class,
() -> PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(lessThan20sSetting)
);
assertEquals(
"Setting [" + PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey() + "] should be more than 20s or -1ms to disable timeout",
iae.getMessage()
);

// Valid setting with timeout = -1
build = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "-1").build();
assertEquals(-1, PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis());
}

public void testReplicaAllocatorTimeout() {
// Valid setting with timeout = 20s
Settings build = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "20s").build();
assertEquals(20, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds());

// Valid setting with timeout > 20s
build = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "30000ms").build();
assertEquals(30, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds());

// Invalid setting with timeout < 20s
Settings lessThan20sSetting = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "10s").build();
IllegalArgumentException iae = expectThrows(
IllegalArgumentException.class,
() -> REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(lessThan20sSetting)
);
assertEquals(
"Setting [" + REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey() + "] should be more than 20s or -1ms to disable timeout",
iae.getMessage()
);

// Valid setting with timeout = -1
build = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "-1").build();
assertEquals(-1, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis());
}

private void createIndexAndUpdateClusterState(int count, int numberOfShards, int numberOfReplicas) {
if (count == 0) return;
Metadata.Builder metadata = Metadata.builder();
Expand Down

0 comments on commit c2b75cb

Please sign in to comment.