Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SpringBoot - Add registered workflow and activity impl info to workers template #1986

Merged
merged 4 commits into from
Feb 20, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class WorkersTemplate implements BeanFactoryAware, EnvironmentAware {

private WorkerFactory workerFactory;
private Collection<Worker> workers;
private final Map<String, RegisteredInfo> registeredInfo = new HashMap<>();

public WorkersTemplate(
@Nonnull TemporalProperties properties,
Expand Down Expand Up @@ -111,6 +112,14 @@ public Collection<Worker> getWorkers() {
return workers;
}

/** Return information on registered workflow and activity types per task queue */
public Map<String, RegisteredInfo> getRegisteredInfo() {
if (workers == null) {
this.workers = createWorkers(getWorkerFactory());
}
return registeredInfo;
}

WorkerFactory createWorkerFactory(WorkflowClient workflowClient) {
if (testWorkflowEnvironment != null) {
return testWorkflowEnvironment.getWorkerFactory();
Expand Down Expand Up @@ -170,7 +179,7 @@ private void configureWorkflowImplementationsByTaskQueue(
worker = createNewWorker(taskQueue, null, workers);
}

configureWorkflowImplementationAutoDiscovery(worker, clazz, null);
configureWorkflowImplementationAutoDiscovery(worker, clazz, null, workers);
}
}
}
Expand All @@ -197,7 +206,7 @@ private void configureActivityBeansByTaskQueue(
}

configureActivityImplementationAutoDiscovery(
worker, bean, beanName, targetClass, null);
worker, bean, beanName, targetClass, null, workers);
}
}
});
Expand All @@ -218,7 +227,7 @@ private void configureWorkflowImplementationsByWorkerName(
+ clazz);
}

configureWorkflowImplementationAutoDiscovery(worker, clazz, workerName);
configureWorkflowImplementationAutoDiscovery(worker, clazz, workerName, workers);
}
}
}
Expand All @@ -241,7 +250,7 @@ private void configureActivityBeansByWorkerName(
}

configureActivityImplementationAutoDiscovery(
worker, bean, beanName, targetClass, workerName);
worker, bean, beanName, targetClass, workerName, workers);
}
}
});
Expand Down Expand Up @@ -303,14 +312,21 @@ private void createWorkerFromAnExplicitConfig(
AopUtils.getTargetClass(bean),
taskQueue);
worker.registerActivitiesImplementations(bean);
addRegisteredActivityImpl(worker, beanName, bean.getClass().getName());
});
}
}

private void configureActivityImplementationAutoDiscovery(
Worker worker, Object bean, String beanName, Class<?> targetClass, String byWorkerName) {
Worker worker,
Object bean,
String beanName,
Class<?> targetClass,
String byWorkerName,
Workers workers) {
try {
worker.registerActivitiesImplementations(bean);
addRegisteredActivityImpl(worker, beanName, bean.getClass().getName());
if (log.isInfoEnabled()) {
log.info(
"Registering auto-discovered activity bean '{}' of class {} on a worker {}with a task queue '{}'",
Expand All @@ -334,7 +350,7 @@ private void configureActivityImplementationAutoDiscovery(
}

private void configureWorkflowImplementationAutoDiscovery(
Worker worker, Class<?> clazz, String byWorkerName) {
Worker worker, Class<?> clazz, String byWorkerName, Workers workers) {
try {
configureWorkflowImplementation(worker, clazz);
if (log.isInfoEnabled()) {
Expand Down Expand Up @@ -378,6 +394,7 @@ private <T> void configureWorkflowImplementation(Worker worker, Class<?> clazz)
(Class<T>) workflowMethod.getWorkflowInterface(),
() -> (T) beanFactory.createBean(clazz),
workflowImplementationOptions);
addRegisteredWorkflowImpl(worker, workflowMethod.getWorkflowInterface().getName());
}
}

Expand Down Expand Up @@ -410,6 +427,94 @@ private Worker createNewWorker(
return worker;
}

private void addRegisteredWorkflowImpl(Worker worker, String workflowClass) {
if (!registeredInfo.containsKey(worker.getTaskQueue())) {
registeredInfo.put(
worker.getTaskQueue(),
new RegisteredInfo()
.addWorkflowInfo(new RegisteredWorkflowInfo().addClassName(workflowClass)));
} else {
registeredInfo
.get(worker.getTaskQueue())
.getRegisteredWorkflowInfo()
.add(new RegisteredWorkflowInfo().addClassName(workflowClass));
}
}

private void addRegisteredActivityImpl(Worker worker, String beanName, String beanClass) {
if (!registeredInfo.containsKey(worker.getTaskQueue())) {
registeredInfo.put(
worker.getTaskQueue(),
new RegisteredInfo()
.addActivityInfo(
new RegisteredActivityInfo().addBeanName(beanName).addClassName(beanClass)));
} else {
registeredInfo
.get(worker.getTaskQueue())
.getRegisteredActivityInfo()
.add(new RegisteredActivityInfo().addBeanName(beanName).addClassName(beanClass));
}
}

public static class RegisteredInfo {
private final List<RegisteredActivityInfo> registeredActivityInfo = new ArrayList<>();
private final List<RegisteredWorkflowInfo> registeredWorkflowInfo = new ArrayList<>();

public RegisteredInfo addActivityInfo(RegisteredActivityInfo activityInfo) {
registeredActivityInfo.add(activityInfo);
return this;
}

public RegisteredInfo addWorkflowInfo(RegisteredWorkflowInfo workflowInfo) {
registeredWorkflowInfo.add(workflowInfo);
return this;
}

public List<RegisteredActivityInfo> getRegisteredActivityInfo() {
return registeredActivityInfo;
}

public List<RegisteredWorkflowInfo> getRegisteredWorkflowInfo() {
return registeredWorkflowInfo;
}
}

public static class RegisteredActivityInfo {
private String beanName;
private String className;

public RegisteredActivityInfo addClassName(String className) {
this.className = className;
return this;
}

public RegisteredActivityInfo addBeanName(String beanName) {
this.beanName = beanName;
return this;
}

public String getClassName() {
return className;
}

public String getBeanName() {
return beanName;
}
}

public static class RegisteredWorkflowInfo {
private String className;

public RegisteredWorkflowInfo addClassName(String className) {
this.className = className;
return this;
}

public String getClassName() {
return className;
}
}

private static class Workers {
private final Map<String, Worker> workersByName = new HashMap<>();
private final Map<String, Worker> workersByTaskQueue = new HashMap<>();
Expand Down
Loading