Skip to content

Commit

Permalink
Fix up ClusterServiceIT (#90397)
Browse files Browse the repository at this point in the history
ClusterServiceIT#testPendingUpdateTask has some unbounded waits, it
relies on the clock advancing by at least 1ms which might not happen,
and it leaves the cluster service thread blocked on failure which causes
knock-on effects. This commit addresses these problems.
  • Loading branch information
DaveCTurner authored Sep 27, 2022
1 parent 24cf871 commit be149bd
Showing 1 changed file with 39 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
public ClusterState execute(ClusterState currentState) {
invoked3.countDown();
try {
block2.await();
assertTrue(block2.await(10, TimeUnit.SECONDS));
} catch (InterruptedException e) {
fail();
}
Expand All @@ -397,40 +397,48 @@ public void onFailure(Exception e) {
fail();
}
});
invoked3.await();

for (int i = 2; i <= 5; i++) {
clusterService.submitUnbatchedStateUpdateTask(Integer.toString(i), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}
assertTrue(invoked3.await(10, TimeUnit.SECONDS));

try {
for (int i = 2; i <= 5; i++) {
clusterService.submitUnbatchedStateUpdateTask(Integer.toString(i), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}

@Override
public void onFailure(Exception e) {
fail();
}
});
}

@Override
public void onFailure(Exception e) {
fail();
}
});
}
Thread.sleep(100);
final var startNanoTime = System.nanoTime();
while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= 0) {
// noinspection BusyWait
Thread.sleep(100);
}

pendingClusterTasks = clusterService.getMasterService().pendingTasks();
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : pendingClusterTasks) {
controlSources.remove(task.getSource().string());
}
assertTrue(controlSources.isEmpty());
pendingClusterTasks = clusterService.getMasterService().pendingTasks();
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : pendingClusterTasks) {
controlSources.remove(task.getSource().string());
}
assertTrue(controlSources.isEmpty());

response = internalCluster().coordOnlyNodeClient().admin().cluster().preparePendingClusterTasks().get();
assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : response) {
if (controlSources.remove(task.getSource().string())) {
assertThat(task.getTimeInQueueInMillis(), greaterThan(0L));
response = internalCluster().coordOnlyNodeClient().admin().cluster().preparePendingClusterTasks().get();
assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : response) {
if (controlSources.remove(task.getSource().string())) {
assertThat(task.getTimeInQueueInMillis(), greaterThan(0L));
}
}
assertTrue(controlSources.isEmpty());
} finally {
block2.countDown();
}
assertTrue(controlSources.isEmpty());
block2.countDown();
}
}

0 comments on commit be149bd

Please sign in to comment.