diff --git a/README.md b/README.md index 25fa65ad..f2eeeb9f 100644 --- a/README.md +++ b/README.md @@ -143,12 +143,15 @@ ES_PASSWORD="na" # Num of workers to start. 1 is the minimum. NUM_WORKERS=1 +# Default value is 16. 1 is the mininum. +WORKER_MAX_NUM_RUNNING_PROCS=16 + # Default value is 1. Specifies the number of days (int) until the # abandoned workers in the cws_workers database table are cleaned out. WORKER_ABANDONED_DAYS=1 # Run the dev script -./dev.sh `pwd` ${USER} ${DB_TYPE} ${DB_HOST} ${DB_PORT} ${DB_NAME} ${DB_USER} ${DB_PASS} ${ES_PROTOCOL} ${ES_HOST} ${ES_PORT} ${ES_USE_AUTH} ${ES_USERNAME} ${ES_PASSWORD} ${CLOUD} ${SECURITY} ${HOSTNAME} ${EMAIL_LIST} ${ADMIN_FIRST} ${ADMIN_LAST} ${ADMIN_EMAIL} ${NUM_WORKERS} ${WORKER_ABANDONED_DAYS} +./dev.sh `pwd` ${USER} ${DB_TYPE} ${DB_HOST} ${DB_PORT} ${DB_NAME} ${DB_USER} ${DB_PASS} ${ES_PROTOCOL} ${ES_HOST} ${ES_PORT} ${ES_USE_AUTH} ${ES_USERNAME} ${ES_PASSWORD} ${CLOUD} ${SECURITY} ${HOSTNAME} ${EMAIL_LIST} ${ADMIN_FIRST} ${ADMIN_LAST} ${ADMIN_EMAIL} ${NUM_WORKERS} ${WORKER_MAX_NUM_RUNNING_PROCS} ${WORKER_ABANDONED_DAYS} ``` ###### Run Personal Dev Script diff --git a/cws-core/src/main/java/jpl/cws/core/db/SchedulerDbService.java b/cws-core/src/main/java/jpl/cws/core/db/SchedulerDbService.java index cdbd8a4d..55094dee 100644 --- a/cws-core/src/main/java/jpl/cws/core/db/SchedulerDbService.java +++ b/cws-core/src/main/java/jpl/cws/core/db/SchedulerDbService.java @@ -47,8 +47,7 @@ public class SchedulerDbService extends DbService implements InitializingBean { public static final int DEFAULT_WORKER_PROC_DEF_MAX_INSTANCES = 1; public static final int PROCESSES_PAGE_SIZE = 100; - - // KEY FOR THIS IS: KEY `claimKey` (`status`,`proc_def_key`,`priority`,`created_time`) + public static final String FIND_CLAIMABLE_ROWS_SQL = "SELECT uuid FROM cws_sched_worker_proc_inst " + "WHERE " + @@ -243,17 +242,24 @@ public int updateProcInstIdAndStartedByWorker( return numUpdated; } - + + + /** * Attempt to claim a process start request in the database. * - * @param procDefKey -- only attempt to claim rows for this process definition + * @param workerProcsList -- attempts to claim rows for the active set of process definition(s) * @return mappings of claimUuids and claimedRowUuids * */ - public Map> claimHighestPriorityStartReq(String workerId, String procDefKey, int limit) { + + public Map> claimHighestPriorityStartReq(String workerId, Map workerProcsList, Map limitsPerProcs, int limit) { List claimUuids = new ArrayList(); List rowUuids = new ArrayList(); + List rowUuidsPerProcDefKey = new ArrayList(); + LinkedHashMap uuidAndProcDefKeyPair = new LinkedHashMap(); + List clearOutUnclaimedInst = new ArrayList(); + List unfilteredRowUuids = new ArrayList(); List claimedRowUuids = new ArrayList(); long t0 = System.currentTimeMillis(); int numClaimed = 0; @@ -266,11 +272,45 @@ public Map> claimHighestPriorityStartReq(String workerId, St try { // Find claimable rows // - rowUuids = jdbcTemplate.queryForList( - FIND_CLAIMABLE_ROWS_SQL, String.class, - new Object[] {procDefKey, - limit*2}); // over-find because some workers might compete with this set + for (Map.Entry procs : limitsPerProcs.entrySet()) { + rowUuidsPerProcDefKey = jdbcTemplate.queryForList(FIND_CLAIMABLE_ROWS_SQL, String.class, + new Object[] {procs.getKey(), procs.getValue()*2}); + // get list of uuids using array of procdefkeys IN (keys) + unfilteredRowUuids.addAll(rowUuidsPerProcDefKey); + } + + Collections.sort(unfilteredRowUuids); + for (String id : unfilteredRowUuids) { + String procDefKeyString = getProcDefKeyFromUuid(id); + uuidAndProcDefKeyPair.put(id, procDefKeyString); + } + + for (Map.Entry procLimit : limitsPerProcs.entrySet()) { + Set keys = uuidAndProcDefKeyPair.keySet(); + int applyPerProcsCap = 0; + for (String key : keys) { + + if (uuidAndProcDefKeyPair.get(key).equals(procLimit.getKey())) { + applyPerProcsCap = applyPerProcsCap + 1; + if (applyPerProcsCap > procLimit.getValue()) { + clearOutUnclaimedInst.add(key); + } + } + } + } + + for (String removeUuidFromList : clearOutUnclaimedInst) { + uuidAndProcDefKeyPair.remove(removeUuidFromList); + } + + Set uuidKeys = uuidAndProcDefKeyPair.keySet(); + // after its filtered add the uuids to rowUuids arraylist + for (String key : uuidKeys) { + rowUuids.add(key); + } + // make query that uses multi limit per ProcDefkey (JOIN) + // iterate to grab 30 if (!rowUuids.isEmpty()) { // Found some claimable rows, so now try to claim them.. // @@ -283,7 +323,7 @@ public Map> claimHighestPriorityStartReq(String workerId, St numClaimed++; claimUuids.add(claimUuid); claimedRowUuids.add(uuid); - log.debug("CLAIMED " + claimUuid + " (uuid=" +uuid+") for procDefKey '" + procDefKey + "'"); + //log.debug("CLAIMED " + claimUuid + " (uuid=" +uuid+") for procDefKey '" + procDefKeyList + "'"); } if (numClaimed == limit) { @@ -293,12 +333,12 @@ public Map> claimHighestPriorityStartReq(String workerId, St if (numClaimed == 0) { // other workers beat us to claiming the rows - log.warn("Attempted to claim " + rowUuids.size() + " rows for procDefKey '" + procDefKey + "', but claimed none! " + + log.warn("Attempted to claim " + rowUuids.size() + " rows for procDefKeys '" + workerProcsList.keySet() + "', but claimed none! " + (attempts < 10 ? "Retrying..." : "GIVING UP!")); continue; // retry finding claimable rows } else { - log.debug("Claimed (" + numClaimed + " of " + rowUuids.size() + ") for procDefKey '" + procDefKey + "'"); + log.debug("Claimed (" + numClaimed + " of " + rowUuids.size() + ") for procDefKeys '" + workerProcsList.keySet() + "'"); } } else if (log.isTraceEnabled()) { @@ -335,11 +375,11 @@ else if (log.isTraceEnabled()) { if (numClaimed != claimUuids.size()) { log.error("numUpdated != claimUuids.size()" ); } - + Map> ret = new HashMap>(); ret.put("claimUuids", claimUuids); ret.put("claimedRowUuids", claimedRowUuids); - + return ret; } @@ -356,8 +396,25 @@ public String getProcInstRowStatus(String uuid) { return null; } } - - + + public int getMaxProcsValueForWorker(String workerId) { + return jdbcTemplate.queryForObject( + "SELECT max_num_running_procs FROM cws_worker WHERE id=?", + new Object[] {workerId}, Integer.class); + } + + public int getCountForClaimedProcInstPerKey(String procDefKey, List claimedUuids) { + String listOfClaimUuid = "\"" + String.join("\", \"", claimedUuids) + "\"" ; + String query = "SELECT count(*) FROM cws_sched_worker_proc_inst " + "WHERE proc_def_key='" + procDefKey + "' " + "AND claim_uuid IN (" + listOfClaimUuid + ")"; + return jdbcTemplate.queryForObject(query, Integer.class); + } + + + public String getProcDefKeyFromUuid(String uuid) { + String query = "SELECT proc_def_key FROM cws_sched_worker_proc_inst " + "WHERE uuid='" + uuid + "'"; + return jdbcTemplate.queryForObject(query, String.class); + } + public Map getProcInstRow(String uuid) { List> list = jdbcTemplate.queryForList( "SELECT * FROM cws_sched_worker_proc_inst " + diff --git a/cws-engine-service/src/main/java/jpl/cws/engine/EngineDbService.java b/cws-engine-service/src/main/java/jpl/cws/engine/EngineDbService.java index 703759c8..b8cf38bd 100644 --- a/cws-engine-service/src/main/java/jpl/cws/engine/EngineDbService.java +++ b/cws-engine-service/src/main/java/jpl/cws/engine/EngineDbService.java @@ -26,6 +26,7 @@ public class EngineDbService extends DbService implements InitializingBean { @Value("${cws.install.type}") private String cwsInstallType; @Value("${cws.worker.type}") private String cwsWorkerType; @Value("${camunda.executor.service.max.pool.size}") private int maxExecutorServicePoolSize; + @Value("${worker.max.num.running.procs}") private int workerMaxNumRunningProcs; private Logger log; @@ -180,8 +181,8 @@ public void createOrUpdateWorkerRow(String lockOwner) { numUpdated = jdbcTemplate.update( "INSERT INTO cws_worker" + " (id, lock_owner, name, install_directory, cws_install_type, cws_worker_type, " + - " status, job_executor_max_pool_size, created_time, last_heartbeat_time) " + - "VALUES (?,?,?,?,?,?,?,?,?,?)", + " status, job_executor_max_pool_size, max_num_running_procs, created_time, last_heartbeat_time) " + + "VALUES (?,?,?,?,?,?,?,?,?,?,?)", new Object[]{ workerId, lockOwner, @@ -191,6 +192,7 @@ public void createOrUpdateWorkerRow(String lockOwner) { cwsWorkerType, null, // status will be changed to null once the worker is fully initialized maxExecutorServicePoolSize, // changeable later via the UI.. + workerMaxNumRunningProcs, tsNow, // created_time tsNow // last_heartbeat_time }); diff --git a/cws-engine-service/src/main/java/jpl/cws/engine/WorkerService.java b/cws-engine-service/src/main/java/jpl/cws/engine/WorkerService.java index c2daf071..54ccdd4e 100644 --- a/cws-engine-service/src/main/java/jpl/cws/engine/WorkerService.java +++ b/cws-engine-service/src/main/java/jpl/cws/engine/WorkerService.java @@ -75,7 +75,9 @@ public class WorkerService implements InitializingBean { @Value("${cws.engine.jobexecutor.enabled}") private boolean jobExecutorEnabled; @Value("${cws.tomcat.lib}") private String cwsTomcatLib; - + + @Value("${worker.max.num.running.procs}") private int workerMaxNumRunningProcs; + private Logger log; public static AtomicInteger processorExecuteCount = new AtomicInteger(0); @@ -84,8 +86,9 @@ public class WorkerService implements InitializingBean { // Map of procDefKey and count of active process instances public static Map processCounters = new HashMap(); - + private static Map workerMaxProcInstances = new HashMap(); + private static Set procStartReqUuidStartedThisWorker = new HashSet(); private static Set acceptingProcDefKeys = new HashSet(); //private static Set runningToCompleteTransitionUuids = new HashSet(); @@ -185,7 +188,7 @@ public void initProcessCountersAndLimits() { } - log.debug("AFTER INIT: limits: " + workerMaxProcInstances + ", counts: " + processCounters); + log.info("AFTER INIT: limits: " + workerMaxProcInstances + ", counts: " + processCounters); } @@ -312,7 +315,7 @@ public boolean updateProcessCountersAndLimits() { // String postConfig = "limits: " + workerMaxProcInstances + ", counts: " + processCounters; if (lastProcCounterStatusMsg == null || !lastProcCounterStatusMsg.equals(postConfig)) { - log.debug("NEW: " + postConfig + ", OLD: " + lastProcCounterStatusMsg); + log.info("NEW: " + postConfig + ", OLD: " + lastProcCounterStatusMsg); lastProcCounterStatusMsg = postConfig; return true; // config changed } @@ -670,59 +673,92 @@ public List> claimWithCounter(String limitToProcDefKey) { synchronized (procStateLock) { // procCountsLock t1 = System.currentTimeMillis(); + + int procSetSize = 0; + //int totalCurrentRunningProcsOnWorker = 0; + Map currentCounts = new HashMap(); + Map remainders = new HashMap(); + Map queryLimitForProcSet = new HashMap(); + Map limitToProcDefKeyObject = new HashMap(); + for (Entry procMax : workerMaxProcInstances.entrySet()) { String procDefKey = procMax.getKey(); - if (limitToProcDefKey != null && !limitToProcDefKey.equals(procDefKey)) { - continue; - } + int procMaxNumber = procMax.getValue(); if (!acceptingProcDefKeys.contains(procDefKey)) { //log.debug("skipping " + procDefKey + " BECAUSE IT NOT ACCEPTING RIGHT NOW!!!!"); continue; } - + + currentCounts.put(procDefKey, processCounters.get(procDefKey)); + remainders.put(procDefKey, procMaxNumber - currentCounts.get(procDefKey)); + queryLimitForProcSet.put(procDefKey, Math.min(EXEC_SERVICE_MAX_POOL_SIZE, remainders.get(procDefKey))); + //log.trace("getting currentCount for procDefKey " + procDefKey); - int currentCount = processCounters.get(procDefKey); + //int currentCount = processCounters.get(procDefKey); //log.trace("currentCount for " + procDefKey + " is " + currentCount); - int remainder = procMaxNumber - currentCount; + //int remainder = procMaxNumber - currentCount; //log.trace("remainder for " + procDefKey + " is " + remainder); - int queryLimit = Math.min(EXEC_SERVICE_MAX_POOL_SIZE, remainder); + //int queryLimit = Math.min(EXEC_SERVICE_MAX_POOL_SIZE, remainder); // FIXME: needs revisit for proper min //log.trace("queryLimit for " + procDefKey + " is " + queryLimit); - - if (remainder > 0) { - // claim for remainder (marks DB rows as "claimedByWorker") - Map> claimRowData = - schedulerDbService.claimHighestPriorityStartReq( - workerId, procDefKey, queryLimit); - - List claimed = claimRowData.get("claimUuids"); - - if (!claimed.isEmpty()) { - // increment counter by amount that was actually claimed - // in anticipation that the start will actually work. - // If the start turns out not to later worker, then this count will be decremented at that time. - // - processCounters.put(procDefKey, processCounters.get(procDefKey) + claimed.size()); - // update uuid list - procStartReqUuidStartedThisWorker.addAll(claimRowData.get("claimedRowUuids")); - //log.debug("procStartReqUuidStartedThisWorker = " + procStartReqUuidStartedThisWorker); - - log.debug("(CLAIMED " + claimed.size() + " / " + queryLimit + ", maxProcs=" + procMaxNumber + ") for procDef '" + procDefKey + "' (limitToProcDefKey="+limitToProcDefKey+")"); - - claimUuids.addAll(claimed); + + } // end for loop + + int totalCurrentRunningProcsOnWorker = 0; + for (Entry entry : processCounters.entrySet()) { + totalCurrentRunningProcsOnWorker += entry.getValue().intValue(); + } + + // rename to workerMaxProcQueryLimit + int MaxNumForProcsOnWorker = schedulerDbService.getMaxProcsValueForWorker(workerId); + // this is for all procDefs cap + int workerMaxProcQueryLimit = MaxNumForProcsOnWorker - totalCurrentRunningProcsOnWorker; + + int remaindersTotal = 0; + for (int r: remainders.values()) { + remaindersTotal += r; + } + + if (remaindersTotal > 0 && workerMaxProcQueryLimit > 0) { + // claim for remainder (marks DB rows as "claimedByWorker") + + int queryLimit = Math.min(MaxNumForProcsOnWorker, workerMaxProcQueryLimit); + + Map> claimRowData = + schedulerDbService.claimHighestPriorityStartReq( + workerId, currentCounts, queryLimitForProcSet, queryLimit); // pass list of procDefkey and a map of queryLimit per procDefKey + + List claimed = claimRowData.get("claimUuids"); + + if (!claimed.isEmpty()) { + // increment counter by amount that was actually claimed + // in anticipation that the start will actually work. + // If the start turns out not to later worker, then this count will be decremented at that time. + // + for (Map.Entry procDefKey : processCounters.entrySet()) { + int claimedInstCount = schedulerDbService.getCountForClaimedProcInstPerKey(procDefKey.getKey(), claimed); + processCounters.put(procDefKey.getKey(), processCounters.get(procDefKey.getKey()) + claimedInstCount); } - //else { - // log.debug("NONE CLAIMED (queryLimit=" + queryLimit + ", max=" + procMaxNumber + ") for procDef '" + procDefKey + "' (limitToProcDefKey="+limitToProcDefKey+")"); - //} - } - else { - log.debug("[" + procDefKey + "] remainder <= 0, so not attempting claim. " + - "(remainder = " + remainder + - ", procMaxNumber = " + procMaxNumber + - ", currentCount = " + currentCount + ")"); + + // update uuid list + procStartReqUuidStartedThisWorker.addAll(claimRowData.get("claimedRowUuids")); + //log.debug("procStartReqUuidStartedThisWorker = " + procStartReqUuidStartedThisWorker); + + log.debug("(CLAIMED " + claimed.size() + " / " + queryLimit + ", maxProcs=" + workerMaxProcInstances.entrySet() + ") for procDefKeys '" + workerMaxProcInstances.keySet() + "' (limitToProcDefKey="+limitToProcDefKey+")" + ", workerMaxNumRunningProcs=" + MaxNumForProcsOnWorker); + claimUuids.addAll(claimed); } - - } // end for loop + //else { + // log.debug("NONE CLAIMED (queryLimit=" + queryLimit + ", max=" + procMaxNumber + ") for procDef '" + procDefKey + "' (limitToProcDefKey="+limitToProcDefKey+")"); + //} + } + else { + log.debug("Remainder for Worker Max Process Limit [" + workerMaxProcQueryLimit + "] workerMaxProcQueryLimit <= 0 OR Total of remainders [" + remaindersTotal + "] is <=0, so not attempting claim. " + + "(remainders = " + remainders + + ", procMaxNumbers = " + workerMaxProcInstances.entrySet() + + ", currentCounts = " + currentCounts + ")"); + } + + } // release lock diff --git a/cws-engine-service/src/main/java/jpl/cws/engine/listener/ProcessEventListener.java b/cws-engine-service/src/main/java/jpl/cws/engine/listener/ProcessEventListener.java index a5a0558d..896795ef 100644 --- a/cws-engine-service/src/main/java/jpl/cws/engine/listener/ProcessEventListener.java +++ b/cws-engine-service/src/main/java/jpl/cws/engine/listener/ProcessEventListener.java @@ -70,7 +70,7 @@ public void run() { boolean endedOnThisWorker = workerService.processEndedActions(procDefKey, uuid); if (endedOnThisWorker) { - workerService.procStartReqAction(procDefKey, "processEndEventDetected message received"); + workerService.procStartReqAction(null, "processEndEventDetected message received"); } } else if (eventType.equals("sync")) { @@ -78,10 +78,7 @@ else if (eventType.equals("sync")) { if (processCounterStateChanged) { log.trace(eventType + " :: state changed"); - - // If the process counter state changed, then we potentially have more bandwidth to - // execute more processes of the type that was just completed/failed. - workerService.procStartReqAction(procDefKey, "sync message received"); + workerService.procStartReqAction(null, "sync message received"); } } } diff --git a/cws-installer/src/main/java/jpl/cws/task/CwsInstaller.java b/cws-installer/src/main/java/jpl/cws/task/CwsInstaller.java index 3da167b3..0fd50f29 100644 --- a/cws-installer/src/main/java/jpl/cws/task/CwsInstaller.java +++ b/cws-installer/src/main/java/jpl/cws/task/CwsInstaller.java @@ -199,6 +199,7 @@ public class CwsInstaller { private static String user_provided_logstash; private static String history_level; private static String history_days_to_live; + private static String worker_max_num_running_procs; private static String worker_abandoned_days; private static String aws_default_region; @@ -260,6 +261,7 @@ public static void main(String args[]) { setupAwsSqs(); } setupLimitToRemoveAbandonedWorkersByDays(); + setupMaxLimitForNumberOfProcessesPerWorker(); genUniqueWorkerId(); setupStartupAutoregisterProcessDefs(); showInstallationInfo(); @@ -884,6 +886,45 @@ private static void setupHistoryDaysToLive() { } } + + private static void setupMaxLimitForNumberOfProcessesPerWorker() { + worker_max_num_running_procs = getPreset("worker_max_num_running_procs"); + + if (worker_max_num_running_procs == null) { + worker_max_num_running_procs = getPreset("default_worker_max_num_running_procs"); + } + if (worker_max_num_running_procs == null) { + worker_max_num_running_procs = CORES + ""; + } + + // make sure preset is valid positive integer + try { + if (Integer.parseInt(worker_max_num_running_procs) <= 0) { + log.warn("Processes per worker value must be a positive integer. Got: " + worker_max_num_running_procs + ". Defaulting to number of processing cores on machine."); + worker_max_num_running_procs = CORES + ""; + } + } catch (NumberFormatException e) { + log.warn("Processes per worker value failed to parse as an integer. Got: " + worker_max_num_running_procs + ". Defaulting to number of processing cores on machine."); + worker_max_num_running_procs = CORES + ""; + } + + if (cws_installer_mode.equals("interactive")) { + boolean done = false; + while (!done) { + worker_max_num_running_procs = readLine("Enter the maximum number of processes that run on worker(s). " + + "Default is " + CORES + "" + ": ", worker_max_num_running_procs); + + // make sure input was valid + try { + done = Integer.parseInt(worker_max_num_running_procs) >= 1; + } catch (NumberFormatException e) { + // bad input, try again + } + } + } + } + + private static void setupLimitToRemoveAbandonedWorkersByDays() { worker_abandoned_days = getPreset("worker_abandoned_days"); @@ -1581,6 +1622,7 @@ private static void showInstallationInfo() { print("CWS Notification Emails = " + cws_notification_emails); print("CWS Token Expiration In Hours = " + cws_token_expiration_hours); print("History Level = " + history_level); + print("Processes per Worker = " + worker_max_num_running_procs); print("Days Remove Abandoned Workers = " + worker_abandoned_days); if (installConsole) { print("History Days to Live = " + history_days_to_live); @@ -2309,6 +2351,7 @@ private static void updateWorkerProperties() throws IOException { content = content.replace("__CWS_DB_PASSWORD__", cws_db_password); content = content.replace("__CAMUNDA_EXEC_SVC_MAX_POOL_SIZE__", CORES+""); content = content.replace("__AWS_DEFAULT_REGION__", aws_default_region); + content = content.replace("__CWS_WORKER_MAX_NUM_RUNNING_PROCS__", worker_max_num_running_procs); // S3 Initiator might not be in use if(aws_sqs_dispatcher_sqsUrl != null) { @@ -2420,6 +2463,7 @@ private static void updateCwsUiProperties() throws IOException { content = content.replace("__CWS_AUTH_SCHEME__", cws_auth_scheme); content = content.replace("__CWS_HISTORY_DAYS_TO_LIVE__", history_days_to_live); content = content.replace("__CWS_HISTORY_LEVEL__", history_level); + content = content.replace("__CWS_WORKER_MAX_NUM_RUNNING_PROCS__", worker_max_num_running_procs); content = content.replace("__CWS_WORKER_ABANDONED_DAYS__", worker_abandoned_days); content = content.replace("__AWS_DEFAULT_REGION__", aws_default_region); @@ -2452,6 +2496,7 @@ private static void updateCwsUiConfig() throws IOException { content = content.replace("__CWS_DB_PASSWORD__", cws_db_password); content = content.replace("__JOB_EXECUTOR_ACTIVATE__", "false"); content = content.replace("__HISTORY_LEVEL__", history_level); + content = content.replace("__CWS_WORKER_MAX_NUM_RUNNING_PROCS__", worker_max_num_running_procs); content = content.replace("__CWS_WORKER_ABANDONED_DAYS__", worker_abandoned_days); content = content.replace("__CWS_AMQ_HOST__", cws_amq_host); @@ -2808,6 +2853,7 @@ private static void writeOutConfigurationFile() throws IOException { setPreset("user_provided_logstash", user_provided_logstash); setPreset("history_level", history_level); setPreset("history_days_to_live", history_days_to_live); + setPreset("worker_max_num_running_procs", worker_max_num_running_procs); setPreset("worker_abandoned_days", worker_abandoned_days); setPreset("aws_default_region", aws_default_region); setPreset("aws_sqs_dispatcher_sqsUrl", aws_sqs_dispatcher_sqsUrl); diff --git a/cws-service/src/main/java/jpl/cws/controller/RestService.java b/cws-service/src/main/java/jpl/cws/controller/RestService.java index 9c2474bf..a25d45fa 100644 --- a/cws-service/src/main/java/jpl/cws/controller/RestService.java +++ b/cws-service/src/main/java/jpl/cws/controller/RestService.java @@ -957,6 +957,9 @@ public GsonUTCDateAdapter() { Map procs = cwsConsoleService.getWorkerNumRunningProcs(); + log.info("*** LOG cwsConsoleService.getWorkerNumRunningProcs() : " + procs); + + for (String workerId : procs.keySet()) { int count = Integer.parseInt(procs.get(workerId)); @@ -968,7 +971,10 @@ public GsonUTCDateAdapter() { procs.put(workerId, Integer.toString(total)); } - + + log.info("*** LOG return procs : " + procs); + + return procs; } diff --git a/dev.sh b/dev.sh index 14697947..4e0116b2 100755 --- a/dev.sh +++ b/dev.sh @@ -26,7 +26,8 @@ ADMIN_FIRSTNAME=${19} ADMIN_LASTNAME=${20} ADMIN_EMAIL=${21} NUM_WORKERS=${22} -WORKER_ABANDONED_DAYS=${23} +WORKER_MAX_NUM_RUNNING_PROCS=${23} +WORKER_ABANDONED_DAYS=${24} source ${ROOT}/utils.sh diff --git a/install/cws-engine/cws-engine.properties b/install/cws-engine/cws-engine.properties index ef4c24bb..eb09f3af 100644 --- a/install/cws-engine/cws-engine.properties +++ b/install/cws-engine/cws-engine.properties @@ -11,6 +11,7 @@ cws.auth.scheme=__CWS_AUTH_SCHEME__ cws.worker.id=__CWS_WORKER_ID__ cws.worker.type=__CWS_WORKER_TYPE__ cws.worker.abandoned.days=__CWS_WORKER_ABANDONED_DAYS__ +worker.max.num.running.procs=__CWS_WORKER_MAX_NUM_RUNNING_PROCS__ cws.jmx.service.url=service:jmx:rmi:///jndi/rmi://localhost:__CWS_JMX_PORT__/jmxrmi camunda.executor.service.max.pool.size=__CAMUNDA_EXEC_SVC_MAX_POOL_SIZE__ default.smtp.host=smtp.localhost diff --git a/install/cws-ui/cws-ui.properties b/install/cws-ui/cws-ui.properties index 6815235b..957224f0 100644 --- a/install/cws-ui/cws-ui.properties +++ b/install/cws-ui/cws-ui.properties @@ -49,6 +49,7 @@ cws.history.days.to.live=__CWS_HISTORY_DAYS_TO_LIVE__ cws.history.level=__CWS_HISTORY_LEVEL__ cws.worker.abandoned.days=__CWS_WORKER_ABANDONED_DAYS__ +worker.max.num.running.procs=__CWS_WORKER_MAX_NUM_RUNNING_PROCS__ # # AWS CLOUD PROPERTIES # diff --git a/install/example-cws-configuration.properties b/install/example-cws-configuration.properties index 688d5efe..43bf8a7c 100644 --- a/install/example-cws-configuration.properties +++ b/install/example-cws-configuration.properties @@ -153,6 +153,12 @@ history_days_to_live=7 # worker table. Allowed values are whole integers 1 <= n worker_abandoned_days=1 +# Specifies the max number of actively running process instances on worker. This variable is set for +# each worker in the CWS DB table 'cws_worker' on column 'worker_max_num_running_procs'. Once CWS installation +# is complete the value(s) within the database are manually configurable, for each respective worker. +# Allowed values are whole integers 1 <= n +worker_max_num_running_procs=16 + # Specifies the number of hours that a CWS security token is valid for. After this # amount of time it will expire, and the User will be required to authenticate # again to get a new one. diff --git a/install/installerPresets.properties b/install/installerPresets.properties index f66a50b2..e6260081 100644 --- a/install/installerPresets.properties +++ b/install/installerPresets.properties @@ -21,4 +21,4 @@ default_history_level=full default_history_days_to_live=7 default_worker_abandoned_days=1 default_aws_cloudwatch_endpoint=monitoring.us-west-1.amazonaws.com -default_metrics_publishing_interval=10 \ No newline at end of file +default_metrics_publishing_interval=10 diff --git a/install/sql/core.sql.template b/install/sql/core.sql.template index ea0338c3..59c0e37e 100644 --- a/install/sql/core.sql.template +++ b/install/sql/core.sql.template @@ -7,6 +7,7 @@ CREATE TABLE IF NOT EXISTS `cws_worker` ( `cws_worker_type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `status` varchar(255) COLLATE utf8_bin DEFAULT NULL, `job_executor_max_pool_size` int(11) DEFAULT NULL, + `max_num_running_procs` int(11) NOT NULL, `active_count` int(11) DEFAULT NULL, `created_time` datetime DEFAULT NULL, `last_heartbeat_time` datetime NOT NULL, diff --git a/utils.sh b/utils.sh index c066047c..3e513779 100644 --- a/utils.sh +++ b/utils.sh @@ -203,9 +203,10 @@ function auto_conf_data () { ADMIN_LASTNAME=${21} ADMIN_EMAIL=${22} NUM_WORKERS=${23} - WORKER_ABANDONED_DAYS=${24} + WORKER_MAX_NUM_RUNNING_PROCS=${24} + WORKER_ABANDONED_DAYS=${25} - OUTPUT_FILE=${25} + OUTPUT_FILE=${26} source ${ROOT}/utils.sh @@ -297,6 +298,7 @@ function auto_conf_data () { cws_amq_jmx_port=${CWS_AMQ_JMX_PORT} cws_jmx_port=${CWS_JMX_PORT} history_days_to_live=1 + worker_max_num_running_procs=${WORKER_MAX_NUM_RUNNING_PROCS} worker_abandoned_days=${WORKER_ABANDONED_DAYS} notify_users_email=y email_subject=[CWS] You have been assigned a task (CWS_TASK_NAME) @@ -316,5 +318,4 @@ function auto_conf_data () { cws_token_expiration_hours=240 user_provided_logstash=n EOF - }