Skip to content

Commit

Permalink
[MOSIP-19429] added thread pool with config prop and logic to monitor (
Browse files Browse the repository at this point in the history
…#771)

* [MOSIP-19429] added thread pool with config prop and logic to monitor

* [MOSIP-19429] updated thread group count
  • Loading branch information
manojsp12 authored Jan 12, 2022
1 parent bab3d16 commit 2b529fa
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class IdAuthFraudAnalysisEventManager {
@Autowired
private ObjectMapper mapper;

@Async
@Async("fraudAnalysisExecutor")
public void analyseDigitalSignatureFailure(String uri, Map<String, Object> request, String errorMessage) {
if (EnvUtil.getIsFraudAnalysisEnabled()) {
List<String> pathSegments = Arrays.asList(uri.split("/"));
Expand All @@ -73,7 +73,7 @@ public void analyseDigitalSignatureFailure(String uri, Map<String, Object> reque
}
}

@Async
@Async("fraudAnalysisExecutor")
public void analyseEvent(AutnTxn txn) {
if (EnvUtil.getIsFraudAnalysisEnabled()) {
IdAuthFraudAnalysisEventDTO eventData = createEventData(txn.getRefId(), txn.getRequestTrnId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@

import java.util.Arrays;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.MessageSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.context.i18n.LocaleContextHolder;
import org.springframework.context.support.ResourceBundleMessageSource;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.servlet.LocaleResolver;
import org.springframework.web.servlet.i18n.SessionLocaleResolver;
Expand All @@ -20,7 +25,10 @@
import io.mosip.authentication.common.service.util.EnvUtil;
import io.mosip.authentication.core.constant.RestServicesConstants;
import io.mosip.authentication.core.indauth.dto.IdType;
import io.mosip.authentication.core.logger.IdaLogger;
import io.mosip.idrepository.core.builder.RestRequestBuilder;
import io.mosip.kernel.core.logger.spi.Logger;
import io.mosip.kernel.core.util.StringUtils;
import io.mosip.kernel.dataaccess.hibernate.config.HibernateDaoConfig;

/**
Expand All @@ -30,6 +38,8 @@
*
*/
public abstract class IdAuthConfig extends HibernateDaoConfig {

private static final Logger logger = IdaLogger.getLogger(IdAuthConfig.class);

/** The environment. */
@Autowired
Expand Down Expand Up @@ -113,5 +123,62 @@ public RestRequestBuilder getRestRequestBuilder() {
return new RestRequestBuilder(Arrays.stream(RestServicesConstants.values())
.map(RestServicesConstants::getServiceName).collect(Collectors.toList()));
}

@Bean
@Primary
public Executor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Math.floorDiv(EnvUtil.getActiveAsyncThreadCount(), 3));
executor.setMaxPoolSize(EnvUtil.getActiveAsyncThreadCount());
executor.setThreadNamePrefix("idauth-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}

@Bean
@Qualifier("webSubHelperExecutor")
public Executor webSubHelperExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Math.floorDiv(EnvUtil.getActiveAsyncThreadCount(), 3));
executor.setMaxPoolSize(EnvUtil.getActiveAsyncThreadCount());
executor.setThreadNamePrefix("idauth-websub-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}

@Bean
@Qualifier("fraudAnalysisExecutor")
public Executor fraudAnalysisExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Math.floorDiv(EnvUtil.getActiveAsyncThreadCount(), 3));
executor.setMaxPoolSize(EnvUtil.getActiveAsyncThreadCount());
executor.setThreadNamePrefix("idauth-fraud-analysis-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}

@Scheduled(fixedRateString = "${" + "mosip.ida.monitor-thread-queue-in-ms" + ":10000}")
public void monitorThreadQueueLimit() {
if (StringUtils.isNotBlank(EnvUtil.getMonitorAsyncThreadQueue())) {
ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) executor();
ThreadPoolTaskExecutor webSubHelperExecutor = (ThreadPoolTaskExecutor) webSubHelperExecutor();
ThreadPoolTaskExecutor fraudAnalysisExecutor = (ThreadPoolTaskExecutor) fraudAnalysisExecutor();
String monitoringLog = "Thread Name : {} Thread Active Count: {} Thread Task count: {} Thread queue count: {}";
logThreadQueueDetails(threadPoolTaskExecutor, threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size(), monitoringLog);
logThreadQueueDetails(webSubHelperExecutor, webSubHelperExecutor.getThreadPoolExecutor().getQueue().size(), monitoringLog);
logThreadQueueDetails(fraudAnalysisExecutor, fraudAnalysisExecutor.getThreadPoolExecutor().getQueue().size(), monitoringLog);
}
}

private void logThreadQueueDetails(ThreadPoolTaskExecutor threadPoolTaskExecutor, int threadPoolQueueSize,
String monitoringLog) {
if (threadPoolQueueSize > EnvUtil.getAsyncThreadQueueThreshold())
logger.info(monitoringLog, threadPoolTaskExecutor.getThreadNamePrefix(),
threadPoolTaskExecutor.getActiveCount(),
threadPoolTaskExecutor.getThreadPoolExecutor().getTaskCount(), threadPoolQueueSize);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void initRegistrar(WebSubEventTopicRegistrar registrar, Supplier<Boolean>
* @param eventModel the event model
*/
@WithRetry
@Async
@Async("webSubHelperExecutor")
public <U> void publishEvent(String eventTopic, U eventModel) {
publisher.publishUpdate(eventTopic, eventModel, MediaType.APPLICATION_JSON_VALUE, null, publisherUrl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ public class EnvUtil {

@Getter @Setter private static String internalAuthInternalBioRefId;

@Getter @Setter private static Integer activeAsyncThreadCount;

@Getter @Setter private static String monitorAsyncThreadQueue;

@Getter @Setter private static Integer asyncThreadQueueThreshold;

@Autowired
private Environment env;

Expand Down Expand Up @@ -258,6 +264,9 @@ public void init() {
setInternalAuthTrustValidationRequired(this.getProperty("mosip.ida.internal.trust-validation-required", Boolean.class, false));
setInternalAuthInternalRefId(this.getProperty(INTERNAL_REFERENCE_ID));
setInternalAuthInternalBioRefId(this.getProperty(INTERNAL_BIO_REFERENCE_ID));
setActiveAsyncThreadCount(this.getProperty("mosip.ida.active-async-thread-count", Integer.class));
setMonitorAsyncThreadQueue(this.getProperty("mosip.ida.monitor-thread-queue-in-ms"));
setAsyncThreadQueueThreshold(this.getProperty("mosip.ida.max-thread-queue-threshold", Integer.class, 0));
}

public String getProperty(String key) {
Expand Down

0 comments on commit 2b529fa

Please sign in to comment.