Skip to content

Commit

Permalink
waitForPersistentTaskCondition
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed May 29, 2018
1 parent b4a50b7 commit ab05475
Show file tree
Hide file tree
Showing 10 changed files with 19 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public long getAllocationId() {
public void waitForPersistentTask(final Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> predicate,
final @Nullable TimeValue timeout,
final PersistentTasksService.WaitForPersistentTaskListener<?> listener) {
persistentTasksService.waitForPersistentTask(persistentTaskId, predicate, timeout, listener);
persistentTasksService.waitForPersistentTaskCondition(persistentTaskId, predicate, timeout, listener);
}

final boolean isCompleted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ void execute(final Req request, final Action<Req, Resp, Builder> action, final A
* @param timeout a timeout for waiting
* @param listener the callback listener
*/
public void waitForPersistentTask(final String taskId,
final Predicate<PersistentTask<?>> predicate,
final @Nullable TimeValue timeout,
final WaitForPersistentTaskListener<?> listener) {
public void waitForPersistentTaskCondition(final String taskId,
final Predicate<PersistentTask<?>> predicate,
final @Nullable TimeValue timeout,
final WaitForPersistentTaskListener<?> listener) {
final Predicate<ClusterState> clusterStatePredicate = clusterState ->
predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId));

Expand Down Expand Up @@ -199,9 +199,9 @@ public void onTimeout(TimeValue timeout) {
* @param timeout a timeout for waiting
* @param listener the callback listener
*/
public void waitForPersistentTasks(final Predicate<PersistentTasksCustomMetaData> predicate,
final @Nullable TimeValue timeout,
final ActionListener<Boolean> listener) {
public void waitForPersistentTasksCondition(final Predicate<PersistentTasksCustomMetaData> predicate,
final @Nullable TimeValue timeout,
final ActionListener<Boolean> listener) {
final Predicate<ClusterState> clusterStatePredicate = clusterState ->
predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,15 @@ public void testPersistentActionStatusUpdate() throws Exception {

int finalI = i;
WaitForPersistentTaskFuture<?> future1 = new WaitForPersistentTaskFuture<>();
persistentTasksService.waitForPersistentTask(taskId,
persistentTasksService.waitForPersistentTaskCondition(taskId,
task -> task != null && task.getStatus() != null && task.getStatus().toString() != null &&
task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"),
TimeValue.timeValueSeconds(10), future1);
assertThat(future1.get().getId(), equalTo(taskId));
}

WaitForPersistentTaskFuture<?> future1 = new WaitForPersistentTaskFuture<>();
persistentTasksService.waitForPersistentTask(taskId,
persistentTasksService.waitForPersistentTaskCondition(taskId,
task -> false, TimeValue.timeValueMillis(10), future1);

assertThrows(future1, IllegalStateException.class, "timed out after 10ms");
Expand All @@ -221,7 +221,7 @@ public void testPersistentActionStatusUpdate() throws Exception {

// Wait for the task to disappear
WaitForPersistentTaskFuture<?> future2 = new WaitForPersistentTaskFuture<>();
persistentTasksService.waitForPersistentTask(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2);
persistentTasksService.waitForPersistentTaskCondition(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2);

logger.info("Completing the running task");
// Complete the running task and make sure it finishes properly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ public boolean hasJobsToWaitFor() {
// so wait for that to happen here.
void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitForCloseRequest, CloseJobAction.Response response,
ActionListener<CloseJobAction.Response> listener) {
persistentTasksService.waitForPersistentTasks(persistentTasksCustomMetaData -> {
persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> {
for (String persistentTaskId : waitForCloseRequest.persistentTaskIds) {
if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ public void onFailure(Exception e) {

private void waitForJobStarted(String taskId, OpenJobAction.JobParams jobParams, ActionListener<OpenJobAction.Response> listener) {
JobPredicate predicate = new JobPredicate();
persistentTasksService.waitForPersistentTask(taskId, predicate, jobParams.getTimeout(),
persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, jobParams.getTimeout(),
new PersistentTasksService.WaitForPersistentTaskListener<OpenJobAction.JobParams>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ protected ClusterBlockException checkBlock(StartDatafeedAction.Request request,
private void waitForDatafeedStarted(String taskId, StartDatafeedAction.DatafeedParams params,
ActionListener<StartDatafeedAction.Response> listener) {
DatafeedPredicate predicate = new DatafeedPredicate();
persistentTasksService.waitForPersistentTask(taskId, predicate, params.getTimeout(),
persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, params.getTimeout(),
new PersistentTasksService.WaitForPersistentTaskListener<StartDatafeedAction.DatafeedParams>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ private void sendResponseOrFailure(String datafeedId, ActionListener<StopDatafee
void waitForDatafeedStopped(List<String> datafeedPersistentTaskIds, StopDatafeedAction.Request request,
StopDatafeedAction.Response response,
ActionListener<StopDatafeedAction.Response> listener) {
persistentTasksService.waitForPersistentTasks(persistentTasksCustomMetaData -> {
persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> {
for (String persistentTaskId: datafeedPersistentTaskIds) {
if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persisten
logger.debug("Request to cancel Task for Rollup job [" + jobId + "] successful.");

// Step 2. Wait for the task to finish cancellation internally
persistentTasksService.waitForPersistentTask(jobId, Objects::isNull, timeout,
persistentTasksService.waitForPersistentTaskCondition(jobId, Objects::isNull, timeout,
new PersistentTasksService.WaitForPersistentTaskListener<RollupJob>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<RollupJob> task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ static void startPersistentTask(RollupJob job, ActionListener<PutRollupJobAction

private static void waitForRollupStarted(RollupJob job, ActionListener<PutRollupJobAction.Response> listener,
PersistentTasksService persistentTasksService) {
persistentTasksService.waitForPersistentTask(job.getConfig().getId(), Objects::nonNull, job.getConfig().getTimeout(),
persistentTasksService.waitForPersistentTaskCondition(job.getConfig().getId(), Objects::nonNull, job.getConfig().getTimeout(),
new PersistentTasksService.WaitForPersistentTaskListener<RollupJob>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<RollupJob> task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,10 @@ public void testStartTask() {
// Bail here with an error, further testing will happen through tests of #startPersistentTask
requestCaptor2.getValue().onFailure(new RuntimeException("Ending"));
return null;
}).when(tasksService).waitForPersistentTask(eq(job.getConfig().getId()), any(), any(), requestCaptor2.capture());
}).when(tasksService).waitForPersistentTaskCondition(eq(job.getConfig().getId()), any(), any(), requestCaptor2.capture());

TransportPutRollupJobAction.startPersistentTask(job, testListener, tasksService);
verify(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), any());
verify(tasksService).waitForPersistentTask(eq(job.getConfig().getId()), any(), any(), any());
verify(tasksService).waitForPersistentTaskCondition(eq(job.getConfig().getId()), any(), any(), any());
}
}

0 comments on commit ab05475

Please sign in to comment.