Skip to content

Commit

Permalink
Followup on comments from #91612 (#91700)
Browse files Browse the repository at this point in the history
  • Loading branch information
idegtiarenko authored Nov 21, 2022
1 parent 69bf513 commit 642a77f
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -31,33 +32,36 @@
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
import org.elasticsearch.test.gateway.TestGatewayAllocator;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.Set;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.summingDouble;
import static java.util.stream.Collectors.summingLong;
import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.getIndexDiskUsageInBytes;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

Expand Down Expand Up @@ -93,96 +97,97 @@ public void testDecideShardAllocation() {
assertNotNull(allocateDecision.getTargetNode().getId(), assignedShards.get(0).currentNodeId());
}

public void testBalanceByShardLoad() {

var smallIndices = IntStream.range(1, 5)
.mapToObj(i -> IndexMetadata.builder("small-index-" + i))
.map(builder -> builder.settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.map(builder -> builder.indexWriteLoadForecast(randomIngestLoad(1.5)));

var heavyIndex = IndexMetadata.builder("heavy-index")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)
.indexWriteLoadForecast(8.0);

var clusterState = stateWithStartedIndices(Stream.concat(smallIndices, Stream.of(heavyIndex)).toList());

var testWriteLoadForecaster = new WriteLoadForecaster() {
@Override
public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
throw new UnsupportedOperationException("Not required for test");
}

@Override
@SuppressForbidden(reason = "This is required to test balancing by ingest load")
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
return indexMetadata.getForecastedWriteLoad();
}
};
public void testBalanceByWriteLoad() {

var allocationService = new MockAllocationService(
yesAllocationDeciders(),
new TestGatewayAllocator(),
new BalancedShardsAllocator(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
TEST_WRITE_LOAD_FORECASTER
),
EmptyClusterInfoService.INSTANCE,
SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES
);

var settings = Settings.EMPTY;
var allocator = new BalancedShardsAllocator(
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
testWriteLoadForecaster
var clusterState = applyStartedShardsUntilNoChange(
stateWithStartedIndices(
IndexMetadata.builder("heavy-index").indexWriteLoadForecast(8.0),
IndexMetadata.builder("light-index-1").indexWriteLoadForecast(1.0),
IndexMetadata.builder("light-index-2").indexWriteLoadForecast(2.0),
IndexMetadata.builder("light-index-3").indexWriteLoadForecast(3.0),
IndexMetadata.builder("zero-write-load-index").indexWriteLoadForecast(0.0),
IndexMetadata.builder("no-write-load-index")
),
allocationService
);
var allocation = createRoutingAllocation(clusterState);
allocator.allocate(allocation);

assertThat(allocation.metadata().getTotalNumberOfShards(), allOf(greaterThanOrEqualTo(3), lessThanOrEqualTo(5)));
for (RoutingNode routingNode : allocation.routingNodes()) {
var nodeIngestLoad = 0.0;
for (ShardRouting shardRouting : routingNode) {
if (shardRouting.started() || shardRouting.initializing()) { // count load from the target node when relocating
var indexMetadata = clusterState.metadata().index(shardRouting.index());
nodeIngestLoad += testWriteLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
}
}
assertThat(nodeIngestLoad, lessThanOrEqualTo(8.0 + 1.5));
}
}
assertThat(
getShardsPerNode(clusterState).values(),
contains(
Set.of("heavy-index"),
Set.of("light-index-1", "light-index-2", "light-index-3", "zero-write-load-index", "no-write-load-index")
)
);

private Double randomIngestLoad(double max) {
return switch (randomInt(3)) {
case 0 -> null;
case 1 -> 0.0;
default -> randomDoubleBetween(1.0, max, true);
};
assertThat(
getPerNode(
clusterState,
summingDouble(
it -> TEST_WRITE_LOAD_FORECASTER.getForecastedWriteLoad(clusterState.metadata().index(it.index())).orElse(0.0)
)
).values(),
everyItem(lessThanOrEqualTo(8.0))
);
}

public void testBalanceByDiskUsage() {

var smallIndices = IntStream.range(1, 5)
.mapToObj(i -> IndexMetadata.builder("small-index-" + i))
.map(builder -> builder.settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.map(builder -> builder.shardSizeInBytesForecast(ByteSizeValue.ofGb(1).getBytes()));
var allocationService = createAllocationService(
Settings.builder()
// enable disk based balancing
.put(BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING.getKey(), "1e-9")
.build()
);

var heavyIndex = IndexMetadata.builder("heavy-index")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)
.shardSizeInBytesForecast(ByteSizeValue.ofGb(8).getBytes());
var clusterState = applyStartedShardsUntilNoChange(
stateWithStartedIndices(
IndexMetadata.builder("heavy-index").shardSizeInBytesForecast(ByteSizeValue.ofGb(8).getBytes()),
IndexMetadata.builder("light-index-1").shardSizeInBytesForecast(ByteSizeValue.ofGb(1).getBytes()),
IndexMetadata.builder("light-index-2").shardSizeInBytesForecast(ByteSizeValue.ofGb(2).getBytes()),
IndexMetadata.builder("light-index-3").shardSizeInBytesForecast(ByteSizeValue.ofGb(3).getBytes()),
IndexMetadata.builder("zero-disk-usage-index").shardSizeInBytesForecast(0L),
IndexMetadata.builder("no-disk-usage-index")
),
allocationService
);

var clusterState = stateWithStartedIndices(Stream.concat(smallIndices, Stream.of(heavyIndex)).toList());
assertThat(
getShardsPerNode(clusterState).values(),
contains(
Set.of("heavy-index"),
Set.of("light-index-1", "light-index-2", "light-index-3", "zero-disk-usage-index", "no-disk-usage-index")
)
);

var allocator = new BalancedShardsAllocator(
Settings.builder().put("cluster.routing.allocation.balance.disk_usage", "1e-9").build()
assertThat(
getPerNode(
clusterState,
summingLong(it -> clusterState.metadata().index(it.index()).getForecastedShardSizeInBytes().orElse(0L))
).values(),
everyItem(lessThanOrEqualTo(ByteSizeValue.ofGb(8).getBytes()))
);
var allocation = createRoutingAllocation(clusterState);
allocator.allocate(allocation);
}

assertThat(allocation.metadata().getTotalNumberOfShards(), allOf(greaterThanOrEqualTo(3), lessThanOrEqualTo(5)));
for (RoutingNode routingNode : allocation.routingNodes()) {
var nodeDiskUsage = 0L;
for (ShardRouting shardRouting : routingNode) {
if (shardRouting.started() || shardRouting.initializing()) { // count load from the target node when relocating
var indexMetadata = clusterState.metadata().index(shardRouting.index());
nodeDiskUsage += indexMetadata.getForecastedShardSizeInBytes().orElse(0L);
}
}
assertThat(nodeDiskUsage, lessThanOrEqualTo(ByteSizeValue.ofGb(8 + 1).getBytes()));
}
private static Map<String, Set<String>> getShardsPerNode(ClusterState clusterState) {
return getPerNode(clusterState, mapping(ShardRouting::getIndexName, toSet()));
}

private static <T> Map<String, T> getPerNode(ClusterState clusterState, Collector<ShardRouting, ?, T> collector) {
return clusterState.getRoutingNodes()
.stream()
.collect(Collectors.toMap(RoutingNode::nodeId, it -> StreamSupport.stream(it.spliterator(), false).collect(collector)));
}

/**
Expand Down Expand Up @@ -333,26 +338,13 @@ private RoutingAllocation createRoutingAllocation(ClusterState clusterState) {
);
}

private static ClusterState stateWithStartedIndices(List<IndexMetadata.Builder> indices) {
private static ClusterState stateWithStartedIndices(IndexMetadata.Builder... indices) {
var metadataBuilder = Metadata.builder();
var routingTableBuilder = RoutingTable.builder();
for (var index : indices) {
var inSyncId = UUIDs.randomBase64UUID(random());
var build = index.putInSyncAllocationIds(0, Set.of(inSyncId)).build();
var build = index.settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0).build();
metadataBuilder.put(build, false);
routingTableBuilder.add(
IndexRoutingTable.builder(build.getIndex())
.addShard(
TestShardRouting.newShardRouting(
new ShardId(build.getIndex(), 0),
randomFrom("node-1", "node-2"),
null,
true,
ShardRoutingState.STARTED,
AllocationId.newInitializing(inSyncId)
)
)
);
routingTableBuilder.addAsNew(build);
}

return ClusterState.builder(ClusterName.DEFAULT)
Expand Down Expand Up @@ -383,13 +375,9 @@ private void addIndex(
var numberOfShards = assignments.entrySet().stream().mapToInt(Map.Entry::getValue).sum();
var inSyncIds = randomList(numberOfShards, numberOfShards, () -> UUIDs.randomBase64UUID(random()));
var indexMetadataBuilder = IndexMetadata.builder(name)
.settings(
Settings.builder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.build()
);
.settings(settings(Version.CURRENT))
.numberOfShards(numberOfShards)
.numberOfReplicas(0);

for (int shardId = 0; shardId < numberOfShards; shardId++) {
indexMetadataBuilder.putInSyncAllocationIds(shardId, Set.of(inSyncIds.get(shardId)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,8 @@ void addNode(long sizeBytes, int shardCount, double writeLoad) {
for (ShardRouting shardRouting : routingNode) {
shards += 1;
totalBytes += shardSizesByIndex.get(shardRouting.index().getName());
totalWriteLoad += SIMULATION_WRITE_LOAD_FORECASTER.getForecastedWriteLoad(
clusterState.metadata().index(shardRouting.index())
).orElseThrow(() -> new AssertionError("missing write load"));
totalWriteLoad += TEST_WRITE_LOAD_FORECASTER.getForecastedWriteLoad(clusterState.metadata().index(shardRouting.index()))
.orElseThrow(() -> new AssertionError("missing write load"));
}

results.startObject();
Expand Down Expand Up @@ -476,7 +475,7 @@ private Map.Entry<MockAllocationService, ShardsAllocator> createNewAllocationSer
new BalancedShardsAllocator(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
SIMULATION_WRITE_LOAD_FORECASTER
TEST_WRITE_LOAD_FORECASTER
),
threadPool,
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,6 @@ public void testDoNotRebalanceToTheNodeThatNoLongerExists() {
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT)
)
.system(randomBoolean())
.build();
final var index = indexMetadata.getIndex();
final var shardId = new ShardId(index, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ public Long getShardSize(ShardRouting shardRouting) {
}
};

public static final WriteLoadForecaster TEST_WRITE_LOAD_FORECASTER = new WriteLoadForecaster() {
@Override
public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
throw new AssertionError("Not required for testing");
}

@Override
@SuppressForbidden(reason = "tests do not need a license to access the write load")
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
return indexMetadata.getForecastedWriteLoad();
}
};

public static MockAllocationService createAllocationService() {
return createAllocationService(Settings.EMPTY);
}
Expand Down Expand Up @@ -397,18 +410,4 @@ public void allocateUnassigned(
}
}
}

protected static final WriteLoadForecaster SIMULATION_WRITE_LOAD_FORECASTER = new WriteLoadForecaster() {
@Override
public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
throw new AssertionError("only called during rollover");
}

@Override
@SuppressForbidden(reason = "tests do not need a license to access the write load")
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
return indexMetadata.getForecastedWriteLoad();
}
};

}

0 comments on commit 642a77f

Please sign in to comment.