Skip to content

Commit

Permalink
Respect TaskDecorator configuration on DefaultManagedTaskExecutor
Browse files Browse the repository at this point in the history
Closes gh-30442
  • Loading branch information
jhoeller committed May 8, 2023
1 parent c09055b commit 2c7e866
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, Sche

private TaskExecutorAdapter adaptedExecutor;

@Nullable
private TaskDecorator taskDecorator;


/**
* Create a new ConcurrentTaskExecutor, using a single thread executor as default.
Expand Down Expand Up @@ -139,6 +142,7 @@ public final Executor getConcurrentExecutor() {
* @since 4.3
*/
public final void setTaskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
this.adaptedExecutor.setTaskDecorator(taskDecorator);
}

Expand Down Expand Up @@ -175,11 +179,15 @@ public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
}


private static TaskExecutorAdapter getAdaptedExecutor(Executor concurrentExecutor) {
private TaskExecutorAdapter getAdaptedExecutor(Executor concurrentExecutor) {
if (managedExecutorServiceClass != null && managedExecutorServiceClass.isInstance(concurrentExecutor)) {
return new ManagedTaskExecutorAdapter(concurrentExecutor);
}
return new TaskExecutorAdapter(concurrentExecutor);
TaskExecutorAdapter adapter = new TaskExecutorAdapter(concurrentExecutor);
if (this.taskDecorator != null) {
adapter.setTaskDecorator(this.taskDecorator);
}
return adapter;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
*/
public ConcurrentTaskScheduler() {
super();
this.scheduledExecutor = initScheduledExecutor(null);
initScheduledExecutor(null);
}

/**
Expand All @@ -118,7 +118,7 @@ public ConcurrentTaskScheduler() {
*/
public ConcurrentTaskScheduler(ScheduledExecutorService scheduledExecutor) {
super(scheduledExecutor);
this.scheduledExecutor = initScheduledExecutor(scheduledExecutor);
initScheduledExecutor(scheduledExecutor);
}

/**
Expand All @@ -134,11 +134,11 @@ public ConcurrentTaskScheduler(ScheduledExecutorService scheduledExecutor) {
*/
public ConcurrentTaskScheduler(Executor concurrentExecutor, ScheduledExecutorService scheduledExecutor) {
super(concurrentExecutor);
this.scheduledExecutor = initScheduledExecutor(scheduledExecutor);
initScheduledExecutor(scheduledExecutor);
}


private ScheduledExecutorService initScheduledExecutor(@Nullable ScheduledExecutorService scheduledExecutor) {
private void initScheduledExecutor(@Nullable ScheduledExecutorService scheduledExecutor) {
if (scheduledExecutor != null) {
this.scheduledExecutor = scheduledExecutor;
this.enterpriseConcurrentScheduler = (managedScheduledExecutorServiceClass != null &&
Expand All @@ -148,7 +148,6 @@ private ScheduledExecutorService initScheduledExecutor(@Nullable ScheduledExecut
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
this.enterpriseConcurrentScheduler = false;
}
return this.scheduledExecutor;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.scheduling.concurrent;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -25,6 +26,8 @@
import org.junit.jupiter.api.Test;

import org.springframework.core.task.NoOpRunnable;
import org.springframework.core.task.TaskDecorator;
import org.springframework.util.Assert;

import static org.assertj.core.api.Assertions.assertThatCode;

Expand All @@ -38,8 +41,8 @@ class ConcurrentTaskExecutorTests extends AbstractSchedulingTaskExecutorTests {
new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());


@Override
@SuppressWarnings("deprecation")
@Override
protected org.springframework.core.task.AsyncListenableTaskExecutor buildExecutor() {
concurrentExecutor.setThreadFactory(new CustomizableThreadFactory(this.threadNamePrefix));
return new ConcurrentTaskExecutor(concurrentExecutor);
Expand Down Expand Up @@ -69,10 +72,40 @@ void passingNullExecutorToCtorResultsInDefaultTaskExecutorBeingUsed() {
}

@Test
void passingNullExecutorToSetterResultsInDefaultTaskExecutorBeingUsed() {
void earlySetConcurrentExecutorCallRespectsConfiguredTaskDecorator() {
ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor();
executor.setConcurrentExecutor(null);
executor.setConcurrentExecutor(new DecoratedExecutor());
executor.setTaskDecorator(new RunnableDecorator());
assertThatCode(() -> executor.execute(new NoOpRunnable())).doesNotThrowAnyException();
}

@Test
void lateSetConcurrentExecutorCallRespectsConfiguredTaskDecorator() {
ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor();
executor.setTaskDecorator(new RunnableDecorator());
executor.setConcurrentExecutor(new DecoratedExecutor());
assertThatCode(() -> executor.execute(new NoOpRunnable())).doesNotThrowAnyException();
}


private static class DecoratedRunnable implements Runnable {
@Override
public void run() {
}
}

private static class RunnableDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
return new DecoratedRunnable();
}
}

private static class DecoratedExecutor implements Executor {
@Override
public void execute(Runnable command) {
Assert.state(command instanceof DecoratedRunnable, "TaskDecorator not applied");
}
}

}

0 comments on commit 2c7e866

Please sign in to comment.